英文:
Bounded semaphore queue and processPoolExecutor
问题
最近我偶然发现了@noxdafox使用boundedSemaphore
的一个巧妙技巧,用于控制最大队列长度,以限制加载到processPoolExecutor
中的队列中进程的数量。这是我遵循的示例链接:gist。
使用processPoolExecutor
而不是threadPoolExecutor
时似乎会出现此问题,我不确定原因,也许有人知道是否有对concurrent.futures
实现进行了更改,导致我的最新尝试失败。
以下是我用于测试实现的示例代码:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore
class test:
def __init__(self):
self.processExecutor = MaxQueuePool(ProcessPoolExecutor,4,4)
def testfn(self):
msg = 'haha'
print(msg)
return msg
def testing(self):
return self.processExecutor.submit(self.testfn)
class MaxQueuePool:
"""This Class wraps a concurrent.futures.Executor
limiting the size of its task queue.
If `max_queue_size` tasks are submitted, the next call to submit will block
until a previously submitted one is completed.
"""
def __init__(self, executor, max_queue_size, max_workers=None):
self.pool = executor(max_workers=max_workers)
self.pool_queue = BoundedSemaphore(max_queue_size)
def submit(self, function, *args, **kwargs):
"""Submits a new task to the pool, blocks if Pool queue is full."""
self.pool_queue.acquire()
print('submitting to pool')
future = self.pool.submit(function, *args, **kwargs)
future.add_done_callback(self.pool_queue_callback)
return future
def pool_queue_callback(self, _):
"""Called once task is done, releases one queue slot."""
self.pool_queue.release()
if __name__ == '__main__':
thingy = test()
testthingy = thingy.testing()
wait([testthingy])
print(testthingy.result())
我得到了以下错误:
submitting to pool
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
TypeError: cannot pickle '_thread.lock' object
"""
上述异常是以下异常的直接原因:
Traceback (most recent call last):
File "C:/Users/markus.ng.yu/Downloads/testconcurrency.py", line 44, in <module>
print(testthingy.result())
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
return self.__get_result()
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
TypeError: cannot pickle '_thread.lock' object
欢迎任何关于为什么这不起作用的见解。
英文:
I recently chanced upon this neat trick by @noxdafox using a boundedSemaphore
to hold a maximum queue length to limit the number of processes in the queue that is loaded into the processPoolExecutor
. This is the link to the gist is the example that I followed.
This issue seems to occur when using processPoolExecutor
but not threadPoolExecutor
, I'm not sure what the reason and perhaps someone knows if there was a change to concurrent.futures
implementation that causes my recent attempt to fail.
Here is the sample code that I used to test out the implementation.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore
class test:
def __init__(self):
self.processExecutor = MaxQueuePool(ProcessPoolExecutor,4,4)
def testfn(self):
msg = 'haha'
print(msg)
return msg
def testing(self):
return self.processExecutor.submit(self.testfn)
class MaxQueuePool:
"""This Class wraps a concurrent.futures.Executor
limiting the size of its task queue.
If `max_queue_size` tasks are submitted, the next call to submit will block
until a previously submitted one is completed.
"""
def __init__(self, executor, max_queue_size, max_workers=None):
self.pool = executor(max_workers=max_workers)
self.pool_queue = BoundedSemaphore(max_queue_size)
def submit(self, function, *args, **kwargs):
"""Submits a new task to the pool, blocks if Pool queue is full."""
self.pool_queue.acquire()
print('submitting to pool')
future = self.pool.submit(function, *args, **kwargs)
future.add_done_callback(self.pool_queue_callback)
return future
def pool_queue_callback(self, _):
"""Called once task is done, releases one queue slot."""
self.pool_queue.release()
if __name__ == '__main__':
thingy = test()
testthingy = thingy.testing()
wait([testthingy])
print(testthingy.result())
I get the following error:
submitting to pool
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:/Users/markus.ng.yu/Downloads/testconcurrency.py", line 44, in <module>
print(testthingy.result())
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
return self.__get_result()
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
Will appreciate any insight into why this does not work :>
答案1
得分: 0
你的类在其test.__init__
方法中创建了一个MaxQueuePool
类的实例,并将其分配给属性self.processExecutor
。这个MaxQueuePool
实例包含对ProcessPoolExecutor
实例的引用。在testing
方法中,你有self.processExecutor.submit(self.testfn)
,其中self
指的是test
类的一个实例。但由于self.testfn
要在一个多进程池进程中执行,所以必须将这个test
实例发送到将执行self.testfn
的池进程的地址空间。这个序列化/反序列化过程是通过pickle
来完成的。但由于test
实例间接包含对ProcessPoolExecutor
实例的引用,而这样的实例无法被pickle化,所以你得到了你目前遇到的错误。
因此,问题是由于“worker function”(self.testfn
)是一个包含无法被pickle化的属性的类实例的方法造成的。可以通过将worker function test.testfn
更改为全局函数 testfn
来解决这个问题:
更新
以下代码演示了如何使用单个多进程池并行运行多个testfn
的执行。
testing
返回的列表理解在所有6个任务都被提交之前都不会完全计算。由于max_queue_size
值为4,所提交的前2个任务(i = 1和i = 2)必须首先完成,然后才能提交最后的2个任务(i = 5和i = 6)。前4个任务将并行运行,并且会在大致相同的时间内完成,因此最后的2个任务将在大致1秒后提交并完成。
如果池大小为N,那么max_queue_size
值小于N是没有意义的。否则,将始终有空闲的池进程。然而,max_queue_size
的命名并不准确:这个值并不代表可以等待执行的任务数。它表示池大小的总和(即当前正在执行的任务数)加上等待执行的任务数(等待任务的实际队列大小)。
在这个示例中,我们的池大小为4,max_queue_size
值为4。我们正在循环中提交6个任务。前4个将立即从任务队列中被拉出并执行。接下来的2个任务将不会被提交,直到至少有2个之前提交的任务完成。在此之前,等待执行的任务队列当前的大小为0,因此池进程将在任务完成和下一个任务可以被提交并从队列中被空闲进程处理之间的短暂间隔内处于空闲状态。因此,除非内存有问题,否则建议将max_queue_size
设置为至少池大小的两倍,以便每当进程完成执行任务时,队列中已经有另一个任务可以被拉出来处理。另一种看待这个问题的方式是,如果你愿意在N(池大小)个任务正在执行时提交和等待执行M个任务,那么将max_queue_size
设置为N + M。
请阅读代码中的注释。
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore
import time
# 这现在是一个全局函数,不再是test类的方法:
def testfn(n):
time.sleep(1)
msg = f'haha{n}'
return msg, time.time() # 添加完成时间
# 类名通常首字母大写:
class Test:
def __init__(self):
self.processExecutor = MaxQueuePool(ProcessPoolExecutor, 4, 4)
def testing(self):
# 提交6个任务。
return [self.processExecutor.submit(testfn, i) for i in range(1, 7)]
class MaxQueuePool:
"""这个类封装了一个concurrent.futures.Executor,限制了其任务队列的大小。
如果提交了`max_queue_size`个任务,下一个submit调用将被阻塞,直到之前提交的任务完成。
"""
def __init__(self, executor, max_queue_size, max_workers=None):
self.pool = executor(max_workers=max_workers)
self.pool_queue = BoundedSemaphore(max_queue_size)
def submit(self, function, *args, **kwargs):
"""提交一个新任务到池中,如果池队列已满,则会阻塞。"""
self.pool_queue.acquire()
print('submitting to pool at time', time.time())
future = self.pool.submit(function, *args, **kwargs)
future.add_done_callback(self.pool_queue_callback)
return future
def pool_queue_callback(self, _):
"""任务完成后调用,释放一个队列槽位。"""
self.pool_queue.release()
if __name__ == '__main__':
tester = Test()
futures = tester.testing()
for future in futures:
print(future.result())
打印输出:
submitting to pool at time 1685373628.8266022
submitting to pool at time 1685373628.8516064
submitting to pool at time 1685373628.8526037
submitting to pool at time 1685373628.853605
submitting to pool at time 1685373629.978602
submitting to pool at time 1685373629.9806027
('haha1', 1685373629.978602)
('haha2', 1685373629.978602)
('haha3', 1685373629.978602)
('haha4', 1685373629.978602)
('haha5', 1685373630.99162)
('haha6', 1685373630.99162)
当你使用ThreadPoolExecutor
时,无需序列化/反序列化test
实例,因为提交的任务在主线程的地址空间中运行。
英文:
Your class creates an instance of class MaxQueuePool
in its test.__init__
method and assigns it to property self.processExecutor
. This MaxQueuePool
instance contains a reference to a ProcessPoolExecutor
instance. In method testing
you have self.processExecutor.submit(self.testfn)
where self
refers to an instance of class test
. But since self.testfn
is to be executed in a multiprocessing pool process, this test
instance must be sent to the address space of the pool process that will execute the self.testfn
"worker function." This serialization/de-serialization is done using pickle
. But since the test
instance indirectly contains a reference to a ProcessPoolExecutor
instance and such an instance cannot be pickled, you get the error you are getting.
So the problem is caused by the "worker function" (self.testfn
) being a method of a class instance that contains properties that cannot be pickled. This can be fixed by changing the worker function, test.testfn
, to be a global function, testfn
, instead:
Update
The following code demonstrates how I would run multiple executions of testfn
in parallel using a single multiprocessing pool.
The list comprehension returned by testing
will not be fully computed until all 6 tasks have been submitted.
Because the max_queue_size
value is 4, the first 2 tasks (for i = 1 and i = 2)
submitted will have to first finish before the last 2 tasks (for i = 5 and i = 6) can been submitted.
The first 4 tasks will run in parallel and will complete more or less at the same time
and so the final 2 tasks will be submitted and complete more or less at the same time,
which should be approximately 1 second later than the first 4 tasks.
If you have a pool size of N, it makes no sense for the max_queue_size
value
to be less than N. Otherwise, you will always have pool processes that are idle.
However, max_queue_size
is not accurately named:
This value does not represent the number of tasks that can be sitting in the
task queue waiting to be executed. It represents the sum of the pool size (i.e. the
number of tasks currently being executed) plus the number of tasks waiting to be executed
(the actual queue size of waiting tasks).
If this example we have a pool size of 4 and a max_queue_size
value of 4.
We are submitting 6 tasks in a loop. The first 4 will immediately be pulled off the task queue
an be executed. The next 2 tasks will not be submitted until at least 2 of the previously submitted
tasks complete. Until that happens, the current queue size of tasks waiting to be executed is 0 and therefore
pool processes will be idle for a short interval between the time a task completes and the next task can
be submitted and pulled off the queue to be processed by the idle process.
Therefore, unless memory is a problem, I would advise setting max_queue_size
to at least
two times the pool size so that whenever a process completes executing a task there is already
another task on the queue it can pull of to process.
Another way of looking ar this is if you are willing to have M tasks submitted and
waiting to execute while N (the pool size) tasks are currently being executed, then set max_queue_size
to N + M.
Please read the comments in the code.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore
import time
# This is now a global function and no longer a method of class test:
def testfn(n):
time.sleep(1)
msg = f'haha{n}'
return msg, time.time() # add completion time
# Class names are typically capitalized:
class Test:
def __init__(self):
self.processExecutor = MaxQueuePool(ProcessPoolExecutor, 4, 4)
def testing(self):
# Submit 6 tasks.
return [self.processExecutor.submit(testfn, i) for i in range(1, 7)]
class MaxQueuePool:
"""This Class wraps a concurrent.futures.Executor
limiting the size of its task queue.
If `max_queue_size` tasks are submitted, the next call to submit will block
until a previously submitted one is completed.
"""
def __init__(self, executor, max_queue_size, max_workers=None):
self.pool = executor(max_workers=max_workers)
self.pool_queue = BoundedSemaphore(max_queue_size)
def submit(self, function, *args, **kwargs):
"""Submits a new task to the pool, blocks if Pool queue is full."""
self.pool_queue.acquire()
print('submitting to pool at time', time.time())
future = self.pool.submit(function, *args, **kwargs)
future.add_done_callback(self.pool_queue_callback)
return future
def pool_queue_callback(self, _):
"""Called once task is done, releases one queue slot."""
self.pool_queue.release()
if __name__ == '__main__':
tester = Test()
futures = tester.testing()
for future in futures:
print(future.result())
Prints:
submitting to pool at time 1685373628.8266022
submitting to pool at time 1685373628.8516064
submitting to pool at time 1685373628.8526037
submitting to pool at time 1685373628.853605
submitting to pool at time 1685373629.978602
submitting to pool at time 1685373629.9806027
('haha1', 1685373629.978602)
('haha2', 1685373629.978602)
('haha3', 1685373629.978602)
('haha4', 1685373629.978602)
('haha5', 1685373630.99162)
('haha6', 1685373630.99162)
When you were using a ThreadPoolExecutor
, there was no need to serialize/de-serialize the test
instance since submitted tasks run in the address space of the main thread.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论