英文:
RabbitMQ consumer in Go
问题
我正在尝试用Go语言编写一个RabbitMQ的消费者。它应该每次从队列中取出5个对象并进行处理。此外,如果成功处理,则应确认消息;否则,将其发送到死信队列5次,然后丢弃。它应该无限运行,并处理消费者的取消事件。
我有几个问题:
- RabbitMQ-go中是否有
BasicConsumer
和EventingBasicConsumer
的概念? - RabbitMQ中的
Model
是什么,在RabbitMQ-go中是否存在? - 当对象处理失败时,如何将其发送到死信队列,并在
ttl
之后重新排队? - 在下面的代码中,
ch.Consume
函数中的consumerTag
参数的意义是什么? - 在这种情况下,我们应该使用
channel.Get()
还是channel.Consume()
?
请告诉我如何修改下面的代码以满足上述要求。我之所以问这个问题,是因为我找不到RabbitMQ-Go的合适文档。
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil {
fmt.Println("failed to fetch the data from consumer", err)
}
fmt.Println(k)
}
}()
log.Printf("Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
修改后的问题:
我已经按照链接link1 link2中的建议延迟了消息的处理。但问题是,即使在ttl
之后,消息仍然从死信队列返回到原始队列。我正在使用RabbitMQ 3.0.0
。有人能指出问题在哪里吗?
英文:
I am trying to write a RabbitMQ Consumer in Go. Which is suppose to take the 5 objects at a time from the queue and process them. Moreover, it is suppose to acknowledge if successfully processed else send to the dead-letter queue for 5 times and then discard, it should be running infinitely and handling the cancellation event of the consumer.
I have few questions :
- Is there any concept of
BasicConsumer
vsEventingBasicConsumer
in RabbitMq-go Reference? - What is
Model
in RabbitMQ and is it there in RabbitMq-go? - How to send the objects when failed to dead-letter queue and again re-queue them after
ttl
- What is the significance of
consumerTag
argument in thech.Consume
function in the below code - Should we use the
channel.Get()
orchannel.Consume()
for this scenario?
What are the changes i need to make in the below code to meet above requirement. I am asking this because i couldn't find decent documentation of RabbitMq-Go.
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
Edited question:
I have delayed the processing of the messages as suggested in the links link1 link2. But the problem is messages are getting back to their original queue from dead-lettered queue even after ttl. I am using RabbitMQ 3.0.0
. Can anyone point out what is the problem?
答案1
得分: 4
在RabbitMq-go参考文档中,没有明确的BasicConsumer和EventingBasicConsumer的概念。但是,Channel.Get
和Channel.Consume
方法提供了类似的功能。使用Channel.Get
可以进行非阻塞调用,如果有可用的消息,则获取第一条消息,否则返回ok=false
。而使用Channel.Consume
方法可以将排队的消息传递到一个通道中。
在RabbitMQ中,如果你指的是C# RabbitMQ中的IModel
和Connection.CreateModel
,那是C#库中的内容,而不是RabbitMQ本身的内容。这只是为了尝试将其抽象出来,远离RabbitMQ的"Channel"术语,但它从未流行起来。
要将对象发送到死信队列并在ttl后重新排队,可以使用delivery.Nack
方法,设置requeue=false
。
在下面的代码中,consumerTag
参数在ch.Consume
函数中的意义是消费者标识符。它可以用于使用channel.Cancel
取消通道,并标识负责传递的消费者。所有通过channel.Consume
传递的消息都将设置ConsumerTag
字段。
对于这种情况,我认为几乎从不优先使用channel.Get()
而是使用channel.Consume()
。使用channel.Get
将会轮询队列,并浪费CPU资源,这在Go语言中是没有意义的。
要满足上述要求,你需要对以下代码进行以下更改:
-
由于你是批量处理5个消息,你可以创建一个goroutine从消费者通道接收消息,并在接收到5个消息后调用另一个函数进行处理。
-
要确认消息或将其发送到死信队列,你可以使用
delivery.Ack
或delivery.Nack
函数。你可以设置multiple=true
,并一次性对整个批次进行确认。一旦消息进入死信队列,你需要检查delivery.Headers["x-death"]
头部字段,以确定消息被死信转发的次数,并在已经重试了5次后调用delivery.Reject
。 -
使用
channel.NotifyCancel
来处理取消事件。
请参考以下代码:
// 1. 批量处理5个消息的goroutine
func processBatch(deliveries <-chan amqp.Delivery) {
batch := make([]amqp.Delivery, 0, 5)
for delivery := range deliveries {
batch = append(batch, delivery)
if len(batch) == 5 {
// 调用另一个函数处理批次
processBatchMessages(batch)
// 清空批次
batch = batch[:0]
}
}
}
// 2. 处理批次消息的函数
func processBatchMessages(batch []amqp.Delivery) {
for _, delivery := range batch {
// 处理消息
// 确认消息或发送到死信队列
delivery.Ack(false)
}
}
// 3. 使用channel.NotifyCancel处理取消事件
func consumeMessages(ch *amqp.Channel) {
deliveries, err := ch.Consume(
"queue_name", // 队列名称
"consumer_tag", // 消费者标识符
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
// 错误处理
}
go processBatch(deliveries)
// 监听取消事件
cancel := ch.NotifyCancel(make(chan string))
go handleCancel(cancel)
}
func handleCancel(cancel <-chan string) {
// 处理取消事件
}
希望对你有所帮助!
英文:
> Is there any concept of BasicConsumer vs EventingBasicConsumer in
> RabbitMq-go Reference?
Not exactly, but the Channel.Get
and Channel.Consume
calls serve a similar concept. With Channel.Get
you have a non-blocking call that gets the first message if there's any available, or returns ok=false
. With Channel.Consume
the queued messages are delivered to a channel.
> What is Model in RabbitMQ and is it there in RabbitMq-go?
If you're referring to the IModel
and Connection.CreateModel
in C# RabbitMQ, that's something from the C# lib, not from RabbitMQ itself. It was just an attempt to abstract away from the RabbitMQ "Channel" terminology, but it never caught on.
> How to send the objects when failed to dead-letter queue and again
> re-queue them after ttl
Use the delivery.Nack method with requeue=false
.
> What is the significance of consumerTag argument in the ch.Consume
> function in the below code
The ConsumerTag
is just a consumer identifier. It can be used to cancel the channel with channel.Cancel, and to identify the consumer responsible for the delivery. All messages delivered with the channel.Consume
will have the ConsumerTag
field set.
> Should we use the channel.Get()
or channel.Consume()
for this scenario?
I think channel.Get()
is almost never preferable over channel.Consume()
. With channel.Get
you'll be polling the queue and wasting CPU doing nothing, which doesn't make sense in Go.
> What are the changes i need to make in the below code to meet above
> requirement.
-
Since you're batch processing 5 at a time, you can have a goroutine that receives from the consumer channel and once it gets the 5 deliveries you call another function to process them.
-
To acknowledge or send to the dead-letter queue you'll use the delivery.Ack or delivery.Nack functions. You can use
multiple=true
and call it once for the batch. Once the message goes to the dead letter queue, you have to check thedelivery.Headers["x-death"]
header for how many times its been dead-lettered and call delivery.Reject when its been retried for 5 times already. -
Use channel.NotifyCancel to handle the cancellation event.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论