有界信号量队列和ProcessPoolExecutor

huangapple go评论85阅读模式
英文:

Bounded semaphore queue and processPoolExecutor

问题

最近我偶然发现了@noxdafox使用boundedSemaphore的一个巧妙技巧,用于控制最大队列长度,以限制加载到processPoolExecutor中的队列中进程的数量。这是我遵循的示例链接:gist

使用processPoolExecutor而不是threadPoolExecutor时似乎会出现此问题,我不确定原因,也许有人知道是否有对concurrent.futures实现进行了更改,导致我的最新尝试失败。

以下是我用于测试实现的示例代码:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore

class test:
    def __init__(self):
        self.processExecutor = MaxQueuePool(ProcessPoolExecutor,4,4)
        
    def testfn(self):
        msg = 'haha'
        print(msg)
        return msg
    
    def testing(self):
        return self.processExecutor.submit(self.testfn)

class MaxQueuePool:
    """This Class wraps a concurrent.futures.Executor
    limiting the size of its task queue.
    If `max_queue_size` tasks are submitted, the next call to submit will block
    until a previously submitted one is completed.
    """
    def __init__(self, executor, max_queue_size, max_workers=None):
        self.pool = executor(max_workers=max_workers)
        self.pool_queue = BoundedSemaphore(max_queue_size)

    def submit(self, function, *args, **kwargs):
        """Submits a new task to the pool, blocks if Pool queue is full."""
        self.pool_queue.acquire()
        print('submitting to pool')
        
        future = self.pool.submit(function, *args, **kwargs)
        future.add_done_callback(self.pool_queue_callback)

        return future

    def pool_queue_callback(self, _):
        """Called once task is done, releases one queue slot."""
        self.pool_queue.release()

if __name__ == '__main__':
    thingy = test()
    testthingy = thingy.testing()
    wait([testthingy])
    print(testthingy.result())

我得到了以下错误:

