GoLang pubsub 服务器停止监听新的频道订阅。

huangapple go评论78阅读模式
英文:

GoLang pubsub server stops listening to new channel subscriptions

问题

我在我的Go WebSocket应用程序中有一个Redis PubSub连接,所以每当一个客户端连接并订阅一个频道时,它会监听并发送消息。

然而,假设客户端1订阅了频道X,PubSub开始监听并接收来自该频道的消息。

现在,客户端1还订阅了频道Y,所以服务器也应该监听来自该频道的消息,但它停止监听X,只监听Y

以下是一个示例日志输出:

go-1  | New Client is connected, total:  1
go-1  | 2022/02/16 17:36:03 signature is invalid
go-1  | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1
go-1  | 2022/02/16 17:36:06 Received message from task_count
go-1  | 2022/02/16 17:36:06 Received message from task_count
go-1  | New Client is connected, total:  2
go-1  | 2022/02/16 17:36:14 signature is invalid
go-1  | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1
go-1  | 2022/02/16 17:36:16 Received message from task_count
go-1  | 2022/02/16 17:36:16 Received message from task_results
go-1  | 2022/02/16 17:36:16 Received message from task_results
go-1  | 2022/02/16 17:36:21 Received message from task_results
go-1  | 2022/02/16 17:36:21 Received message from task_results
go-1  | 2022/02/16 17:36:26 Received message from task_results
go-1  | 2022/02/16 17:36:26 Received message from task_results
go-1  | 2022/02/16 17:36:31 Received message from task_results
go-1  | 2022/02/16 17:36:31 Received message from task_results

有什么想法是怎么回事?

根据评论进行了编辑:

type PubSub struct {
	Clients       []Client
	Subscriptions []Subscription
}

type Client struct {
	Id         string
	Connection *websocket.Conn
}

type Message struct {
	Action  string          `json:"action"`
	Topic   string          `json:"topic"`
	Message json.RawMessage `json:"message"`
	Token   string          `json:"token"`
}

type Subscription struct {
	Topic  string
	Client *Client
	UserId string
}

func (ps *PubSub) GetSubscriptions(topic string, client *Client) []Subscription {
	var subscriptionList []Subscription

	for _, subscription := range ps.Subscriptions {
		if client != nil {
			if subscription.Client.Id == client.Id && subscription.Topic == topic {
				subscriptionList = append(subscriptionList, subscription)
			}
		} else {
			if subscription.Topic == topic {
				subscriptionList = append(subscriptionList, subscription)
			}
		}
	}
	return subscriptionList
}

这是我的WebSocket处理程序:

func websocketHandler(w http.ResponseWriter, r *http.Request) {
	gRedisConn, err := gRedisConn()
	if err != nil {
		log.Panic(err)
	}
	gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
	upgrader.CheckOrigin = func(r *http.Request) bool {
		return true
	}

	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}

	client := pubsub.Client{
		Id:         autoId(),
		Connection: conn,
	}

	// 将此客户端添加到列表中
	ps.AddClient(client)

	fmt.Println("New Client is connected, total: ", len(ps.Clients))

	for {
		messageType, p, err := conn.ReadMessage()
		if err != nil {
			log.Println("Something went wrong", err)
			ps.RemoveClient(client)
			log.Println("total clients and subscriptions ", len(ps.Clients), len(ps.Subscriptions))
			return
		}
		go listenToMessages()
		ps.HandleReceiveMessage(client, messageType, p, gPubSubConn)
	}
}

func (ps *PubSub) HandleReceiveMessage(client Client, messageType int, payload []byte, gPubSubConn *redis.PubSubConn) *PubSub {
	m := Message{}
	err := json.Unmarshal(payload, &m)
	if err != nil {
		fmt.Println("This is not correct message payload")
		return ps
	}

	switch m.Action {
	case PUBLISH:
		ps.Publish(m.Topic, m.Message, nil)
	case SUBSCRIBE:
		ps.Subscribe(&client, m.Topic, gPubSubConn, m.Token)
	case UNSUBSCRIBE:
		fmt.Println("Client want to unsubscribe the topic", m.Topic, client.Id)
	default:
		break
	}

	return ps
}

func (ps *PubSub) Subscribe(client *Client, topic string, gPubSubConn *redis.PubSubConn, token string) *PubSub {
	clientSubs := ps.GetSubscriptions(topic, client)
	if len(clientSubs) > 0 {
		return ps
	}

	userId := utils.GetUser(token)
	newSubscription := Subscription{
		Topic:  topic,
		Client: client,
		UserId: userId,
	}

	ps.Subscriptions = append(ps.Subscriptions, newSubscription)

	if err := gPubSubConn.Subscribe(topic); err != nil {
		log.Panic(err)
	}

	return ps
}

希望对你有所帮助!如果还有其他问题,请随时提问。

英文:

I have a redis pubsub connection in my go websocket app, so whenever a client connects and subscribes to a channel, it listens and sends message.
However, say Client 1 is subscribed to channel X, the pubsub starts listening and receiving messages from it.

