Hang during queue.join() asynchronously processing a queue

huangapple go评论59阅读模式
英文:

Hang during queue.join() asynchronously processing a queue

问题

我目前正在使用multiprocessingasyncio来处理大量数据。然而,我的代码在处理完一批项目(在我这里是200个)后会随机挂起,然后不执行queue.join()来处理下一批。

根据文档:

阻塞,直到队列中的所有项目都被获取和处理。每当向队列添加项目时,未完成任务的计数会增加。每当消费者调用task_done()来指示项目已经被检索并且所有工作都已完成时,未完成任务的计数会减少。当未完成任务的计数降至零时,join()解除阻塞。

我确保对每个数据批次中的所有项目都调用了queue.task_done(),但问题仍然存在。我是否理解有误?我做错了什么?我可以改进什么?

最小可复现的代码:

import asyncio
import logging
import random
from multiprocessing import JoinableQueue, Process

N_PROCESSES = 4
N_WORKERS = 8


def create_queue(zpids: list[int]) -> JoinableQueue:
    queue = JoinableQueue()
    for zpid in zpids:
        queue.put(zpid)
    return queue


async def worker(i_process: int, queue: JoinableQueue):
    # 以200个为一批处理项目
    query_size = 200
    while True:
        batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
        if not batch:
            break

        logging.info("模拟一些任务...")

        for _ in batch:
            queue.task_done()


async def async_process(i_process: int, queue: JoinableQueue):
    logging.info(f"p:{i_process} - 启动工作线程...")
    workers = [asyncio.create_task(worker(i_process, queue)) for _ in range(N_WORKERS)]
    await asyncio.gather(*workers, return_exceptions=True)


def async_process_wrapper(i_process: int, zpids: JoinableQueue):
    asyncio.run(async_process(i_process, zpids), debug=True)


def start_processes(queue: JoinableQueue):
    for i in range(N_PROCESSES):
        Process(target=async_process_wrapper, args=(i, queue)).start()

    queue.join()


def main():
    data = [random.randrange(1, 1000) for _ in range(200000)]
    my_queue = create_queue(data)
    start_processes(my_queue)


if __name__ == "__main__":
    main()
英文:

I'm currently using multiprocessing and asyncio to process a huge amount of data. However, my code keeps randomly hanging after it finishes processing a batch of items (200 in my case) and does not do queue.join() to process the next batch.

According to the docs:
> Block until all items in the queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

I made sure to call queue.task_done() for all items in every batch of data per worker, yet it still happens. Am I understanding something incorrectly? What am I doing wrong? What can I improve?

Minimal reproducible code:

import asyncio
import logging
import random
from multiprocessing import JoinableQueue, Process

N_PROCESSES = 4
N_WORKERS = 8


def create_queue(zpids: list[int]) -> JoinableQueue:
    queue = JoinableQueue()
    for zpid in zpids:
        queue.put(zpid)
    return queue


async def worker(i_process: int, queue: JoinableQueue):
    # Process items in batch of 200
    query_size = 200
    while True:
        batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
        if not batch:
            break

        logging.info("Faking some tasks...")

        for _ in batch:
            queue.task_done()


async def async_process(i_process: int, queue: JoinableQueue):
    logging.info(f"p:{i_process} - launching workers...")
    workers = [asyncio.create_task(worker(i_process, queue)) for _ in range(N_WORKERS)]
    await asyncio.gather(*workers, return_exceptions=True)


def async_process_wrapper(i_process: int, zpids: JoinableQueue):
    asyncio.run(async_process(i_process, zpids), debug=True)


def start_processes(queue: JoinableQueue):
    for i in range(N_PROCESSES):
        Process(target=async_process_wrapper, args=(i, queue)).start()

    queue.join()


def main():
    data = [random.randrange(1, 1000) for _ in range(200000)]
    my_queue = create_queue(data)
    start_processes(my_queue)


if __name__ == "__main__":
    main()

答案1

得分: 1

qsize 不可靠worker 将在队列末尾随机引发 queue.Empty 异常,导致在成功获取到那一点的项目数量上未执行 task_done

以下更改应该解决这个问题。

from queue import Empty


async def worker(i_process: int, queue: JoinableQueue):
    # 批处理项目,每次处理 200 个
    query_size = 200
    while True:
        # batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
        batch = []
        try:
            for _ in range(query_size):
                batch.append(queue.get(timeout=0.01))
        except Empty:
            pass

        if not batch:
            break

        logging.info("模拟一些任务...")

        for _ in batch:
            queue.task_done()
英文:

qsize is not reliable. worker will randomly raise a queue.Empty exception at the end of the queue, causing task_done not to be executed for the number of items successfully fetched up to that point.

The following change should fix the problem.

from queue import Empty


async def worker(i_process: int, queue: JoinableQueue):
    # Process items in batch of 200
    query_size = 200
    while True:
        # batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
        batch = []
        try:
            for _ in range(query_size):
                batch.append(queue.get(timeout=0.01))
        except Empty:
            pass

        if not batch:
            break

        logging.info("Faking some tasks...")

        for _ in batch:
            queue.task_done()

huangapple
  • 本文由 发表于 2023年7月13日 12:01:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/76675825.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定