如何处理ZMQ套接字缺乏线程安全性的问题?

huangapple go评论97阅读模式
英文:

How to deal with ZMQ sockets lack of thread safety?

问题

我已经在一些Python应用程序中使用ZMQ一段时间了,但最近我决定用Go重新实现其中一个应用程序,然后我意识到ZMQ套接字不是线程安全的。

原始的Python实现使用了以下事件循环:

while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))
    
    for req in requests:
        rep = handle_request(req)
        if rep:
            replies.append(rep)
            requests.remove(req)

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b'', zmq.SNDMORE)
        router.send(data)
        del replies[:]

问题是在第一次循环时可能还没有准备好回复,所以每当我有待处理的请求时,我必须使用非常短的超时时间进行轮询,否则客户端将等待更长的时间,应用程序最终会使用大量CPU进行轮询。

当我决定用Go重新实现它时,我以为只需要这样简单地使用无限超时进行轮询就可以避免这个问题:

for {
    sockets, _ := poller.Poll(-1) 
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case router:
            msg, _ := s.RecvMessage(0)
            client_id := msg[0]
            data := msg[2]
            go handleRequest(router, client_id, data)                
        }
    }
}

但是,这种理想的实现只适用于只有一个客户端连接或负载较轻的情况。在负载较重的情况下,我会在libzmq内部得到随机的断言错误。我尝试了以下方法:

  1. 根据zmq4文档的建议,我尝试在所有套接字操作上添加sync.Mutex并进行锁定/解锁。但是失败了。我猜测这是因为ZMQ在刷新时使用了自己的线程。

  2. 创建一个用于轮询/接收的goroutine和一个用于发送的goroutine,并使用通道以与Python版本中的req/rep队列相同的方式使用。但是失败了,因为我仍然在共享套接字。

  3. 与2相同,但设置GOMAXPROCS=1。但是失败了,并且吞吐量非常有限,因为回复被阻塞,直到Poll()调用返回。

  4. 与2相同,但使用runtime.LockOSThread将所有套接字操作保持在与套接字相同的线程中。与上述相同的问题。它不会失败,但吞吐量非常有限。

  5. 与4相同,但使用Python版本的轮询超时策略。它可以工作,但与Python版本有相同的问题。

  6. 共享上下文而不是套接字,并在单独的goroutine中创建一个用于发送的套接字和一个用于接收的套接字,通过通道进行通信。它可以工作,但我必须重写客户端库以使用两个套接字而不是一个。

  7. 摒弃ZMQ,使用原始的TCP套接字,它们是线程安全的。它可以完美地工作,但我也必须重写客户端库。

所以,看起来6是ZMQ真正预期的使用方式,因为只有这种方式我才能无缝地与goroutine一起使用它,但我想知道是否还有其他我没有尝试过的方法。有什么想法吗?


更新

通过这里的答案,我意识到我可以向轮询器添加一个inproc的PULL套接字,并让一个goroutine连接并推送一个字节来打破无限等待。这不像这里建议的解决方案那样灵活,但它可以工作,我甚至可以将其回溯到Python版本中。

英文:

I've been using ZMQ in some Python applications for a while, but only very recently I decided to reimplement one of them in Go and I realized that ZMQ sockets are not thread-safe.

The original Python implementation uses an event loop that looks like this:

<!-- language: lang-py -->

while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))
    
    for req in requests:
        rep = handle_request(req)
        if rep:
            replies.append(rep)
            requests.remove(req)

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b&#39;&#39;, zmq.SNDMORE)
        router.send(data)
        del replies[:]

The problem is that the reply might not be ready on the first pass, so whenever I have pending requests, I have to poll with a very short timeout or the clients will wait for more than they should, and the application ends up using a lot of CPU for polling.

When I decided to reimplement it in Go, I thought it would be as simple as this, avoiding the problem by using infinite timeout on polling:

<!-- language: lang-go -->

for {
	sockets, _ := poller.Poll(-1) 
	for _, socket := range sockets {
		switch s := socket.Socket; s {
        case router:
		    msg, _ := s.RecvMessage(0)
		    client_id := msg[0]
		    data := msg[2]
            go handleRequest(router, client_id, data)                
        }
	}
}

