如何检测RabbitMQ连接是否断开?

huangapple go评论79阅读模式
英文:

How to detect dead RabbitMQ connection?

问题

我有一个用Go编写的RabbitMQ消费者脚本。这是一个简单的脚本,来自RabbitMQ教程,使用了streadway/amqp库。

问题是,如果RabbitMQ服务器停止运行,消费者脚本不会退出;当RabbitMQ服务器重新启动时,消费者将不再接收消息。

有没有办法检测到消费者连接已断开并重新连接,或者至少终止消费者脚本?

我知道该库为连接设置了默认的10秒心跳间隔;是否可以利用这一点?

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test_task_queue", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            d.Ack(false)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

以上是您要翻译的内容。

英文:

I have a RabbitMQ consumer script in Go. This is a simple script from RabbitMQ tutorial that uses streadway/amqp library.

The problem is that if the RabbitMQ server is stopped, the consumer script does not exit; and when RabbitMQ server is restarted, the consumer does not receive messages anymore.

Is there a way to detect that the consumer connection is dead and reconnect, or at least terminate the consumer script?

I know that the library sets a default 10 sec. heartbeat interval for the connection; is it possible to use that someway?

func main() {
conn, err := amqp.Dial(&quot;amqp://guest:guest@localhost:5672/&quot;)
failOnError(err, &quot;Failed to connect to RabbitMQ&quot;)
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, &quot;Failed to open a channel&quot;)
defer ch.Close()
q, err := ch.QueueDeclare(
&quot;test_task_queue&quot;, // name
true,         // durable
false,        // delete when unused
false,        // exclusive
false,        // no-wait
nil,          // arguments
)
failOnError(err, &quot;Failed to declare a queue&quot;)
err = ch.Qos(
1,     // prefetch count
0,     // prefetch size
false, // global
)
failOnError(err, &quot;Failed to set QoS&quot;)
msgs, err := ch.Consume(
q.Name, // queue
&quot;&quot;,     // consumer
false,  // auto-ack
false,  // exclusive
false,  // no-local
false,  // no-wait
nil,    // args
)
failOnError(err, &quot;Failed to register a consumer&quot;)
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(&quot;Received a message: %s&quot;, d.Body)
d.Ack(false)
dot_count := bytes.Count(d.Body, []byte(&quot;.&quot;))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf(&quot;Done&quot;)
}
}()
log.Printf(&quot; [*] Waiting for messages. To exit press CTRL+C&quot;)
&lt;-forever
}

答案1

得分: 27

amqp.Connection有一个名为NotifyClose()的方法,它返回一个通道,用于表示传输或协议错误。所以可以像下面这样使用:

