被重写的 `Process.run` 不会异步执行。

huangapple go评论102阅读模式
英文:

Overriden `Process.run` does not execute asynchronously

问题

delay函数异步执行,大约需要DELAY时间。然而,事实并非如此。脚本失败,出现以下错误:

  1. Traceback (most recent call last):
  2. File "/home/vahvero/Desktop/tmp.py", line 54, in <module>
  3. assert total - DELAY < 1e-2, f"Expected to take circa {DELAY}s, took {total}s"
  4. AssertionError: Expected to take circa 2s, took 4.003754430001209s

为什么我的对run的更改导致线性处理而不是并行处理?

英文:

Having subclassed Process.run

  1. import multiprocessing as mp
  2. import time
  3. DELAY = 2
  4. class NewProcess(mp.get_context().Process):
  5. def run(self) -&gt; None:
  6. # add new kwarg to item[4] slot
  7. old_que = self._args[0]
  8. new_que = mp.SimpleQueue()
  9. while not old_que.empty():
  10. item = old_que.get()
  11. new_que.put(
  12. (
  13. item[0],
  14. item[1],
  15. item[2], # Function
  16. item[3], # Arguments
  17. item[4] # Keyword arguments
  18. | {
  19. &quot;message&quot;: &quot;Hello world!&quot;,
  20. },
  21. )
  22. )
  23. # Recreate args
  24. self._args = new_que, *self._args[1:]
  25. # Continue as normal
  26. super().run()
  27. def delay(*args, **kwargs):
  28. time.sleep(DELAY)
  29. return args, kwargs
  30. if __name__ == &quot;__main__&quot;:
  31. context = mp.get_context()
  32. context.Process = NewProcess
  33. with context.Pool(2) as pool:
  34. responses = []
  35. start = time.perf_counter()
  36. for _ in range(2):
  37. resp = pool.apply_async(
  38. func=delay,
  39. args=tuple(range(3)),
  40. kwds={},
  41. )
  42. responses.append(resp)
  43. for resp in responses:
  44. resp.wait()
  45. responses = [resp.get() for resp in responses]
  46. total = time.perf_counter() - start
  47. assert total - DELAY &lt; 1e-2, f&quot;Expected to take circa {DELAY}s, took {total}s&quot;
  48. assert responses == (
  49. expected := list(
  50. (
  51. (0, 1, 2),
  52. {
  53. &quot;message&quot;: &quot;Hello world!&quot;
  54. }
  55. )
  56. )
  57. ), f&quot;{responses=}!={expected}&quot;

I would expect that delay function executes asynchronously taking circa DELAY time. However, it does not. Script fails with

  1. Traceback (most recent call last):
  2. File &quot;/home/vahvero/Desktop/tmp.py&quot;, line 54, in &lt;module&gt;
  3. assert total - DELAY &lt; 1e-2, f&quot;Expected to take circa {DELAY}s, took {total}s&quot;
  4. AssertionError: Expected to take circa 2s, took 4.003754430001209s

Why my changes to run cause linear rather than parallel processing?

答案1

得分: 1

你的代码存在我认为有三个问题,其中两个较小,一个较大:

  1. 你所期望的responses值不正确,即你应该期望返回的列表中有两个元组。
  2. 你期望这两个任务运行的时间需要更加宽松,因为在较慢的机器上创建和初始化进程的开销可能会超过0.01秒。
  3. 每个进程池进程都在创建自己的新SimpleQueue实例。因此,我们现在有两个队列实例,每个队列上都有两个任务,总共有4个任务需要处理。这就是为什么它花费的时间是你所期望的两倍的原因。

