英文:
How to create NATS push consumer using jscontext.newConsumer() in Golang
问题
在创建js上下文AddConsumer(topic.ID(), &nats.ConsumerConfig{})时,创建了一个拉取消费者,是否有办法使用addconsumer创建一个推送消费者?
英文:
While creating js context AddConsumer(topic.ID(), &nats.ConsumerConfig{}) creating pull consumer, Is there any way to create a push consumer using addconsumer
答案1
得分: 1
文档对于这部分非常糟糕,错误信息也不够具体。我花了几个小时才弄清楚。以下是我找到的内容...
要使用AddConsumer
创建一个推送消费者,你必须提供DeliverySubject
参数,它可以是任何值,为了简单起见,可以与消费者名称相同。关于这部分的文档非常糟糕,没有解释应该如何使用它。
以下是一个完整的工作示例,它创建一个消费者并订阅来自该消费者的特定主题。
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatalf("无法连接到nats:%s", err)
}
js, err := nc.JetStream()
if err != nil {
log.Fatal("无法获取jetstream:", err)
}
streamName := "TEST_STREAM"
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"test.>"},
Retention: nats.InterestPolicy,
Replicas: 1,
})
if err != nil {
fmt.Printf("创建流 %q 时出错:%s\n", streamName, err)
return
}
groupName := "group-1"
consumerName := fmt.Sprintf("%s-consumer", groupName)
_, err = js.AddConsumer(
streamName,
&nats.ConsumerConfig{
Name: consumerName,
Durable: consumerName,
DeliverSubject: consumerName,
DeliverGroup: groupName,
},
)
if err != nil {
fmt.Printf("创建消费者 %q 时出错:%s\n", consumerName, err)
return
}
subject := "test.resource.created"
_, err = js.QueueSubscribe(
subject,
groupName,
func(msg *nats.Msg) {
fmt.Printf("Jetstream - 收到 %q 的消息:%q\n", msg.Subject, msg.Data)
},
nats.Bind(streamName, consumerName),
)
if err != nil {
fmt.Printf("订阅 %q 时出错:%s\n", subject, err)
}
提供传递组是为了让多个应用实例订阅同一个消费者,事件只会被传递给其中一个实例(随机选择)。也就是说,如果你想要水平扩展应用程序。
如果你想要限制消费者接收的事件,那么你应该在消费者配置中提供SubjectFilter
。每个消费者将持久化与其过滤器匹配的所有事件,如果没有提供过滤器,则持久化来自流的所有事件。
创建消费者后,你可以使用任何JetStream订阅方法(js.SubscribeSync
,js.QueueSubscribeSync
,js.Subscribe
,js.QueueSubscribe
,...)订阅它,但你必须提供NATS选项将订阅与消费者绑定,使用nats.Bind(streamName, consumerName)
来完成。
当订阅时,如果你想订阅消费者的所有事件,可以将主题名称留空。
英文:
The documentation is very bad about this part, and the errors are not very descriptive. I spent hours figuring it out. Here is what I found...
To create a push consumer using AddConsumer
you must provide the DeliverySubject
parameter, it can be anything, and for simplicity it can be the same as the consumer name. The documentation for this part is very bad and does not explain how it should be used.
The following is a full working example that creates a consumer and subscribes to certain subject from that consumer.
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatalf("could not connect to nats: %s", err)
}
js, err := nc.JetStream()
if err != nil {
log.Fatal("Could not get jetstream:", err)
}
streamName := "TEST_STREAM"
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"test.>"},
Retention: nats.InterestPolicy,
Replicas: 1,
})
if err != nil {
fmt.Printf("error creating stream %q: %s\n", streamName, err)
return
}
groupName := "group-1"
consumerName := fmt.Sprintf("%s-consumer", groupName)
_, err = js.AddConsumer(
streamName,
&nats.ConsumerConfig{
Name: consumerName,
Durable: consumerName,
DeliverSubject: consumerName,
DeliverGroup: groupName,
},
)
if err != nil {
fmt.Printf("error creating consumer %q: %s\n", consumerName, err)
return
}
subject := "test.resource.created"
_, err = js.QueueSubscribe(
subject,
groupName,
func(msg *nats.Msg) {
fmt.Printf("Jetstream - received %q: %q\n", msg.Subject, msg.Data)
},
nats.Bind(streamName, consumerName),
)
if err != nil {
fmt.Printf("error subscribing to %q: %s\n", subject, err)
}
delivery group is provided so that several app instances can subscribe to the same consumer and the event will only be delivered to one of them (at random). That is, if you want to scale the app horizontally.
If you want to restrict what events the consumer receives. Then you should provide SubjectFilter
to consumer config. Each consumer will persist all events that their filter matches, or all the events from the stream if no filter is provided.
After creating the consumer, You can then subscribe to it using any JetStream subscribe method (js.SubscribeSync
, js.QueueSubscribeSync
, js.Subscribe
, js.QueueSubscribe
, ...) but you MUST provide nats option to bind the subscription to a consumer, this is done using nats.Bind(streamName, consumerName)
You can leave the subject name an empty string when subscribing to subscribe to all events from the consumer.
答案2
得分: -1
创建一个消费者。
js.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "MONITOR",
})
AddConsumer将一个消费者添加到流中。
AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)
英文:
Create a Consumer.
js.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "MONITOR",
})
AddConsumer adds a consumer to a stream.
AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论