for {  //重新连接循环
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") //设置连接
    notify := conn.NotifyClose(make(chan *amqp.Error)) //错误通道
    ...
    ch, err := conn.Channel()
    msgs, err := ch.Consume(
    ...
    for {  //接收循环
        select {  //检查连接
            case err = <-notify:
                //处理错误
                break //重新连接
            case d = <-msgs:
                //处理消息
                ...
        }
    }
}

请注意,这只是一个示例代码片段,你可能需要根据你的实际需求进行适当的修改。

英文:

amqp.Connection has method NotifyClose() which return channel signalling a transport or protocol error.
So something like

for {  //reconnection loop
conn, err := amqp.Dial(&quot;amqp://guest:guest@localhost:5672/&quot;) //setup
notify := conn.NotifyClose(make(chan *amqp.Error)) //error channel
...
ch, err := conn.Channel()
msgs, err := ch.Consume(
...
for{  //receive loop
select {  //check connection
case err = &lt;-notify:
//work with error
break //reconnect
case d = &lt;- msgs:
//work with message
...
}
}
}

答案2

得分: 6

有几种方法可以实现这个:检查传递通道是否关闭或使用Channel.NotifyClose

检查传递通道

在启动消费者之后,您将从传递通道接收消息。如您所知,接收操作可能采用特殊形式x, ok := <-ch,其中当x具有零值时,ok为false,这是由于通道已关闭(并且为空):

conn, _ := amqp.Dial(url)
ch, _ := conn.Channel()

delivery, _ := ch.Consume(
		queueName,
		consumerName,
	    true,  // 自动确认
	    false, // 独占
	    false, // 不使用本地
	    true,  // 不等待
		nil,   // 表
	)

for {
    payload, ok := <-delivery
    if !ok {
        // ... 通道已关闭
        return
    }
}

这是因为当AMQP通道关闭或发生错误时,Go通道<-chan amqp.Delivery将关闭:

> [它] 在返回的chan Delivery上继续传递,直到发生Channel.Cancel、Connection.Close、Channel.Close或AMQP异常。

使用Channel.NotifyClose

这很简单。原理是相同的:

> NotifyClose在服务器以Connection.Close或Channel.Close方法的形式发送通道或连接异常时注册一个监听器。

NotifyClose返回的通道与您传递的参数相同;该方法只在内部注册它,因此您可以这样做:

errC := ch.NotifyClose(make(chan *amqp.Error, n))

其中n是非零的缓冲区大小。确保向NotifyClose传递一个带缓冲的通道,否则,根据代码结构的不同,库可能会在发送时阻塞。

然后,您可以在errC通道上接收并根据错误类型采取相应的操作。简而言之,错误可能是:

  • 连接错误,通常是无法恢复的
  • 通道错误,也称为软异常,通常通过重置连接来恢复
  • 如果程序有意调用conn.Close(),则为nil

要知道错误是否可恢复,可以检查amqp.ErrorCode字段和/或Recover字段,后者在软异常的情况下设置为true。

以下函数显示了如何区分错误代码 - 这是额外的见解。对于一般情况,只需检查Error.Recover

const (
	ConnectionError = 1
	ChannelError    = 2
)

func isConnectionError(err *amqp.Error) bool {
	return errorType(err.Code) == ConnectionError
}

func isChannelError(err *amqp.Error) bool {
	return errorType(err.Code) == ChannelError
}

func errorType(code int) int {
	switch code {
	case
		amqp.ContentTooLarge,    // 311
		amqp.NoConsumers,        // 313
		amqp.AccessRefused,      // 403
		amqp.NotFound,           // 404
		amqp.ResourceLocked,     // 405
		amqp.PreconditionFailed: // 406
		return ChannelError

	case
		amqp.ConnectionForced, // 320
		amqp.InvalidPath,      // 402
		amqp.FrameError,       // 501
		amqp.SyntaxError,      // 502
		amqp.CommandInvalid,   // 503
		amqp.ChannelError,     // 504
		amqp.UnexpectedFrame,  // 505
		amqp.ResourceError,    // 506
		amqp.NotAllowed,       // 530
		amqp.NotImplemented,   // 540
		amqp.InternalError:    // 541
		fallthrough

	default:
		return ConnectionError
	}
}
英文:

There are a couple ways of doing this: checking whether the delivery channel is closed or using Channel.NotifyClose.

Checking the delivery channel

After starting the consumer, you will receive from the delivery channel. As you know, the receive operation may take the special form x, ok := &lt;-ch, where ok is false when x has a zero value due the channel being closed (and empty):

conn, _ := amqp.Dial(url)
ch, _ := conn.Channel()
delivery, _ := ch.Consume(
queueName,
consumerName,
true,  // auto ack
false, // exclusive
false, // no local
true,  // no wait,
nil,   // table
)
for {
payload, ok := &lt;- delivery
if !ok {
// ... channel closed
return
}
}

This works because the Go channel &lt;-chan amqp.Delivery will be closed when the AMQP channel is closed or an error occurs:

> [It] continues deliveries to the returned chan Delivery until Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs.

Using Channel.NotifyClose

This is straightforward. And the principle is the same:

> NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method.

The channel returned by NotifyClose is the same you pass as argument; the method only registers it internally, so you can do just:

errC := ch.NotifyClose(make(chan *amqp.Error, n))

where n is a non-zero buffer size. Make sure to pass a buffered channel to NotifyClose otherwise, depending on how your code is structured, the library may block on send.

Then you can receive on the errC channel and take action depending on the type of error you get. In short, the error can be:

  • a connection error, usually unrecoverable
  • a channel error, also called soft exception, usually recoverable by resetting the connection
  • nil if the program calls conn.Close() on purpose

To know whether the error is recoverable or not, you can inspect the amqp.Error's Code field and/or the Recover field, which is set to true in case of soft exceptions.

The following func shows how error codes can be distinguished — this is provided as additional insight. For the general case, just check Error.Recover:

const (
	ConnectionError = 1
	ChannelError    = 2
)

func isConnectionError(err *amqp.Error) bool {
	return errorType(err.Code) == ConnectionError
}

func isChannelError(err *amqp.Error) bool {
	return errorType(err.Code) == ChannelError
}

func errorType(code int) int {
	switch code {
	case
		amqp.ContentTooLarge,    // 311
		amqp.NoConsumers,        // 313
		amqp.AccessRefused,      // 403
		amqp.NotFound,           // 404
		amqp.ResourceLocked,     // 405
		amqp.PreconditionFailed: // 406
		return ChannelError

	case
		amqp.ConnectionForced, // 320
		amqp.InvalidPath,      // 402
		amqp.FrameError,       // 501
		amqp.SyntaxError,      // 502
		amqp.CommandInvalid,   // 503
		amqp.ChannelError,     // 504
		amqp.UnexpectedFrame,  // 505
		amqp.ResourceError,    // 506
		amqp.NotAllowed,       // 530
		amqp.NotImplemented,   // 540
		amqp.InternalError:    // 541
		fallthrough

	default:
		return ConnectionError
	}
}

答案3

得分: 1

这可能对某人有所帮助

// 主包 - "cmd/my-project-name/main.go"

package main

import (
	"my-proyect-name/rmq"
)

func main() {
	// RMQ
	rmq.ConnectToRMQ()
}
// RMQ 包 - "rmq"
import (
	"errors"
	"log"
	"ms-gcp-cloud-storage/constants"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

const (
	rmqCredentials string = "amqp://user:pswd@localhost:5672"
	rmqQueue       string = "golang-queue:new"
	rmqExchange    string = constants.RMQ_DIRECT_EXCHANGE // "" 空字符串
	rmqContentType string = "application/json"
)

var conn *amqp.Connection
var chann *amqp.Channel

func hasError(err error, msg string) {
	if err != nil {
		log.Printf("%s: %s", msg, err)
	}
}

func ConnectToRMQ() (err error) {
	conn, err = amqp.Dial(rmqCredentials)
	if err != nil {
		return errors.New("连接错误: " + err.Error())
	}

	chann, err = conn.Channel()
	if err != nil {
		return errors.New("打开通道错误: " + err.Error())
	}

	q, err := chann.QueueDeclare(
		rmqQueue, // 名称
		true,     // 持久化
		false,    // 未使用时删除
		false,    // 独占
		false,    // no-wait
		nil,      // 参数
	)

	if err != nil {
		log.Fatalf("声明队列错误 %v\n", q.Name)
	}

	log.Printf("连接到队列: %v\n", q.Name)

	observeConnection()

	return nil
}

func observeConnection() {
	go func() {
		log.Printf("连接丢失: %s\n", <-conn.NotifyClose(make(chan *amqp.Error)))
		log.Printf("尝试重新连接到 RMQ\n")

		closeActiveConnections()

		for err := ConnectToRMQ(); err != nil; err = ConnectToRMQ() {
			log.Println(err)
			time.Sleep(5 * time.Second)
		}
	}()
}

// 也可以在优雅关闭中实现
func closeActiveConnections() {
	if !chann.IsClosed() {
		if err := chann.Close(); err != nil {
			log.Println(err.Error())
		}
	}

	if conn != nil && !conn.IsClosed() {
		if err := conn.Close(); err != nil {
			log.Println(err.Error())
		}
	}
}

// SendMessage - 无响应的消息
func SendMessage(body string) {
	err := chann.Publish(
		rmqExchange, // 交换机
		rmqQueue,    // 路由键
		false,       // 强制性
		false,       // 立即
		amqp.Publishing{
			ContentType:  rmqContentType,
			DeliveryMode: constants.RMQ_PERSISTENT_MSG,
			Body:         []byte(body),
		})

	if err != nil {
		log.Printf("%s\n %s\n", "发布消息错误", err)
		log.Println(body)
	}
}

英文:

This may help someone

// MAIN PACKAGE - &quot;cmd/my-project-name/main.go&quot;

package main

import (
	&quot;my-proyect-name/rmq&quot;
)

func main() {
	// RMQ
	rmq.ConnectToRMQ()
}
// RMQ PACKAGE - &quot;rmq&quot;
import (
	&quot;errors&quot;
	&quot;log&quot;
	&quot;ms-gcp-cloud-storage/constants&quot;
	&quot;time&quot;

	amqp &quot;github.com/rabbitmq/amqp091-go&quot;
)

const (
	rmqCredentials string = &quot;amqp://user:pswd@localhost:5672&quot;
	rmqQueue       string = &quot;golang-queue:new&quot;
	rmqExchange    string = constants.RMQ_DIRECT_EXCHANGE // &quot;&quot; empty string
	rmqContentType string = &quot;application/json&quot;
)

var conn *amqp.Connection
var chann *amqp.Channel

func hasError(err error, msg string) {
	if err != nil {
		log.Printf(&quot;%s: %s&quot;, msg, err)
	}
}

func ConnectToRMQ() (err error) {
	conn, err = amqp.Dial(rmqCredentials)
	if err != nil {
		return errors.New(&quot;Error de conexion: &quot; + err.Error())
	}

	chann, err = conn.Channel()
	if err != nil {
		return errors.New(&quot;Error al abrir canal: &quot; + err.Error())
	}

	q, err := chann.QueueDeclare(
		rmqQueue, // name
		true,     // durable
		false,    // delete when unused
		false,    // exclusive
		false,    // no-wait
		nil,      // arguments
	)

	if err != nil {
		log.Fatalf(&quot;Error al declarar queue %v\n&quot;, q.Name)
	}

	log.Printf(&quot;Conectado al Queue: %v\n&quot;, q.Name)

	observeConnection()

	return nil
}

func observeConnection() {
	go func() {
		log.Printf(&quot;Conexion perdida: %s\n&quot;, &lt;-conn.NotifyClose(make(chan *amqp.Error)))
		log.Printf(&quot;Intentando reconectar con RMQ\n&quot;)

		closeActiveConnections()

		for err := ConnectToRMQ(); err != nil; err = ConnectToRMQ() {
			log.Println(err)
			time.Sleep(5 * time.Second)
		}
	}()
}

// Can be also implemented in graceful shutdowns
func closeActiveConnections() {
	if !chann.IsClosed() {
		if err := chann.Close(); err != nil {
			log.Println(err.Error())
		}
	}

	if conn != nil &amp;&amp; !conn.IsClosed() {
		if err := conn.Close(); err != nil {
			log.Println(err.Error())
		}
	}
}

// SendMessage - message without response
func SendMessage(body string) {
	err := chann.Publish(
		rmqExchange, // exchange
		rmqQueue,    // routing key
		false,       // mandatory
		false,       // immediate
		amqp.Publishing{
			ContentType:  rmqContentType,
			DeliveryMode: constants.RMQ_PERSISTENT_MSG,
			Body:         []byte(body),
		})

	if err != nil {
		log.Printf(&quot;%s\n %s\n&quot;, &quot;Error al publicar mensaje&quot;, err)
		log.Println(body)
	}
}

答案4

得分: -1

还没有发现go-amqp库实现连接池的断开和重新连接功能。
在GitHub上有一个基于Amqp的开源代码。
支持断开连接后的重新连接和异常重新连接。代码也相对简单易用,每个服务都有一个连接和通道。

源代码在这里

示例代码:

package main
 
import (
	"go-rabbit/rabbit"
)
 
/*
	支持连接和重新连接功能
	以及失败重发功能
    作者:Bill
*/
func main() {
	var(
		addr = "amqp://guest:guest@localhost:5672/"
		queue = "testQueue"
		exchange = "test_exchange"
		routerKey = "/test"
		msg = "test1!"
 
		//延迟
		delayQueue = "delay_queue"
		delayExchange = "delay_exchange"
		delayRouterKey = "delay_exchange"
		prefix = "v1_prefix"
		sep = "_"
		eType = "F"
		_ttl = 60 * 1000
	)
 
	var rabbitProduct1 = rabbit.NewRabbitProduct(addr,_ttl,prefix,sep,delayExchange,delayQueue,delayRouterKey)
	// 注册回收
	go rabbitProduct1.InitDefdelay(false)
	go rabbitProduct1.InitDefdelay(true)
	go rabbitProduct1.RegisterDelayWithPreFix("delay_queue","delay_exchange","delay_exchange")
 
	// 如果_ttl > 0,则ttl是死亡回收时间
	rabbitProduct1.PubMessage(true,eType,queue,exchange,routerKey,msg,rabbitProduct1.GetBool(1),rabbitProduct1.GetBool(0),_ttl)
 
}

希望这对你有所帮助或给你一些思路。

英文:

It has not been found that the go-amqp library implements the disconnection and reconnection function of the connection pool. <br>
There is an open source code based on Amqp secondary packaging on github.
Reconnect after disconnection and abnormal reconnect have been supported. The code is also relatively simple to use, and each service has a connection and channel.

Source Code here

Example Code:

package main
import (
&quot;go-rabbit/rabbit&quot;
)
/*
support isconnection and reconnection function
And Failure re-send function
@author : Bill
*/
func main() {
var(
addr = &quot;amqp://guest:guest@localhost:5672/&quot;
queue = &quot;testQueue&quot;
exchange = &quot;test_exchange&quot;
routerKey = &quot;/test&quot;
msg = &quot;test1!&quot;
//delay
delayQueue = &quot;delay_queue&quot;
delayExchange = &quot;delay_exchange&quot;
delayRouterKey = &quot;delay_exchange&quot;
prefix = &quot;v1_prefix&quot;
sep = &quot;_&quot;
eType = &quot;F&quot;
_ttl = 60 * 1000
)
var rabbitProduct1 = rabbit.NewRabbitProduct(addr,_ttl,prefix,sep,delayExchange,delayQueue,delayRouterKey)
// register recycle
go rabbitProduct1.InitDefdelay(false)
go rabbitProduct1.InitDefdelay(true)
go rabbitProduct1.RegisterDelayWithPreFix(&quot;delay_queue&quot;,&quot;delay_exchange&quot;,&quot;delay_exchange&quot;)
// ttl is dead recycle time if ttl &gt; 0 then recycle
rabbitProduct1.PubMessage(true,eType,queue,exchange,routerKey,msg,rabbitProduct1.GetBool(1),rabbitProduct1.GetBool(0),_ttl)
}

Wish it will help you or give you some idea

huangapple
  • 本文由 发表于 2017年2月2日 07:43:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/41991926.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定