How to ensure redis subscriber receive message in Go (Golang)?

huangapple go评论94阅读模式
英文:

How to ensure redis subscriber receive message in Go (Golang)?

问题

我正在使用gin框架构建一个API服务器。总体上,我正在构建两个项目。项目'API'和项目'SOCKET'。项目'API'是主要的REST API,将在Android中使用,使用gin框架(golang)开发。而项目'SOCKET'是用于客户端的套接字服务器,将使用套接字连接,使用node.js(Socket.IO)。

流程如下:
用户A:作为请求者;A连接到"API"
用户B:作为响应者;B连接到"SOCKET"

用户A从Android调用APIrequestData,请求将由"API"项目处理。项目"API"将记录请求,并通过pubsub将其发布到redis中的new_request

以下是示例代码:

client := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379",
    Password: "", // 无密码
    DB:       0,  // 使用默认数据库
})

pong, err := client.Ping().Result()

fmt.Println(pong, err)

if err != nil {
    fmt.Println("err", err)
}

pubsub, err := client.Subscribe("responseclient")
if err != nil {
    panic(err)
}
defer pubsub.Close()

err = client.Publish("new_request", "Example New Request").Err()

if err != nil {
    panic(err)
}
msg, err := pubsub.ReceiveMessage()
if err != nil {
    panic(err)
}

fmt.Println(msg.Channel, msg.Payload)

在项目"SOCKET"中,有一个订阅者将监听每个发生的发布,并将新消息发布到responseclient频道。

以下是示例代码:

ioApp.on('connection', function(socket) {
    redisSub.on('new_request', function(channel, message) {
        console.log(channel + ':' + message);
        redisPub.publish("responseclient", JSON.stringify(res));
    });
})

如果用户B已连接到Socket.IO,这将正常工作。但是,如果用户B离线或未连接到socket.io,这将等待很长时间,直到我们手动终止或直到用户B上线。

我想问的是:

  1. 我们是否可以在redis pub/sub上创建类似于callback的东西?如果订阅者由于离线或其他原因而无法接受消息,我们关闭连接。这种可能吗?
  2. 在Node.js中,我知道可以使用超时函数,在一定时间内如果没有收到消息,将关闭订阅或触发任何事件,如何在golang中实现这一点?我需要通知用户A,如果用户B活动或离线,这样他就可以等待另一个时间来创建请求。
  3. 如果没有其他办法,你对我有什么建议?

希望我的问题清楚明了,并且能得到很好的回答。
*某些代码可能缺少变量。
** 我正在使用这个golang redis库:go-redis

英文:

I'm using gin framework to build an API server. In General, I'm build 2 projects. Project 'API' and Project 'SOCKET'. Project 'API' is the main REST API that will used in Android, developed using gin framework (golang). And Project 'SOCKET' is the socket server for client that will use socket connection , using node.js (Socket.IO)

The process begin like this : <br>
User A : as the requester ; A connect to &quot;API&quot; <br>
User B : as the responder ; B connect to &quot;SOCKET&quot;

User A call API requestData from android, the request will handled by &quot;API&quot;'s project. And Project &quot;API&quot; will record the request, and publish on redis
as new_request using pubsub

this is the code for example :

client := redis.NewClient(&amp;redis.Options{
    Addr:     &quot;localhost:6379&quot;,
    Password: &quot;&quot;, // no password set
    DB:       0,  // use default DB
})

pong, err := client.Ping().Result()

fmt.Println(pong, err)

 if err !=nil {
 	fmt.Println(&quot;err&quot;,err);
 }


pubsub, err := client.Subscribe(&quot;responseclient&quot;)
if err !=nil {
	panic(err)
}
defer pubsub.Close()

err = client.Publish(&quot;new_request&quot;, &quot;Example New Request&quot;).Err()

if err !=nil {
	panic(err)
}
msg, err :=pubsub.ReceiveMessage()
if err != nil {
    panic(err)
}

fmt.Println(msg.Channel, msg.Payload)

}

In Project &quot;SOCKET&quot; there is a subscriber that will listen every publish that occured, and publish new message to channel responseclient
this is for the example code :

ioApp.on (&#39;connection&#39; , function(socket) {
redisSub.on(&#39;new_request&#39;, function (channel, message) {
    console.log(channel + &#39;:&#39; + message);
		  	
  	redisPub.publish(&quot;responseclient&quot;, JSON.stringify(res));	

});	

})

This work smoothly, if User B is Connected to Socket.IO. But if User B was offline, or not connected to socket.io, this will waiting for long, until we kill manually or until User B is online

