Golang:如何确保Redis订阅者接收到Redis Pub/Sub中的所有消息?

huangapple go评论69阅读模式
英文:

Golang: how to ensure redis subscribers receive all messages from redis pubsub?

问题

我正在尝试在Redis中发布消息到动态生成的频道,并同时订阅现有频道中的所有消息。
以下代码似乎可以工作,但根据客户端(浏览器)请求的时间不同,可能无法接收到一些消息。

我尝试在select语句中使用了两个Go通道的"fan-in",但效果不好。

package main

import (
    ...
	"github.com/go-redis/redis/v8"
	"github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{}
var rd = redis.NewClient(&redis.Options{
	Addr: "localhost:6379",
})
var ctx = context.Background()

func echo(w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println("websocket connection err:", err)
		return
	}
	defer conn.Close()

	room := make(chan string)
	start := make(chan string)

	go func() {
	loop:
		for {
			sub := rd.Subscribe(ctx)
			defer sub.Close()
            
   			channels := []string{}

			for {
				select {
                //有时在执行此case时似乎无法接收到消息
				case channel := <-room:
					log.Println("channel", channel)
					channels = append(channels, channel)
					sub = rd.Subscribe(ctx, channels...)
					start <- "ok"
				case msg := <-sub.Channel():
					log.Println("msg", msg)
					err := conn.WriteMessage(websocket.TextMessage, []byte(msg.Payload))
					if err != nil {
						log.Println("websocket write err:", err)
						break loop
					}
				}
			}

		}
	}()

	for {
		_, msg, err := conn.ReadMessage()
		if err != nil {
			log.Println("websocket read err:", err)
			break
		}
		log.Println(string(msg))

		chPrefix := strings.Split(string(msg), ":")[0]
		ch := chPrefix + "-channel"
		if string(msg) == "test" || string(msg) == "yeah" {
			room <- ch
			log.Println(ch)
			log.Println(<-start)
		}

		if err := rd.Publish(ctx, ch, msg).Err(); err != nil {
			log.Println("redis publish err:", err)
			break
		}
	}

}

func main() {
	http.Handle("/", http.FileServer(http.Dir("./js")))
	http.HandleFunc("/ws", echo)

	log.Println("server starting...", "http://localhost:5000")
	log.Fatal(http.ListenAndServe("localhost:5000", nil))
}
英文:

I am trying to publish messages to dynamically generated channels in redis while subscribing all messages from the existing channels.
The following seems to work, but it fails to receive some messages depending on the timing of requests from the client (browser).

I tried "fan-in" for the two go channels in the select statement, but it did not work well.





package main

import (
    ...
	&quot;github.com/go-redis/redis/v8&quot;
	&quot;github.com/gorilla/websocket&quot;
)

var upgrader = websocket.Upgrader{}
var rd = redis.NewClient(&amp;redis.Options{
	Addr: &quot;localhost:6379&quot;,
})
var ctx = context.Background()

func echo(w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println(&quot;websocket connection err:&quot;, err)
		return
	}
	defer conn.Close()

	room := make(chan string)
	start := make(chan string)

	go func() {
	loop:
		for {
			sub := rd.Subscribe(ctx)
			defer sub.Close()
            
   			channels := []string{}

			for {
				select {
                //it seems that messages are not received when executing this case sometimes
				case channel := &lt;-room:
					log.Println(&quot;channel&quot;, channel)
					channels = append(channels, channel)
					sub = rd.Subscribe(ctx, channels...)
					start &lt;- &quot;ok&quot;
				case msg := &lt;-sub.Channel():
					log.Println(&quot;msg&quot;, msg)
					err := conn.WriteMessage(websocket.TextMessage, []byte(msg.Payload))
					if err != nil {
						log.Println(&quot;websocket write err:&quot;, err)
						break loop
					}
				}
			}

		}
	}()

	for {
		_, msg, err := conn.ReadMessage()
		if err != nil {
			log.Println(&quot;websocket read err:&quot;, err)
			break
		}
		log.Println(string(msg))

		chPrefix := strings.Split(string(msg), &quot;:&quot;)[0]
		ch := chPrefix + &quot;-channel&quot;
		if string(msg) == &quot;test&quot; || string(msg) == &quot;yeah&quot; {
			room &lt;- ch
			log.Println(ch)
			log.Println(&lt;-start)
		}

		if err := rd.Publish(ctx, ch, msg).Err(); err != nil {
			log.Println(&quot;redis publish err:&quot;, err)
			break
		}
	}

}

func main() {
	http.Handle(&quot;/&quot;, http.FileServer(http.Dir(&quot;./js&quot;)))
	http.HandleFunc(&quot;/ws&quot;, echo)

	log.Println(&quot;server starting...&quot;, &quot;http://localhost:5000&quot;)
	log.Fatal(http.ListenAndServe(&quot;localhost:5000&quot;, nil))
}

答案1

得分: 1

如果你说的是不想丢失任何消息,我建议你使用Redis Streams而不是发布/订阅。这将确保你不会错过任何消息,并且在必要时可以回溯流的历史记录。

这是一个使用Go、Streams和Websockets的示例,应该可以帮助你入门。

这是一个示例链接

英文:

If by all messages, you mean you do not wish to lose any messages, I would recommend using Redis Streams instead of pub/sub. This will ensure you are not missing messages and can go back on the stream history if necessary.

This is an example of using Go, Streams and websockets that should get you started in that direction

huangapple
  • 本文由 发表于 2022年2月1日 15:33:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/70936929.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定