RabbitMQ队列长度始终为0。

huangapple go评论78阅读模式
英文:

RabbitMQ Queue Length is always 0

问题

我正在编写一个应用程序,遇到了一个问题,反复检查代码,似乎没有问题,使用下面的基本代码片段进行测试,问题可以复现... RabbitMQ 始终说队列是空的,但实际上并不是空的。

下面的 Golang 代码片段展示了一个生产者比消费者更频繁地发送消息的情况。消费者始终处于活动状态,但休眠时间更长,以使队列中有积压的消息。结果是,每次尝试时,消费者都会获取消息,但 API 始终说没有消息 -> 消息计数为 0。

你可以使用以下 RabbitMQ 服务器进行测试:

docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

输出:

Sent Message 0
Received Event Message 0
Queue Len: %!f(<nil>) - 0
Queue Len: %!f(<nil>) - 0
Sent Message 1
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 1
Sent Message 2
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 3
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 2
Sent Message 4
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 5
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 3
Sent Message 6
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 7
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 4
Sent Message 8
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 9
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 5
Sent Message 10
....
....

队列长度从未显示为非 0。在我的应用程序中,当我放入一堆消息时,可能会捕获到它显示为 X,但很快它就变成了 0,我以为应用程序中有一个隐藏的消费者,但实际上没有,API 给出了一些不准确的结果,或者我应该在其他地方查找长度。

更新

只有在有消费者的情况下才会出现上述问题,如果队列没有消费者,它会按预期工作,只需注释掉 .Consume 代码:

/*
eventQueue, err := ch.Consume(
...
go func(){
for a := range eventQueue {
..
}()
*/

现在它“改善”了,但首先,这不是我要寻找的,其次,输出仍然很奇怪 =S:

Sent Message 0
Queue Len: 0.000000 - 1
Queue Len: 0.000000 - 1
Sent Message 1
Queue Len: 0.000000 - 2
Queue Len: 0.000000 - 2
Sent Message 2
Queue Len: 0.000000 - 3
Queue Len: 1.000000 - 3
Sent Message 3
Queue Len: 1.000000 - 4
Queue Len: 1.000000 - 4
Sent Message 4
Queue Len: 1.000000 - 5
Queue Len: 1.000000 - 5
Sent Message 5
Queue Len: 4.000000 - 6
Queue Len: 4.000000 - 6
Sent Message 6
Queue Len: 4.000000 - 7
Queue Len: 4.000000 - 7
Sent Message 7
Queue Len: 4.000000 - 8
Queue Len: 6.000000 - 8
Sent Message 8
Queue Len: 6.000000 - 9
Queue Len: 6.000000 - 9
Sent Message 9
Queue Len: 6.000000 - 10
Queue Len: 6.000000 - 10
Sent Message 10
Queue Len: 9.000000 - 11
Queue Len: 9.000000 - 11
英文:

I was writing an application and I had this issue, looking the code over and over, nothing seems to be wrong, tested with the below basic snippet and the issue is reproducible .... RabbitMQ is saying the queue is always empty when it is not.

The below Golang snippet shows a producer sending messages more often than the consumer consuming them. The consumer is always active but sleeping longer to make the queue have messages in its backlog. Result? The consumer fetches messages each time it tries however the API is always saying there are no messages -> message count is 0.

package main

