英文:
Could one connection support multiple channels in go api for rabbitmq?
问题
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
// 每个连接都应该声明它们期望的拓扑结构
func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) {
// 建立连接
conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, err
}
// 在连接中建立通道
ch, err := conn.Channel()
if err != nil {
return nil, nil, err
}
// 声明队列
if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
return nil, nil, err
}
return conn, ch, nil
}
func main() {
// AMQP URL
url := "amqp://guest:guest@127.0.0.1:5672"
for i := 1; i <= 2; i++ {
fmt.Println("连接", i)
// 两个 goroutine
go func() {
// 队列名称
queue := fmt.Sprintf("example.reconnect.%d", i)
// 在 TCP 连接中建立通道
_, pub, err := setup(url, queue)
if err != nil {
fmt.Println("错误:发布者设置", err)
return
}
// 从发布者端清空队列以建立初始状态
if _, err := pub.QueuePurge(queue, false); err != nil {
fmt.Println("错误:清空队列", err)
return
}
// 发布消息
if err := pub.Publish("", queue, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("%d", i)),
}); err != nil {
fmt.Println("错误:发布消息", err)
return
}
// 保持运行
for {
time.Sleep(time.Second * 20)
}
}()
}
// 保持运行
for {
time.Sleep(time.Second * 20)
}
}
我以为程序和 MQ 服务器之间只有一个连接,但实际上有两个连接,一个连接只能支持一个通道,为什么?
为什么两个 goroutine 不能共享同一个 TCP 连接?
在理论上,套接字描述符可以在进程的所有线程之间共享。
为什么两个 goroutine 不共享一个套接字,而是拥有各自的通道?
手动绘制的模型:
RabbitMQ 中的实际模型:
英文:
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
// Every connection should declare the topology they expect
func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) {
//setup connection
conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, err
}
//build channel in the connection
ch, err := conn.Channel()
if err != nil {
return nil, nil, err
}
//queue declare
if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
return nil, nil, err
}
return conn, ch, nil
}
func main() {
//amqp url
url := "amqp://guest:guest@127.0.0.1:5672";
for i := 1; i <= 2; i++ {
fmt.Println("connect ", i)
//two goroutine
go func() {
//queue name
queue := fmt.Sprintf("example.reconnect.%d", i)
//setup channel in the tcp connection
_, pub, err := setup(url, queue)
if err != nil {
fmt.Println("err publisher setup:", err)
return
}
// Purge the queue from the publisher side to establish initial state
if _, err := pub.QueuePurge(queue, false); err != nil {
fmt.Println("err purge:", err)
return
}
//publish msg
if err := pub.Publish("", queue, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("%d", i)),
}); err != nil {
fmt.Println("err publish:", err)
return
}
//keep running
for{
time.Sleep(time.Second * 20)
}
}()
}
//keep running
for {
time.Sleep(time.Second * 20)
}
}
I thought there is only one connection between the program and mq-server,
but there are two connection,one connection can only support one channel,why?
can't the two goroutine share the same tcp connection?
Socket descriptor can share in all threads of a process in the theory.
Why the two goroutine don't share one socket but have their own channel?
The model by hand:
The real model in rabbitmq:
答案1
得分: 5
从库的源代码来看,似乎可以多次调用conn.Channel(),它会在同一个连接上创建一个新的通信流。
好的,我试过了,这是一个可行的例子...一个goroutine,一个连接,两个通道。
我设置了接收器,然后发送一条消息,然后从接收器通道中读取。
如果你想在一个goroutine中绑定多个队列,你可以调用rec.Consume两次,然后在队列之间进行选择。
package main
import (
"fmt"
"github.com/streadway/amqp"
"os"
)
func main() {
conn, err := amqp.Dial("amqp://localhost")
e(err)
defer conn.Close()
fmt.Println("Connected")
rec, err := conn.Channel()
e(err)
fmt.Println("Setup receiver")
rq, err := rec.QueueDeclare("go-test", false, false, false, false, nil)
e(err)
msgs, err := rec.Consume(rq.Name, "", true, false, false, false, nil)
e(err)
fmt.Println("Setup sender")
send, err := conn.Channel()
e(err)
sq, err := send.QueueDeclare("go-test", false, false, false, false, nil)
e(err)
fmt.Println("Send message")
err = send.Publish("", sq.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("This is a test"),
})
e(err)
msg := <-msgs
fmt.Println("Received from:", rq, "msg:", string(msg.Body))
}
func e(err error) {
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
在我的环境中的输出结果:
$ go run rmq.go
Connected
Setup receiver
Setup sender
Send message
Received from: {go-test 0 0} msg: This is a test
英文:
Looking at the source for the library it appears as though you can call conn.Channel() as many times as you like and it creates a new stream of communication over the same connection.
Ok, I tried it, here's a working example... One goroutine, one connection, two channels
I setup the receiver, then send a message, then read from the receiver channel
if you wanted multiple queue's bound in one goroutine, you would call rec.Consume twice and then select across the queues.
package main
import (
"fmt"
"github.com/streadway/amqp"
"os"
)
func main() {
conn, err := amqp.Dial("amqp://localhost")
e(err)
defer conn.Close()
fmt.Println("Connected")
rec, err := conn.Channel()
e(err)
fmt.Println("Setup receiver")
rq, err := rec.QueueDeclare("go-test", false, false, false, false, nil)
e(err)
msgs, err := rec.Consume(rq.Name, "", true, false, false, false, nil)
e(err)
fmt.Println("Setup sender")
send, err := conn.Channel()
e(err)
sq, err := send.QueueDeclare("go-test", false, false, false, false, nil)
e(err)
fmt.Println("Send message")
err = send.Publish("", sq.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("This is a test"),
})
e(err)
msg := <-msgs
fmt.Println("Received from:", rq, "msg:", string(msg.Body))
}
func e(err error) {
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
Output on my box:
$ go run rmq.go
Connected
Setup receiver
Setup sender
Send message
Received from: {go-test 0 0} msg: This is a test
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论