英文:
RabbitMQ Closes connection after publish
问题
我正在使用Golang上的RabbitMQ,并且遇到了以前从未遇到的问题。在一次Publish
之后,连接就会关闭。以下是代码片段:
func (k *K) DoWork() {
[...]
go func() {
for {
time.Sleep(time.Second * 5)
k.eventLock.Lock()
if !k.connected {
fmt.Printf("Not connected, skip\n")
k.eventLock.Unlock()
continue
}
k.eventLock.Unlock()
err = k.outChannel.Publish(k.outExchangeName, "", true, true, amqp.Publishing{
ContentType: "application/json",
Body: content,
})
if err != nil {
fmt.Printf("Error sending status - %v\n", err)
for {
err = k.initOutConnection()
if err != nil {
fmt.Printf("An error occurred initOut - %v\n", err)
} else {
fmt.Printf("Restablished connection\n")
break
}
}
continue
} else {
fmt.Printf("Sent keep alive\n")
}
}
}()
}
func (k *K) initOutConnection() error {
var err error
k.outConnection, err = k.getRabbitConnection()
if err != nil {
return err
}
k.outChannel, err = k.outConnection.Channel()
if err != nil {
return err
}
k.outExchangeName = os.Getenv("RABBIT_MQ_OUT_EXCHANGE_NAME")
err = k.outChannel.ExchangeDeclare(
k.outExchangeName,
"fanout",
true,
true,
false,
true, nil)
if err != nil {
return err
}
return nil
}
输出结果如下:
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
这是一个完美的循环,成功发送一次后,连接关闭,我重新建立连接,然后再次成功发送一次,一次失败。但是,生产者应用程序的视角来看,消费者没有收到任何消息。
如果我故意在一次发布后关闭连接,它可以工作:
err = k.outChannel.Publish()
if err == nil {
k.outChannel.Close()
k.initOutConnection()
fmt.Printf("Sent keep alive\n")
}
它会产生一致的输出:
Sent keep alive
Sent keep alive
Sent keep alive
但是我想要在所有的publish
中使用单个连接,有人知道我做错了什么吗?
英文:
I am using RabbitMQ on Golang, and I am finding myself into some trouble I never was before. The connection keeps closing after one single Publish
. This is the snippet:
func (k*K) DoWork(){
[...]
go func() {
for {
time.Sleep(time.Second * 5)
k.eventLock.Lock()
if !k.connected {
fmt.Printf("Not connected, skip\n")
k.eventLock.Unlock()
continue
}
k.eventLock.Unlock()
err = k.outChannel.Publish(k.outExchangeName, "", true, true, amqp.Publishing{
ContentType: "application/json",
Body: content,
})
if err != nil {
fmt.Printf("Error sending status - %v\n", err)
for {
err = k.initOutConnection()
if err != nil {
fmt.Printf("An error occurred initOut - %v\n", err)
} else {
fmt.Printf("Restablished connection\n")
break
}
}
continue
} else {
fmt.Printf("Sent keep alive\n")
}
}
}()
}
func (k *K) initOutConnection() error {
var err error
k.outConnection, err = k.getRabbitConnection()
if err != nil {
return err
}
k.outChannel, err = k.outConnection.Channel()
if err != nil {
return err
}
k.outExchangeName = os.Getenv("RABBIT_MQ_OUT_EXCHANGE_NAME")
err = k.outChannel.ExchangeDeclare(
k.outExchangeName,
"fanout",
true,
true,
false,
true, nil)
if err != nil {
return err
}
return nil
}
This is the output:
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
It is a perfect cycle, one successful send, the connection closes, I establish the connection, then again one success, one failure. But this is to the eyes of the producer application, the consumer did not receive any message.
And if I on purpose close the connection after one publish, it does work:
err = k.outChannel.Publish()
if err == nil {
k.outChannel.Close()
k.initOutConnection()
fmt.Printf("Sent keep alive\n")
}
It produces a consistent output:
Sent keep alive
Sent keep alive
Sent keep alive
But I want to use a single connection for all my publish
, does someone know what am I doing wrong?
答案1
得分: 2
我们使用的AMQP库是"github.com/streadway/amqp"。根据文档的描述,发送消息后连接不会关闭,除非在代码中显式关闭连接或存在错误的实现。可能的情况是,出站通道无法发送消息,因为它被放置在确认模式中。
文档中提到,由于发布是异步的,任何无法传递的消息都将由服务器返回。在调用publish时,如果将mandatory或immediate参数设置为true,请使用Channel.NotifyReturn添加监听器来处理任何无法传递的消息。
通常的发布流程如下:
channel := client.NotifyPublish(...)
client.Publish(...)
// 等待通道
你可以在该库的示例代码中找到更多信息:https://github.com/streadway/amqp/blob/master/_examples/simple-producer/producer.go
英文:
Which amqp library do you use? Is it "github.com/streadway/amqp"?
I don't think the connection is closed after you send a message, unless you do close the connection in your code / have some wrong implementation. What could happen is that the outgoing channel is not able to send a message because it was put in confirmation mode.
Here's an excerpt from the documentation:
> Since publishings are asynchronous, any undeliverable message will get returned by the server. Add a listener with Channel.NotifyReturn to handle any undeliverable message when calling publish with either the mandatory or immediate parameters as true.
Doc:
https://pkg.go.dev/github.com/streadway/amqp#Channel.Publish
So usually the publish looks like:
channel := client.NotifyPublish(...)
client.Publish(...)
// wait for the channel
Example from the repository: https://github.com/streadway/amqp/blob/master/_examples/simple-producer/producer.go
答案2
得分: 1
在经过1.5天的尝试后,我终于想到了在Docker RabbitMQ中启用调试日志,而无需从中分离出来。命令如下:
docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
我运行了上述代码片段,果然出现了一个错误:
Error on AMQP connection <0.1254.0> (172.17.0.1:41690 -> 172.17.0.2:5672, vhost: '/', user: 'guest', state: running), channel 1:
operation basic.publish caused a connection exception not_implemented: "immediate=true"
问题出在.Publish()
的第四个参数上,即immediate
,我应该将其设置为false
而不是true
...
连接确实存在并正常工作,但一旦我使用immediate=true
调用Publish,连接就会关闭,尽管我没有从Publish
中收到错误消息,但消息并没有到达其交换目的地。
就这么简单,一个布尔值浪费了我无数个小时。
英文:
after 1.5 days on it, it finally came to my mind to enable debug logging in Docker RabbitMQ without detaching from it with:
docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
And ran the above snippet, sure enough an error appeared:
Error on AMQP connection <0.1254.0> (172.17.0.1:41690 -> 172.17.0.2:5672, vhost: '/', user: 'guest', state: running), channel 1:
operation basic.publish caused a connection exception not_implemented: "immediate=true"
The issue was the 4th Argument to .Publish()
which is immediate
, instead of true I should set it to false
...
The connection was really there up and working, as soon as I called Publish with immediate=true
it got closed the connection and although I did not get an error from Publish
the message did not reach its exchange destination.
That simple, a single boolean wasting me countless hours.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论