But that ideal implementation only works when I have a single client connected, or a light load. Under heavy load I get random assertion errors inside libzmq. I tried the following:

  1. Following the zmq4 docs I tried adding a sync.Mutex and lock/unlock on all socket operations. It fails. I assume it's because ZMQ uses its own threads for flushing.

  2. Creating one goroutine for polling/receiving and one for sending, and use channels in the same way I used the req/rep queues in the Python version. It fails, as I'm still sharing the socket.

  3. Same as 2, but setting GOMAXPROCS=1. It fails, and throughput was very limited because replies were being held back until the Poll() call returned.

  4. Use the req/rep channels as in 2, but use runtime.LockOSThread to keep all socket operations in the same thread as the socket. Has the same problem as above. It doesn't fail, but throughput was very limited.

  5. Same as 4, but using the poll timeout strategy from the Python version. It works, but has the same problem the Python version does.

  6. Share the context instead of the socket and create one socket for sending and one for receiving in separate goroutines, communicating with channels. It works, but I'll have to rewrite the client libs to use two sockets instead of one.

  7. Get rid of zmq and use raw TCP sockets, which are thread-safe. It works perfectly, but I'll also have to rewrite the client libs.

So, it looks like 6 is how ZMQ was really intended to be used, as that's the only way I got it to work seamlessly with goroutines, but I wonder if there's any other way I haven't tried. Any ideas?


Update

With the answers here I realized I can just add an inproc PULL socket to the poller and have a goroutine connect and push a byte to break out of the infinite wait. It's not as versatile as the solutions suggested here, but it works and I can even backport it to the Python version.

答案1

得分: 5

我在1.5年前提出了一个问题,要求将https://github.com/vaughan0/go-zmq/blob/master/channels.go移植到pebbe/zmq4。最终作者决定不这样做,但我们已经在生产环境中(在非常繁重的工作负载下)使用了很长时间。

这是一个要添加到pebbe/zmq4包中的文件的gist(因为它向Socket添加了方法)。这个文件可以重新编写,使得Socket接收器上的方法接受一个Socket作为参数,但由于我们无论如何都会使用我们的代码,这是一个简单的前进方式。

基本用法是像平常一样创建你的Socket(例如称为s),然后你可以:

channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()

现在你有两个类型为[][]byte的通道,你可以在goroutine之间使用它们,但是在通道抽象中管理的单个goroutine负责管理Poller并与Socket通信。

英文:

I opened an issue a 1.5 years ago to introduce a port of https://github.com/vaughan0/go-zmq/blob/master/channels.go to pebbe/zmq4. Ultimately the author decided against it, but we have used this in production (under VERY heavy workloads) for a long time now.

This is a gist of the file that had to be added to the pebbe/zmq4 package (since it adds methods to the Socket). This could be re-written in such a way that the methods on the Socket receiver instead took a Socket as an argument, but since we vendor our code anyway, this was an easy way forward.

The basic usage is to create your Socket like normal (call it s for example) then you can:

channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()

Now you have two channels of type [][]byte that you can use between goroutines, but a single goroutine - managed within the channels abstraction, is responsible for managing the Poller and communicating with the socket.

答案2

得分: 1

使用pebbe/zmq4的祝福方式是使用Reactor。Reactor具有监听Go通道的能力,但是你不想这样做,因为它们通过定期使用轮询超时来轮询通道,这会重新引入你在Python版本中遇到的同样问题。相反,你可以使用zmq的inproc套接字,其中一个端由反应器持有,另一个端由一个goroutine持有,该goroutine从通道中传递数据。这很复杂、冗长和不愉快,但我已经成功使用过。

英文:

The blessed way to do this with pebbe/zmq4 is with a Reactor. Reactors have the ability to listen on Go channels, but you don't want to do that because they do so by polling the channel periodically using a poll timeout, which reintroduces the same exact problem you have in your Python version. Instead you can use zmq inproc sockets, with one end held by the reactor and the other end held by a goroutine that passes data in from a channel. It's complicated, verbose, and unpleasant, but I have used it successfully.

huangapple
  • 本文由 发表于 2016年4月6日 05:57:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/36437799.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定