Go Gin SSE 多协程编程实践 0

文章除了会介绍 SSE 基础示例外,也会讲解一些 go 多协程开发,代码构建的技术。

什么是 SSE

Server Sent Events (SSE) 是一种允许服务器向客户端推送更新的技术。与传统的请求/响应模式不同,SSE 通过建立一个持久的连接,允许服务器实时地将新的数据推送到客户端。
SSE 基于 HTTP 协议,所以在返回的响应类型上遵循标准格式,但是具有特殊的 Header。

使用

Go 的 Gin 框架已经实现了 SSE。这是代码片段。


package main  
  
import (  
    "github.com/gin-gonic/gin"  
    "io"
    "time"
)  
  
func main() {  
    r := gin.Default()  
    r.GET("/sse", func(c *gin.Context) {
    	ticker := time.NewTicker(time.Second)  
    	c.Stream(func(w io.Writer) bool {
    		_, ok := <-ticker.C  
    		if ok {
    			c.SSEvent("message", "ping")  
    		}          
          return ok  
       })  
    })    
    _ = r.Run()  
}

启动这个最小示例,用你喜欢的方式请求一下这个端点,可以看到每秒一次的 ping 文本消息。

实践

以业务需求为例,客户端需要接收某种提示后弹窗确认,这个提示信息来自于其他用户。
这里有两种消息,提示 H 和确认 C。一个身份标识 UID 和一个消息标识 MID
开始改造代码。

增加消息结构

type MsgH struct {
	MID MID `json:"id"`
}

type MsgC struct {
	MID MID `json:"id"`
}

type MID string
type UID string

增加消息路由

r.POST("/H", func(context *gin.Context) {
// 接受H类型信息MsgH
})

r.POST("/C", func(context *gin.Context) {
// 接受C类型信息MsgC
})

传递数据

显然我们需要跨协程传递数据,将两个接受消息 handler 协程的数据发送到建立 sse 长连接的协程。在 go 的世界我们用 channel 来实现。
在使用 channel 时,我们总应该思考这个 channel 的性质。那什么是 channel 的性质?
也就是发送者和接收者的数量。mpsc 多生产者单消费者,这是 go 的 channel。所以在这个场景里,我们很容易想到,两个 handler 就是生产者,sse 路由是消费者(是这样吗?)


var stream = make(chan any, 2)

r.GET("/sse", func(c *gin.Context) {
	c.SSEvent("message", "hello world")
	c.Writer.Flush()
	c.Stream(func(w io.Writer) bool {
		msg, ok := <-stream
		if ok {
			c.SSEvent("message", msg)
			c.Writer.Flush()
		}
		return ok
	})
})

r.POST("/H", func(context *gin.Context) {
	var msg = new(MsgH)
	err := context.BindJSON(&msg)
	if err != nil {
		return
	}
	stream <- msg
})

r.POST("/C", func(context *gin.Context) {
	var msg = new(MsgC)
	err := context.BindJSON(&msg)
	if err != nil {
		return
	}
	stream <- msg
})

正确分析数据流

当只有一个连接时,这个示例能够工作,发送的 C 消息和 H 消息都会出现那个唯一的 SSE 流中。
而当另一个连接建立后,你会发现你发送的 C 和 H 会随机出现在两个流中。没错,在上面的代码中,所有建立的 sse 流(handler 闭包)都引用了同一个 channel ,每个协程都在尝试从唯一的管道接受数据,而我们的管道是 mpsc,消息只会被其中一个协程读到,并不是广播的。对于上面的代码,如果我们希望客户端们发的信息广播到所有 sse 流,那就需要一个多生产者多消费者管道。Go 没有这个东西。一般的解决办法是为每个 sse 流建立一个句柄(actor 模型的讲法,如果把每个 sse 流视作 actor,那么给这个 actor 发送消息的管道一般叫 handle)。符合直觉的改法是在创建流时同时创建消息管道(创建 actor 同时创建 handle)。

r.GET("/sse", func(c *gin.Context) {
	c.SSEvent("message", "hello world")
	//不再从外部引入
	var handle = make(chan any,1)

	c.Writer.Flush()
	c.Stream(func(w io.Writer) bool {
		msg, ok := <-handle
		if ok {
			c.SSEvent("message", msg)
			c.Writer.Flush()
		}
		return ok
	})
})

重新建模

好现在每个 sse 流都有了自己句柄,我们可以为每个流发送消息了。什么,句柄还在 sse 的协程里,没法发。顺着这个线索,我们可以想象,怎么把 handle 转移出来。当然还是 channel

