RabbitMQ在发布消息后关闭连接。

huangapple go评论67阅读模式
英文:

RabbitMQ Closes connection after publish

问题

我正在使用Golang上的RabbitMQ,并且遇到了以前从未遇到的问题。在一次Publish之后,连接就会关闭。以下是代码片段:

func (k *K) DoWork() {
    [...]
    go func() {
        for {
            time.Sleep(time.Second * 5)
            k.eventLock.Lock()
            if !k.connected {
                fmt.Printf("Not connected, skip\n")
                k.eventLock.Unlock()
                continue
            }
            k.eventLock.Unlock()
            err = k.outChannel.Publish(k.outExchangeName, "", true, true, amqp.Publishing{
                ContentType: "application/json",
                Body:        content,
            })

            if err != nil {
                fmt.Printf("Error sending status - %v\n", err)
                for {
                    err = k.initOutConnection()
                    if err != nil {
                        fmt.Printf("An error occurred initOut - %v\n", err)
                    } else {
                        fmt.Printf("Restablished connection\n")
                        break
                    }
                }
                continue
            } else {
                fmt.Printf("Sent keep alive\n")
            }
        }
    }()
}

func (k *K) initOutConnection() error {
    var err error
    k.outConnection, err = k.getRabbitConnection()

    if err != nil {
        return err
    }

    k.outChannel, err = k.outConnection.Channel()
    if err != nil {
        return err
    }

    k.outExchangeName = os.Getenv("RABBIT_MQ_OUT_EXCHANGE_NAME")

    err = k.outChannel.ExchangeDeclare(
        k.outExchangeName,
        "fanout",
        true,
        true,
        false,
        true, nil)

    if err != nil {
        return err
    }

    return nil
}

输出结果如下:

Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection

这是一个完美的循环,成功发送一次后,连接关闭,我重新建立连接,然后再次成功发送一次,一次失败。但是,生产者应用程序的视角来看,消费者没有收到任何消息

如果我故意在一次发布后关闭连接,它可以工作:

err = k.outChannel.Publish()
if err == nil {
    k.outChannel.Close()
    k.initOutConnection()
    fmt.Printf("Sent keep alive\n")
}

它会产生一致的输出:

Sent keep alive
Sent keep alive
Sent keep alive

但是我想要在所有的publish中使用单个连接,有人知道我做错了什么吗?

英文:

I am using RabbitMQ on Golang, and I am finding myself into some trouble I never was before. The connection keeps closing after one single Publish. This is the snippet:


func (k*K) DoWork(){
[...]
go func() {
		for {
			time.Sleep(time.Second * 5)
			k.eventLock.Lock()
			if !k.connected {
				fmt.Printf("Not connected, skip\n")
				k.eventLock.Unlock()
				continue
			}
			k.eventLock.Unlock()
			err = k.outChannel.Publish(k.outExchangeName, "", true, true, amqp.Publishing{
				ContentType: "application/json",
				Body:        content,
			})

			if err != nil {
				fmt.Printf("Error sending status - %v\n", err)
				for {
					err = k.initOutConnection()
					if err != nil {
						fmt.Printf("An error occurred initOut - %v\n", err)
					} else {
						fmt.Printf("Restablished connection\n")
						break
					}
				}
				continue
			} else {
				fmt.Printf("Sent keep alive\n")
			}
		}
	}()
}

func (k *K) initOutConnection() error {
	var err error
	k.outConnection, err = k.getRabbitConnection()

	if err != nil {
		return err
	}

	k.outChannel, err = k.outConnection.Channel()
	if err != nil {
		return err
	}

	k.outExchangeName = os.Getenv("RABBIT_MQ_OUT_EXCHANGE_NAME")

	err = k.outChannel.ExchangeDeclare(
		k.outExchangeName,
		"fanout",
		true,
		true,
		false,
		true, nil)

	if err != nil {
		return err
	}

	return nil
}

This is the output:

Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection

It is a perfect cycle, one successful send, the connection closes, I establish the connection, then again one success, one failure. But this is to the eyes of the producer application, the consumer did not receive any message.

And if I on purpose close the connection after one publish, it does work:

err = k.outChannel.Publish()
if err == nil {
	k.outChannel.Close()
	k.initOutConnection()
	fmt.Printf("Sent keep alive\n")
}

It produces a consistent output:

Sent keep alive
Sent keep alive
Sent keep alive

But I want to use a single connection for all my publish, does someone know what am I doing wrong?

答案1

得分: 2

我们使用的AMQP库是"github.com/streadway/amqp"。根据文档的描述,发送消息后连接不会关闭,除非在代码中显式关闭连接或存在错误的实现。可能的情况是,出站通道无法发送消息,因为它被放置在确认模式中。

文档中提到,由于发布是异步的,任何无法传递的消息都将由服务器返回。在调用publish时,如果将mandatory或immediate参数设置为true,请使用Channel.NotifyReturn添加监听器来处理任何无法传递的消息。

通常的发布流程如下:

channel := client.NotifyPublish(...)
client.Publish(...)

// 等待通道

你可以在该库的示例代码中找到更多信息:https://github.com/streadway/amqp/blob/master/_examples/simple-producer/producer.go

英文:

Which amqp library do you use? Is it "github.com/streadway/amqp"?

I don't think the connection is closed after you send a message, unless you do close the connection in your code / have some wrong implementation. What could happen is that the outgoing channel is not able to send a message because it was put in confirmation mode.

Here's an excerpt from the documentation:

> Since publishings are asynchronous, any undeliverable message will get returned by the server. Add a listener with Channel.NotifyReturn to handle any undeliverable message when calling publish with either the mandatory or immediate parameters as true.

Doc:

https://pkg.go.dev/github.com/streadway/amqp#Channel.Publish

So usually the publish looks like:

channel := client.NotifyPublish(...)
client.Publish(...)
// wait for the channel

Example from the repository: https://github.com/streadway/amqp/blob/master/_examples/simple-producer/producer.go

答案2

得分: 1

在经过1.5天的尝试后,我终于想到了在Docker RabbitMQ中启用调试日志,而无需从中分离出来。命令如下:

docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

我运行了上述代码片段,果然出现了一个错误:

Error on AMQP connection <0.1254.0> (172.17.0.1:41690 -> 172.17.0.2:5672, vhost: '/', user: 'guest', state: running), channel 1:
operation basic.publish caused a connection exception not_implemented: "immediate=true"

问题出在.Publish()的第四个参数上,即immediate,我应该将其设置为false而不是true...
连接确实存在并正常工作,但一旦我使用immediate=true调用Publish,连接就会关闭,尽管我没有从Publish中收到错误消息,但消息并没有到达其交换目的地。

就这么简单,一个布尔值浪费了我无数个小时。

英文:

after 1.5 days on it, it finally came to my mind to enable debug logging in Docker RabbitMQ without detaching from it with:

docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

And ran the above snippet, sure enough an error appeared:

Error on AMQP connection <0.1254.0> (172.17.0.1:41690 -> 172.17.0.2:5672, vhost: '/', user: 'guest', state: running), channel 1:
operation basic.publish caused a connection exception not_implemented: "immediate=true"

The issue was the 4th Argument to .Publish() which is immediate, instead of true I should set it to false ...
The connection was really there up and working, as soon as I called Publish with immediate=true it got closed the connection and although I did not get an error from Publish the message did not reach its exchange destination.

That simple, a single boolean wasting me countless hours.

huangapple
  • 本文由 发表于 2021年11月26日 06:31:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/70117912.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定