import (
	&quot;encoding/json&quot;
	&quot;fmt&quot;
	&quot;github.com/streadway/amqp&quot;
	&quot;io/ioutil&quot;
	&quot;net/http&quot;
	&quot;testing&quot;
	&quot;time&quot;
)
func main() {

    username := &quot;guest&quot;
	password := &quot;guest&quot;
	scheme := &quot;amqp&quot;
	rabbitMqHost := &quot;localhost&quot;
	port := &quot;5672&quot;

	connectionString := fmt.Sprintf(&quot;%s://%s:%s@%s:%s/&quot;, scheme, username, password, rabbitMqHost, port)

	conn, err := amqp.Dial(connectionString)

	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}

	exchangeName := &quot;my-exchange&quot;
	// Declare exchange
	err = ch.ExchangeDeclare(
		exchangeName, // name
		&quot;fanout&quot;,     // type
		true,         // durable
		true,         // auto-deleted
		false,        // internal
		false,        // no-wait
		nil,          // arguments
	)

	if err != nil {
		panic(err)
	}

	// Create first Queue
	queueName := &quot;my-queue&quot;
	q, err := ch.QueueDeclare(
		queueName, // name
		true,      // durable
		true,      // delete when unsused
		false,     // exclusive
		false,     // no-wait
		nil,       // arguments
	)

	if err != nil {
		panic(err)
	}

	// Bind Exchange to Queue
	err = ch.QueueBind(
		q.Name,       // queue name
		&quot;&quot;,           // routing key
		exchangeName, // exchange
		false,
		nil,
	)

	// Listen
	eventQueue, err := ch.Consume(
		q.Name, // queue
		&quot;&quot;,     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)

	if err != nil {
		panic(err)
	}

	go func() {
		for a := range eventQueue {
			fmt.Printf(&quot;Received Event %s\n&quot;, string(a.Body))
			time.Sleep(time.Second * 4)
		}
	}()

	go func() {
		count := 0
		for {
			err = ch.Publish(exchangeName, &quot;&quot;, false, false, amqp.Publishing{
				ContentType: &quot;application/json&quot;,
				Body:        []byte(fmt.Sprintf(&quot;Message %d&quot;, count)),
			})

			fmt.Printf(&quot;Sent Message %d\n&quot;, count)
			count++
			if err != nil {
				panic(err)
			}
			time.Sleep(time.Second * 2)
		}
	}()

    for {
		httpRes, err := http.Get(&quot;http://guest:guest@localhost:15672/api/queues/%2f/my-queue&quot;)
		if err != nil {
			panic(err)
		}

		var resJson map[string]interface{}
		content, err := ioutil.ReadAll(httpRes.Body)
		if err != nil {
			panic(err)
		}
		httpRes.Body.Close()
		err = json.Unmarshal(content, &amp;resJson)

		if err != nil {
			panic(err)
		}

		q2, err := ch.QueueDeclarePassive(
			queueName, // name
			true,      // durable
			true,      // delete when unsused
			false,     // exclusive
			false,     // no-wait
			nil,
		)
		fmt.Printf(&quot;Queue Len: %f - %d\n&quot;, resJson[&quot;messages&quot;], q2.Messages)
		time.Sleep(time.Second)
	}

}

You can test with the following RabbitMQ Server:

docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Output:

Sent Message 0
Received Event Message 0
Queue Len: %!f(&lt;nil&gt;) - 0
Queue Len: %!f(&lt;nil&gt;) - 0
Sent Message 1
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 1
Sent Message 2
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 3
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 2
Sent Message 4
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 5
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 3
Sent Message 6
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 7
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 4
Sent Message 8
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 9
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 5
Sent Message 10
....
....

Not a single time the Que Len has said it is not 0. In my application when I put a bunch of messages I may catch it saying X, but quickly it becomes 0, I thought I had a hidden consumer in the app, but no, the API is giving some results that either are not accurate, or I should be looking somewhere else to get the length.

Update

The above only happens when there is a consumer, if the queue does not have a consumer, it works as expected, just comment the .Consume code:

/*
eventQueue, err := ch.Consume(
...
go func(){
for a := range eventQueue {
..
}()
*/

And now it "improves", but first, it is not what I am looking for, second, it is still strange output =S:

Sent Message 0
Queue Len: 0.000000 - 1
Queue Len: 0.000000 - 1
Sent Message 1
Queue Len: 0.000000 - 2
Queue Len: 0.000000 - 2
Sent Message 2
Queue Len: 0.000000 - 3
Queue Len: 1.000000 - 3
Sent Message 3
Queue Len: 1.000000 - 4
Queue Len: 1.000000 - 4
Sent Message 4
Queue Len: 1.000000 - 5
Queue Len: 1.000000 - 5
Sent Message 5
Queue Len: 4.000000 - 6
Queue Len: 4.000000 - 6
Sent Message 6
Queue Len: 4.000000 - 7
Queue Len: 4.000000 - 7
Sent Message 7
Queue Len: 4.000000 - 8
Queue Len: 6.000000 - 8
Sent Message 8
Queue Len: 6.000000 - 9
Queue Len: 6.000000 - 9
Sent Message 9
Queue Len: 6.000000 - 10
Queue Len: 6.000000 - 10
Sent Message 10
Queue Len: 9.000000 - 11
Queue Len: 9.000000 - 11

答案1

得分: 3

q2.Messages字段是不可靠的,它表示的是未等待确认的消息数量,即已经收到确认的消息。

你的消费者声明中设置了autoAck = true,也就是说不需要确认,这意味着没有期望的确认消息,也就是说没有消息被确认。

当你注释掉消费者时,已确认的消息数量可能取决于发布者的缓冲区。

使用AMQP 0.9.1在给定队列上以编程方式获取精确的消息数量基本上是不可能的。你可以使用管理API中的message_stats字段代替:

http://localhost:15672/api/queues/vhost/queue_name
英文:

The field q2.Messages is unreliable, it is the count of messages not awaiting acknowledgment, i.e. messages already ACK'ed.

Your consumer is declared with autoAck = true — i.e. noAck —, which means that no acknowledgements are expected, which means that there are zero messages already ACK'ed.

When you comment out the consumer, the number of acknowledged messages likely depends on the publisher buffer.

Getting a precise number of messages programmatically on a given queue with AMQP 0.9.1 is basically not possible. You may use the message_stats field in the management API instead:

http://localhost:15672/api/queues/vhost/queue_name

答案2

得分: 1

接受的解决方案将是blackgreen的。证明如下替换,只需在问题部分替换消费者和发布者代码:

// 监听
eventQueue, err := ch.Consume(
    q.Name, // 队列
    "",     // 消费者
    false,  // 自动确认 < -- 差异
    false,  // 独占
    false,  // 不使用本地
    false,  // 不等待
    nil,    // 参数
)

if err != nil {
    panic(err)
}

go func() {

    for a := range eventQueue {
        err = ch.Ack(a.DeliveryTag, false) // < -- 差异
        if err != nil {
            panic(err)
        }
        fmt.Printf("接收到事件 %s\n", string(a.Body))
        time.Sleep(time.Second * 4)
    }
}()

go func() {
    count := 0
    for {
        err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
            ContentType: "application/json",
            Body:        []byte(fmt.Sprintf("消息 %d", count)),
        })

        fmt.Printf("发送消息 %d\n", count)
        count++
        if err != nil {
            panic(err)
        }
        if count >= 20 { // < -- 差异
            break
        }
        time.Sleep(time.Second * 2)
    }
}()

