在Go API中,一个连接是否可以支持多个通道?

huangapple go评论82阅读模式
英文:

Could one connection support multiple channels in go api for rabbitmq?

问题

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"time"
)

// 每个连接都应该声明它们期望的拓扑结构
func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) {
	// 建立连接
	conn, err := amqp.Dial(url)
	if err != nil {
		return nil, nil, err
	}
	// 在连接中建立通道
	ch, err := conn.Channel()
	if err != nil {
		return nil, nil, err
	}
	// 声明队列
	if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
		return nil, nil, err
	}

	return conn, ch, nil
}

func main() {
	// AMQP URL
	url := "amqp://guest:guest@127.0.0.1:5672"
	for i := 1; i <= 2; i++ {
		fmt.Println("连接", i)
		// 两个 goroutine
		go func() {
			// 队列名称
			queue := fmt.Sprintf("example.reconnect.%d", i)
			// 在 TCP 连接中建立通道
			_, pub, err := setup(url, queue)
			if err != nil {
				fmt.Println("错误:发布者设置", err)
				return
			}
			// 从发布者端清空队列以建立初始状态
			if _, err := pub.QueuePurge(queue, false); err != nil {
				fmt.Println("错误:清空队列", err)
				return
			}
			// 发布消息
			if err := pub.Publish("", queue, false, false, amqp.Publishing{
				Body: []byte(fmt.Sprintf("%d", i)),
			}); err != nil {
				fmt.Println("错误:发布消息", err)
				return
			}
			// 保持运行
			for {
				time.Sleep(time.Second * 20)
			}
		}()
	}
	// 保持运行
	for {
		time.Sleep(time.Second * 20)
	}
}

我以为程序和 MQ 服务器之间只有一个连接,但实际上有两个连接,一个连接只能支持一个通道,为什么?

为什么两个 goroutine 不能共享同一个 TCP 连接?

在理论上,套接字描述符可以在进程的所有线程之间共享。

为什么两个 goroutine 不共享一个套接字,而是拥有各自的通道?

手动绘制的模型:

在Go API中,一个连接是否可以支持多个通道?

RabbitMQ 中的实际模型:
在Go API中,一个连接是否可以支持多个通道?

英文:
package main
import (
&quot;fmt&quot;
&quot;github.com/streadway/amqp&quot;
&quot;time&quot;
)
// Every connection should declare the topology they expect
func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) {
//setup connection
conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, err
}
//build channel in the connection
ch, err := conn.Channel()
if err != nil {
return nil, nil, err
}
//queue declare
if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
return nil, nil, err
}
return conn, ch, nil
}
func main() {
//amqp url
url := &quot;amqp://guest:guest@127.0.0.1:5672&quot;;
for i := 1; i &lt;= 2; i++ {
fmt.Println(&quot;connect &quot;, i)
//two goroutine   
go func() {
//queue name
queue := fmt.Sprintf(&quot;example.reconnect.%d&quot;, i)
//setup channel in the tcp connection
_, pub, err := setup(url, queue)
if err != nil {
fmt.Println(&quot;err publisher setup:&quot;, err)
return
}
// Purge the queue from the publisher side to establish initial state
if _, err := pub.QueuePurge(queue, false); err != nil {
fmt.Println(&quot;err purge:&quot;, err)
return
}
//publish msg
if err := pub.Publish(&quot;&quot;, queue, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf(&quot;%d&quot;, i)),
}); err != nil {
fmt.Println(&quot;err publish:&quot;, err)
return
}
//keep running
for{
time.Sleep(time.Second * 20)
}
}()
}
//keep running
for {
time.Sleep(time.Second * 20)
}
}

I thought there is only one connection between the program and mq-server,

but there are two connection,one connection can only support one channel,why?

can't the two goroutine share the same tcp connection?

Socket descriptor can share in all threads of a process in the theory.

Why the two goroutine don't share one socket but have their own channel?

The model by hand:

在Go API中,一个连接是否可以支持多个通道?

The real model in rabbitmq:
在Go API中,一个连接是否可以支持多个通道?

答案1

得分: 5

库的源代码来看,似乎可以多次调用conn.Channel(),它会在同一个连接上创建一个新的通信流。

好的,我试过了,这是一个可行的例子...一个goroutine,一个连接,两个通道。
我设置了接收器,然后发送一条消息,然后从接收器通道中读取。

如果你想在一个goroutine中绑定多个队列,你可以调用rec.Consume两次,然后在队列之间进行选择。

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"os"
)

func main() {
	conn, err := amqp.Dial("amqp://localhost")
	e(err)
	defer conn.Close()
	fmt.Println("Connected")
	rec, err := conn.Channel()
	e(err)

	fmt.Println("Setup receiver")
	rq, err := rec.QueueDeclare("go-test", false, false, false, false, nil)
	e(err)
	msgs, err := rec.Consume(rq.Name, "", true, false, false, false, nil)
	e(err)

	fmt.Println("Setup sender")
	send, err := conn.Channel()
	e(err)
	sq, err := send.QueueDeclare("go-test", false, false, false, false, nil)
	e(err)

	fmt.Println("Send message")
	err = send.Publish("", sq.Name, false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte("This is a test"),
	})
	e(err)

	msg := <-msgs
	fmt.Println("Received from:", rq, "msg:", string(msg.Body))
}

func e(err error) {
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
}

在我的环境中的输出结果:

$ go run rmq.go 
Connected
Setup receiver
Setup sender
Send message
Received from: {go-test 0 0} msg: This is a test
英文:

Looking at the source for the library it appears as though you can call conn.Channel() as many times as you like and it creates a new stream of communication over the same connection.

Ok, I tried it, here's a working example... One goroutine, one connection, two channels
I setup the receiver, then send a message, then read from the receiver channel

if you wanted multiple queue's bound in one goroutine, you would call rec.Consume twice and then select across the queues.

package main
import (
&quot;fmt&quot;
&quot;github.com/streadway/amqp&quot;
&quot;os&quot;
)
func main() {
conn, err := amqp.Dial(&quot;amqp://localhost&quot;)
e(err)
defer conn.Close()
fmt.Println(&quot;Connected&quot;)
rec, err := conn.Channel()
e(err)
fmt.Println(&quot;Setup receiver&quot;)
rq, err := rec.QueueDeclare(&quot;go-test&quot;, false, false, false, false, nil)
e(err)
msgs, err := rec.Consume(rq.Name, &quot;&quot;, true, false, false, false, nil)
e(err)
fmt.Println(&quot;Setup sender&quot;)
send, err := conn.Channel()
e(err)
sq, err := send.QueueDeclare(&quot;go-test&quot;, false, false, false, false, nil)
e(err)
fmt.Println(&quot;Send message&quot;)
err = send.Publish(&quot;&quot;, sq.Name, false, false, amqp.Publishing{
ContentType: &quot;text/plain&quot;,
Body:        []byte(&quot;This is a test&quot;),
})
e(err)
msg := &lt;-msgs
fmt.Println(&quot;Received from:&quot;, rq, &quot;msg:&quot;, string(msg.Body))
}
func e(err error) {
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}

Output on my box:

$ go run rmq.go 
Connected
Setup receiver
Setup sender
Send message
Received from: {go-test 0 0} msg: This is a test

huangapple
  • 本文由 发表于 2014年10月11日 19:19:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/26314061.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定