Golang amqp 重新连接

huangapple go评论108阅读模式
英文:

Golang amqp reconnect

问题

我想测试与rabbitmq服务器的重新连接。我写了一个小脚本来测试。
http://play.golang.org/p/l3ZWzG0Qqb
但是它不起作用。

在第10步中,我关闭了通道和连接。然后再次打开它们。并重新创建了chan amqp.Confirmation (:75)。然后继续循环。
但是在此之后,从chan confirms中没有返回任何内容。

更新:代码如下。

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"os"
	"time"
)

const SERVER = "amqp://user:pass@localhost:5672/"
const EXCHANGE_NAME = "publisher.test.1"
const EXCHANGE_TYPE = "direct"
const ROUTING_KEY = "publisher.test"

var Connection *amqp.Connection
var Channel *amqp.Channel

func setup(url string) (*amqp.Connection, *amqp.Channel, error) {
	conn, err := amqp.Dial(url)
	if err != nil {
		return nil, nil, err
	}

	ch, err := conn.Channel()
	if err != nil {
		return nil, nil, err
	}

	return conn, ch, nil
}

func main() {
	url := SERVER

	Connection, Channel, err := setup(url)
	if err != nil {
		fmt.Println("err publisher setup:", err)
		return
	}

	confirms := Channel.NotifyPublish(make(chan amqp.Confirmation, 1))
	if err := Channel.Confirm(false); err != nil {
		log.Fatalf("confirm.select destination: %s", err)
	}

	for i := 1; i <= 3000000; i++ {
		log.Println(i)

		if err != nil {
			fmt.Println("err consume:", err)
			return
		}

		if err := Channel.Publish(EXCHANGE_NAME, ROUTING_KEY, false, false, amqp.Publishing{
			Body: []byte(fmt.Sprintf("%d", i)),
		}); err != nil {
			fmt.Println("err publish:", err)
			log.Printf("%+v", err)
			os.Exit(1)
			return
		}

		// only ack the source delivery when the destination acks the publishing
		confirmed := <-confirms
		if confirmed.Ack {
			log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
		} else {
			log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
			// TODO. Reconnect will be here
		}

		if i == 10 {
			Channel.Close()
			Connection.Close()
			while := true
			for while {
				log.Println("while")
				time.Sleep(time.Second * 1)
				Connection, Channel, err = setup(url)
				if err == nil {
					while = false
					confirms = Channel.NotifyPublish(make(chan amqp.Confirmation, 1))
					log.Printf("%+v", confirms)
				}
			}
		}
		time.Sleep(time.Millisecond * 300)
	}

	os.Exit(1)
}
英文:

I want to test the restart connection to the rabbitmq server.
On wrote small script to test.
http://play.golang.org/p/l3ZWzG0Qqb
But it's not working.

In step 10, I close the channel and connection. And open them again. And re-create chan amqp.Confirmation ( :75) . And continue the cycle.
But after that, from the chan confirms nothing return.

UPD: code here.

package main
import (
&quot;fmt&quot;
&quot;github.com/streadway/amqp&quot;
&quot;log&quot;
&quot;os&quot;
&quot;time&quot;
)
const SERVER = &quot;amqp://user:pass@localhost:5672/&quot;
const EXCHANGE_NAME = &quot;publisher.test.1&quot;
const EXCHANGE_TYPE = &quot;direct&quot;
const ROUTING_KEY = &quot;publisher.test&quot;
var Connection *amqp.Connection
var Channel *amqp.Channel
func setup(url string) (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, nil, err
}
return conn, ch, nil
}
func main() {
url := SERVER
Connection, Channel, err := setup(url)
if err != nil {
fmt.Println(&quot;err publisher setup:&quot;, err)
return
}
confirms := Channel.NotifyPublish(make(chan amqp.Confirmation, 1))
if err := Channel.Confirm(false); err != nil {
log.Fatalf(&quot;confirm.select destination: %s&quot;, err)
}
for i := 1; i &lt;= 3000000; i++ {
log.Println(i)
if err != nil {
fmt.Println(&quot;err consume:&quot;, err)
return
}
if err := Channel.Publish(EXCHANGE_NAME, ROUTING_KEY, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf(&quot;%d&quot;, i)),
}); err != nil {
fmt.Println(&quot;err publish:&quot;, err)
log.Printf(&quot;%+v&quot;, err)
os.Exit(1)
return
}
// only ack the source delivery when the destination acks the publishing
confirmed := &lt;-confirms
if confirmed.Ack {
log.Printf(&quot;confirmed delivery with delivery tag: %d&quot;, confirmed.DeliveryTag)
} else {
log.Printf(&quot;failed delivery of delivery tag: %d&quot;, confirmed.DeliveryTag)
// TODO. Reconnect will be here
}
if i == 10 {
Channel.Close()
Connection.Close()
while := true
for while {
log.Println(&quot;while&quot;)
time.Sleep(time.Second * 1)
Connection, Channel, err = setup(url)
if err == nil {
while = false
confirms = Channel.NotifyPublish(make(chan amqp.Confirmation, 1))
log.Printf(&quot;%+v&quot;, confirms)
}
}
}
time.Sleep(time.Millisecond * 300)
}
os.Exit(1)
}

答案1

得分: 0

你应该调用channel.Confirm()方法将通道设置为确认模式。即使在关闭连接后,甚至在同一连接上获取新通道之后,你也应该再次调用Confirm()方法,因为新通道与旧通道不同,并且所有新通道的默认设置都是不发送确认的。

英文:

You should put channel in confirm mode. by calling the channel.Confirm() method.
After closing the connection and even after getting new channel on the same connection, you should call Confirm() method again, since the channel is different from the old channel, and the default for all new channel is not to send confirm.

huangapple
  • 本文由 发表于 2016年4月20日 17:52:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/36740066.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定