英文:
Hang during queue.join() asynchronously processing a queue
问题
我目前正在使用multiprocessing
和asyncio
来处理大量数据。然而,我的代码在处理完一批项目(在我这里是200个)后会随机挂起,然后不执行queue.join()
来处理下一批。
根据文档:
阻塞,直到队列中的所有项目都被获取和处理。每当向队列添加项目时,未完成任务的计数会增加。每当消费者调用
task_done()
来指示项目已经被检索并且所有工作都已完成时,未完成任务的计数会减少。当未完成任务的计数降至零时,join()
解除阻塞。
我确保对每个数据批次中的所有项目都调用了queue.task_done()
,但问题仍然存在。我是否理解有误?我做错了什么?我可以改进什么?
最小可复现的代码:
import asyncio
import logging
import random
from multiprocessing import JoinableQueue, Process
N_PROCESSES = 4
N_WORKERS = 8
def create_queue(zpids: list[int]) -> JoinableQueue:
queue = JoinableQueue()
for zpid in zpids:
queue.put(zpid)
return queue
async def worker(i_process: int, queue: JoinableQueue):
# 以200个为一批处理项目
query_size = 200
while True:
batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
if not batch:
break
logging.info("模拟一些任务...")
for _ in batch:
queue.task_done()
async def async_process(i_process: int, queue: JoinableQueue):
logging.info(f"p:{i_process} - 启动工作线程...")
workers = [asyncio.create_task(worker(i_process, queue)) for _ in range(N_WORKERS)]
await asyncio.gather(*workers, return_exceptions=True)
def async_process_wrapper(i_process: int, zpids: JoinableQueue):
asyncio.run(async_process(i_process, zpids), debug=True)
def start_processes(queue: JoinableQueue):
for i in range(N_PROCESSES):
Process(target=async_process_wrapper, args=(i, queue)).start()
queue.join()
def main():
data = [random.randrange(1, 1000) for _ in range(200000)]
my_queue = create_queue(data)
start_processes(my_queue)
if __name__ == "__main__":
main()
英文:
I'm currently using multiprocessing
and asyncio
to process a huge amount of data. However, my code keeps randomly hanging after it finishes processing a batch of items (200 in my case) and does not do queue.join()
to process the next batch.
According to the docs:
> Block until all items in the queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
I made sure to call queue.task_done()
for all items in every batch of data per worker, yet it still happens. Am I understanding something incorrectly? What am I doing wrong? What can I improve?
Minimal reproducible code:
import asyncio
import logging
import random
from multiprocessing import JoinableQueue, Process
N_PROCESSES = 4
N_WORKERS = 8
def create_queue(zpids: list[int]) -> JoinableQueue:
queue = JoinableQueue()
for zpid in zpids:
queue.put(zpid)
return queue
async def worker(i_process: int, queue: JoinableQueue):
# Process items in batch of 200
query_size = 200
while True:
batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
if not batch:
break
logging.info("Faking some tasks...")
for _ in batch:
queue.task_done()
async def async_process(i_process: int, queue: JoinableQueue):
logging.info(f"p:{i_process} - launching workers...")
workers = [asyncio.create_task(worker(i_process, queue)) for _ in range(N_WORKERS)]
await asyncio.gather(*workers, return_exceptions=True)
def async_process_wrapper(i_process: int, zpids: JoinableQueue):
asyncio.run(async_process(i_process, zpids), debug=True)
def start_processes(queue: JoinableQueue):
for i in range(N_PROCESSES):
Process(target=async_process_wrapper, args=(i, queue)).start()
queue.join()
def main():
data = [random.randrange(1, 1000) for _ in range(200000)]
my_queue = create_queue(data)
start_processes(my_queue)
if __name__ == "__main__":
main()
答案1
得分: 1
qsize 不可靠。worker
将在队列末尾随机引发 queue.Empty
异常,导致在成功获取到那一点的项目数量上未执行 task_done
。
以下更改应该解决这个问题。
from queue import Empty
async def worker(i_process: int, queue: JoinableQueue):
# 批处理项目,每次处理 200 个
query_size = 200
while True:
# batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
batch = []
try:
for _ in range(query_size):
batch.append(queue.get(timeout=0.01))
except Empty:
pass
if not batch:
break
logging.info("模拟一些任务...")
for _ in batch:
queue.task_done()
英文:
qsize is not reliable. worker
will randomly raise a queue.Empty
exception at the end of the queue, causing task_done
not to be executed for the number of items successfully fetched up to that point.
The following change should fix the problem.
from queue import Empty
async def worker(i_process: int, queue: JoinableQueue):
# Process items in batch of 200
query_size = 200
while True:
# batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
batch = []
try:
for _ in range(query_size):
batch.append(queue.get(timeout=0.01))
except Empty:
pass
if not batch:
break
logging.info("Faking some tasks...")
for _ in batch:
queue.task_done()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论