Go Gin SSE 多协程编程实践 1
回顾
在上文中,Go Gin SSE 多协程编程实践 0 CodeSection0,我们把每个 sse 发送到了一个单独的协程进行处理。同时也把两个 http handler 接收到的消息发送到了 stream。
继续改进代码
分发消息
分离的协程现有两个 handle,一个用于接收新的 sse 流,一个用于接收 handler 的消息,我们使用 select 从多个 channel 读取数据。
var stream = make(chan any, 2)
var handles = make(chan *SSEHandle, 8)
go func() {
var handleContainer = make(map[UID]map[SID]chan any)
for {
select {
case handle, ok := <-handles:
if !ok {
return
}
if _, ok := handleContainer[handle.UID]; !ok {
handleContainer[handle.UID] = make(map[SID]chan any)
}
handleContainer[handle.UID][handle.SID] = handle.handle
case msg, ok := <-stream:
//todo 进行发送逻辑
}
}
}()
使用 for select 随机从两个 channel 接收数据。要么新增 sse handle,要么对 sse 发送消息,这两个路径永远不会同时发生。协程内部的 handle 容器是可变状态,传统的多线程会把它放入临界区,再对其修改。而 go 则是将持有它变更的临界区(channel 传递的消息)发送到所在的协程(一种抽象理解),因此对它的读写总是通过 channel 传递的指令顺序访问进行的。
补全发送逻辑,这里直接用广播的方式。
var stream = make(chan any, 2)
var handles = make(chan *SSEHandle, 8)
go func() {
var handleContainer = make(map[UID]map[SID]chan any)
for {
select {
case handle, ok := <-handles:
if !ok {
return
}
if _, ok := handleContainer[handle.UID]; !ok {
handleContainer[handle.UID] = make(map[SID]chan any)
}
handleContainer[handle.UID][handle.SID] = handle.handle
case msg, ok := <-stream:
if !ok {
return
}
for _, m := range handleContainer {
for _, handle := range m {
handle <- msg
}
}
}
}
}()
现在每条消息都会发到所有流。
管理生命周期
现在的代码,会对所有 sse 创建管道来传递消息,但是当 sse 关闭时,我们的 sse 句柄依旧生存,这显然是不合理的。管理协程在发送多个消息后,会因为管道没有消费者而阻塞。而那些产生的消息的 http handler 也会因为 stream 管道填满而阻塞(也就是你请求发送的端点时会卡住)。那么怎么做才能正确的管理句柄的生命周期。一个句柄因用户建立 sse 而创建,也应该因为 sse 断开而销毁。直接在断开时关闭管道吗?
r.GET("/sse", func(c *gin.Context) {
var query = &struct {
Id string `form:"id"`
}{}
err := c.ShouldBind(query)
if err != nil {
return
}
uid, err := uuid.Parse(query.Id)
if err != nil {
return
}
c.SSEvent("message", "hello world")
c.Writer.Flush()
var handle = make(chan any, 1)
handles <- &SSEHandle{
UID: UID(uid),
SID: SID(uuid.New()),
handle: handle,
}
c.Stream(func(w io.Writer) bool {
msg, ok := <-handle
if ok {
c.SSEvent("message", msg)
c.Writer.Flush()
}
return ok
})
close(handle)
})
这显然是一种怪异的代码,且不说这样做会带来什么后果。以事物的关系来看,我们在
var handle = make(chan any, 1)
handles <- &SSEHandle{
UID: UID(uid),
SID: SID(uuid.New()),
handle: handle,
}
谁持有谁关闭
这里已经把 handle 的所有权从 sse 协程发送到了管理协程,是的,管理协程持有这些 handle 的所有权,显然 handle 是不能从 sse 一侧关闭的,这不符合事物的关系。
更科学的解释,管理协程持有写权限,如果向一个关闭的管道写数据,我们的程序会 panic,干净的 go 代码中协程一定要由写侧关闭。那么就需要把 close(handle) 移动到管理协程,sse 端通知管理端请关闭我的句柄,我要退出了。
增加一个结构用于通知管理协程关闭 sse 句柄
type SSEExit struct {
UID UID
SID SID
}
接受 sse 断开的关闭请求。删除掉协程本地的记录,并关闭掉句柄 channel。
var stream = make(chan any, 2)
var handles = make(chan *SSEHandle, 8)
var exitSignal = make(chan *SSEExit, 1)//->又创建了一个句柄传递关闭请求
go func() {
var handleContainer = make(map[UID]map[SID]chan any)
for {
select {
case handle, ok := <-handles:
if !ok {
return
}
if _, ok := handleContainer[handle.UID]; !ok {
handleContainer[handle.UID] = make(map[SID]chan any)
}
handleContainer[handle.UID][handle.SID] = handle.handle
case msg, ok := <-stream:
if !ok {
return
}
for _, m := range handleContainer {
for _, handle := range m {
handle <- msg
}
}
case exit, ok := <-exitSignal:
//当sse关闭后,sse一侧向管理端说,我退出了,请不要给我发消息了。
if !ok {
if _, ok := handleContainer[exit.UID]; ok {
close(handleContainer[exit.UID][exit.SID])
delete(handleContainer[exit.UID], exit.SID)
}
}
}
}
}()
Sse 一侧,流结束后,handler 即将退出,向管理协程发送关闭句柄的请求。
r.GET("/sse", func(c *gin.Context) {
var query = &struct {
Id string `form:"id"`
}{}
err := c.ShouldBind(query)
if err != nil {
return
}
uid, err := uuid.Parse(query.Id)
if err != nil {
return
}
c.SSEvent("message", "hello world")
c.Writer.Flush()
var handle = make(chan any, 1)
var sid = SID(uuid.New())
handles <- &SSEHandle{
UID: UID(uid),
SID: sid,
handle: handle,
}
c.Stream(func(w io.Writer) bool {
msg, ok := <-handle
if ok {
c.SSEvent("message", msg)
c.Writer.Flush()
}
return ok
})
//->SID和UID定位了一个链接
exitSignal <- &SSEExit{
UID: UID(uid),
SID: sid,
}
})
暂停一下
现在我们不仅能广播消息,还能正确的管理资源的生命周期。但是目前的代码不够干燥,管理协程提供了多个句柄。代码仍然有很大的调整空间。
截至目前的代码