知道何时从多进程队列中读取了所有内容

huangapple go评论66阅读模式
英文:

Knowing when you've read everything off a multiprocessing Queue

问题

我有一些将工作分配给任务的代码。这些任务将它们的结果放在队列中,主线程从队列中读取这些结果并处理它们。

from multiprocessing import Process, Queue, Pool, Manager
import uuid

def handle_task(arg, queue, end_marker):
    ... 将一些结果添加到队列中...
    queue.put(end_marker)

def main(tasks):
    manager = Manager()
    queue = manager.Queue()
    count = len(tasks)
    end_marker = uuid.uuid4()
    with Pool() as pool:
        pool.starmap(handle_task, ((task, queue, end_marker) for task in tasks))
        while count > 0:
            value = queue.get()
            if value == end_marker:
                count -= 1
            else:
                ... 处理值 ...

这段代码可以工作,但非常笨拙和不优雅。如果tasks是一个迭代器会怎样?为什么我需要提前知道有多少任务,以及跟踪每个任务。

是否有一种更干净的方法从队列中读取数据,并知道每个将写入该线程的进程都已完成,以及您已经读取了它们写入的所有内容?

英文:

I have some code that farms out work to tasks. The tasks put their results on a queue, and the main thread reads these results from the queue and deals with them.

from multiprocessing import Process, Queue, Pool, Manager
import uuid


def handle_task(arg, queue, end_marker):
    ... add some number of results to the queue . . .
    queue.put(end_marker)

def main(tasks):
    manager = Manager()
    queue = manager.Queue()
    count = len(tasks)
    end_marker = uuid.uuid4()
    with Pool() as pool:
        pool.starmap(handle_task, ((task, queue, end_marker) for task in tasks))
        while count > 0:
            value = queue.get()
            if value == end_marker:
                count -= 1
            else:
                ... deal with value ...

This code works, but it is incredibly kludgy and inelegant. What if tasks is a iterator? Why do I need to know how many tasks there are ahead of time and keep track of each of them.

Is there a cleaner way of reading from a Queue and and knowing that every process that will write to that thread is done, and you've read everything that they've written?

答案1

得分: 0

首先,对于管理队列的操作速度比 multiprocessing.Queue 实例要慢得多。但是,为什么要在返回结果时使用额外的队列,当一个多进程池已经使用这样的队列来返回结果呢?handle_task 可以不必将一些结果值写入队列,而是可以直接返回这些值的列表。例如:

from multiprocessing import Pool

def handle_task(arg):
    results = []
    # 向结果列表中添加一些结果值:
    results.append(arg + arg)
    results.append(arg * arg)
    return results

def main(tasks):
    with Pool() as pool:
        map_results = pool.map(handle_task, tasks)
        for results in map_results:
            for value in results:
                # 处理值:
                print(value)

if __name__ == '__main__':
    main([7, 2, 3])

输出结果:

14
49
4
4
6
9

作为一个额外的好处,返回的结果将按照任务提交的顺序进行返回,这在某一天可能会很重要。如果你希望能够在结果变得可用时处理这些返回的值,那么可以使用 pool.imappool.imap_unordered(如果你不关心返回值的顺序,这似乎是适用的情况):

from multiprocessing import Pool

def handle_task(arg):
    results = []
    # 向结果列表中添加一些结果值:
    results.append(arg + arg)
    results.append(arg * arg)
    return results

def main(tasks):
    with Pool() as pool:
        for results in pool.imap_unordered(handle_task, tasks):
            for value in results:
                # 处理值:
                print(value)

if __name__ == '__main__':
    main([7, 2, 3])

如果提交的任务数量较大,那么你可能应该使用 imap_unordered 方法的 chunksize 参数。一个合理的值将是 len(tasks) / (4 * pool_size),其中对于你的池大小,默认使用了 multiprocessing.cpu_count() 的值。这更多或更少是在你使用 mapstarmap 方法时计算 chunksize 值的方式,如果你没有指定 chunksize 参数。

使用 multiprocessing.Queue 实例

from multiprocessing import Pool, Queue
from queue import Empty

def init_pool_processes(q):
    global queue
    queue = q

def handle_task(arg):
    results = []
    # 向结果列表中添加一些结果值:
    queue.put(arg + arg)  # 引用全局队列
    queue.put(arg * arg)

def main(tasks):
    queue = Queue()
    with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
        pool.map(handle_task, tasks)
        try:
            while True:
                value = queue.get_nowait()
                print(value)
        except Empty:
            pass

if __name__ == '__main__':
    main([7, 2, 3])

尽管调用 queue.empty() 对于 multiprocessing.Queue 实例来说不应该是可靠的,但只要在所有任务都完成处理之后执行这个操作,它似乎不比依赖于阻塞的获取调用只在所有项目都被检索之后才引发异常更不可靠:

from multiprocessing import Pool, Queue

def init_pool_processes(q):
    global queue
    queue = q

def handle_task(arg):
    results = []
    # 向结果列表中添加一些结果值:
    queue.put(arg + arg)  # 引用全局队列
    queue.put(arg * arg)

def main(tasks):
    queue = Queue()
    with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
        pool.map(handle_task, tasks)
        while not queue.empty():
            value = queue.get_nowait()
            print(value)

if __name__ == '__main__':
    main([7, 2, 3])

但如果你希望严格按照文档暗示的使用 multiprocessing.Queue 实例的唯一可靠方法,那就是像你已经在做的那样使用 sentinels:

from multiprocessing import Pool, Queue

class Sentinel:
    pass

SENTINEL = Sentinel()

def init_pool_processes(q):
    global queue
    queue = q

def handle_task(arg):
    results = []
    # 向结果列表中添加一些结果值:
    queue.put(arg + arg)  # 引用全局队列
    queue.put(arg * arg)
    queue.put(SENTINEL)

