如何从NATS Jetstream服务器接收心跳消息

huangapple go评论87阅读模式
英文:

How to receive heart beat messages from NATS Jetstream server

问题

我有一个正在运行的本地NATS服务器,其中有一个名为EVENTS的流和几个主题。

在Golang中,我正在尝试使用idleHeartBeat选项订阅推送消费者,如这里所述。

我在网上搜索了很多,但找不到接收这些心跳的方法。最初,我以为我会像普通消息一样接收它们(从这里了解到),但是当运行下面的代码时,我只接收到我发布的消息,没有接收到心跳。即使在接收到我发布的所有消息并等待几秒钟后,也没有要发送的消息。

我是否需要进行其他配置?我应该从另一个订阅或特定主题中监听吗?

我非常感谢任何关于如何在Golang中接收这些心跳消息并以某种方式处理它们的指导。

这是我的代码:

func main() {

	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}

	nc, _ := nats.Connect(url)
	defer nc.Drain()

	js, _ := nc.JetStream()

	streamName := "EVENTS"

	js.AddStream(&nats.StreamConfig{
		Name:     streamName,
		Subjects: []string{"events.>"},
	})

	js.Subscribe("events.*", func(msg *nats.Msg) {
		fmt.Printf("monitor service subscribes from subject:%s\n", msg.Subject)
		fmt.Printf("received %q from my subscription\n", msg.Subject)
		msg.Ack()
	}, nats.IdleHeartbeat(1*time.Second), nats.DeliverLast(), nats.ManualAck())

	for {

	}
}

提前感谢!

英文:

I have a running local NATS server with a stream named EVENTS and several subjects.

In Golang, I am trying to subscribe with a push consumer using the idleHeartBeat option as described here.

I searched a lot online and could not find a way to receive these heart beats. Initially, I thought I was going to receive them as regular messages (as I understood from here), but when running the code below I only receive the messages I publish and no heart beats. This happens even though after receiving all messages I published and waiting a few seconds there are no messages to be sent anymore.

Is there any other configuration I should do? Should I listen from another subscription or specific subject?

I would really appreciate any guidance to how to receive these heart beats messages in Golang and process them in some way.

This is my code:

	func main() {

	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}

	nc, _ := nats.Connect(url)
	defer nc.Drain()

	js, _ := nc.JetStream()

	streamName := "EVENTS"

	js.AddStream(&nats.StreamConfig{
		Name:     streamName,
		Subjects: []string{"events.>"},
	})

	js.Subscribe("events.*", func(msg *nats.Msg) {
		fmt.Printf("monitor service subscribes from subject:%s\n", msg.Subject)
		fmt.Printf("received %q from my subscription\n", msg.Subject)
		msg.Ack()
	}, nats.IdleHeartbeat(1*time.Second), nats.DeliverLast(), nats.ManualAck())

	for {

	}
}

Thanks in advance!

答案1

得分: 1

在你提供的文章中,通过CLI创建消费者并订阅消费者是通过CLI完成的。在CLI中,客户端只接收所有消息,包括心跳消息。

NATS Go客户端的工作方式有所不同。当你调用Subscribe函数时,它会创建一个消费者(因为之前没有定义),并订阅该消费者。发布到流(和消费者)的消息将传递给你的消息处理程序,但心跳消息会自动处理(请查看源代码)。这也在你提供的文档中有描述:

> 注意,这个心跳机制由支持的客户端透明地处理,不需要应用程序处理。

总结一下,CLI客户端显示订阅主题中的所有消息,而NATS Go客户端会自动处理心跳消息,你在消息处理程序中看不到它们。

英文:

In the article you linked, the creation of the consumer and the subscription to the consumer is made through the CLI. In the CLI, the client just receives all messages, even the heartbeats.

The NATS Go client works differently. When you call the Subscribe function, it creates a consumer (since you didn't define it previously) and subscribes to it. The messages published to the stream (and the consumer) are delivered to your message handlers, but the heartbeats are handled automatically (check the source code). This is also described in the documentation you linked:

> Note that this heartbeat mechanism is all handled transparently by supported clients and does not need to be handled by the application.

To sum up, the CLI client shows all the messages coming through the subscribed topic, whereas the NATS Go client handles the heartbeats automatically and you don't see them in your message handlers.

huangapple
  • 本文由 发表于 2022年10月19日 16:58:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/74122560.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定