
huangapple go评论78阅读模式

How to deal with ZMQ sockets lack of thread safety?




while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))
    for req in requests:
        rep = handle_request(req)
        if rep:

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b'', zmq.SNDMORE)
        del replies[:]



for {
    sockets, _ := poller.Poll(-1) 
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case router:
            msg, _ := s.RecvMessage(0)
            client_id := msg[0]
            data := msg[2]
            go handleRequest(router, client_id, data)                


  1. 根据zmq4文档的建议,我尝试在所有套接字操作上添加sync.Mutex并进行锁定/解锁。但是失败了。我猜测这是因为ZMQ在刷新时使用了自己的线程。

  2. 创建一个用于轮询/接收的goroutine和一个用于发送的goroutine,并使用通道以与Python版本中的req/rep队列相同的方式使用。但是失败了,因为我仍然在共享套接字。

  3. 与2相同,但设置GOMAXPROCS=1。但是失败了,并且吞吐量非常有限,因为回复被阻塞,直到Poll()调用返回。

  4. 与2相同,但使用runtime.LockOSThread将所有套接字操作保持在与套接字相同的线程中。与上述相同的问题。它不会失败,但吞吐量非常有限。

  5. 与4相同,但使用Python版本的轮询超时策略。它可以工作,但与Python版本有相同的问题。

  6. 共享上下文而不是套接字,并在单独的goroutine中创建一个用于发送的套接字和一个用于接收的套接字,通过通道进行通信。它可以工作,但我必须重写客户端库以使用两个套接字而不是一个。

  7. 摒弃ZMQ,使用原始的TCP套接字,它们是线程安全的。它可以完美地工作,但我也必须重写客户端库。





I've been using ZMQ in some Python applications for a while, but only very recently I decided to reimplement one of them in Go and I realized that ZMQ sockets are not thread-safe.

The original Python implementation uses an event loop that looks like this:

<!-- language: lang-py -->

while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))
    for req in requests:
        rep = handle_request(req)
        if rep:

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b&#39;&#39;, zmq.SNDMORE)
        del replies[:]

The problem is that the reply might not be ready on the first pass, so whenever I have pending requests, I have to poll with a very short timeout or the clients will wait for more than they should, and the application ends up using a lot of CPU for polling.

When I decided to reimplement it in Go, I thought it would be as simple as this, avoiding the problem by using infinite timeout on polling:

<!-- language: lang-go -->

for {
	sockets, _ := poller.Poll(-1) 
	for _, socket := range sockets {
		switch s := socket.Socket; s {
        case router:
		    msg, _ := s.RecvMessage(0)
		    client_id := msg[0]
		    data := msg[2]
            go handleRequest(router, client_id, data)                

But that ideal implementation only works when I have a single client connected, or a light load. Under heavy load I get random assertion errors inside libzmq. I tried the following:

  1. Following the zmq4 docs I tried adding a sync.Mutex and lock/unlock on all socket operations. It fails. I assume it's because ZMQ uses its own threads for flushing.

  2. Creating one goroutine for polling/receiving and one for sending, and use channels in the same way I used the req/rep queues in the Python version. It fails, as I'm still sharing the socket.

  3. Same as 2, but setting GOMAXPROCS=1. It fails, and throughput was very limited because replies were being held back until the Poll() call returned.

  4. Use the req/rep channels as in 2, but use runtime.LockOSThread to keep all socket operations in the same thread as the socket. Has the same problem as above. It doesn't fail, but throughput was very limited.

  5. Same as 4, but using the poll timeout strategy from the Python version. It works, but has the same problem the Python version does.

  6. Share the context instead of the socket and create one socket for sending and one for receiving in separate goroutines, communicating with channels. It works, but I'll have to rewrite the client libs to use two sockets instead of one.

  7. Get rid of zmq and use raw TCP sockets, which are thread-safe. It works perfectly, but I'll also have to rewrite the client libs.

So, it looks like 6 is how ZMQ was really intended to be used, as that's the only way I got it to work seamlessly with goroutines, but I wonder if there's any other way I haven't tried. Any ideas?


With the answers here I realized I can just add an inproc PULL socket to the poller and have a goroutine connect and push a byte to break out of the infinite wait. It's not as versatile as the solutions suggested here, but it works and I can even backport it to the Python version.


得分: 5




channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()



I opened an issue a 1.5 years ago to introduce a port of https://github.com/vaughan0/go-zmq/blob/master/channels.go to pebbe/zmq4. Ultimately the author decided against it, but we have used this in production (under VERY heavy workloads) for a long time now.

This is a gist of the file that had to be added to the pebbe/zmq4 package (since it adds methods to the Socket). This could be re-written in such a way that the methods on the Socket receiver instead took a Socket as an argument, but since we vendor our code anyway, this was an easy way forward.

The basic usage is to create your Socket like normal (call it s for example) then you can:

channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()

Now you have two channels of type [][]byte that you can use between goroutines, but a single goroutine - managed within the channels abstraction, is responsible for managing the Poller and communicating with the socket.


得分: 1



The blessed way to do this with pebbe/zmq4 is with a Reactor. Reactors have the ability to listen on Go channels, but you don't want to do that because they do so by polling the channel periodically using a poll timeout, which reintroduces the same exact problem you have in your Python version. Instead you can use zmq inproc sockets, with one end held by the reactor and the other end held by a goroutine that passes data in from a channel. It's complicated, verbose, and unpleasant, but I have used it successfully.

  • 本文由 发表于 2016年4月6日 05:57:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/36437799.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