以下代码不会创建任何新的队列实例,而是在必要时修改队列中的项目(任务)。我确保队列的重写只执行一次。代码还会打印出结果和时间:

  1. import multiprocessing as mp
  2. import time
  3. from functools import partial
  4. DELAY = 2
  5. class NewProcess(mp.get_context().Process):
  6. def __init__(self, rewritten, *args, **kwargs):
  7. self._rewritten = rewritten
  8. super().__init__(*args, **kwargs)
  9. def run(self) -> None:
  10. que = self._args[0]
  11. # 添加新的关键字参数到item[4]槽位
  12. with self._rewritten.get_lock():
  13. # 队列是否已被重写?
  14. if not self._rewritten.value:
  15. items = []
  16. while not que.empty():
  17. item = que.get()
  18. if not item[4]:
  19. item = list(item)
  20. item[4] = {"message": "Hello world!"}
  21. item = tuple(item)
  22. items.append(item)
  23. for item in items:
  24. que.put(item)
  25. self._rewritten.value = 1
  26. super().run()
  27. def create_new_process(rewritten, *args, **kwargs):
  28. return NewProcess(rewritten, *args, **kwargs)
  29. def delay(*args, **kwargs):
  30. time.sleep(DELAY)
  31. return args, kwargs
  32. if __name__ == "__main__":
  33. context = mp.get_context()
  34. rewritten = mp.Value('i', 0) # 队列尚未被重写
  35. context.Process = partial(create_new_process, rewritten)
  36. with context.Pool(2) as pool:
  37. responses = []
  38. start = time.time()
  39. for _ in range(2):
  40. resp = pool.apply_async(
  41. func=delay,
  42. args=tuple(range(3)),
  43. kwds={},
  44. )
  45. responses.append(resp)
  46. responses = [resp.get() for resp in responses]
  47. total = time.time() - start
  48. print(responses)
  49. print(total)
  50. # 更加宽松地容忍时间:
  51. assert total < DELAY + .2, f"期望耗时约为{DELAY}秒,实际耗时{total}秒"
  52. # 你期望返回2个项目:
  53. assert responses == (
  54. expected := [
  55. (
  56. (0, 1, 2),
  57. {
  58. "message": "Hello world!"
  59. }
  60. )
  61. ] * 2 # 注意这一行
  62. ), f"{responses=}!={expected}"

打印输出:

  1. [((0, 1, 2), {'message': 'Hello world!'}), ((0, 1, 2), {'message': 'Hello world!'})]
  2. 2.1168272495269775

注意

我之所以提到了XY问题,是因为我想了解你的最终目标,并确定是否有更好、更安全的方法来实现这一目标。你的目标似乎是确保如果没有关键字参数传递给你的工作函数,那么你将提供一个默认值。如果是这样,那么肯定有一种更清晰、更简单、更高效的方法。例如,我们可以使用一个装饰器函数:

  1. import multiprocessing as mp
  2. import time
  3. from functools import wraps
  4. DELAY = 2
  5. def provide_default_keywords(f):
  6. @wraps(f)
  7. def wrapper(*args, **kwargs):
  8. if not kwargs:
  9. kwargs = {"message": "Hello world!"}
  10. return f(*args, **kwargs)
  11. return wrapper
  12. @provide_default_keywords
  13. def delay(*args, **kwargs):
  14. time.sleep(DELAY)
  15. return args, kwargs
  16. if __name__ == "__main__":
  17. with mp.Pool(2) as pool:
  18. responses = []
  19. start = time.time()
  20. for _ in range(2):
  21. resp = pool.apply_async(
  22. func=delay,
  23. args=tuple(range(3)),
  24. kwds={},
  25. )
  26. responses.append(resp)
  27. responses = [resp.get() for resp in responses]
  28. total = time.time() - start
  29. print(responses)
  30. print(total)
  31. # 更加宽松地容忍时间:
  32. assert total < DELAY + .2, f"期望耗时约为{DELAY}秒,实际耗时{total}秒"
  33. # 你期望返回2个项目:
  34. assert responses == (
  35. expected := [
  36. (
  37. (0, 1, 2),
  38. {
  39. "message": "Hello world!"
  40. }
  41. )
  42. ] * 2 # 注意这一行
  43. ), f"{responses=}!={expected}"

更新

