如何在Golang中使用jscontext.newConsumer()创建NATS推送消费者

huangapple go评论67阅读模式
英文:

How to create NATS push consumer using jscontext.newConsumer() in Golang

问题

在创建js上下文AddConsumer(topic.ID(), &nats.ConsumerConfig{})时,创建了一个拉取消费者,是否有办法使用addconsumer创建一个推送消费者?

英文:

While creating js context AddConsumer(topic.ID(), &nats.ConsumerConfig{}) creating pull consumer, Is there any way to create a push consumer using addconsumer

答案1

得分: 1

文档对于这部分非常糟糕,错误信息也不够具体。我花了几个小时才弄清楚。以下是我找到的内容...

要使用AddConsumer创建一个推送消费者,你必须提供DeliverySubject参数,它可以是任何值,为了简单起见,可以与消费者名称相同。关于这部分的文档非常糟糕,没有解释应该如何使用它。

以下是一个完整的工作示例,它创建一个消费者并订阅来自该消费者的特定主题。

nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
	log.Fatalf("无法连接到nats:%s", err)
}

js, err := nc.JetStream()
if err != nil {
	log.Fatal("无法获取jetstream:", err)
}

streamName := "TEST_STREAM"
_, err = js.AddStream(&nats.StreamConfig{
	Name:      streamName,
	Subjects:  []string{"test.>"},
	Retention: nats.InterestPolicy,
	Replicas:  1,
})

if err != nil {
	fmt.Printf("创建流 %q 时出错:%s\n", streamName, err)
	return
}

groupName := "group-1"
consumerName := fmt.Sprintf("%s-consumer", groupName)
_, err = js.AddConsumer(
	streamName,
	&nats.ConsumerConfig{
		Name:           consumerName,
		Durable:        consumerName,
		DeliverSubject: consumerName,
		DeliverGroup:   groupName,
	},
)

if err != nil {
	fmt.Printf("创建消费者 %q 时出错:%s\n", consumerName, err)
	return
}

subject := "test.resource.created"
_, err = js.QueueSubscribe(
	subject,
	groupName,
	func(msg *nats.Msg) { 
		fmt.Printf("Jetstream - 收到 %q 的消息:%q\n", msg.Subject, msg.Data) 
	},
	nats.Bind(streamName, consumerName),
)

if err != nil {
	fmt.Printf("订阅 %q 时出错:%s\n", subject, err)
}

提供传递组是为了让多个应用实例订阅同一个消费者,事件只会被传递给其中一个实例(随机选择)。也就是说,如果你想要水平扩展应用程序。

如果你想要限制消费者接收的事件,那么你应该在消费者配置中提供SubjectFilter。每个消费者将持久化与其过滤器匹配的所有事件,如果没有提供过滤器,则持久化来自流的所有事件。

创建消费者后,你可以使用任何JetStream订阅方法(js.SubscribeSyncjs.QueueSubscribeSyncjs.Subscribejs.QueueSubscribe,...)订阅它,但你必须提供NATS选项将订阅与消费者绑定,使用nats.Bind(streamName, consumerName)来完成。

当订阅时,如果你想订阅消费者的所有事件,可以将主题名称留空。

英文:

The documentation is very bad about this part, and the errors are not very descriptive. I spent hours figuring it out. Here is what I found...

To create a push consumer using AddConsumer you must provide the DeliverySubject parameter, it can be anything, and for simplicity it can be the same as the consumer name. The documentation for this part is very bad and does not explain how it should be used.

The following is a full working example that creates a consumer and subscribes to certain subject from that consumer.

nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
	log.Fatalf("could not connect to nats: %s", err)
}

js, err := nc.JetStream()
if err != nil {
	log.Fatal("Could not get jetstream:", err)
}

streamName := "TEST_STREAM"
_, err = js.AddStream(&nats.StreamConfig{
	Name:      streamName,
	Subjects:  []string{"test.>"},
	Retention: nats.InterestPolicy,
	Replicas:  1,
})

if err != nil {
	fmt.Printf("error creating stream %q: %s\n", streamName, err)
	return
}

groupName := "group-1"
consumerName := fmt.Sprintf("%s-consumer", groupName)
_, err = js.AddConsumer(
	streamName,
	&nats.ConsumerConfig{
		Name:           consumerName,
		Durable:        consumerName,
		DeliverSubject: consumerName,
		DeliverGroup:   groupName,
	},
)

if err != nil {
	fmt.Printf("error creating consumer %q: %s\n", consumerName, err)
	return
}

subject := "test.resource.created"
_, err = js.QueueSubscribe(
	subject,
	groupName,
	func(msg *nats.Msg) { 
		fmt.Printf("Jetstream - received %q: %q\n", msg.Subject, msg.Data) 
	},
	nats.Bind(streamName, consumerName),
)

if err != nil {
	fmt.Printf("error subscribing to %q: %s\n", subject, err)
}

delivery group is provided so that several app instances can subscribe to the same consumer and the event will only be delivered to one of them (at random). That is, if you want to scale the app horizontally.

If you want to restrict what events the consumer receives. Then you should provide SubjectFilter to consumer config. Each consumer will persist all events that their filter matches, or all the events from the stream if no filter is provided.

After creating the consumer, You can then subscribe to it using any JetStream subscribe method (js.SubscribeSync, js.QueueSubscribeSync, js.Subscribe, js.QueueSubscribe, ...) but you MUST provide nats option to bind the subscription to a consumer, this is done using nats.Bind(streamName, consumerName)

You can leave the subject name an empty string when subscribing to subscribe to all events from the consumer.

答案2

得分: -1

创建一个消费者。

js.AddConsumer("ORDERS", &nats.ConsumerConfig{
    Durable: "MONITOR",
})

AddConsumer将一个消费者添加到流中。

AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)
英文:

Create a Consumer.

js.AddConsumer("ORDERS", &nats.ConsumerConfig{
	Durable: "MONITOR",
})

AddConsumer adds a consumer to a stream.

AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

huangapple
  • 本文由 发表于 2022年1月13日 19:48:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/70696087.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定