Go Gin SSE 多协程编程实践 1

回顾

在上文中,Go Gin SSE 多协程编程实践 0 CodeSection0,我们把每个 sse 发送到了一个单独的协程进行处理。同时也把两个 http handler 接收到的消息发送到了 stream。

继续改进代码

分发消息

分离的协程现有两个 handle,一个用于接收新的 sse 流,一个用于接收 handler 的消息,我们使用 select 从多个 channel 读取数据。

var stream = make(chan any, 2)
var handles = make(chan *SSEHandle, 8)

go func() {
	var handleContainer = make(map[UID]map[SID]chan any)

	for {
		select {
		case handle, ok := <-handles:
			if !ok {
				return
			}
			if _, ok := handleContainer[handle.UID]; !ok {
				handleContainer[handle.UID] = make(map[SID]chan any)
			}
			handleContainer[handle.UID][handle.SID] = handle.handle
		case msg, ok := <-stream:
			//todo 进行发送逻辑
		}
	}
}()

使用 for select 随机从两个 channel 接收数据。要么新增 sse handle,要么对 sse 发送消息,这两个路径永远不会同时发生。协程内部的 handle 容器是可变状态,传统的多线程会把它放入临界区,再对其修改。而 go 则是将持有它变更的临界区(channel 传递的消息)发送到所在的协程(一种抽象理解),因此对它的读写总是通过 channel 传递的指令顺序访问进行的。
补全发送逻辑,这里直接用广播的方式。

var stream = make(chan any, 2)
var handles = make(chan *SSEHandle, 8)

go func() {
	var handleContainer = make(map[UID]map[SID]chan any)

	for {
		select {
		case handle, ok := <-handles:
			if !ok {
				return
			}
			if _, ok := handleContainer[handle.UID]; !ok {
				handleContainer[handle.UID] = make(map[SID]chan any)
			}
			handleContainer[handle.UID][handle.SID] = handle.handle
		case msg, ok := <-stream:
			if !ok {
				return
			}
			for _, m := range handleContainer {
				for _, handle := range m {
					handle <- msg
				}
			}
		}
	}
}()

现在每条消息都会发到所有流。

管理生命周期

现在的代码,会对所有 sse 创建管道来传递消息,但是当 sse 关闭时,我们的 sse 句柄依旧生存,这显然是不合理的。管理协程在发送多个消息后,会因为管道没有消费者而阻塞。而那些产生的消息的 http handler 也会因为 stream 管道填满而阻塞(也就是你请求发送的端点时会卡住)。那么怎么做才能正确的管理句柄的生命周期。一个句柄因用户建立 sse 而创建,也应该因为 sse 断开而销毁。直接在断开时关闭管道吗?

r.GET("/sse", func(c *gin.Context) {
	var query = &struct {
		Id string `form:"id"`
	}{}

	err := c.ShouldBind(query)
	if err != nil {
		return
	}

	uid, err := uuid.Parse(query.Id)
	if err != nil {
		return
	}

	c.SSEvent("message", "hello world")
	c.Writer.Flush()

	var handle = make(chan any, 1)
	handles <- &SSEHandle{
		UID:    UID(uid),
		SID:    SID(uuid.New()),
		handle: handle,
	}

	c.Stream(func(w io.Writer) bool {
		msg, ok := <-handle
		if ok {
			c.SSEvent("message", msg)
			c.Writer.Flush()
		}
		return ok
	})
	
	close(handle)
})

这显然是一种怪异的代码,且不说这样做会带来什么后果。以事物的关系来看,我们在

var handle = make(chan any, 1)
handles <- &SSEHandle{
	UID:    UID(uid),
	SID:    SID(uuid.New()),
	handle: handle,
}

谁持有谁关闭

这里已经把 handle 的所有权从 sse 协程发送到了管理协程,是的,管理协程持有这些 handle 的所有权,显然 handle 是不能从 sse 一侧关闭的,这不符合事物的关系。
更科学的解释,管理协程持有写权限,如果向一个关闭的管道写数据,我们的程序会 panic,干净的 go 代码中协程一定要由写侧关闭。那么就需要把 close(handle) 移动到管理协程,sse 端通知管理端请关闭我的句柄,我要退出了。
增加一个结构用于通知管理协程关闭 sse 句柄

type SSEExit struct {
	UID UID
	SID SID
}

接受 sse 断开的关闭请求。删除掉协程本地的记录,并关闭掉句柄 channel。

var stream = make(chan any, 2)
var handles = make(chan *SSEHandle, 8)
var exitSignal = make(chan *SSEExit, 1)//->又创建了一个句柄传递关闭请求