var handles = make(chan chan any, 8)

r.GET("/sse", func(c *gin.Context) {
	c.SSEvent("message", "hello world")
	c.Writer.Flush()

	var handle = make(chan any, 1)
	handles <- handle

	c.Stream(func(w io.Writer) bool {
		msg, ok := <-handle
		if ok {
			c.SSEvent("message", msg)
			c.Writer.Flush()
		}
		return ok
	})
})

让 Handle 携带元信息

现在有了多个 handle 的生产者,我们还需要一个消费者。那么谁来消费这个 handle 流?
可以想象,我们要广播消息,所以需要一个东西存这些 sse 的 handle。我们有多个 UID,可能每个 UID 会建立多个 sse 流。而每个 sse 流又拥有自己的生命周期,是独特的。
所以新增一个类型,让 handle 额外携带 sse 流的元信息。

type MID string  
type UID string  
type SID string //-> 每个sse流自己编号

// SSE句柄
type SSEHandle struct {
	UID UID
	SID SID
	handle chan any
}

分离协程作为 actor

想从 handles 读数据还要另一个协程。因为 main 将为 http server 阻塞,等待协程数据时也是阻塞的。所以从主线程分离一个协程去读 handles 发来的 handle 信息,这个协程内部保存了一个可变状态,就是所有 sse 流的信息。这个协程成为了 sse handle 管理者。这也是一个 actor。而 handles 则是它的 handle。所有的 sse 协程会向它发送自己的 handle

var handles = make(chan *SSEHandle, 8) //-> 每个SSE handle 都带了元信息
go func() {
	var handleContainer = make(map[UID]map[SID]chan any, 8)
	for handle := range handles {
 	if _, ok := handleContainer[handle.UID]; !ok {
 		handleContainer[handle.UID] = make(map[SID]chan any)
 	}
 	handleContainer[handle.UID][handle.SID] = handle.handle
 }
}

暂停一下

Go 虽然是静态类型语言,但是可以用 any 来传递多种类型数据。因此我们可以预见,这个 handles 还可以传递其他类型数据。那么还有哪些数据会传递?这些数据进入这个协程会被怎么处理。这些我们下一章再来实现。
截至目前的所有代码

package main

import (
	"github.com/gin-gonic/gin"
	"github.com/google/uuid"
	"io"
)
// 定义结构
type MsgH struct {
	MID MID `json:"id"`
}

type MsgC struct {
	MID MID `json:"id"`
}
// 选择了用 uuid 作为 id
type MID uuid.UUID 
type UID uuid.UUID
type SID uuid.UUID
type SSEHandle struct {
	UID    UID
	SID    SID
	handle chan any
}

func main() {
	r := gin.Default()

	var stream = make(chan any, 2)
	var handles = make(chan *SSEHandle, 8)

	go func() {
		var handleContainer = make(map[UID]map[SID]chan any)
		for handle := range handles {
			if _, ok := handleContainer[handle.UID]; !ok {
				handleContainer[handle.UID] = make(map[SID]chan any)
			}
			handleContainer[handle.UID][handle.SID] = handle.handle
		}
	}()

	r.GET("/sse", func(c *gin.Context) {
		var query = &struct {
			Id string `form:"id"`
		}{}

		err := c.ShouldBind(query)
		if err != nil {
			return
		}

		uid, err := uuid.Parse(query.Id)
		if err != nil {
			return
		}

		c.SSEvent("message", "hello world")
		c.Writer.Flush()

		// 现在让每个sse建立时传递UID,并为每个连接单独生成SID。
		// 在拥有追踪系统时,可以直接用TraceId作为SID。
		var handle = make(chan any, 1)
		handles <- &SSEHandle{
			UID:    UID(uid),
			SID:    SID(uuid.New()),
			handle: handle,
		}

		c.Stream(func(w io.Writer) bool {
			msg, ok := <-handle
			if ok {
				c.SSEvent("message", msg)
				c.Writer.Flush()
			}
			return ok
		})
	})

	r.POST("/H", func(context *gin.Context) {
		var msg = new(MsgH)
		err := context.BindJSON(&msg)
		if err != nil {
			return
		}
		stream <- msg
	})

	r.POST("/C", func(context *gin.Context) {
		var msg = new(MsgC)
		err := context.BindJSON(&msg)
		if err != nil {
			return
		}
		stream <- msg
	})

	_ = r.Run()
}