请查看下面我的评论,关于使你的初始方法存在问题的竞争条件。如果你不想使用装饰器,那么更好的方法是重写apply_async方法。在下面的代码中,我创建了一个混合类来执行此操作,它可以与多进程池或多线程池一起使用:

  1. from multiprocessing.pool import Pool
  2. import time
  3. DELAY = 2
  4. class Apply_Async_Mixin: # 可以与多进程或多线程一起使用
  5. def apply_async(self, func, args=(), kwds={}, callback=None,
  6. error_callback=None):
  7. if not kwds:
  8. kwds = {"message": "Hello world!"}
  9. return super().apply_async(func,
  10. args=args,
  11. kwds=kwds,
  12. callback=callback,
  13. error_callback=error_callback)
  14. # 你必须首先指定混合类:
  15. class MyPool(Apply_Async_Mixin, Pool): # 多进程
  16. pass
  17. def delay(*args, **kwargs):
  18. time.sleep(DELAY)
  19. return args, kwargs
  20. if __name__
  21. <details>
  22. <summary>英文:</summary>
  23. You have what I believe to be 3 issues with your code, two minor and one major:
  24. 1. What you state should be the expected `responses` values is not correct, i.e. you should be expecting two (2) tuples in the returned list.
  25. 2. The time you expect the two tasks to run needs to be a bit more generous as on a slower machine the overhead of creating and initializing processes could very well be more than .01 secods.
  26. 3. Each pool process is creating its own new `SimpleQueue` instance. So instead of having a single queue instance with two items on it, we now have 2 queue instances each with two items on it for a total of 4 tasks to process. This is why it is taking twice as long as you think it should.
  27. The following code does not create any new queue instances but instead modifies the items (tasks) on the queue if necessary. I ensure that the rewriting of the queue is performed only once. The code also prints out the results and timing:
  28. ```python
  29. import multiprocessing as mp
  30. import time
  31. from functools import partial
  32. DELAY = 2
  33. class NewProcess(mp.get_context().Process):
  34. def __init__(self, rewritten, *args, **kwargs):
  35. self._rewritten = rewritten
  36. super().__init__(*args, **kwargs)
  37. def run(self) -&gt; None:
  38. que = self._args[0]
  39. # add new kwarg to item[4] slot
  40. with self._rewritten.get_lock():
  41. # has the queue been rewritten?
  42. if not self._rewritten.value:
  43. items = []
  44. while not que.empty():
  45. item = que.get()
  46. if not item[4]:
  47. item = list(item)
  48. item[4] = {&quot;message&quot;: &quot;Hello world!&quot;}
  49. item = tuple(item)
  50. items.append(item)
  51. for item in items:
  52. que.put(item)
  53. self._rewritten.value = 1
  54. super().run()
  55. def create_new_process(rewritten, *args, **kwargs):
  56. return NewProcess(rewritten, *args, **kwargs)
  57. def delay(*args, **kwargs):
  58. time.sleep(DELAY)
  59. return args, kwargs
  60. if __name__ == &quot;__main__&quot;:
  61. context = mp.get_context()
  62. rewritten = mp.Value(&#39;i&#39;, 0) # Queue has not yet been rewritten
  63. context.Process = partial(create_new_process, rewritten)
  64. with context.Pool(2) as pool:
  65. responses = []
  66. start = time.time()
  67. for _ in range(2):
  68. resp = pool.apply_async(
  69. func=delay,
  70. args=tuple(range(3)),
  71. kwds={},
  72. )
  73. responses.append(resp)
  74. &quot;&quot;&quot;
  75. # This is unnecessary: # Booboo
  76. for resp in responses:
  77. resp.wait()
  78. &quot;&quot;&quot;
  79. responses = [resp.get() for resp in responses]
  80. total = time.time() - start
  81. print(responses)
  82. print(total)
  83. # Be a bit more time tolerant: # Booboo
  84. assert total &lt; DELAY + .2, f&quot;Expected to take circa {DELAY}s, took {total}s&quot;
  85. # You are expecting 2 items returned:
  86. assert responses == (
  87. expected := [
  88. (
  89. (0, 1, 2),
  90. {
  91. &quot;message&quot;: &quot;Hello world!&quot;
  92. }
  93. )
  94. ] * 2 # Note this line
  95. ), f&quot;{responses=}!={expected}&quot;

Prints:

  1. [((0, 1, 2), {&#39;message&#39;: &#39;Hello world!&#39;}), ((0, 1, 2), {&#39;message&#39;: &#39;Hello world!&#39;})]
  2. 2.1168272495269775

Note

The reason why I mentioned in my comment the XY problem is because I wanted to understand what your ultimate goal was and to determine whether there was a better, safer way of accomplishing this. Your goal seems to be to ensure that if no keyword arguments were passed to your worker function, then you would provide a default. If so, surely there is a cleaner, simpler, more efficient way. For example, we can use a decorator function:

  1. import multiprocessing as mp
  2. import time
  3. from functools import wraps
  4. DELAY = 2
  5. def provide_default_keywords(f):
  6. @wraps(f)
  7. def wrapper(*args, **kwargs):
  8. if not kwargs:
  9. kwargs = {&quot;message&quot;: &quot;Hello world!&quot;}
  10. return f(*args, **kwargs)
  11. return wrapper
  12. @provide_default_keywords
  13. def delay(*args, **kwargs):
  14. time.sleep(DELAY)
  15. return args, kwargs
  16. if __name__ == &quot;__main__&quot;:
  17. with mp.Pool(2) as pool:
  18. responses = []
  19. start = time.time()
  20. for _ in range(2):
  21. resp = pool.apply_async(
  22. func=delay,
  23. args=tuple(range(3)),
  24. kwds={},
  25. )
  26. responses.append(resp)
  27. responses = [resp.get() for resp in responses]
  28. total = time.time() - start
  29. print(responses)
  30. print(total)
  31. # Be a bit more time tolerant: # Booboo
  32. assert total &lt; DELAY + .2, f&quot;Expected to take circa {DELAY}s, took {total}s&quot;
  33. # You are expecting 2 items returned:
  34. assert responses == (
  35. expected := [
  36. (
  37. (0, 1, 2),
  38. {
  39. &quot;message&quot;: &quot;Hello world!&quot;
  40. }
  41. )
  42. ] * 2 # Note this line
  43. ), f&quot;{responses=}!={expected}&quot;

Update

See my comment below concerning the race condition that renders your initial approach problematic. If you don't want to use a decorator, then better would be to override the apply_async method. In the following code I create a mixin class to do just that and it can be used with a multiprocessing pool or a multithreading pool:

  1. from multiprocessing.pool import Pool
  2. import time
  3. DELAY = 2
  4. class Apply_Async_Mixin: # Can be used with multiprocessing or multithreading
  5. def apply_async(self, func, args=(), kwds={}, callback=None,
  6. error_callback=None):
  7. if not kwds:
  8. kwds = {&quot;message&quot;: &quot;Hello world!&quot;}
  9. return super().apply_async(func,
  10. args=args,
  11. kwds=kwds,
  12. callback=callback,
  13. error_callback=error_callback)
  14. # You must specify the mixin first:
  15. class MyPool(Apply_Async_Mixin, Pool): # multiprocessing
  16. pass
  17. def delay(*args, **kwargs):
  18. time.sleep(DELAY)
  19. return args, kwargs
  20. if __name__ == &quot;__main__&quot;:
  21. with MyPool(2) as pool:
  22. start = time.time()
  23. async_results = [
  24. pool.apply_async(
  25. func=delay,
  26. args=tuple(range(3)),
  27. kwds={},
  28. )
  29. for _ in range(2)
  30. ]
  31. responses = [async_result.get() for async_result in async_results]
  32. total = time.time() - start
  33. print(responses)
  34. print(total)
  35. # Be a bit more time tolerant: # Booboo
  36. assert total &lt; DELAY + .2, f&quot;Expected to take circa {DELAY}s, took {total}s&quot;
  37. # You are expecting 2 items returned:
  38. assert responses == (
  39. expected := [
  40. (
  41. (0, 1, 2),
  42. {
  43. &quot;message&quot;: &quot;Hello world!&quot;
  44. }
  45. )
  46. ] * 2 # Note this line
  47. ), f&quot;{responses=}!={expected}&quot;

huangapple
  • 本文由 发表于 2023年5月24日 17:42:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76322147.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定