输出:

.... 队列长度增加
发送消息 13
队列长度:8.000000 - 0
队列长度:8.000000 - 0
接收到事件 消息 4
发送消息 14
队列长度:8.000000 - 0
队列长度:9.000000 - 0
发送消息 15
队列长度:9.000000 - 0
队列长度:9.000000 - 0
接收到事件 消息 5
发送消息 16
队列长度:9.000000 - 0
队列长度:9.000000 - 0
发送消息 17
队列长度:11.000000 - 0
队列长度:11.000000 - 0
接收到事件 消息 6
发送消息 18
队列长度:11.000000 - 0
队列长度:11.000000 - 0
发送消息 19
队列长度:11.000000 - 0
队列长度:12.000000 - 0
接收到事件 消息 7
队列长度:12.000000 - 0
队列长度:12.000000 - 0
队列长度:12.000000 - 0
队列长度:12.000000 - 0
接收到事件 消息 8
队列长度:12.000000 - 0
队列长度:12.000000 - 0
队列长度:12.000000 - 0
队列长度:12.000000 - 0
接收到事件 消息 9
队列长度:12.000000 - 0
队列长度:11.000000 - 0
队列长度:11.000000 - 0
队列长度:11.000000 - 0
接收到事件 消息 10
队列长度:11.000000 - 0
队列长度:11.000000 - 0
队列长度:10.000000 - 0
队列长度:10.000000 - 0
接收到事件 消息 11
队列长度:10.000000 - 0
队列长度:10.000000 - 0
队列长度:10.000000 - 0
队列长度:9.000000 - 0
接收到事件 消息 12
....
当发布者退出时,队列长度减少,消费者追赶上来,消息长度减少:
接收到事件 消息 16
队列长度:5.000000 - 0
队列长度:5.000000 - 0
队列长度:5.000000 - 0
队列长度:4.000000 - 0
接收到事件 消息 17
队列长度:4.000000 - 0
队列长度:4.000000 - 0
队列长度:4.000000 - 0
队列长度:4.000000 - 0
接收到事件 消息 18
队列长度:2.000000 - 0
队列长度:2.000000 - 0
队列长度:2.000000 - 0
队列长度:2.000000 - 0
接收到事件 消息 19
队列长度:2.000000 - 0
队列长度:1.000000 - 0
英文:

The accepted solution will be blackgreen's. The proof is the below replacement, just replace the consumer and publisher code in the question section by:

// Listen
	eventQueue, err := ch.Consume(
		q.Name, // queue
		&quot;&quot;,     // consumer
		false,  // auto-ack &lt;-- Difference
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)

	if err != nil {
		panic(err)
	}

	go func() {

		for a := range eventQueue {
			err = ch.Ack(a.DeliveryTag, false) // &lt;-- Difference
			if err != nil {
				panic(err)
			}
			fmt.Printf(&quot;Received Event %s\n&quot;, string(a.Body))
			time.Sleep(time.Second * 4)
		}
	}()

	go func() {
		count := 0
		for {
			err = ch.Publish(exchangeName, &quot;&quot;, false, false, amqp.Publishing{
				ContentType: &quot;application/json&quot;,
				Body:        []byte(fmt.Sprintf(&quot;Message %d&quot;, count)),
			})

			fmt.Printf(&quot;Sent Message %d\n&quot;, count)
			count++
			if err != nil {
				panic(err)
			}
			if count &gt;= 20 { // &lt;-- Difference
				break
			}
			time.Sleep(time.Second * 2)
		}
	}()

Output:

.... The increase in the queue length
Sent Message 13
Queue Len: 8.000000 - 0
Queue Len: 8.000000 - 0
Received Event Message 4
Sent Message 14
Queue Len: 8.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 15
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 5
Sent Message 16
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 17
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 6
Sent Message 18
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Sent Message 19
Queue Len: 11.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 7
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 8
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 9
Queue Len: 12.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 10
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Received Event Message 11
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 12
....
As publisher exits it decreases, the consumer catches up and message len decreases:
Received Event Message 16
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 17
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 18
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Received Event Message 19
Queue Len: 2.000000 - 0
Queue Len: 1.000000 - 0

huangapple
  • 本文由 发表于 2021年11月27日 20:50:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/70134943.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定