英文:
Message still in nats limit queue after ack and term sent in Go
问题
我尝试为一个NATS限制队列编写了一个订阅者:
sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
return err
}
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
if errors.Is(err, nats.ErrSlowConsumer) {
log.Printf("慢消费者错误。等待重置...")
time.Sleep(50 * time.Millisecond)
continue
} else {
return err
}
}
msg.InProgress()
var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
msg.Term()
return err
}
actualSubject := message.Context.FullSubject()
handler, ok := callbacks[message.Context.Category]
if !ok {
msg.Nak()
continue
}
callback, err := handler(&message)
if err == nil {
msg.Ack()
msg.Term()
} else {
msg.Nak()
return err
}
callback(ctx)
这段代码的目标是消费一些主题上的任何消息,并调用与该主题相关联的回调函数。这段代码可以工作,但我遇到的问题是,如果handler
函数没有返回错误,我希望在调用handler
之后删除消息。我原以为msg.Term
就是做这个的,但我仍然在队列中看到所有的消息。
我最初是围绕一个工作队列设计的,但我希望它能与多个订阅者一起工作,所以我不得不重新设计它。有没有办法让这个工作?
英文:
I tried writing a subscriber for a NATS limit queue:
sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
return err
}
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
if errors.Is(err, nats.ErrSlowConsumer) {
log.Printf("Slow consumer error returned. Waiting for reset...")
time.Sleep(50 * time.Millisecond)
continue
} else {
return err
}
}
msg.InProgress()
var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
msg.Term()
return err
}
actualSubject := message.Context.FullSubject()
handler, ok := callbacks[message.Context.Category]
if !ok {
msg.Nak()
continue
}
callback, err := handler(&message)
if err == nil {
msg.Ack()
msg.Term()
} else {
msg.Nak()
return err
}
callback(ctx)
The goal of this code is consume any message on a number of subjects and call a callback function associated with the subject. This code works but the issue I'm running into is that I'd like the message to be deleted after the call to handler
if that function doesn't return an error. I thought that's what msg.Term
was doing but I still see all the messages in the queue.
I had originally designed this around a work queue but I wanted it to work with multiple subscribers so I had to redesign it. Is there any way to make this work?
答案1
得分: 1
根据提供的代码,我假设在使用JetStream库创建订阅时,你没有提供流和消费者信息。
在SubscribeSync
方法的文档中,它说当没有提供流和消费者信息时,库会创建一个临时消费者,并且消费者的名称由服务器选择。它还会尝试确定订阅所属的流。
以下是我认为在你的代码中发生的情况:
- 当你调用
SubscribeSync
方法时,会创建一个临时消费者,使用你提供的主题。 - 当调用
msg.Ack
和msg.Term
时,你确实对当前消费者确认了消息,但只对当前消费者有效。 - 下次调用
SubscribeSync
方法时,会创建一个新的临时消费者,其中包含你已经在另一个消费者上删除的消息。这就是JetStream中流、消费者和订阅的设计原理。
根据你想要实现的目标,这里有一些建议:
-
使用纯粹的NATS Core库来处理发布/订阅或队列。不要使用JetStream。NATS Core库直接使用主题,而JetStream库在内部创建其他内容(流和消费者),如果没有提供信息的话。
-
使用JetStream,但是自己通过代码或直接在NATS服务器上创建流和持久化消费者。这样,通过已定义的流和消费者,你应该能够按预期工作。
英文:
Based on the code provided, I assume that you are not providing stream and consumer info when creating a subscription with the JetStream library.
In the documentation for the SubscribeSync
method, it says that when stream and consumer information is not provided, the library will create an ephemeral consumer and the name of the consumer is picked by the server. It also attempts to figure out which stream the subscription is for.
Here is what I believe happens in your code:
- When you call the
SubscribeSync
method, an ephemeral consumer is created, with your provided topic. - When
msg.Ack
andmsg.Term
are called, you do acknowledge the message, but only for that current consumer. - The next time you call the
SubscribeSync
method, a new ephemeral consumer is created, containing the message that you already deleted on another consumer. Which is how the Jetstream concepts of streams, consumers, and subscriptions work by design.
Based on what you want to accomplish, here are some suggestions:
-
Use the plain NATS Core library to work with either a pub/sub or a queue. Don't use JetStream. The NATS Core library works with topics directly, whereas the Jetstream library creates additional things (streams and consumers) under the hood if the information is not provided.
-
Use JetStream but create a stream and a durable consumer yourself, either through code or directly on the NATS server. This way, with a stream and a consumer already defined, you should be able to make it work as intended.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论