submitting to pool
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
TypeError: cannot pickle '_thread.lock' object
"""

上述异常是以下异常的直接原因

Traceback (most recent call last):
  File "C:/Users/markus.ng.yu/Downloads/testconcurrency.py", line 44, in <module>
    print(testthingy.result())
  File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
    return self.__get_result()
  File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
TypeError: cannot pickle '_thread.lock' object

欢迎任何关于为什么这不起作用的见解。

英文:

I recently chanced upon this neat trick by @noxdafox using a boundedSemaphore to hold a maximum queue length to limit the number of processes in the queue that is loaded into the processPoolExecutor. This is the link to the gist is the example that I followed.

This issue seems to occur when using processPoolExecutor but not threadPoolExecutor, I'm not sure what the reason and perhaps someone knows if there was a change to concurrent.futures implementation that causes my recent attempt to fail.

Here is the sample code that I used to test out the implementation.

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore

class test:
    def __init__(self):
        self.processExecutor = MaxQueuePool(ProcessPoolExecutor,4,4)
        
    def testfn(self):
        msg = &#39;haha&#39;
        print(msg)
        return msg
    
    def testing(self):
        return self.processExecutor.submit(self.testfn)

class MaxQueuePool:
    &quot;&quot;&quot;This Class wraps a concurrent.futures.Executor
    limiting the size of its task queue.
    If `max_queue_size` tasks are submitted, the next call to submit will block
    until a previously submitted one is completed.
    &quot;&quot;&quot;
    def __init__(self, executor, max_queue_size, max_workers=None):
        self.pool = executor(max_workers=max_workers)
        self.pool_queue = BoundedSemaphore(max_queue_size)

    def submit(self, function, *args, **kwargs):
        &quot;&quot;&quot;Submits a new task to the pool, blocks if Pool queue is full.&quot;&quot;&quot;
        self.pool_queue.acquire()
        print(&#39;submitting to pool&#39;)
        
        future = self.pool.submit(function, *args, **kwargs)
        future.add_done_callback(self.pool_queue_callback)

        return future

    def pool_queue_callback(self, _):
        &quot;&quot;&quot;Called once task is done, releases one queue slot.&quot;&quot;&quot;
        self.pool_queue.release()

if __name__ == &#39;__main__&#39;:
    thingy = test()
    testthingy = thingy.testing()
    wait([testthingy])
    print(testthingy.result())

I get the following error:

submitting to pool
concurrent.futures.process._RemoteTraceback: 
&quot;&quot;&quot;
Traceback (most recent call last):
  File &quot;C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py&quot;, line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File &quot;C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\reduction.py&quot;, line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle &#39;_thread.lock&#39; object
&quot;&quot;&quot;

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File &quot;C:/Users/markus.ng.yu/Downloads/testconcurrency.py&quot;, line 44, in &lt;module&gt;
    print(testthingy.result())
  File &quot;C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py&quot;, line 449, in result
    return self.__get_result()
  File &quot;C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py&quot;, line 401, in __get_result
    raise self._exception
  File &quot;C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py&quot;, line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File &quot;C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\reduction.py&quot;, line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle &#39;_thread.lock&#39; object

Will appreciate any insight into why this does not work :>

答案1

得分: 0

你的类在其test.__init__方法中创建了一个MaxQueuePool类的实例,并将其分配给属性self.processExecutor。这个MaxQueuePool实例包含对ProcessPoolExecutor实例的引用。在testing方法中,你有self.processExecutor.submit(self.testfn),其中self指的是test类的一个实例。但由于self.testfn要在一个多进程池进程中执行,所以必须将这个test实例发送到将执行self.testfn的池进程的地址空间。这个序列化/反序列化过程是通过pickle来完成的。但由于test实例间接包含对ProcessPoolExecutor实例的引用,而这样的实例无法被pickle化,所以你得到了你目前遇到的错误。

因此,问题是由于“worker function”(self.testfn)是一个包含无法被pickle化的属性的类实例的方法造成的。可以通过将worker function test.testfn 更改为全局函数 testfn 来解决这个问题:

更新

以下代码演示了如何使用单个多进程池并行运行多个testfn的执行。

testing返回的列表理解在所有6个任务都被提交之前都不会完全计算。由于max_queue_size值为4,所提交的前2个任务(i = 1和i = 2)必须首先完成,然后才能提交最后的2个任务(i = 5和i = 6)。前4个任务将并行运行,并且会在大致相同的时间内完成,因此最后的2个任务将在大致1秒后提交并完成。

如果池大小为N,那么max_queue_size值小于N是没有意义的。否则,将始终有空闲的池进程。然而,max_queue_size的命名并不准确:这个值并不代表可以等待执行的任务数。它表示池大小的总和(即当前正在执行的任务数)加上等待执行的任务数(等待任务的实际队列大小)。

在这个示例中,我们的池大小为4,max_queue_size值为4。我们正在循环中提交6个任务。前4个将立即从任务队列中被拉出并执行。接下来的2个任务将不会被提交,直到至少有2个之前提交的任务完成。在此之前,等待执行的任务队列当前的大小为0,因此池进程将在任务完成和下一个任务可以被提交并从队列中被空闲进程处理之间的短暂间隔内处于空闲状态。因此,除非内存有问题,否则建议将max_queue_size设置为至少池大小的两倍,以便每当进程完成执行任务时,队列中已经有另一个任务可以被拉出来处理。另一种看待这个问题的方式是,如果你愿意在N(池大小)个任务正在执行时提交和等待执行M个任务,那么将max_queue_size设置为N + M。

请阅读代码中的注释。

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore
import time

# 这现在是一个全局函数,不再是test类的方法:
def testfn(n):

    time.sleep(1)
    msg = f'haha{n}'
    return msg, time.time() # 添加完成时间

# 类名通常首字母大写:
class Test:
    def __init__(self):
        self.processExecutor = MaxQueuePool(ProcessPoolExecutor, 4, 4)

    def testing(self):
        # 提交6个任务。
        return [self.processExecutor.submit(testfn, i) for i in range(1, 7)]

class MaxQueuePool:
    """这个类封装了一个concurrent.futures.Executor,限制了其任务队列的大小。
    如果提交了`max_queue_size`个任务,下一个submit调用将被阻塞,直到之前提交的任务完成。
    """
    def __init__(self, executor, max_queue_size, max_workers=None):
        self.pool = executor(max_workers=max_workers)
        self.pool_queue = BoundedSemaphore(max_queue_size)

    def submit(self, function, *args, **kwargs):
        """提交一个新任务到池中,如果池队列已满,则会阻塞。"""
        self.pool_queue.acquire()
        print('submitting to pool at time', time.time())

        future = self.pool.submit(function, *args, **kwargs)
        future.add_done_callback(self.pool_queue_callback)

        return future

    def pool_queue_callback(self, _):
        """任务完成后调用,释放一个队列槽位。"""
        self.pool_queue.release()

if __name__ == '__main__':
    tester = Test()
    futures = tester.testing()
    for future in futures:
        print(future.result())

打印输出:

submitting to pool at time 1685373628.8266022
submitting to pool at time 1685373628.8516064
submitting to pool at time 1685373628.8526037
submitting to pool at time 1685373628.853605
submitting to pool at time 1685373629.978602
submitting to pool at time 1685373629.9806027
('haha1', 1685373629.978602)
('haha2', 1685373629.978602)
('haha3', 1685373629.978602)
('haha4', 1685373629.978602)
('haha5', 1685373630.99162)
('haha6', 1685373630.99162)

当你使用ThreadPoolExecutor时,无需序列化/反序列化test实例,因为提交的任务在主线程的地址空间中运行。

英文:

Your class creates an instance of class MaxQueuePool in its test.__init__ method and assigns it to property self.processExecutor. This MaxQueuePool instance contains a reference to a ProcessPoolExecutor instance. In method testing you have self.processExecutor.submit(self.testfn) where self refers to an instance of class test. But since self.testfn is to be executed in a multiprocessing pool process, this test instance must be sent to the address space of the pool process that will execute the self.testfn "worker function." This serialization/de-serialization is done using pickle. But since the test instance indirectly contains a reference to a ProcessPoolExecutor instance and such an instance cannot be pickled, you get the error you are getting.

So the problem is caused by the "worker function" (self.testfn) being a method of a class instance that contains properties that cannot be pickled. This can be fixed by changing the worker function, test.testfn, to be a global function, testfn, instead:

Update

The following code demonstrates how I would run multiple executions of testfn in parallel using a single multiprocessing pool.

The list comprehension returned by testing will not be fully computed until all 6 tasks have been submitted.
Because the max_queue_size value is 4, the first 2 tasks (for i = 1 and i = 2)
submitted will have to first finish before the last 2 tasks (for i = 5 and i = 6) can been submitted.
The first 4 tasks will run in parallel and will complete more or less at the same time
and so the final 2 tasks will be submitted and complete more or less at the same time,
which should be approximately 1 second later than the first 4 tasks.

If you have a pool size of N, it makes no sense for the max_queue_size value
to be less than N. Otherwise, you will always have pool processes that are idle.
However, max_queue_size is not accurately named:
This value does not represent the number of tasks that can be sitting in the
task queue waiting to be executed. It represents the sum of the pool size (i.e. the
number of tasks currently being executed) plus the number of tasks waiting to be executed
(the actual queue size of waiting tasks).

If this example we have a pool size of 4 and a max_queue_size value of 4.
We are submitting 6 tasks in a loop. The first 4 will immediately be pulled off the task queue
an be executed. The next 2 tasks will not be submitted until at least 2 of the previously submitted
tasks complete. Until that happens, the current queue size of tasks waiting to be executed is 0 and therefore
pool processes will be idle for a short interval between the time a task completes and the next task can
be submitted and pulled off the queue to be processed by the idle process.
Therefore, unless memory is a problem, I would advise setting max_queue_size to at least
two times the pool size so that whenever a process completes executing a task there is already
another task on the queue it can pull of to process.
Another way of looking ar this is if you are willing to have M tasks submitted and
waiting to execute while N (the pool size) tasks are currently being executed, then set max_queue_size to N + M.

Please read the comments in the code.

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore
import time

# This is now a global function and no longer a method of class test:
def testfn(n):

    time.sleep(1)
    msg = f&#39;haha{n}&#39;
    return msg, time.time() # add completion time

# Class names are typically capitalized:
class Test:
    def __init__(self):
        self.processExecutor = MaxQueuePool(ProcessPoolExecutor, 4, 4)

    def testing(self):
        # Submit 6 tasks.
        return [self.processExecutor.submit(testfn, i) for i in range(1, 7)]

class MaxQueuePool:
    &quot;&quot;&quot;This Class wraps a concurrent.futures.Executor
    limiting the size of its task queue.
    If `max_queue_size` tasks are submitted, the next call to submit will block
    until a previously submitted one is completed.
    &quot;&quot;&quot;
    def __init__(self, executor, max_queue_size, max_workers=None):
        self.pool = executor(max_workers=max_workers)
        self.pool_queue = BoundedSemaphore(max_queue_size)

    def submit(self, function, *args, **kwargs):
        &quot;&quot;&quot;Submits a new task to the pool, blocks if Pool queue is full.&quot;&quot;&quot;
        self.pool_queue.acquire()
        print(&#39;submitting to pool at time&#39;, time.time())

        future = self.pool.submit(function, *args, **kwargs)
        future.add_done_callback(self.pool_queue_callback)

        return future

    def pool_queue_callback(self, _):
        &quot;&quot;&quot;Called once task is done, releases one queue slot.&quot;&quot;&quot;
        self.pool_queue.release()

if __name__ == &#39;__main__&#39;:
    tester = Test()
    futures = tester.testing()
    for future in futures:
        print(future.result())

Prints:

submitting to pool at time 1685373628.8266022
submitting to pool at time 1685373628.8516064
submitting to pool at time 1685373628.8526037
submitting to pool at time 1685373628.853605
submitting to pool at time 1685373629.978602
submitting to pool at time 1685373629.9806027
(&#39;haha1&#39;, 1685373629.978602)
(&#39;haha2&#39;, 1685373629.978602)
(&#39;haha3&#39;, 1685373629.978602)
(&#39;haha4&#39;, 1685373629.978602)
(&#39;haha5&#39;, 1685373630.99162)
(&#39;haha6&#39;, 1685373630.99162)

When you were using a ThreadPoolExecutor, there was no need to serialize/de-serialize the test instance since submitted tasks run in the address space of the main thread.

huangapple
  • 本文由 发表于 2023年5月28日 17:44:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76350853.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定