Go 源码学习( go-sse )

alexandrevicenzi-go-sse

上次学习了gossed 项目 https://github.com/benas/gossed

再来学习一下它用到的核心开源库:https://github.com/alexandrevicenzi/go-sse

这个项目也是一个Go语言实现SSE的库,按照README里面的说明:支持多通道隔离,广播,自定义headers,支持Last-Event-ID,遵循SSE规范

这个库主要由三个核心源文件 sse.goclient.gochannel.go 组成,分别完成相应功能,具体看一下代码。

client.go 中Client结构,收到http请求后创建:

type Client struct {
    lastEventId,
    channel string
    send chan *Message
}

client.go 中Channel结构,Channel由Client连接时创建:

type Channel struct {
    lastEventId,
    name string
    clients map[*Client]bool
}

message.go 中Message结构,这基本上就是SSE规范里面要传送的数据结构:

type Message struct {
    id,
    data,
    event string
    retry int
}

sse.go中server结构,实现ServeHTTP方法,这样就实现net/http包中的Handler接口:

type Server struct {
    options *Options
    channels map[string]*Channel
    addClient chan *Client
    removeClient chan *Client
    shutdown chan bool
    closeChannel chan string
}

实现ServeHTTP方法,注意,作者单建一个goroutine读取closeNotify chan阻塞住,收到http断开信号则写删除客户端的chan,并且如果有消息则发送消息:

func (s *Server) ServeHTTP(response http.ResponseWriter, request *http.Request) {
    ...
    if request.Method == "GET" {
        ...
        lastEventId := request.Header.Get("Last-Event-ID")
        c := NewClient(lastEventId, channelName)
        s.addClient <- c
        closeNotify := response.(http.CloseNotifier).CloseNotify()

        go func() {
            <-closeNotify
            s.removeClient <- c
        }()

        for msg := range c.send {
            ...
        }
    } else if request.Method != "OPTIONS" {
        response.WriteHeader(http.StatusMethodNotAllowed)
    }
}

server创建后,启动一个goroutine执行任务分发:

// NewServer creates a new SSE server.
func NewServer(options *Options) *Server {
    ...
    s := &Server{...}
    go s.dispatch()
    return s
}

dispatch(),利用读取三个chan来监控客户端连接、断开、服务停止:

func (s *Server) dispatch() {
    for {
        select {

        // New client connected.
        case c := <- s.addClient:
            ch, exists := s.channels[c.channel]

            if !exists {
                ch = NewChannel(c.channel)
                s.channels[ch.name] = ch
                log.Printf("go-sse: channel '%s' created.", ch.name)
            }

            ch.addClient(c)

        // Client disconnected.
        case c := <- s.removeClient:
            if ch, exists := s.channels[c.channel]; exists {
                ch.removeClient(c)

                log.Printf("go-sse: checking if channel '%s' has clients.", ch.name)

                if ch.ClientCount() == 0 {
                    delete(s.channels, ch.name)
                    ch.Close()
                    log.Printf("go-sse: channel '%s' has no clients.", ch.name)
                }
            }

        // Close channel and all clients in it.
        case channel := <- s.closeChannel:
            if ch, exists := s.channels[channel]; exists {
                delete(s.channels, channel)
                ch.Close()
            } else {
                log.Printf("go-sse: requested to close channel '%s', but it doesn't exists.", channel)
            }

        // Event Source shutdown.
        case <- s.shutdown:
            s.close()
            close(s.addClient)
            close(s.removeClient)
            close(s.closeChannel)
            close(s.shutdown)
            log.Printf("go-sse: server stoped.")
            return
        }
    }
}

最后,贴一下这个开源库的使用示例,后台Go程序:

func main() {
    s := sse.NewServer(nil)
    defer s.Shutdown()

    http.Handle("/", http.FileServer(http.Dir("./static")))
    http.Handle("/events/", s)

    go func () {
        for {
            s.SendMessage("/events/channel-1", sse.SimpleMessage(time.Now().String()))
            time.Sleep(5 * time.Second)
        }
    }()

    log.Println("Listening at :3000")
    http.ListenAndServe(":3000", nil)
}

前端HTML:

<!DOCTYPE html>
<html>
<head>
    <title>SSE Examples</title>
</head>
<body>
    <strong>Messages</strong>
    <br>
    <div id="messages"></div>

    <script type="text/javascript">
        e1 = new EventSource('/events/channel-1');
        e1.onmessage = function(event) {
            document.getElementById('messages').innerHTML += event.data + '<br>';
        };
    
</script>
</body>
</html>

最后,有关Last-Event-ID的相关细节还没太明白,另外示例中有一些导出方法没有给出使用例子,需要使用者自行阅读代码,例如从Server实例获取Channel和Client,然后对其相应处理等等。列出一些重要的方法,等我用到的时候再仔细研究吧。

func (s *Server) Channels() []string
func (s *Server) GetChannel(name string) (*Channel, bool)
func (s *Server) HasChannel(name string) bool
func (s *Server) CloseChannel(name string)
...
func (c *Channel) SendMessage(message *Message)
func (c *Channel) Close()
func (c *Channel) ClientCount() int
func (c *Channel) LastEventId() string
...
func (c *Client) SendMessage(message *Message)
func (c *Client) Channel() string
func (c *Client) LastEventId() string