英文:
Monitor queue usage on ZeroMQ PULL socket
问题
我的应用程序需要从一个PUSH套接字中获取数据(我无法控制服务器端)。数据被处理并写入文件,这是一个单线程的I/O绑定操作,性能不稳定。
我希望应用程序能够尽量少地工作来将消息拉入进程内存,以便实现以下目标:a)不阻塞服务器并且不会丢失任何消息(这已经是使用普通的PULL套接字的情况);b)我可以监控由于处理缓慢而导致的消息积压。
我可以使用一个包装套接字来实现这一点,其中包含三个内部套接字:
- 一个用于与服务器的实际连接
- 一对通过
inproc://
连接的套接字,我可以在发送和接收消息时进行计数。因此,数据流如下:
线程1
rx.recv()
inproc_tx.send()
queued_messages++
线程2
inproc_rx.recv()
queued_messages--
do_processing()
这似乎是一种合理的缓冲消息的方式,因为它已经使用了zmq::message_t
从recv()
中获取消息,并且我相信它的实现比我自己编写的队列逻辑更高效。这种方法可以工作,但是它似乎增加了很多开销和额外的代码,而底层代码已经知道有多少消息在队列中,可以决定何时丢弃新消息。
是否有一种API可以查询ZeroMQ的PULL套接字的队列使用情况?或者有更好的实现上述功能的方法吗?
英文:
My application needs to pull data from a PUSH socket (I do not have control of the server side). The data is processed and written to file, which is single-threaded and I/O bound with variable performance.
I would like the application to do the minimum work to pull the messages into process memory so that a) the server is not blocked and won't drop any messages (which is already the case with just a normal PULL socket) and b) I can monitor the backlog of messages caused by slow processing.
I can achieve this using a wrapper socket with three sockets internally:
- One for the actual connection to the server
- A pair of sockets with an
inproc://
connection where I can keep count of messages as they are sent and received. So the data flows is:
Thread 1
rx.recv()
inproc_tx.send()
queued_messages++
Thread 2
inproc_rx.recv()
queued_messages--
do_processing()
This seemed like a sensible way to buffer the messages, as it already has zmq::message_t
s from recv()
and I am sure it will more efficiently implemented than rolling my own queuing logic. It works, but it seems like a lot of overhead and extra code when the underlying code already knows how many messages are queued to decide when to drop new messages.
Is there an API to query the queue usage on a ZeroMQ PULL socket? Or a better way to implement the above?
答案1
得分: 1
你可能低估了ZeroMQ在底层为你提供的免费功能!PUSH/PULL模式用于将消息从PUSHER发送到可用的PULLER。如果你有多个PULLER,结果就是如果其中一个PULLER在需求方面稍微落后,ZeroMQ会自动向其发送较少的消息。
我还认为在缓冲接收到的消息方面,你可能会犯一个错误。如果你在线程1中处理消息时落后了,那么在线程2中处理它时你仍然会落后;你只是通过一些额外的延迟掩盖了性能不佳。如果你只是将线程1和线程2都作为PULLER(而不是将它们串联在一起),异步处理消息将无法充分利用PUSH/PULL为你提供的功能。请参考指南中这部分的图5。
关于积压,你可以将PUSH套接字的高水位标记设置为某个可接受的值,并在zmq_send()上设置超时。如果你检测到zmq_send()超时,那么情况就不妙了;PULLER已经落后了。实际上,这就是你需要知道的全部。我不知道有什么方法可以发现有多少消息在排队等待,但归根结底,要么太多(超时),要么根本没有(PULLER跟得上)。如果PULLER开始落后,并且你想要尽早检测到这一点,将PUSH的高水位标记设置为较低的值。
补充:
考虑到你的PULLER受到I/O的限制,我建议最简单的做法是:
- 完全摒弃线程2,以及inproc_tx和inproc_rx,因为它们不再需要了。
- 为你的
rx
PULL套接字设置ZMQ_RCVHWM选项为0(即无限制)。 - 让ZMQ为你做所有的缓冲。无论如何,结果都是一样的。
- 让线程1将数据写入磁盘,没有必要将其作为异步操作或其他操作。
这样,ZMQ将缓冲尽可能多的消息(取决于RAM的限制,而不是默认的1000)。如果你的一侧开始耗尽内存并变慢,它甚至会在发送端(取决于发送HWM设置的值,1000是默认值)也进行缓冲。在线程1中,你可以定期询问操作系统还剩多少未分配的RAM,现在你真正关心的是剩余的可用RAM是否不足(而不是有多少消息在缓冲)。如果你仍然有可用的RAM,你就不会阻塞PUSHER发送给你所有这些东西。
这也利用了每个套接字后面的管理线程来完成所有工作;由于这是库代码,你可以让它为你做缓冲,而不是让你的代码变得复杂。你可以将线程1视为与套接字管理线程异步工作(实际上是代表你处理网络连接的线程)。
英文:
It's possible that you're underestimating what ZeroMQ will do for you under the hood for free! The PUSH/PULL pattern is used to send a message from the PUSHer to an available PULLer. If you have more than 1 PULLer, the result is that if one of them is getting a bit behind the demand, ZeroMQ will automatically send fewer messages to it.
I also think that you'd be making a mistake in buffering a received message. If you'd fall behind processing it in your Thread 1, you're still going to fall behind processing it in Thread 2; you're just disguising underperformance with some extra latency. Processing messages asynchronously would be not making use of what PUSH/PULL would do for you, if you simply had both Thread 1 and Thread 2 as PULLers (instead of being pipe-lined together). Take a look at Figure 5 in this part of the guide.
In terms of backlog, you could set the high water mark on the PUSH socket to some acceptable value, and put a time-out on zmq_send(). If you detect that zmq_send() timed out, then all is not well; the PULLers have fallen behind. And, really, that's all you need to know. There is no means I know of to discover how many messages are queued up, waiting, but when one gets right down to it it'll either be too many (time out), or none at all (the PULLers are keeping up). If the PULLers started to fall behind and you wanted to detect that early, set the PUSH high water mark to some low value.
ADDED
Given that your PULLer is held up by I/O, I suggest that the simplest thing to do is to:
- get rid of thread 2 altogether, and also inproc_tx and inproc_rx as they're no longer needed,
- set the ZMQ_RCVHWM option for your
rx
PULL socket to 0 (= no limit), - just let ZMQ do all the buffering for you. It'll amount to the same thing anyway,
- Have thread 1 do the writing to disk, there's no need for that to be an async operation or anything like that.
This way, ZMQ will buffer as many messages as RAM permits (instead of the default of 1000). It'll even buffer them on the sending side (up to whatever the sending HWM is set to - 1000 is the default) too if your side starts to exhaust memory and slow down. In thread 1 you could periodically ask the OS how much unallocated RAM there is left, with a shortage of free RAM now being your actual point of concern (rather than, how many messages are buffered). If you've still got free RAM, you're not holding up the PUSHer sending you all this stuff.
It's also exploiting the management thread that lies behind each socket doing all the work; as that's library code, you may as well make it earn its keep and do the buffering for you, rather than complicating your code. You can think of your thread 1 being asynchronous to the socket management thread (which is what's actually handling the network connection for you on your behalf).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论