Golang每个用户的服务器发送事件

huangapple go评论85阅读模式
英文:

Golang Server Sent Events Per User

问题

我已经使用Go一段时间了,但从未使用过SSE。我遇到了一个问题,有人能提供一个工作示例吗?这个示例只会发送给特定的用户(连接)。

我正在使用gorilla-sessions进行身份验证,我想使用UserID来区分连接。

或者我应该使用每5秒轮询的Ajax方式吗?

非常感谢

以下是我找到并尝试过的内容:

  1. https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c 它不会发送给单个用户,并且如果连接关闭,go函数不会停止。

  2. https://github.com/striversity/gotr/blob/master/010-server-sent-event-part-2/main.go 这与我所需的有点相似,但一旦连接被移除,它就无法跟踪。所以,一旦你关闭并在私密窗口中打开浏览器,它就完全不起作用了。而且,像上面一样,go例程会一直运行。

英文:

I've been working with Go for some time but never done SSE before. I'm having an issue, can someone PLEASE provide with a working example of server sent events that will only send to a specific user(connection).

I'm using a gorilla - sessions to authenticate and I would like to use UserID to separate connections.

Or should I use 5 second polling via Ajax?

Many thanks

Here is what i found and tried:

  1. https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c it doenst send to an individual user and the go func wont stop if the connection is closed

  2. https://github.com/striversity/gotr/blob/master/010-server-sent-event-part-2/main.go this is kind of what i need but it doesnt track once the connection is removed. So now, once you close and open the browser in private window it's not working at all. Also, as above, the go routine keeps going.

答案1

得分: 0

创建一个“broker”来将消息分发给连接的用户:

type Broker struct {
    // users是一个映射,其中键是用户ID,值是连接的通道切片
    users map[string][]chan []byte

    // actions是一个函数通道,用于在broker的goroutine中调用
    // broker在单个goroutine中执行所有操作,以避免数据竞争
    actions chan func()
}

// run在一个goroutine中执行。它简单地获取并调用函数
func (b *Broker) run() {
    for a := range b.actions {
        a()
    }
}

func newBroker() *Broker {
    b := &Broker{
        users:   make(map[string][]chan []byte),
        actions: make(chan func()),
    }
    go b.run()
    return b
}

// addUserChan为具有给定ID的用户添加一个通道
func (b *Broker) addUserChan(id string, ch chan []byte) {
    b.actions <- func() {
        b.users[id] = append(b.users[id], ch)
    }
}

// removeUserchan从具有给定ID的用户中删除一个通道
func (b *Broker) removeUserChan(id string, ch chan []byte) {
    // broker可能正在尝试发送到ch,但没有接收者。将ch传送到防止broker被卡住
    go func() { for range ch {} }()

    b.actions <- func() {
        chs := b.users[id]
        i := 0
        for _, c := range chs {
            if c != ch {
                chs[i] = c
                i = i + 1
            }
        }
        if i == 0 {
            delete(b.users, id)
        } else {
            b.users[id] = chs[:i]
        }
        // 关闭通道以打破removeUserChan开头的循环
        // 必须在broker的goroutine中执行此操作
        // 以确保broker不会发送到已关闭的goroutine
        close(ch)
    }
}

// sendToUser向给定用户ID的所有通道发送消息
func (b *Broker) sendToUser(id string, data []byte) {
    b.actions <- func() {
        for _, ch := range b.users[id] {
            ch <- data
        }
    }
}

在包级别声明一个具有broker的变量:

var broker = newBroker()

使用broker编写SSE端点:

func sseEndpoint(w http.ResponseWriter, r *http.Request) {
    // 假设用户ID在查询字符串中,这里是示例
    // 您应该使用您的身份验证代码来获取ID
    id := r.FormValue("id")

    // 进行常规的SSE设置
    flusher := w.(http.Flusher)
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    // 创建用于接收此连接的消息的通道。
    // 将该通道注册到broker中。
    // 在函数返回时,从broker中删除该通道。
    ch := make(chan []byte)
    broker.addUserChan(id, ch)
    defer broker.removeUserChan(id, ch)
    for {
        select {
        case <-r.Context().Done():
            // 用户关闭了连接,我们结束
            return
        case m := <-ch:
            // 我们收到了一条消息,进行常规的SSE操作
            fmt.Fprintf(w, "data: %s\n\n", m)
            flusher.Flush()
        }
    }
}

在您的应用程序中添加代码来调用Broker.sendToUser。

英文:

Create a "broker" to distribute messages to connected users:

type Broker struct {
// users is a map where the key is the user id
// and the value is a slice of channels to connections
// for that user id
users map[string][]chan []byte
// actions is a channel of functions to call
// in the broker&#39;s goroutine. The broker executes
// everything in that single goroutine to avoid
// data races.
actions chan func()
}
// run executes in a goroutine. It simply gets and 
// calls functions.
func (b *Broker) run() {
for a := range b.actions {
a()
}
}
func newBroker() *Broker {
b := &amp;Broker{
users:   make(map[string][]chan []byte),
actions: make(chan func()),
}
go b.run()
return b
}
// addUserChan adds a channel for user with given id.
func (b *Broker) addUserChan(id string, ch chan []byte) {
b.actions &lt;- func() {
b.users[id] = append(b.users[id], ch)
}
}
// removeUserchan removes a channel for a user with the given id.
func (b *Broker) removeUserChan(id string, ch chan []byte) {
// The broker may be trying to send to 
// ch, but nothing is receiving. Pump ch
// to prevent broker from getting stuck.
go func() { for range ch {} }()
b.actions &lt;- func() {
chs := b.users[id]
i := 0
for _, c := range chs {
if c != ch {
chs[i] = c
i = i + 1
}
}
if i == 0 {
delete(b.users, id)
} else {
b.users[id] = chs[:i]
}
// Close channel to break loop at beginning
// of removeUserChan.
// This must be done in broker goroutine
// to ensure that broker does not send to
// closed goroutine.
close(ch)
}
}
// sendToUser sends a message to all channels for the given user id.
func (b *Broker) sendToUser(id string, data []byte) {
b.actions &lt;- func() {
for _, ch := range b.users[id] {
ch &lt;- data
}
}
}

Declare a variable with the broker at package-level:

 var broker = newBroker()

Write the SSE endpoint using the broker:

func sseEndpoint(w http.ResponseWriter, r *http.Request) {
// I assume that user id is in query string for this example,
// You should use your authentication code to get the id.
id := r.FormValue(&quot;id&quot;)
// Do the usual SSE setup.
flusher := w.(http.Flusher)
w.Header().Set(&quot;Content-Type&quot;, &quot;text/event-stream&quot;)
w.Header().Set(&quot;Cache-Control&quot;, &quot;no-cache&quot;)
w.Header().Set(&quot;Connection&quot;, &quot;keep-alive&quot;)
// Create channel to receive messages for this connection.  
// Register that channel with the broker.
// On return from the function, remove the channel
// from the broker.
ch := make(chan []byte)
broker.addUserChan(id, ch)
defer broker.removeUserChan(id, ch)
for {
select {
case &lt;-r.Context().Done():
// User closed the connection. We are out of here.
return
case m := &lt;-ch:
// We got a message. Do the usual SSE stuff.
fmt.Fprintf(w, &quot;data: %s\n\n&quot;, m)
flusher.Flush()
}
}
}

Add code to your application to call Broker.sendToUser.

huangapple
  • 本文由 发表于 2022年11月20日 11:31:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/74505447.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定