go func() {
	var handleContainer = make(map[UID]map[SID]chan any)

	for {
		select {
		case handle, ok := <-handles:
			if !ok {
				return
			}
			if _, ok := handleContainer[handle.UID]; !ok {
				handleContainer[handle.UID] = make(map[SID]chan any)
			}
			handleContainer[handle.UID][handle.SID] = handle.handle
		case msg, ok := <-stream:
			if !ok {
				return
			}
			for _, m := range handleContainer {
				for _, handle := range m {
					handle <- msg
				}
			}
		case exit, ok := <-exitSignal:
			//当sse关闭后,sse一侧向管理端说,我退出了,请不要给我发消息了。
			if !ok {
				if _, ok := handleContainer[exit.UID]; ok {
					close(handleContainer[exit.UID][exit.SID])
					delete(handleContainer[exit.UID], exit.SID)
				}
			}
		}
	}
}()

Sse 一侧,流结束后,handler 即将退出,向管理协程发送关闭句柄的请求。

r.GET("/sse", func(c *gin.Context) {
	var query = &struct {
		Id string `form:"id"`
	}{}

	err := c.ShouldBind(query)
	if err != nil {
		return
	}

	uid, err := uuid.Parse(query.Id)
	if err != nil {
		return
	}

	c.SSEvent("message", "hello world")
	c.Writer.Flush()

	var handle = make(chan any, 1)
	var sid = SID(uuid.New()) 
	handles <- &SSEHandle{
		UID:    UID(uid),
		SID:    sid,
		handle: handle,
	}

	c.Stream(func(w io.Writer) bool {
		msg, ok := <-handle
		if ok {
			c.SSEvent("message", msg)
			c.Writer.Flush()
		}
		return ok
	})
	//->SID和UID定位了一个链接
	exitSignal <- &SSEExit{
		UID: UID(uid),
		SID: sid,
	}
})

暂停一下

现在我们不仅能广播消息,还能正确的管理资源的生命周期。但是目前的代码不够干燥,管理协程提供了多个句柄。代码仍然有很大的调整空间。
截至目前的代码

package main

import (
	"github.com/gin-gonic/gin"
	"github.com/google/uuid"
	"io"
)

type MsgH struct {
	MID string `json:"id"`
}

type MsgC struct {
	MID string `json:"id"`
}

type MID uuid.UUID
type UID uuid.UUID
type SID uuid.UUID
type SSEHandle struct {
	UID    UID
	SID    SID
	handle chan any
}

type SSEExit struct {
	UID UID
	SID SID
}

func main() {
	r := gin.Default()

	var stream = make(chan any, 2)
	var handles = make(chan *SSEHandle, 8)
	var exitSignal = make(chan *SSEExit, 1)

	go func() {
		var handleContainer = make(map[UID]map[SID]chan any)

		for {
			select {
			case handle, ok := <-handles:
				if !ok {
					return
				}
				if _, ok := handleContainer[handle.UID]; !ok {
					handleContainer[handle.UID] = make(map[SID]chan any)
				}
				handleContainer[handle.UID][handle.SID] = handle.handle
			case msg, ok := <-stream:
				if !ok {
					return
				}
				for _, m := range handleContainer {
					for _, handle := range m {
						handle <- msg
					}
				}
			case exit, ok := <-exitSignal:
				if !ok {
					if _, ok := handleContainer[exit.UID]; ok {
						close(handleContainer[exit.UID][exit.SID])
						delete(handleContainer[exit.UID], exit.SID)
					}
				}
			}
		}
	}()

	r.GET("/sse", func(c *gin.Context) {
		var query = &struct {
			Id string `form:"id"`
		}{}

		err := c.ShouldBind(query)
		if err != nil {
			return
		}

		uid, err := uuid.Parse(query.Id)
		if err != nil {
			return
		}

		c.SSEvent("message", "hello world")
		c.Writer.Flush()

		var handle = make(chan any, 1)
		var sid = SID(uuid.New())
		handles <- &SSEHandle{
			UID:    UID(uid),
			SID:    sid,
			handle: handle,
		}

		c.Stream(func(w io.Writer) bool {
			msg, ok := <-handle
			if ok {
				c.SSEvent("message", msg)
				c.Writer.Flush()
			}
			return ok
		})

		exitSignal <- &SSEExit{
			UID: UID(uid),
			SID: sid,
		}
	})

	r.POST("/H", func(context *gin.Context) {
		var msg = new(MsgH)
		err := context.BindJSON(&msg)
		if err != nil {
			return
		}
		stream <- msg
	})

	r.POST("/C", func(context *gin.Context) {
		var msg = new(MsgC)
		err := context.BindJSON(&msg)
		if err != nil {
			return
		}
		stream <- msg
	})

	_ = r.Run()
}