英文:
Overriden `Process.run` does not execute asynchronously
问题
将delay
函数异步执行,大约需要DELAY
时间。然而,事实并非如此。脚本失败,出现以下错误:
Traceback (most recent call last):
File "/home/vahvero/Desktop/tmp.py", line 54, in <module>
assert total - DELAY < 1e-2, f"Expected to take circa {DELAY}s, took {total}s"
AssertionError: Expected to take circa 2s, took 4.003754430001209s
为什么我的对run
的更改导致线性处理而不是并行处理?
英文:
Having subclassed Process.run
import multiprocessing as mp
import time
DELAY = 2
class NewProcess(mp.get_context().Process):
def run(self) -> None:
# add new kwarg to item[4] slot
old_que = self._args[0]
new_que = mp.SimpleQueue()
while not old_que.empty():
item = old_que.get()
new_que.put(
(
item[0],
item[1],
item[2], # Function
item[3], # Arguments
item[4] # Keyword arguments
| {
"message": "Hello world!",
},
)
)
# Recreate args
self._args = new_que, *self._args[1:]
# Continue as normal
super().run()
def delay(*args, **kwargs):
time.sleep(DELAY)
return args, kwargs
if __name__ == "__main__":
context = mp.get_context()
context.Process = NewProcess
with context.Pool(2) as pool:
responses = []
start = time.perf_counter()
for _ in range(2):
resp = pool.apply_async(
func=delay,
args=tuple(range(3)),
kwds={},
)
responses.append(resp)
for resp in responses:
resp.wait()
responses = [resp.get() for resp in responses]
total = time.perf_counter() - start
assert total - DELAY < 1e-2, f"Expected to take circa {DELAY}s, took {total}s"
assert responses == (
expected := list(
(
(0, 1, 2),
{
"message": "Hello world!"
}
)
)
), f"{responses=}!={expected}"
I would expect that delay
function executes asynchronously taking circa DELAY
time. However, it does not. Script fails with
Traceback (most recent call last):
File "/home/vahvero/Desktop/tmp.py", line 54, in <module>
assert total - DELAY < 1e-2, f"Expected to take circa {DELAY}s, took {total}s"
AssertionError: Expected to take circa 2s, took 4.003754430001209s
Why my changes to run
cause linear rather than parallel processing?
答案1
得分: 1
你的代码存在我认为有三个问题,其中两个较小,一个较大:
- 你所期望的
responses
值不正确,即你应该期望返回的列表中有两个元组。 - 你期望这两个任务运行的时间需要更加宽松,因为在较慢的机器上创建和初始化进程的开销可能会超过0.01秒。
- 每个进程池进程都在创建自己的新
SimpleQueue
实例。因此,我们现在有两个队列实例,每个队列上都有两个任务,总共有4个任务需要处理。这就是为什么它花费的时间是你所期望的两倍的原因。
以下代码不会创建任何新的队列实例,而是在必要时修改队列中的项目(任务)。我确保队列的重写只执行一次。代码还会打印出结果和时间:
import multiprocessing as mp
import time
from functools import partial
DELAY = 2
class NewProcess(mp.get_context().Process):
def __init__(self, rewritten, *args, **kwargs):
self._rewritten = rewritten
super().__init__(*args, **kwargs)
def run(self) -> None:
que = self._args[0]
# 添加新的关键字参数到item[4]槽位
with self._rewritten.get_lock():
# 队列是否已被重写?
if not self._rewritten.value:
items = []
while not que.empty():
item = que.get()
if not item[4]:
item = list(item)
item[4] = {"message": "Hello world!"}
item = tuple(item)
items.append(item)
for item in items:
que.put(item)
self._rewritten.value = 1
super().run()
def create_new_process(rewritten, *args, **kwargs):
return NewProcess(rewritten, *args, **kwargs)
def delay(*args, **kwargs):
time.sleep(DELAY)
return args, kwargs
if __name__ == "__main__":
context = mp.get_context()
rewritten = mp.Value('i', 0) # 队列尚未被重写
context.Process = partial(create_new_process, rewritten)
with context.Pool(2) as pool:
responses = []
start = time.time()
for _ in range(2):
resp = pool.apply_async(
func=delay,
args=tuple(range(3)),
kwds={},
)
responses.append(resp)
responses = [resp.get() for resp in responses]
total = time.time() - start
print(responses)
print(total)
# 更加宽松地容忍时间:
assert total < DELAY + .2, f"期望耗时约为{DELAY}秒,实际耗时{total}秒"
# 你期望返回2个项目:
assert responses == (
expected := [
(
(0, 1, 2),
{
"message": "Hello world!"
}
)
] * 2 # 注意这一行
), f"{responses=}!={expected}"
打印输出:
[((0, 1, 2), {'message': 'Hello world!'}), ((0, 1, 2), {'message': 'Hello world!'})]
2.1168272495269775
注意
我之所以提到了XY问题,是因为我想了解你的最终目标,并确定是否有更好、更安全的方法来实现这一目标。你的目标似乎是确保如果没有关键字参数传递给你的工作函数,那么你将提供一个默认值。如果是这样,那么肯定有一种更清晰、更简单、更高效的方法。例如,我们可以使用一个装饰器函数:
import multiprocessing as mp
import time
from functools import wraps
DELAY = 2
def provide_default_keywords(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not kwargs:
kwargs = {"message": "Hello world!"}
return f(*args, **kwargs)
return wrapper
@provide_default_keywords
def delay(*args, **kwargs):
time.sleep(DELAY)
return args, kwargs
if __name__ == "__main__":
with mp.Pool(2) as pool:
responses = []
start = time.time()
for _ in range(2):
resp = pool.apply_async(
func=delay,
args=tuple(range(3)),
kwds={},
)
responses.append(resp)
responses = [resp.get() for resp in responses]
total = time.time() - start
print(responses)
print(total)
# 更加宽松地容忍时间:
assert total < DELAY + .2, f"期望耗时约为{DELAY}秒,实际耗时{total}秒"
# 你期望返回2个项目:
assert responses == (
expected := [
(
(0, 1, 2),
{
"message": "Hello world!"
}
)
] * 2 # 注意这一行
), f"{responses=}!={expected}"
更新
请查看下面我的评论,关于使你的初始方法存在问题的竞争条件。如果你不想使用装饰器,那么更好的方法是重写apply_async
方法。在下面的代码中,我创建了一个混合类来执行此操作,它可以与多进程池或多线程池一起使用:
from multiprocessing.pool import Pool
import time
DELAY = 2
class Apply_Async_Mixin: # 可以与多进程或多线程一起使用
def apply_async(self, func, args=(), kwds={}, callback=None,
error_callback=None):
if not kwds:
kwds = {"message": "Hello world!"}
return super().apply_async(func,
args=args,
kwds=kwds,
callback=callback,
error_callback=error_callback)
# 你必须首先指定混合类:
class MyPool(Apply_Async_Mixin, Pool): # 多进程
pass
def delay(*args, **kwargs):
time.sleep(DELAY)
return args, kwargs
if __name__
<details>
<summary>英文:</summary>
You have what I believe to be 3 issues with your code, two minor and one major:
1. What you state should be the expected `responses` values is not correct, i.e. you should be expecting two (2) tuples in the returned list.
2. The time you expect the two tasks to run needs to be a bit more generous as on a slower machine the overhead of creating and initializing processes could very well be more than .01 secods.
3. Each pool process is creating its own new `SimpleQueue` instance. So instead of having a single queue instance with two items on it, we now have 2 queue instances each with two items on it for a total of 4 tasks to process. This is why it is taking twice as long as you think it should.
The following code does not create any new queue instances but instead modifies the items (tasks) on the queue if necessary. I ensure that the rewriting of the queue is performed only once. The code also prints out the results and timing:
```python
import multiprocessing as mp
import time
from functools import partial
DELAY = 2
class NewProcess(mp.get_context().Process):
def __init__(self, rewritten, *args, **kwargs):
self._rewritten = rewritten
super().__init__(*args, **kwargs)
def run(self) -> None:
que = self._args[0]
# add new kwarg to item[4] slot
with self._rewritten.get_lock():
# has the queue been rewritten?
if not self._rewritten.value:
items = []
while not que.empty():
item = que.get()
if not item[4]:
item = list(item)
item[4] = {"message": "Hello world!"}
item = tuple(item)
items.append(item)
for item in items:
que.put(item)
self._rewritten.value = 1
super().run()
def create_new_process(rewritten, *args, **kwargs):
return NewProcess(rewritten, *args, **kwargs)
def delay(*args, **kwargs):
time.sleep(DELAY)
return args, kwargs
if __name__ == "__main__":
context = mp.get_context()
rewritten = mp.Value('i', 0) # Queue has not yet been rewritten
context.Process = partial(create_new_process, rewritten)
with context.Pool(2) as pool:
responses = []
start = time.time()
for _ in range(2):
resp = pool.apply_async(
func=delay,
args=tuple(range(3)),
kwds={},
)
responses.append(resp)
"""
# This is unnecessary: # Booboo
for resp in responses:
resp.wait()
"""
responses = [resp.get() for resp in responses]
total = time.time() - start
print(responses)
print(total)
# Be a bit more time tolerant: # Booboo
assert total < DELAY + .2, f"Expected to take circa {DELAY}s, took {total}s"
# You are expecting 2 items returned:
assert responses == (
expected := [
(
(0, 1, 2),
{
"message": "Hello world!"
}
)
] * 2 # Note this line
), f"{responses=}!={expected}"
Prints:
[((0, 1, 2), {'message': 'Hello world!'}), ((0, 1, 2), {'message': 'Hello world!'})]
2.1168272495269775
Note
The reason why I mentioned in my comment the XY problem is because I wanted to understand what your ultimate goal was and to determine whether there was a better, safer way of accomplishing this. Your goal seems to be to ensure that if no keyword arguments were passed to your worker function, then you would provide a default. If so, surely there is a cleaner, simpler, more efficient way. For example, we can use a decorator function:
import multiprocessing as mp
import time
from functools import wraps
DELAY = 2
def provide_default_keywords(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not kwargs:
kwargs = {"message": "Hello world!"}
return f(*args, **kwargs)
return wrapper
@provide_default_keywords
def delay(*args, **kwargs):
time.sleep(DELAY)
return args, kwargs
if __name__ == "__main__":
with mp.Pool(2) as pool:
responses = []
start = time.time()
for _ in range(2):
resp = pool.apply_async(
func=delay,
args=tuple(range(3)),
kwds={},
)
responses.append(resp)
responses = [resp.get() for resp in responses]
total = time.time() - start
print(responses)
print(total)
# Be a bit more time tolerant: # Booboo
assert total < DELAY + .2, f"Expected to take circa {DELAY}s, took {total}s"
# You are expecting 2 items returned:
assert responses == (
expected := [
(
(0, 1, 2),
{
"message": "Hello world!"
}
)
] * 2 # Note this line
), f"{responses=}!={expected}"
Update
See my comment below concerning the race condition that renders your initial approach problematic. If you don't want to use a decorator, then better would be to override the apply_async
method. In the following code I create a mixin class to do just that and it can be used with a multiprocessing pool or a multithreading pool:
from multiprocessing.pool import Pool
import time
DELAY = 2
class Apply_Async_Mixin: # Can be used with multiprocessing or multithreading
def apply_async(self, func, args=(), kwds={}, callback=None,
error_callback=None):
if not kwds:
kwds = {"message": "Hello world!"}
return super().apply_async(func,
args=args,
kwds=kwds,
callback=callback,
error_callback=error_callback)
# You must specify the mixin first:
class MyPool(Apply_Async_Mixin, Pool): # multiprocessing
pass
def delay(*args, **kwargs):
time.sleep(DELAY)
return args, kwargs
if __name__ == "__main__":
with MyPool(2) as pool:
start = time.time()
async_results = [
pool.apply_async(
func=delay,
args=tuple(range(3)),
kwds={},
)
for _ in range(2)
]
responses = [async_result.get() for async_result in async_results]
total = time.time() - start
print(responses)
print(total)
# Be a bit more time tolerant: # Booboo
assert total < DELAY + .2, f"Expected to take circa {DELAY}s, took {total}s"
# You are expecting 2 items returned:
assert responses == (
expected := [
(
(0, 1, 2),
{
"message": "Hello world!"
}
)
] * 2 # Note this line
), f"{responses=}!={expected}"
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论