Now, Client 1 also subscribes to channel Y, so the server should also listen to messages from that channel, however it stops listening to X and only to Y.

    for {
        switch v := gPubSubConn.Receive().(type) {
        case redis.Message:
            log.Printf("Received message from %s", v.Channel)
            subscriptions := ps.GetSubscriptions(v.Channel, nil)
            for _, sub := range subscriptions {
                if v.Channel == types.TaskResults {
                    go sendTaskResultMessage(v.Data, sub)
                } else if v.Channel == types.TaskCount {
                    go sendTaskCountMessage(v.Data, sub)
                }
            }
        case redis.Subscription:
            log.Printf("Subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)
        case error:
            log.Println("Error pub/sub, delivery stopped")
            return
        }

Here's an example log output

go-1  | New Client is connected, total:  1
go-1  | 2022/02/16 17:36:03 signature is invalid
go-1  | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1
go-1  | 2022/02/16 17:36:06 Received message from task_count
go-1  | 2022/02/16 17:36:06 Received message from task_count
go-1  | New Client is connected, total:  2
go-1  | 2022/02/16 17:36:14 signature is invalid
go-1  | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1
go-1  | 2022/02/16 17:36:16 Received message from task_count
go-1  | 2022/02/16 17:36:16 Received message from task_results
go-1  | 2022/02/16 17:36:16 Received message from task_results
go-1  | 2022/02/16 17:36:21 Received message from task_results
go-1  | 2022/02/16 17:36:21 Received message from task_results
go-1  | 2022/02/16 17:36:26 Received message from task_results
go-1  | 2022/02/16 17:36:26 Received message from task_results
go-1  | 2022/02/16 17:36:31 Received message from task_results
go-1  | 2022/02/16 17:36:31 Received message from task_results

Any ideas what's going on?

Edited as per comment:

type PubSub struct {
	Clients       []Client
	Subscriptions []Subscription
}

type Client struct {
	Id         string
	Connection *websocket.Conn
}

type Message struct {
	Action  string          `json:"action"`
	Topic   string          `json:"topic"`
	Message json.RawMessage `json:"message"`
	Token   string          `json:"token"`
}

type Subscription struct {
	Topic  string
	Client *Client
	UserId string
}

func (ps *PubSub) GetSubscriptions(topic string, client *Client) []Subscription {
	var subscriptionList []Subscription

	for _, subscription := range ps.Subscriptions {
		if client != nil {
			if subscription.Client.Id == client.Id && subscription.Topic == topic {
				subscriptionList = append(subscriptionList, subscription)
			}
		} else {
			if subscription.Topic == topic {
				subscriptionList = append(subscriptionList, subscription)
			}
		}
	}
	return subscriptionList
}

Here's my websocket handler

func websocketHandler(w http.ResponseWriter, r *http.Request) {
	gRedisConn, err := gRedisConn()
	if err != nil {
		log.Panic(err)
	}
	gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
	upgrader.CheckOrigin = func(r *http.Request) bool {
		return true

	}
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}
	client := pubsub.Client{
		Id:         autoId(),
		Connection: conn,
	}

	// add this client into the list
	ps.AddClient(client)

	fmt.Println("New Client is connected, total: ", len(ps.Clients))

	for {
		messageType, p, err := conn.ReadMessage()
		if err != nil {
			log.Println("Something went wrong", err)
			ps.RemoveClient(client)
			log.Println("total clients and subscriptions ", len(ps.Clients), len(ps.Subscriptions))
			return
		}
		go listenToMessages()
		ps.HandleReceiveMessage(client, messageType, p, gPubSubConn)
	}
}
func (ps *PubSub) HandleReceiveMessage(client Client, messageType int, payload []byte, gPubSubConn *redis.PubSubConn) *PubSub {
	m := Message{}
	err := json.Unmarshal(payload, &m)
	if err != nil {
		fmt.Println("This is not correct message payload")
		return ps
	}
	switch m.Action {
	case PUBLISH:
		ps.Publish(m.Topic, m.Message, nil)
	case SUBSCRIBE:
		ps.Subscribe(&client, m.Topic, gPubSubConn, m.Token)
	case UNSUBSCRIBE:
		fmt.Println("Client want to unsubscribe the topic", m.Topic, client.Id)
	default:
		break
	}

	return ps
}
func (ps *PubSub) Subscribe(client *Client, topic string, gPubSubConn *redis.PubSubConn, token string) *PubSub {
	clientSubs := ps.GetSubscriptions(topic, client)
	if len(clientSubs) > 0 {
		return ps
	}
	userId := utils.GetUser(token)
	newSubscription := Subscription{
		Topic:  topic,
		Client: client,
		UserId: userId,
	}
	ps.Subscriptions = append(ps.Subscriptions, newSubscription)
	if err := gPubSubConn.Subscribe(topic); err != nil {
		log.Panic(err)
	}
	return ps
}

答案1

得分: 4

立即的问题是由websocketHandler中的这行代码引起的:

gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}

这行代码用新的连接替换了当前的发布订阅连接。新的连接没有任何订阅。之前的连接被泄漏。

在应用程序启动时创建发布订阅连接。

该应用程序至少存在一个数据竞争。使用竞争检测器运行应用程序,并修复报告的问题。

英文:

The immediate issue is caused by this line in websocketHandler:

gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}

This line replaces the current pubsub connection with a new connection. The new connection does not have any subscriptions. The previous connection is leaked.

Create the pubsub connection once at application startup.

The application has at least one data race. Run the application with the race detector and fix the reported problems.

huangapple
  • 本文由 发表于 2022年2月17日 01:37:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/71146678.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定