GO语言使用NATS进行多队列订阅的队列排队

huangapple go评论82阅读模式
英文:

GO lang NATS Queueing with multiple Queue Subscribe

问题

我正在创建一个NATS Go语言队列订阅者客户端,代码如下:

nc.QueueSubscribe("foo", "my_queue", func(msg *nats.Msg) {
    log.Printf("Message: %s", string(msg.Data))
})

所以每当我发布任何消息到"foo"主题时,有时会接收到消息,有时不会。

例如,假设我向上述"foo"主题发送了10条消息,它可能会接收到2或3条。

我的要求如下:

  • 应该有队列订阅。
  • 所有输入事件都应该被处理。
  • 如何以并发模式实现队列订阅。

任何帮助都将不胜感激。

英文:

I am creating NATS go lang Queue Subscriber client as follows,

nc.QueueSubscribe("foo", "my_queue", func(msg *nats.Msg) {
		log.Printf("Message :%s", string(msg.Data))
})

So whenever i publish any message to "foo" subject then some time it is receiving and some time not.

e.g let say i sent 10 messages to above "foo" subject then it will receive 2 or 3 max.

My requirement is as follows,

  • There should be Queue Subscription.
  • All input events should be processed.
  • How to implement Queue Subscribe in concurrent mode.

Any help appreciated.

答案1

得分: 5

如果您使用相同名称(例如my_queue)启动多个队列订阅者,那么发布到“foo”主题的消息只会发送给其中一个队列订阅者。

根据您的陈述,我不确定您是否意味着队列订阅者有时会丢失消息。请记住一件事:NATS 中没有持久性(NATS Streaming 中有)。因此,如果您在订阅者创建之前发布消息,并且该主题上没有其他订阅者,那么这些消息将会丢失。

如果您正在进行实验,并且从一个连接启动队列订阅者,然后在同一个应用程序中从另一个连接发送消息,那么可能是服务器在开始接收消息之前没有注册队列订阅。如果是这种情况,您需要在创建订阅并开始发送之前刷新连接:nc.Flush()

最后,使用并发模式使用队列订阅者没有什么特别的。这就是它们的用途:为同一组的订阅者负载均衡处理相同主题上的消息。如果您在同一个应用程序中创建多个队列订阅者,唯一需要注意的是要么不共享消息处理程序,要么如果共享,则需要使用锁定,因为如果消息到达得足够快,消息处理程序将会并发调用。

英文:

If you start multiple queue subscribers with the same name (in your example my_queue), then a message published on "foo" goes to only one of those queue subscribers.

I am not sure from your statement if you imply that the queue subscriber sometimes misses messages or not. Keep in mind one thing: there is no persistence in NATS (there is in NATS Streaming). So if you publish messages before the subscriber is created, and if there is no other subscriber on that subject, the messages will be lost.

If you were experimenting and starting the queue subscriber from one connection and then in the same application sending messages from another connection, it is possible that the server did not register the queue subscription before it started to receive messages (again, if you were using 2 connections). If that is the case, you would need to flush the connection after creating the subscription and before starting sending: nc.Flush().

Finally, there is nothing special to use queue subscribers in concurrent mode. This is what they are for: load balancing processing of messages on the same subject for subscribers belonging to the same group. The only thing you have to be careful of if you are creating multiple queue subscribers in the same application is either to not share the message handler or if you do, you need to use locking since the message handler would be concurrently invoked if messages arrive fast enough.

huangapple
  • 本文由 发表于 2017年3月20日 19:33:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/42902515.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定