在Go语言中,amqp.Dial每次调用时是否是线程安全的,是否需要创建连接?

huangapple go评论165阅读模式
英文:

Whether to create connection every time when amqp.Dial is threadsafe or not in go lang

问题

根据RabbitMQ文档中提到的,建立TCP连接是昂贵的。因此,引入了通道的概念。现在我遇到了这个示例。在main()函数中,每次发布消息时都会创建连接。

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

它不应该在全局声明一次,并且应该有故障转移机制,以防连接关闭,就像单例对象一样。如果amqp.Dial是线程安全的,我认为它应该是线程安全的。

编辑后的问题:

我以以下方式处理连接错误。在错误时,我监听一个通道并创建一个新的连接。但是当我关闭现有连接并尝试发布消息时,我会收到以下错误。

错误:

2016/03/30 19:20:08 Failed to open a channel: write tcp 172.16.5.48:51085->172.16.0.20:5672: use of closed network connection
exit status 1
7:25 PM

代码:

func main() {
    
    Conn, err := amqp.Dial("amqp://guest:guest@172.16.0.20:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    context := &appContext{queueName: "QUEUENAME", exchangeName: "ExchangeName", exchangeType: "direct", routingKey: "RoutingKey", conn: Conn}
    c := make(chan *amqp.Error)
    
    go func() {
        error := <-c
        if(error != nil){                
            Conn, err = amqp.Dial("amqp://guest:guest@172.16.0.20:5672/")            
            failOnError(err, "Failed to connect to RabbitMQ")            
            Conn.NotifyClose(c)                                           
        }            
    }()
    
    Conn.NotifyClose(c)
    r := web.New()
    // We pass an instance to our context pointer, and our handler.
    r.Get("/", appHandler{context, IndexHandler})
    graceful.ListenAndServe(":8086", r)	 
    
}
英文:

As it is mentioned in the RabbitMQ docs that tcp connections are expensive to make. So, for that concept of channel was introduced. Now i came across this example. In the main() it creates the connection everytime a message is publised.
conn, err := amqp.Dial(&quot;amqp://guest:guest@localhost:5672/&quot;).
Shouldn't it be declared globally once and there should be failover mechanism in case connection get closed like singleton object. If amqp.Dial is thread-safe, which i suppose it should be

Edited question :

I am handling the connection error in the following manner. In which i listen on a channel and create a new connection on error. But when i kill the existing connection and try to publish message. I get the following error.

error :

2016/03/30 19:20:08 Failed to open a channel: write tcp 172.16.5.48:51085-&gt;172.16.0.20:5672: use of closed network connection
exit status 1
7:25 PM

Code :

 func main() {
    
        Conn, err := amqp.Dial(&quot;amqp://guest:guest@172.16.0.20:5672/&quot;)
        failOnError(err, &quot;Failed to connect to RabbitMQ&quot;)
         context := &amp;appContext{queueName: &quot;QUEUENAME&quot;,exchangeName: &quot;ExchangeName&quot;,exchangeType: &quot;direct&quot;,routingKey: &quot;RoutingKey&quot;,conn: Conn}
        c := make(chan *amqp.Error)
        
        go func() {
            error := &lt;-c
            if(error != nil){                
                Conn, err = amqp.Dial(&quot;amqp://guest:guest@172.16.0.20:5672/&quot;)            
                failOnError(err, &quot;Failed to connect to RabbitMQ&quot;)            
                Conn.NotifyClose(c)                                           
            }            
        }()
    
        Conn.NotifyClose(c)
        r := web.New()
        // We pass an instance to our context pointer, and our handler.
        r.Get(&quot;/&quot;, appHandler{context, IndexHandler})
        graceful.ListenAndServe(&quot;:8086&quot;, r)	 
        
    }

答案1

得分: 11

当然,你不应该为每个请求创建一个连接。将其设置为全局变量,或者更好地将其作为应用程序上下文的一部分,在启动时初始化一次。

您可以通过使用Connection.NotifyClose注册一个通道来处理连接错误:

func initialize() {
  c := make(chan *amqp.Error)
  go func() {
    err := <-c
    log.Println("reconnect: " + err.Error())
    initialize()
  }()

  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  if err != nil {
    panic("cannot connect")
  }
  conn.NotifyClose(c)

  // create topology
}
英文:

Of course, you shouldn't create a connection for each request. Make it a global variable or better part of an application context which you initialize once at startup.

You can handle connection errors by registering a channel using Connection.NotifyClose:

func initialize() {
  c := make(chan *amqp.Error)
  go func() {
    err := &lt;-c
    log.Println(&quot;reconnect: &quot; + err.Error())
    initialize()
  }()

  conn, err := amqp.Dial(&quot;amqp://guest:guest@localhost:5672/&quot;)
  if err != nil {
    panic(&quot;cannot connect&quot;)
  }
  conn.NotifyClose(c)

  // create topology
}

huangapple
  • 本文由 发表于 2016年3月23日 21:09:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/36179111.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定