英文:
GoLang pubsub server stops listening to new channel subscriptions
问题
我在我的Go WebSocket应用程序中有一个Redis PubSub连接,所以每当一个客户端连接并订阅一个频道时,它会监听并发送消息。
然而,假设客户端1订阅了频道X
,PubSub开始监听并接收来自该频道的消息。
现在,客户端1还订阅了频道Y
,所以服务器也应该监听来自该频道的消息,但它停止监听X
,只监听Y
。
以下是一个示例日志输出:
go-1 | New Client is connected, total: 1
go-1 | 2022/02/16 17:36:03 signature is invalid
go-1 | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | New Client is connected, total: 2
go-1 | 2022/02/16 17:36:14 signature is invalid
go-1 | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1
go-1 | 2022/02/16 17:36:16 Received message from task_count
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
有什么想法是怎么回事?
根据评论进行了编辑:
type PubSub struct {
Clients []Client
Subscriptions []Subscription
}
type Client struct {
Id string
Connection *websocket.Conn
}
type Message struct {
Action string `json:"action"`
Topic string `json:"topic"`
Message json.RawMessage `json:"message"`
Token string `json:"token"`
}
type Subscription struct {
Topic string
Client *Client
UserId string
}
func (ps *PubSub) GetSubscriptions(topic string, client *Client) []Subscription {
var subscriptionList []Subscription
for _, subscription := range ps.Subscriptions {
if client != nil {
if subscription.Client.Id == client.Id && subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
} else {
if subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
}
}
return subscriptionList
}
这是我的WebSocket处理程序:
func websocketHandler(w http.ResponseWriter, r *http.Request) {
gRedisConn, err := gRedisConn()
if err != nil {
log.Panic(err)
}
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := pubsub.Client{
Id: autoId(),
Connection: conn,
}
// 将此客户端添加到列表中
ps.AddClient(client)
fmt.Println("New Client is connected, total: ", len(ps.Clients))
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.Println("Something went wrong", err)
ps.RemoveClient(client)
log.Println("total clients and subscriptions ", len(ps.Clients), len(ps.Subscriptions))
return
}
go listenToMessages()
ps.HandleReceiveMessage(client, messageType, p, gPubSubConn)
}
}
func (ps *PubSub) HandleReceiveMessage(client Client, messageType int, payload []byte, gPubSubConn *redis.PubSubConn) *PubSub {
m := Message{}
err := json.Unmarshal(payload, &m)
if err != nil {
fmt.Println("This is not correct message payload")
return ps
}
switch m.Action {
case PUBLISH:
ps.Publish(m.Topic, m.Message, nil)
case SUBSCRIBE:
ps.Subscribe(&client, m.Topic, gPubSubConn, m.Token)
case UNSUBSCRIBE:
fmt.Println("Client want to unsubscribe the topic", m.Topic, client.Id)
default:
break
}
return ps
}
func (ps *PubSub) Subscribe(client *Client, topic string, gPubSubConn *redis.PubSubConn, token string) *PubSub {
clientSubs := ps.GetSubscriptions(topic, client)
if len(clientSubs) > 0 {
return ps
}
userId := utils.GetUser(token)
newSubscription := Subscription{
Topic: topic,
Client: client,
UserId: userId,
}
ps.Subscriptions = append(ps.Subscriptions, newSubscription)
if err := gPubSubConn.Subscribe(topic); err != nil {
log.Panic(err)
}
return ps
}
希望对你有所帮助!如果还有其他问题,请随时提问。
英文:
I have a redis pubsub connection in my go websocket app, so whenever a client connects and subscribes to a channel, it listens and sends message.
However, say Client 1 is subscribed to channel X
, the pubsub starts listening and receiving messages from it.
Now, Client 1 also subscribes to channel Y
, so the server should also listen to messages from that channel, however it stops listening to X
and only to Y
.
for {
switch v := gPubSubConn.Receive().(type) {
case redis.Message:
log.Printf("Received message from %s", v.Channel)
subscriptions := ps.GetSubscriptions(v.Channel, nil)
for _, sub := range subscriptions {
if v.Channel == types.TaskResults {
go sendTaskResultMessage(v.Data, sub)
} else if v.Channel == types.TaskCount {
go sendTaskCountMessage(v.Data, sub)
}
}
case redis.Subscription:
log.Printf("Subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Println("Error pub/sub, delivery stopped")
return
}
Here's an example log output
go-1 | New Client is connected, total: 1
go-1 | 2022/02/16 17:36:03 signature is invalid
go-1 | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | New Client is connected, total: 2
go-1 | 2022/02/16 17:36:14 signature is invalid
go-1 | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1
go-1 | 2022/02/16 17:36:16 Received message from task_count
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
Any ideas what's going on?
Edited as per comment:
type PubSub struct {
Clients []Client
Subscriptions []Subscription
}
type Client struct {
Id string
Connection *websocket.Conn
}
type Message struct {
Action string `json:"action"`
Topic string `json:"topic"`
Message json.RawMessage `json:"message"`
Token string `json:"token"`
}
type Subscription struct {
Topic string
Client *Client
UserId string
}
func (ps *PubSub) GetSubscriptions(topic string, client *Client) []Subscription {
var subscriptionList []Subscription
for _, subscription := range ps.Subscriptions {
if client != nil {
if subscription.Client.Id == client.Id && subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
} else {
if subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
}
}
return subscriptionList
}
Here's my websocket handler
func websocketHandler(w http.ResponseWriter, r *http.Request) {
gRedisConn, err := gRedisConn()
if err != nil {
log.Panic(err)
}
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := pubsub.Client{
Id: autoId(),
Connection: conn,
}
// add this client into the list
ps.AddClient(client)
fmt.Println("New Client is connected, total: ", len(ps.Clients))
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.Println("Something went wrong", err)
ps.RemoveClient(client)
log.Println("total clients and subscriptions ", len(ps.Clients), len(ps.Subscriptions))
return
}
go listenToMessages()
ps.HandleReceiveMessage(client, messageType, p, gPubSubConn)
}
}
func (ps *PubSub) HandleReceiveMessage(client Client, messageType int, payload []byte, gPubSubConn *redis.PubSubConn) *PubSub {
m := Message{}
err := json.Unmarshal(payload, &m)
if err != nil {
fmt.Println("This is not correct message payload")
return ps
}
switch m.Action {
case PUBLISH:
ps.Publish(m.Topic, m.Message, nil)
case SUBSCRIBE:
ps.Subscribe(&client, m.Topic, gPubSubConn, m.Token)
case UNSUBSCRIBE:
fmt.Println("Client want to unsubscribe the topic", m.Topic, client.Id)
default:
break
}
return ps
}
func (ps *PubSub) Subscribe(client *Client, topic string, gPubSubConn *redis.PubSubConn, token string) *PubSub {
clientSubs := ps.GetSubscriptions(topic, client)
if len(clientSubs) > 0 {
return ps
}
userId := utils.GetUser(token)
newSubscription := Subscription{
Topic: topic,
Client: client,
UserId: userId,
}
ps.Subscriptions = append(ps.Subscriptions, newSubscription)
if err := gPubSubConn.Subscribe(topic); err != nil {
log.Panic(err)
}
return ps
}
答案1
得分: 4
立即的问题是由websocketHandler
中的这行代码引起的:
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
这行代码用新的连接替换了当前的发布订阅连接。新的连接没有任何订阅。之前的连接被泄漏。
在应用程序启动时创建发布订阅连接。
该应用程序至少存在一个数据竞争。使用竞争检测器运行应用程序,并修复报告的问题。
英文:
The immediate issue is caused by this line in websocketHandler
:
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
This line replaces the current pubsub connection with a new connection. The new connection does not have any subscriptions. The previous connection is leaked.
Create the pubsub connection once at application startup.
The application has at least one data race. Run the application with the race detector and fix the reported problems.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论