What i am asking for , are :

  1. Can we create something like a callback on redis pub/sub ? If the subscriber doesn't accept the message, due to off line, or something else , we close the connection. Is this possible ?
  2. In Node.Js i know i can use timeout function, that will close the subscribe or emit any event if on certain time, there was no message received, how to do this on golang ? I need to inform the User A if User B is active or offline, so he can wait for a another time to create request.
  3. If nothing can, what is your suggestion for me to do this ?

I hope my question , understandable, and can answered well.
<br>*Some code maybe, missing variable. <br>
** I'm using this library for golang redis : go-redis

答案1

得分: 1

1)Redis中没有回调函数。

2)在Go中实现超时的常见方法是使用通道和select语句,其中一个通道用于阻塞操作,另一个通道在超时时接收消息。可以在这里这里的文档找到相关示例。

至于第三点,你有几种方法可选。第一种是使用列表,从一侧推送(发布),从另一侧弹出(订阅)。对于接收者,你可以使用BRPOPBLPOP - 分别表示从右侧或左侧进行阻塞弹出。你可以将这两种方法结合起来实现持久化消息。

此外,PUBSUB的一部分还取决于你要发布到的目标。如果你发布到一个通道,只有在有用户连接并接收消息时才会有订阅者(因此该通道只有一个订阅者),你可以检查发布命令的响应。它会告诉你发布到了多少个客户端。如果该通道只被在线接收者订阅,你会得到一个'1',如果用户离线,则会得到一个'0'。

第三个示例是将消息存储在有序集合中,以时间戳作为分数。这样,接收者可以连接并获取自上次连接以来的消息 - 但这假设某个地方有一些持久性 - 通常是客户端。你还需要对有序集合进行一些清理操作。

在这种情况下,还有一些其他要考虑的事项,比如是否最终使用复制,如果是的话,你必须明确考虑故障转移 - 尽管在你描述的场景中,你可能希望考虑断开连接和重新连接。在我的可靠PUBSUB帖子中有具体的示例。

英文:
  1. There are no callbacks in Redis.

  2. The usual way to implement a timeout in Go is to use channels and select - where one is a channel where you do the blocking and another channel receives a message on timeout. Examples of that can be found here and here for the docs

Now for (3), you have some options on methods. The first is to use a list, pushing from one side (publishing) and popping from another (subscribing). For the receiver you wild use BRPOP of BLPOP - blocking pop from right or left respectively. You can combine the two to have persistent messaging.

Now part of PUBSUB also depends on what you are publishing to. If you are publishing to a channel that would have a subscriber if and only if there is a user connected to receive it (and thus one and only one subscriber to that channel), you can check the response from your publish command. It will tell you how many clients it was published to. If the channel is only subscribed to by an online receiver you would get a '1' back, and a '0' if the user was offline.

A third example is to store the messages in a sorted set, with the timestamp as the score. This would allow the receiver to connect and get messages from the last time it was connected - but that assumes some persistence of that somewhere - usually the client. You would also need some cleanup activity on the sorted sets.

Some other things to consider in this scenario is whether you eventually use replication, in which case you have to explicitly account for failovers - though really in the scenario you describe you'd want to account for disconnects and reconnects. There are specific examples of this at my post on reliable PUBSUB.

答案2

得分: 1

包 main

import (
"context"
"fmt"
"time"

"github.com/go-redis/redis/v8"

)

var ctx = context.Background()

func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // 没有设置密码
DB: 0, // 使用默认的数据库
})

subscribe := rdb.Subscribe(ctx, "hello")
subscriptions := subscribe.ChannelWithSubscriptions(ctx, 1)
go func() {
    var sentCount = 0
    for {
        rdb.Publish(ctx, "hello", time.Now().UnixNano())
        sentCount++
        if sentCount > 300 {
            break
        }
    }
}()
for {
    select {
    case sub := <-subscriptions:
        fmt.Println(sub)
    }
}

}

英文:
package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;time&quot;

	&quot;github.com/go-redis/redis/v8&quot;
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&amp;redis.Options{
		Addr:     &quot;localhost:6379&quot;,
		Password: &quot;&quot;, // no password set
		DB:       0,  // use default DB
	})

	subscribe := rdb.Subscribe(ctx, &quot;hello&quot;)
	subscriptions := subscribe.ChannelWithSubscriptions(ctx, 1)
	go func() {
		var sentCount = 0
		for  {
			rdb.Publish(ctx,&quot;hello&quot;,time.Now().UnixNano())
			sentCount++
			if sentCount &gt;300{
				break
			}
		}
	}()
	for  {
		select {
		case sub:=&lt;-subscriptions:
			fmt.Println(sub)
		}
	}



}

huangapple
  • 本文由 发表于 2016年12月17日 17:30:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/41197200.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定