在Go语言中,amqp.Dial每次调用时是否是线程安全的,是否需要创建连接?

huangapple go评论195阅读模式
英文:

Whether to create connection every time when amqp.Dial is threadsafe or not in go lang

问题

根据RabbitMQ文档中提到的,建立TCP连接是昂贵的。因此,引入了通道的概念。现在我遇到了这个示例。在main()函数中,每次发布消息时都会创建连接。

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

它不应该在全局声明一次,并且应该有故障转移机制,以防连接关闭,就像单例对象一样。如果amqp.Dial是线程安全的,我认为它应该是线程安全的。

编辑后的问题:

我以以下方式处理连接错误。在错误时,我监听一个通道并创建一个新的连接。但是当我关闭现有连接并尝试发布消息时,我会收到以下错误。

错误:

  1. 2016/03/30 19:20:08 Failed to open a channel: write tcp 172.16.5.48:51085->172.16.0.20:5672: use of closed network connection
  2. exit status 1
  3. 7:25 PM

代码:

  1. func main() {
  2. Conn, err := amqp.Dial("amqp://guest:guest@172.16.0.20:5672/")
  3. failOnError(err, "Failed to connect to RabbitMQ")
  4. context := &appContext{queueName: "QUEUENAME", exchangeName: "ExchangeName", exchangeType: "direct", routingKey: "RoutingKey", conn: Conn}
  5. c := make(chan *amqp.Error)
  6. go func() {
  7. error := <-c
  8. if(error != nil){
  9. Conn, err = amqp.Dial("amqp://guest:guest@172.16.0.20:5672/")
  10. failOnError(err, "Failed to connect to RabbitMQ")
  11. Conn.NotifyClose(c)
  12. }
  13. }()
  14. Conn.NotifyClose(c)
  15. r := web.New()
  16. // We pass an instance to our context pointer, and our handler.
  17. r.Get("/", appHandler{context, IndexHandler})
  18. graceful.ListenAndServe(":8086", r)
  19. }
英文:

As it is mentioned in the RabbitMQ docs that tcp connections are expensive to make. So, for that concept of channel was introduced. Now i came across this example. In the main() it creates the connection everytime a message is publised.
conn, err := amqp.Dial(&quot;amqp://guest:guest@localhost:5672/&quot;).
Shouldn't it be declared globally once and there should be failover mechanism in case connection get closed like singleton object. If amqp.Dial is thread-safe, which i suppose it should be

Edited question :

I am handling the connection error in the following manner. In which i listen on a channel and create a new connection on error. But when i kill the existing connection and try to publish message. I get the following error.

error :

  1. 2016/03/30 19:20:08 Failed to open a channel: write tcp 172.16.5.48:51085-&gt;172.16.0.20:5672: use of closed network connection
  2. exit status 1
  3. 7:25 PM

Code :

  1. func main() {
  2. Conn, err := amqp.Dial(&quot;amqp://guest:guest@172.16.0.20:5672/&quot;)
  3. failOnError(err, &quot;Failed to connect to RabbitMQ&quot;)
  4. context := &amp;appContext{queueName: &quot;QUEUENAME&quot;,exchangeName: &quot;ExchangeName&quot;,exchangeType: &quot;direct&quot;,routingKey: &quot;RoutingKey&quot;,conn: Conn}
  5. c := make(chan *amqp.Error)
  6. go func() {
  7. error := &lt;-c
  8. if(error != nil){
  9. Conn, err = amqp.Dial(&quot;amqp://guest:guest@172.16.0.20:5672/&quot;)
  10. failOnError(err, &quot;Failed to connect to RabbitMQ&quot;)
  11. Conn.NotifyClose(c)
  12. }
  13. }()
  14. Conn.NotifyClose(c)
  15. r := web.New()
  16. // We pass an instance to our context pointer, and our handler.
  17. r.Get(&quot;/&quot;, appHandler{context, IndexHandler})
  18. graceful.ListenAndServe(&quot;:8086&quot;, r)
  19. }

答案1

得分: 11

当然,你不应该为每个请求创建一个连接。将其设置为全局变量,或者更好地将其作为应用程序上下文的一部分,在启动时初始化一次。

您可以通过使用Connection.NotifyClose注册一个通道来处理连接错误:

  1. func initialize() {
  2. c := make(chan *amqp.Error)
  3. go func() {
  4. err := <-c
  5. log.Println("reconnect: " + err.Error())
  6. initialize()
  7. }()
  8. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  9. if err != nil {
  10. panic("cannot connect")
  11. }
  12. conn.NotifyClose(c)
  13. // create topology
  14. }
英文:

Of course, you shouldn't create a connection for each request. Make it a global variable or better part of an application context which you initialize once at startup.

You can handle connection errors by registering a channel using Connection.NotifyClose:

  1. func initialize() {
  2. c := make(chan *amqp.Error)
  3. go func() {
  4. err := &lt;-c
  5. log.Println(&quot;reconnect: &quot; + err.Error())
  6. initialize()
  7. }()
  8. conn, err := amqp.Dial(&quot;amqp://guest:guest@localhost:5672/&quot;)
  9. if err != nil {
  10. panic(&quot;cannot connect&quot;)
  11. }
  12. conn.NotifyClose(c)
  13. // create topology
  14. }

huangapple
  • 本文由 发表于 2016年3月23日 21:09:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/36179111.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定