英文:
How to keep my connection alive for publishing messages with RabbitMQ streadway/amqp?
问题
由于每次打开连接以进行发布都是昂贵的,我正在尝试实现一种方法来保持连接活动并在我的应用程序中共享它以发布消息。
var (
Connection *amqp.Connection
Channel *amqp.Channel
err error
)
func Connect() {
Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
FailOnError(err, "Failed to connect to RabbitMQ")
Channel, err = Connection.Channel()
FailOnError(err, "Failed to open a channel")
}
func CloseConnection() {
err = Channel.Close()
FailOnError(err, "Failed to close channel ")
err = Connection.Close()
FailOnError(err, "Failed to close connection ")
}
func KeepAlive() {
queue, err := Channel.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "couldn't publish tics")
tic := "tic"
for {
err := Channel.Publish(
"", // exchange
queue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(tic),
Expiration: "5000",
})
FailOnError(err, "couldn't publish tics")
time.Sleep(5 *time.Second)
}
}
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
函数KeepAlive
是一个无限循环,每5秒发送一条虚拟消息,该消息的生存时间也为5秒,因此会被销毁。
func main() {
rabbitmq.Connect()
defer rabbitmq.CloseConnection()
go func() {
//publisher connection to stay alive as long as application is running
rabbitmq.KeepAlive()
}()
data_layer.OpenDBConnection()
router := gin.Default()
router.POST("/whatever", whatever)
err := router.Run()
if err != nil {
log.Fatal(err.Error())
}
}
在这里,我在一个goroutine中创建连接并调用KeepAlive
,以便它可以在后台工作,始终保持连接活动。
我的问题:
- 我觉得这种方式只是一个变通方法,尽管我尝试搜索如何保持连接活动的示例,但似乎所有这些示例都对消费者端感兴趣。有没有更简洁的方法来保持连接活动?
- 在我的应用程序运行时保持连接活动是否有问题?从性能角度来看(网络、内存使用)?注意:我计划使用Prometheus监视性能,但对我可能遇到的情况的任何说明都将很有帮助。
- 附注:这些发送的虚拟消息将发送到一个虚拟队列,因为如果我将其发送到另一个服务消费的队列中,它将被卡在没有生存时间的实际消息后面,而这些虚拟消息将会变得非常大。
英文:
As connection opening each time for publishing is costly I'm trying to implement some way to keep the connection alive and share it in my app to publish messages.
var (
Connection *amqp.Connection
Channel *amqp.Channel
err error
)
func Connect() {
Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
FailOnError(err, "Failed to connect to RabbitMQ")
Channel, err = Connection.Channel()
FailOnError(err, "Failed to open a channel")
}
func CloseConnection() {
err = Channel.Close()
FailOnError(err, "Failed to close channel ")
err = Connection.Close()
FailOnError(err, "Failed to close connection ")
}
func KeepAlive() {
queue, err := Channel.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "couldn't publish tics")
tic := "tic"
for {
err := Channel.Publish(
"", // exchange
queue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(tic),
Expiration: "5000",
})
FailOnError(err, "couldn't publish tics")
time.Sleep(5 *time.Second)
}
}
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
The function KeepAlive
is an infinite loop that keeps sending a dummy message every 5 secs and that message have a TTL of 5 secs too so it gets destroyed.
func main() {
rabbitmq.Connect()
defer rabbitmq.CloseConnection()
go func() {
//publisher connection to stay alive as long as application is running
rabbitmq.KeepAlive()
}()
data_layer.OpenDBConnection()
router := gin.Default()
router.POST("/whatever", whatever)
err := router.Run()
if err != nil {
log.Fatal(err.Error())
}
}
Here I'm creating the connection and calling KeepAlive
in a goroutine so it can work in the background, keeping my connection alive all the time.
My questions:
-
I feel that this way is just a work around and although I've tried to search for examples how to keep it alive, it seems all of these examples are interested in the consumer side. Is there a cleaner way to keep my connection alive?
-
Is keeping my connection alive as long as my application is running bad? performance wise (network, memory usage)? note: I'm planning to monitor this with Prometheus to watch the performance but any note about what I might face would be helpful
Side note: these tics that are sent will be sent to a dummy queue since if I send it to my queue that I consume messages from by another service it will get stuck behind actual messages that doesn't have TTL and these tics will grow very large.
答案1
得分: 3
使用streadway/amqp
库,您无需自己实现保持连接的功能。该库已经提供了这个机制。
amqp.Dial
方法会构建一个默认心跳为10秒的Connection
。您可以在这里查看代码:
// connection.go
func Dial(url string) (*Connection, error) {
return DialConfig(url, Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
})
}
这通过在打开的连接上发送心跳帧来实现,这比为此目的创建一个仅用于发送虚假消息的队列更高效和可维护。
由上述内容可知,您可以使用amqp.DialConfig
更改连接的心跳:
conn, err := amqp.DialConfig(url, amqp.Config{
Heartbeat: 5 * time.Second,
})
您可能希望自己实现错误时的重新连接逻辑。关于这一点,您可以在这里找到一些有用的信息:https://stackoverflow.com/questions/17108426
英文:
With streadway/amqp
you don't need to implement the keepalive yourself. The library already provides this mechanism.
The method amqp.Dial
constructs a Connection
with a default heartbeat of 10 seconds. You can see the code here:
// connection.go
func Dial(url string) (*Connection, error) {
return DialConfig(url, Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
})
}
This works by sending heartbeat frames on the open connection, which is going to be more efficient and maintainable than sending fake messages to a queue created only for that reason.
From the above follows that you can change the connection heartbeat with amqp.DialConfig
:
conn, err := amqp.DialConfig(url, amqp.Config{
Heartbeat: 5 * time.Second,
})
What you might want to implement yourself is the reconnect-on-error logic. For that you can find some useful information here: https://stackoverflow.com/questions/17108426
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论