No consumers, yet when I publish something an ack is immediately returned why? (golang/rabbitmq)

huangapple go评论80阅读模式
英文:

No consumers, yet when I publish something an ack is immediately returned why? (golang/rabbitmq)

问题

以下是我使用的发布者代码。它在消息从队列中取出之前需要确认。它应该打印出它从消费者那里收到了 Ack 或 Nack(在代码底部)。如果你只运行下面的发布者代码(不同时运行消费者代码),它应该会一直挂起,等待 Ack 或 Nack,但实际上它并不会,它会打印出一个 Ack,就好像一个消费者已经发送了它。所以我很困惑,不知道代码的哪一部分出错了。

对于基础代码,我使用了 rabbitmq 官方教程中的代码:https://www.rabbitmq.com/tutorials/tutorial-one-go.html

对于 Ack/Nack 部分的代码,我参考了这个链接:https://agocs.org/blog/2014/08/19/rabbitmq-best-practices-in-go/

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {

	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	ch.Confirm(false)

	ack, nack := ch.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))

	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	body := "hello"
	err = ch.Publish(
		"",      // exchange
		q.Name, // routing key
		true,   // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	log.Printf(" [x] Sent %s", body)
	failOnError(err, "Failed to publish a message")

	select {
	  case tag := <-ack:
	      log.Println("Acked ", tag)
	  case tag := <-nack:
	      log.Println("Nack alert! ", tag)
	}
}

希望这可以帮助你解决问题。

英文:

Below is the publisher code that I use. It requires acks before the message is taken off the queue. It's suppose to print out that it receives an Ack or nack (at the bottom of the code) from a consumer. If you just run the publisher code below by itself (without running a consumer code at same time), it suppose to just hang, waiting for ack or nack but it doesn't, it prints out an ack as if a consumer had sent it. So I'm confused if I have any part of the code wrong.

For the base code I used code from rabbitmq's official tutorial: https://www.rabbitmq.com/tutorials/tutorial-one-go.html

For ack/nack part of the code I followed this: https://agocs.org/blog/2014/08/19/rabbitmq-best-practices-in-go/

package main
import (
&quot;log&quot;
&quot;github.com/streadway/amqp&quot;
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf(&quot;%s: %s&quot;, msg, err)
}
}
func main() {
conn, err := amqp.Dial(&quot;amqp://guest:guest@localhost:5672/&quot;)
failOnError(err, &quot;Failed to connect to RabbitMQ&quot;)
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, &quot;Failed to open a channel&quot;)
defer ch.Close()
ch.Confirm(false)
ack, nack := ch.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))
q, err := ch.QueueDeclare(
&quot;hello&quot;, // name
false,   // durable
false,   // delete when unused
false,   // exclusive
false,   // no-wait
nil,     // arguments
)
failOnError(err, &quot;Failed to declare a queue&quot;)
body := &quot;hello&quot;
err = ch.Publish(
&quot;&quot;,     // exchange
q.Name, // routing key
true,  // mandatory
false,  // immediate
amqp.Publishing{
ContentType: &quot;text/plain&quot;,
Body:        []byte(body),
})
log.Printf(&quot; [x] Sent %s&quot;, body)
failOnError(err, &quot;Failed to publish a message&quot;)
select {
case tag := &lt;-ack:
log.Println(&quot;Acked &quot;, tag)
case tag := &lt;-nack:
log.Println(&quot;Nack alert! &quot;, tag)
}
}

答案1

得分: 4

你将发布者确认 ack 和 nack 与消费者端的 ack 和 nack 混淆了。

文档中指出:

> 对于无法路由的消息,当交换机验证消息不会路由到任何队列时(返回一个空的队列列表),代理将发出确认。如果消息也被发布为强制性的,那么在 basic.ack 之前,会先向客户端发送 basic.return。对于否定的确认(basic.nack)也是如此。
>
> 对于可路由的消息,在所有队列都接受了消息时,会发送 basic.ack。对于路由到持久队列的持久消息,这意味着持久化到磁盘。对于镜像队列,这意味着所有镜像都接受了该消息。

所以你看到的行为是正确的。RabbitMQ 确认消息已到达队列。

英文:

You are confusing publisher confirm acks and nacks with consumer side acks and nacks.

The documentation states:

> For unroutable messages, the broker will issue a confirm once the
> exchange verifies a message won't route to any queue (returns an empty
> list of queues). If the message is also published as mandatory, the
> basic.return is sent to the client before basic.ack. The same is true
> for negative acknowledgements (basic.nack).
>
> For routable messages, the basic.ack is sent when a message has been
> accepted by all the queues. For persistent messages routed to durable
> queues, this means persisting to disk. For mirrored queues, this means
> that all mirrors have accepted the message.

So you are seeing the correct behaviour. RabbitMQ is confirming that the messages arrived at the queue.

huangapple
  • 本文由 发表于 2017年4月18日 18:49:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/43470113.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定