在Go中发送确认和终止后,消息仍然在NATS限制队列中。

huangapple go评论79阅读模式
英文:

Message still in nats limit queue after ack and term sent in Go

问题

我尝试为一个NATS限制队列编写了一个订阅者:

sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
	return err
}

msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
	if errors.Is(err, nats.ErrSlowConsumer) {
		log.Printf("慢消费者错误。等待重置...")
		time.Sleep(50 * time.Millisecond)
		continue
	} else {
		return err
	}
}

msg.InProgress()

var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
	msg.Term()
	return err
}

actualSubject := message.Context.FullSubject()
handler, ok := callbacks[message.Context.Category]
if !ok {
	msg.Nak()
	continue
}

callback, err := handler(&message)
if err == nil {
	msg.Ack()
	msg.Term()
} else {
	msg.Nak()
	return err
}

callback(ctx)

这段代码的目标是消费一些主题上的任何消息,并调用与该主题相关联的回调函数。这段代码可以工作,但我遇到的问题是,如果handler函数没有返回错误,我希望在调用handler之后删除消息。我原以为msg.Term就是做这个的,但我仍然在队列中看到所有的消息。

我最初是围绕一个工作队列设计的,但我希望它能与多个订阅者一起工作,所以我不得不重新设计它。有没有办法让这个工作?

英文:

I tried writing a subscriber for a NATS limit queue:

sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
	return err
}

msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
	if errors.Is(err, nats.ErrSlowConsumer) {
		log.Printf("Slow consumer error returned. Waiting for reset...")
		time.Sleep(50 * time.Millisecond)
		continue
	} else {
		return err
	}
}

msg.InProgress()

var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
	msg.Term()
	return err
}

actualSubject := message.Context.FullSubject()
handler, ok := callbacks[message.Context.Category]
if !ok {
	msg.Nak()
	continue
}

callback, err := handler(&message)
if err == nil {
	msg.Ack()
	msg.Term()
} else {
	msg.Nak()
	return err
}

callback(ctx)

The goal of this code is consume any message on a number of subjects and call a callback function associated with the subject. This code works but the issue I'm running into is that I'd like the message to be deleted after the call to handler if that function doesn't return an error. I thought that's what msg.Term was doing but I still see all the messages in the queue.

I had originally designed this around a work queue but I wanted it to work with multiple subscribers so I had to redesign it. Is there any way to make this work?

答案1

得分: 1

根据提供的代码,我假设在使用JetStream库创建订阅时,你没有提供流和消费者信息。

SubscribeSync方法的文档中,它说当没有提供流和消费者信息时,库会创建一个临时消费者,并且消费者的名称由服务器选择。它还会尝试确定订阅所属的流。

以下是我认为在你的代码中发生的情况:

  • 当你调用SubscribeSync方法时,会创建一个临时消费者,使用你提供的主题。
  • 当调用msg.Ackmsg.Term时,你确实对当前消费者确认了消息,但只对当前消费者有效。
  • 下次调用SubscribeSync方法时,会创建一个新的临时消费者,其中包含你已经在另一个消费者上删除的消息。这就是JetStream中流、消费者和订阅的设计原理。

根据你想要实现的目标,这里有一些建议:

  • 使用纯粹的NATS Core库来处理发布/订阅或队列。不要使用JetStream。NATS Core库直接使用主题,而JetStream库在内部创建其他内容(流和消费者),如果没有提供信息的话。

  • 使用JetStream,但是自己通过代码或直接在NATS服务器上创建流和持久化消费者。这样,通过已定义的流和消费者,你应该能够按预期工作。

英文:

Based on the code provided, I assume that you are not providing stream and consumer info when creating a subscription with the JetStream library.

In the documentation for the SubscribeSync method, it says that when stream and consumer information is not provided, the library will create an ephemeral consumer and the name of the consumer is picked by the server. It also attempts to figure out which stream the subscription is for.

Here is what I believe happens in your code:

  • When you call the SubscribeSync method, an ephemeral consumer is created, with your provided topic.
  • When msg.Ack and msg.Term are called, you do acknowledge the message, but only for that current consumer.
  • The next time you call the SubscribeSync method, a new ephemeral consumer is created, containing the message that you already deleted on another consumer. Which is how the Jetstream concepts of streams, consumers, and subscriptions work by design.

Based on what you want to accomplish, here are some suggestions:

  • Use the plain NATS Core library to work with either a pub/sub or a queue. Don't use JetStream. The NATS Core library works with topics directly, whereas the Jetstream library creates additional things (streams and consumers) under the hood if the information is not provided.

  • Use JetStream but create a stream and a durable consumer yourself, either through code or directly on the NATS server. This way, with a stream and a consumer already defined, you should be able to make it work as intended.

huangapple
  • 本文由 发表于 2022年9月13日 15:57:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/73699403.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定