Go中的RabbitMQ消费者

huangapple go评论78阅读模式
英文:

RabbitMQ consumer in Go

问题

我正在尝试用Go语言编写一个RabbitMQ的消费者。它应该每次从队列中取出5个对象并进行处理。此外,如果成功处理,则应确认消息;否则,将其发送到死信队列5次,然后丢弃。它应该无限运行,并处理消费者的取消事件。

我有几个问题:

  1. RabbitMQ-go中是否有BasicConsumerEventingBasicConsumer的概念?
  2. RabbitMQ中的Model是什么,在RabbitMQ-go中是否存在?
  3. 当对象处理失败时,如何将其发送到死信队列,并在ttl之后重新排队?
  4. 在下面的代码中,ch.Consume函数中的consumerTag参数的意义是什么?
  5. 在这种情况下,我们应该使用channel.Get()还是channel.Consume()

请告诉我如何修改下面的代码以满足上述要求。我之所以问这个问题,是因为我找不到RabbitMQ-Go的合适文档。

func main() {
    consumer()        
}

func consumer() {
    objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil} 		
    initializeConn(&objConsumerConn.conn)

    ch, err := objConsumerConn.conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    msgs, err := ch.Consume(
        objConsumerConn.queueName, // queue
        "demo1",     // consumerTag
        false,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {        	       
            k := new(EventCaptureData)
            b := bytes.Buffer{}
            b.Write(d.Body)
            dec := gob.NewDecoder(&b)  
            err := dec.Decode(&k)
            d.Ack(true)  

            if err != nil { 
                fmt.Println("failed to fetch the data from consumer", err) 
            }
            fmt.Println(k)                        
        }
    }()      

    log.Printf("Waiting for Messages to process. To exit press CTRL+C ")
    <-forever
}

修改后的问题:

我已经按照链接link1 link2中的建议延迟了消息的处理。但问题是,即使在ttl之后,消息仍然从死信队列返回到原始队列。我正在使用RabbitMQ 3.0.0。有人能指出问题在哪里吗?

英文:

I am trying to write a RabbitMQ Consumer in Go. Which is suppose to take the 5 objects at a time from the queue and process them. Moreover, it is suppose to acknowledge if successfully processed else send to the dead-letter queue for 5 times and then discard, it should be running infinitely and handling the cancellation event of the consumer.
I have few questions :

  1. Is there any concept of BasicConsumer vs EventingBasicConsumer in RabbitMq-go Reference?
  2. What is Model in RabbitMQ and is it there in RabbitMq-go?
  3. How to send the objects when failed to dead-letter queue and again re-queue them after ttl
  4. What is the significance of consumerTag argument in the ch.Consume function in the below code
  5. Should we use the channel.Get() or channel.Consume() for this scenario?

What are the changes i need to make in the below code to meet above requirement. I am asking this because i couldn't find decent documentation of RabbitMq-Go.

   func main() {
    
    	consumer()        
    }
    
    func consumer() {
    	
        objConsumerConn := &amp;rabbitMQConn{queueName: &quot;EventCaptureData&quot;, conn: nil} 		
        initializeConn(&amp;objConsumerConn.conn)
    
    
        ch, err := objConsumerConn.conn.Channel()
        failOnError(err, &quot;Failed to open a channel&quot;)
        defer ch.Close()
    
        msgs, err := ch.Consume(
                objConsumerConn.queueName, // queue
                &quot;demo1&quot;,     // consumerTag
                false,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, &quot;Failed to register a consumer&quot;)
    
        forever := make(chan bool)
    
        go func() {
            for d := range msgs {        	       
    		    k := new(EventCaptureData)
    		    b := bytes.Buffer{}
    		    b.Write(d.Body)
    	 		dec := gob.NewDecoder(&amp;b)  
    			err := dec.Decode(&amp;k)
    	        d.Ack(true)  
    
    			if err != nil { fmt.Println(&quot;failed to fetch the data from consumer&quot;, err); }
    				fmt.Println(k)                        
            }
        }()      
    
        log.Printf(&quot; Waiting for Messages to process. To exit press CTRL+C &quot;)
        &lt;-forever
    
    }

Edited question:

I have delayed the processing of the messages as suggested in the links link1 link2. But the problem is messages are getting back to their original queue from dead-lettered queue even after ttl. I am using RabbitMQ 3.0.0. Can anyone point out what is the problem?

答案1

得分: 4

在RabbitMq-go参考文档中,没有明确的BasicConsumer和EventingBasicConsumer的概念。但是,Channel.GetChannel.Consume方法提供了类似的功能。使用Channel.Get可以进行非阻塞调用,如果有可用的消息,则获取第一条消息,否则返回ok=false。而使用Channel.Consume方法可以将排队的消息传递到一个通道中。

