Go Gin SSE 多协程编程实践 0
文章除了会介绍 SSE 基础示例外,也会讲解一些 go 多协程开发,代码构建的技术。
什么是 SSE
Server Sent Events (SSE) 是一种允许服务器向客户端推送更新的技术。与传统的请求/响应模式不同,SSE 通过建立一个持久的连接,允许服务器实时地将新的数据推送到客户端。
SSE 基于 HTTP 协议,所以在返回的响应类型上遵循标准格式,但是具有特殊的 Header。
- Content-Type: 这个标头的值将会被设置为 text/event-stream,表明这是一个 SSE 连接。
- Cache-Control: 这个标头将会被设置为 no-cache,来防止中间服务器缓存这个连接。
- Connection: 这个标头将一般被设置为 keep-alive,保证连接的持久性。
同时,数据本身也是特殊的文本格式,有着一定的模式,这里不再关心,具体细节将被类库实现,因为文章要讲的是如何基于 SSE 建立应用,而不是如何建立 SSE 类库。
使用
Go 的 Gin 框架已经实现了 SSE。这是代码片段。
package main
import (
"github.com/gin-gonic/gin"
"io"
"time"
)
func main() {
r := gin.Default()
r.GET("/sse", func(c *gin.Context) {
ticker := time.NewTicker(time.Second)
c.Stream(func(w io.Writer) bool {
_, ok := <-ticker.C
if ok {
c.SSEvent("message", "ping")
}
return ok
})
})
_ = r.Run()
}
启动这个最小示例,用你喜欢的方式请求一下这个端点,可以看到每秒一次的 ping 文本消息。
实践
以业务需求为例,客户端需要接收某种提示后弹窗确认,这个提示信息来自于其他用户。
这里有两种消息,提示 H 和确认 C。一个身份标识 UID 和一个消息标识 MID。
开始改造代码。
增加消息结构
type MsgH struct {
MID MID `json:"id"`
}
type MsgC struct {
MID MID `json:"id"`
}
type MID string
type UID string
增加消息路由
r.POST("/H", func(context *gin.Context) {
// 接受H类型信息MsgH
})
r.POST("/C", func(context *gin.Context) {
// 接受C类型信息MsgC
})
传递数据
显然我们需要跨协程传递数据,将两个接受消息 handler 协程的数据发送到建立 sse 长连接的协程。在 go 的世界我们用 channel 来实现。
在使用 channel 时,我们总应该思考这个 channel 的性质。那什么是 channel 的性质?
也就是发送者和接收者的数量。mpsc 多生产者单消费者,这是 go 的 channel。所以在这个场景里,我们很容易想到,两个 handler 就是生产者,sse 路由是消费者(是这样吗?)
var stream = make(chan any, 2)
r.GET("/sse", func(c *gin.Context) {
c.SSEvent("message", "hello world")
c.Writer.Flush()
c.Stream(func(w io.Writer) bool {
msg, ok := <-stream
if ok {
c.SSEvent("message", msg)
c.Writer.Flush()
}
return ok
})
})
r.POST("/H", func(context *gin.Context) {
var msg = new(MsgH)
err := context.BindJSON(&msg)
if err != nil {
return
}
stream <- msg
})
r.POST("/C", func(context *gin.Context) {
var msg = new(MsgC)
err := context.BindJSON(&msg)
if err != nil {
return
}
stream <- msg
})
正确分析数据流
当只有一个连接时,这个示例能够工作,发送的 C 消息和 H 消息都会出现那个唯一的 SSE 流中。
而当另一个连接建立后,你会发现你发送的 C 和 H 会随机出现在两个流中。没错,在上面的代码中,所有建立的 sse 流(handler 闭包)都引用了同一个 channel ,每个协程都在尝试从唯一的管道接受数据,而我们的管道是 mpsc,消息只会被其中一个协程读到,并不是广播的。对于上面的代码,如果我们希望客户端们发的信息广播到所有 sse 流,那就需要一个多生产者多消费者管道。Go 没有这个东西。一般的解决办法是为每个 sse 流建立一个句柄(actor 模型的讲法,如果把每个 sse 流视作 actor,那么给这个 actor 发送消息的管道一般叫 handle)。符合直觉的改法是在创建流时同时创建消息管道(创建 actor 同时创建 handle)。
r.GET("/sse", func(c *gin.Context) {
c.SSEvent("message", "hello world")
//不再从外部引入
var handle = make(chan any,1)
c.Writer.Flush()
c.Stream(func(w io.Writer) bool {
msg, ok := <-handle
if ok {
c.SSEvent("message", msg)
c.Writer.Flush()
}
return ok
})
})
重新建模
好现在每个 sse 流都有了自己句柄,我们可以为每个流发送消息了。什么,句柄还在 sse 的协程里,没法发。顺着这个线索,我们可以想象,怎么把 handle 转移出来。当然还是 channel
var handles = make(chan chan any, 8)
r.GET("/sse", func(c *gin.Context) {
c.SSEvent("message", "hello world")
c.Writer.Flush()
var handle = make(chan any, 1)
handles <- handle
c.Stream(func(w io.Writer) bool {
msg, ok := <-handle
if ok {
c.SSEvent("message", msg)
c.Writer.Flush()
}
return ok
})
})
让 Handle 携带元信息
现在有了多个 handle 的生产者,我们还需要一个消费者。那么谁来消费这个 handle 流?
可以想象,我们要广播消息,所以需要一个东西存这些 sse 的 handle。我们有多个 UID,可能每个 UID 会建立多个 sse 流。而每个 sse 流又拥有自己的生命周期,是独特的。
所以新增一个类型,让 handle 额外携带 sse 流的元信息。
type MID string
type UID string
type SID string //-> 每个sse流自己编号
// SSE句柄
type SSEHandle struct {
UID UID
SID SID
handle chan any
}
分离协程作为 actor
想从 handles 读数据还要另一个协程。因为 main 将为 http server 阻塞,等待协程数据时也是阻塞的。所以从主线程分离一个协程去读 handles 发来的 handle 信息,这个协程内部保存了一个可变状态,就是所有 sse 流的信息。这个协程成为了 sse handle 管理者。这也是一个 actor。而 handles 则是它的 handle。所有的 sse 协程会向它发送自己的 handle
var handles = make(chan *SSEHandle, 8) //-> 每个SSE handle 都带了元信息
go func() {
var handleContainer = make(map[UID]map[SID]chan any, 8)
for handle := range handles {
if _, ok := handleContainer[handle.UID]; !ok {
handleContainer[handle.UID] = make(map[SID]chan any)
}
handleContainer[handle.UID][handle.SID] = handle.handle
}
}
暂停一下
Go 虽然是静态类型语言,但是可以用 any 来传递多种类型数据。因此我们可以预见,这个 handles 还可以传递其他类型数据。那么还有哪些数据会传递?这些数据进入这个协程会被怎么处理。这些我们下一章再来实现。
截至目前的所有代码