英文:
How to communicate between ZeroMQ contexts in different Goroutines?
问题
我正在使用这个作为样板,除了在同一个程序中,我还有一些goroutine作为工作线程并连接到后端端点tcp://127.0.0.1:5560。
我想要做的是通过更高效的方式连接,比如ipc://,inproc://,甚至是unix sockets。我尝试过这些,但没有成功。使用ZeroMQ,通道是不可行的,对吗?
那么,如何在ZeroMQ上连接不同的goroutine,而不是使用tcp?有更好的替代方案吗?
更新:
代码:
// 简单的消息队列代理
// 与请求-回复代理相同,但使用了QUEUE设备
//
// 作者:Brendan Mc.
// 需要:http://github.com/alecthomas/gozmq
package main
import (
zmq "github.com/alecthomas/gozmq"
)
func startWorker() {
context, _ := zmq.NewContext()
defer context.Close()
worker, _ := context.NewSocket(zmq.REP)
//err := worker.Connect("ipc:///backend") // 尝试过,但没有成功
//err := worker.Connect("inproc:///backend") // 尝试过,但没有成功
err := worker.Connect("tcp://127.0.0.1:5560") // 这个可以工作
if err != nil {
fmt.Println(err)
}
for {
data, err := worker.Recv(0)
fmt.Println(string(data))
worker.Send([]byte("I got your data"), 0)
}
}
func main() {
context, _ := zmq.NewContext()
defer context.Close()
// 面向客户端的套接字
frontend, _ := context.NewSocket(zmq.ROUTER)
defer frontend.Close()
frontend.Bind("tcp://*:5559")
// 面向服务的套接字
backend, _ := context.NewSocket(zmq.DEALER)
defer backend.Close()
//backend.Bind("ipc:///backend") // 尝试过,但没有成功
//backend.Bind("inproc:///backend") // 尝试过,但没有成功
backend.Bind("tcp://*:5560") // 这个可以工作
for i := 0; i < 4; i++ {
go startWorker() // 在单独的goroutine中启动工作线程
}
// 启动内置设备
zmq.Device(zmq.QUEUE, frontend, backend)
// 我们永远不会到达这里...
}
英文:
I'm using this as boilerplate, except that in the same program I also have some goroutines that are the workers and connect to the backend endpoint, tcp://127.0.0.1:5560.
What I would like to do is have it connect through a more efficient way, such as ipc://, inproc://, or even unix sockets. I've tried those, and it didn't work. Channels is a no-go with ZeroMQ right ?
So how do I connect different goroutines with ZeroMQ contexts, without tcp ? Is there a better alternative ?
update:
The code:
// Simple message queuing broker
// Same as request-reply broker but using QUEUE device
//
// Author: Brendan Mc.
// Requires: http://github.com/alecthomas/gozmq
package main
import (
zmq "github.com/alecthomas/gozmq"
)
func startWorker() {
context, _ := zmq.NewContext()
defer context.Close()
worker, _ := context.NewSocket(zmq.REP)
//err := worker.Connect("ipc:///backend") // Tried it, but nothing
//err := worker.Connect("inproc:///backend") // Tried it, but nothing
err := worker.Connect("tcp://127.0.0.1:5560") // this works
if err != nil {
fmt.Println(err)
}
for {
data, err := worker.Recv(0)
fmt.Println(string(data))
worker.Send([]byte("I got your data"), 0)
}
}
func main() {
context, _ := zmq.NewContext()
defer context.Close()
// Socket facing clients
frontend, _ := context.NewSocket(zmq.ROUTER)
defer frontend.Close()
frontend.Bind("tcp://*:5559")
// Socket facing services
backend, _ := context.NewSocket(zmq.DEALER)
defer backend.Close()
//backend.Bind("ipc:///backend") // Tried it, but nothing
//backend.Bind("inproc:///backend") // Tried it, but nothing
backend.Bind("tcp://*:5560") // this works
for i := 0; i < 4; i++ {
go startWorker() // Start workers in a separate goroutine
}
// Start built-in device
zmq.Device(zmq.QUEUE, frontend, backend)
// We never get here…
}
答案1
得分: 5
为了使用inproc://
传输方式,所有的套接字都需要共享相同的上下文(该上下文是线程安全的)。
另外,如果你使用相同的上下文,就不需要为ZMQ使用任何后端I/O线程。
你没有提到你正在使用哪个操作系统,但是ipc://
传输方式只在大多数*nix系统下可用。在Windows下,你只能使用以下传输方式:tcp://、inproc://、pgm://。请查看zmq_connect文档以获取更多信息。
英文:
In order to use the inproc://
transport, all of the sockets need to be sharing the same Context(which is thread-safe).
Also if you're using the same Context, you do not need any backend I/O threads for ZMQ
You do not mention which OS you're running under, but the ipc://
transport is only available under most *nix. Under windows you're only able to have the following transports: tcp://, inproc://, pgm://. Check out the zmq_connect documentation for more information.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论