在RabbitMQ中,如果你指的是C# RabbitMQ中的IModelConnection.CreateModel,那是C#库中的内容,而不是RabbitMQ本身的内容。这只是为了尝试将其抽象出来,远离RabbitMQ的"Channel"术语,但它从未流行起来。

要将对象发送到死信队列并在ttl后重新排队,可以使用delivery.Nack方法,设置requeue=false

在下面的代码中,consumerTag参数在ch.Consume函数中的意义是消费者标识符。它可以用于使用channel.Cancel取消通道,并标识负责传递的消费者。所有通过channel.Consume传递的消息都将设置ConsumerTag字段。

对于这种情况,我认为几乎从不优先使用channel.Get()而是使用channel.Consume()。使用channel.Get将会轮询队列,并浪费CPU资源,这在Go语言中是没有意义的。

要满足上述要求,你需要对以下代码进行以下更改:

  1. 由于你是批量处理5个消息,你可以创建一个goroutine从消费者通道接收消息,并在接收到5个消息后调用另一个函数进行处理。

  2. 要确认消息或将其发送到死信队列,你可以使用delivery.Ackdelivery.Nack函数。你可以设置multiple=true,并一次性对整个批次进行确认。一旦消息进入死信队列,你需要检查delivery.Headers["x-death"]头部字段,以确定消息被死信转发的次数,并在已经重试了5次后调用delivery.Reject

  3. 使用channel.NotifyCancel来处理取消事件。

请参考以下代码:

// 1. 批量处理5个消息的goroutine
func processBatch(deliveries <-chan amqp.Delivery) {
    batch := make([]amqp.Delivery, 0, 5)
    
    for delivery := range deliveries {
        batch = append(batch, delivery)
        
        if len(batch) == 5 {
            // 调用另一个函数处理批次
            processBatchMessages(batch)
            
            // 清空批次
            batch = batch[:0]
        }
    }
}

// 2. 处理批次消息的函数
func processBatchMessages(batch []amqp.Delivery) {
    for _, delivery := range batch {
        // 处理消息
        
        // 确认消息或发送到死信队列
        delivery.Ack(false)
    }
}

// 3. 使用channel.NotifyCancel处理取消事件
func consumeMessages(ch *amqp.Channel) {
    deliveries, err := ch.Consume(
        "queue_name", // 队列名称
        "consumer_tag", // 消费者标识符
        false, // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil, // args
    )
    
    if err != nil {
        // 错误处理
    }
    
    go processBatch(deliveries)
    
    // 监听取消事件
    cancel := ch.NotifyCancel(make(chan string))
    go handleCancel(cancel)
}

func handleCancel(cancel <-chan string) {
    // 处理取消事件
}

希望对你有所帮助!

英文:

> Is there any concept of BasicConsumer vs EventingBasicConsumer in
> RabbitMq-go Reference?

Not exactly, but the Channel.Get and Channel.Consume calls serve a similar concept. With Channel.Get you have a non-blocking call that gets the first message if there's any available, or returns ok=false. With Channel.Consume the queued messages are delivered to a channel.

> What is Model in RabbitMQ and is it there in RabbitMq-go?

If you're referring to the IModel and Connection.CreateModel in C# RabbitMQ, that's something from the C# lib, not from RabbitMQ itself. It was just an attempt to abstract away from the RabbitMQ "Channel" terminology, but it never caught on.

> How to send the objects when failed to dead-letter queue and again
> re-queue them after ttl

Use the delivery.Nack method with requeue=false.

> What is the significance of consumerTag argument in the ch.Consume
> function in the below code

The ConsumerTag is just a consumer identifier. It can be used to cancel the channel with channel.Cancel, and to identify the consumer responsible for the delivery. All messages delivered with the channel.Consume will have the ConsumerTag field set.

> Should we use the channel.Get() or channel.Consume() for this scenario?

I think channel.Get() is almost never preferable over channel.Consume(). With channel.Get you'll be polling the queue and wasting CPU doing nothing, which doesn't make sense in Go.

> What are the changes i need to make in the below code to meet above
> requirement.

  1. Since you're batch processing 5 at a time, you can have a goroutine that receives from the consumer channel and once it gets the 5 deliveries you call another function to process them.

  2. To acknowledge or send to the dead-letter queue you'll use the delivery.Ack or delivery.Nack functions. You can use multiple=true and call it once for the batch. Once the message goes to the dead letter queue, you have to check the delivery.Headers[&quot;x-death&quot;] header for how many times its been dead-lettered and call delivery.Reject when its been retried for 5 times already.

  3. Use channel.NotifyCancel to handle the cancellation event.

huangapple
  • 本文由 发表于 2016年4月5日 15:38:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/36419994.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定