def main(tasks):
    queue = Queue()
    with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
        pool.map_async(handle_task, tasks)  # 不阻塞
        sentinel_count = len(tasks)
        while sentinel_count != 0:
            value = queue.get()
            if isinstance(value, Sentinel):
                sentinel_count -= 1
            else:
                print(value)

if __name__ == '__main__':
    main([7, 2, 3])

结论

如果你需要使用队列来输出结果,我建议使用 multiprocessing.Queue。在这种情况下,使用 sentinels 真的是唯一100%正确的处理方式。我也会使用 map_async 方法,这样你可以在结果返回时开始处理。

使用管理队列

这是Pingu的答案,现在保持删除状态:

from multiprocessing import Pool, Manager
from random import randint

def process(n, q):
    for x in range(randint(1, 10)):
        q.put((n, x))

def main():
    with Manager() as manager:
        queue = manager.Queue()
        with Pool() as pool:
            pool.starmap(process, [(n, queue) for n in range(5)])
        while not queue.empty():
            print(queue.get())

if __name__ == 'main':
    main()
英文:

First of all, operations on a managed queue are very slow compared to a multiprocessing.Queue instance. But why are you even using an an additional queue to return results when a multiprocessing pool already uses such a queue for returning results? Instead of having handle_task write some number of result values to a queue, it could simply return a list of these values. For example,

from multiprocessing import Pool


def handle_task(arg):
    results = []
    # Add some number of results to the results list:
    results.append(arg + arg)
    results.append(arg * arg)
    return results

def main(tasks):
    with Pool() as pool:
        map_results = pool.map(handle_task, tasks)
        for results in map_results:
            for value in results:
                # Deal with value:
                print(value)

if __name__ == '__main__':
    main([7, 2, 3])

Prints:

14
49
4
4
6
9

As a side benefit, the results returned will be in task-submission order, which one day might be important. If you want to be able to process the returned values as they become available, then you can use pool.imap or pool.imap_unordered (if you don't care about the order of the returned values, which seems to be the case):

from multiprocessing import Pool


def handle_task(arg):
    results = []
    # Add some number of results to the results list:
    results.append(arg + arg)
    results.append(arg * arg)
    return results

def main(tasks):
    with Pool() as pool:
        for results in pool.imap_unordered(handle_task, tasks):
            for value in results:
                # Deal with value:
                print(value)

if __name__ == '__main__':
    main([7, 2, 3])

If the number of tasks being submitted is "large", then you should probably use the chunksize argument of the imap_unordered method. A reasonable value would be len(tasks) / (4 * pool_size) where you are using by default a value of multiprocessing.cpu_count() for your pool size. This is more or less how a chunksize value is computed when you use the map or starmap methods and you have not specified the chunksize argument.

Using a multiprocessing.Queue instance

from multiprocessing import Pool, Queue
from queue import Empty

def init_pool_processes(q):
    global queue

    queue = q

def handle_task(arg):
    results = []
    # Add some number of results to the results list:
    queue.put(arg + arg) # Referencing the global queue
    queue.put(arg * arg)

def main(tasks):
    queue = Queue()
    with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
        pool.map(handle_task, tasks)
        try:
            while True:
                value = queue.get_nowait()
                print(value)
        except Empty:
            pass

if __name__ == '__main__':
    main([7, 2, 3])

Although callling queue.empty() is not supposed to be reliable for a multiprocessing.Queue instance, as long as you are doing this after all the tasks have finished processing, it seems no more unreliable than relying on blocking get calls raising an exception only after all items have been retrieved:

from multiprocessing import Pool, Queue

def init_pool_processes(q):
    global queue

    queue = q

def handle_task(arg):
    results = []
    # Add some number of results to the results list:
    queue.put(arg + arg) # Referencing the global queue
    queue.put(arg * arg)

def main(tasks):
    queue = Queue()
    with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
        pool.map(handle_task, tasks)
        while not queue.empty():
            value = queue.get_nowait()
            print(value)

if __name__ == '__main__':
    main([7, 2, 3])

But if you want to do everything strictly according to what the documentation implies is the only reliable method when using a multiprocessing.Queue instance, that would be by using sentinels as you already are doing:

from multiprocessing import Pool, Queue

class Sentinel:
    pass

SENTINEL = Sentinel()

def init_pool_processes(q):
    global queue

    queue = q

def handle_task(arg):
    results = []
    # Add some number of results to the results list:
    queue.put(arg + arg) # Referencing the global queue
    queue.put(arg * arg)
    queue.put(SENTINEL)

def main(tasks):
    queue = Queue()
    with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
        pool.map_async(handle_task, tasks) # Does not block
        sentinel_count = len(tasks)
        while sentinel_count != 0:
            value = queue.get()
            if isinstance(value, Sentinel):
                sentinel_count -= 1
            else:
                print(value)

if __name__ == '__main__':
    main([7, 2, 3])

Conclusion

If you need to use a queue for output, I would recommend a multiprocessing.Queue. In this case using sentinels is really the only 100% correct way of proceeding. I would also use the map_async method so that you can start processing results as they are returned.

Using a Managed Queue

This is Pingu's answer, which remains deleted for now:

from multiprocessing import Pool, Manager
from random import randint

def process(n, q):
    for x in range(randint(1, 10)):
        q.put((n, x))

def main():
    with Manager() as manager:
        queue = manager.Queue()
        with Pool() as pool:
            pool.starmap(process, [(n, queue) for n in range(5)])
        while not queue.empty():
            print(queue.get())

if __name__ == '__main__':
    main()

huangapple
  • 本文由 发表于 2023年2月10日 14:54:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/75407809.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定