被重写的 `Process.run` 不会异步执行。

huangapple go评论52阅读模式
英文:

Overriden `Process.run` does not execute asynchronously

问题

delay函数异步执行,大约需要DELAY时间。然而,事实并非如此。脚本失败,出现以下错误:

Traceback (most recent call last):
  File "/home/vahvero/Desktop/tmp.py", line 54, in <module>
    assert total - DELAY < 1e-2, f"Expected to take circa {DELAY}s, took {total}s"
AssertionError: Expected to take circa 2s, took 4.003754430001209s

为什么我的对run的更改导致线性处理而不是并行处理?

英文:

Having subclassed Process.run

import multiprocessing as mp
import time

DELAY = 2

class NewProcess(mp.get_context().Process):
    def run(self) -&gt; None:
        # add new kwarg to item[4] slot
        old_que = self._args[0]
        new_que = mp.SimpleQueue()
        while not old_que.empty():
            item = old_que.get()
            new_que.put(
                (
                    item[0],
                    item[1],
                    item[2], # Function
                    item[3], # Arguments
                    item[4]  # Keyword arguments
                    | {
                        &quot;message&quot;: &quot;Hello world!&quot;,
                    },
                )
            )
        # Recreate args
        self._args = new_que, *self._args[1:]
        # Continue as normal
        super().run()


def delay(*args, **kwargs):
    time.sleep(DELAY)
    return args, kwargs


if __name__ == &quot;__main__&quot;:
    context = mp.get_context()
    context.Process = NewProcess
    with context.Pool(2) as pool:
        responses = []
        start = time.perf_counter()
        for _ in range(2):
            resp = pool.apply_async(
                func=delay,
                args=tuple(range(3)),
                kwds={},
            )

            responses.append(resp)
        for resp in responses:
            resp.wait()

        responses = [resp.get() for resp in  responses]
        total = time.perf_counter() - start

        assert total - DELAY &lt; 1e-2, f&quot;Expected to take circa {DELAY}s, took {total}s&quot;

        assert responses == (
            expected := list(
                (
                    (0, 1, 2),
                    {
                        &quot;message&quot;: &quot;Hello world!&quot;
                    }
                )
            )
        ), f&quot;{responses=}!={expected}&quot;

I would expect that delay function executes asynchronously taking circa DELAY time. However, it does not. Script fails with

Traceback (most recent call last):
File &quot;/home/vahvero/Desktop/tmp.py&quot;, line 54, in &lt;module&gt;
assert total - DELAY &lt; 1e-2, f&quot;Expected to take circa {DELAY}s, took {total}s&quot;
AssertionError: Expected to take circa 2s, took 4.003754430001209s

Why my changes to run cause linear rather than parallel processing?

答案1

得分: 1

你的代码存在我认为有三个问题,其中两个较小,一个较大:

  1. 你所期望的responses值不正确,即你应该期望返回的列表中有两个元组。
  2. 你期望这两个任务运行的时间需要更加宽松,因为在较慢的机器上创建和初始化进程的开销可能会超过0.01秒。
  3. 每个进程池进程都在创建自己的新SimpleQueue实例。因此,我们现在有两个队列实例,每个队列上都有两个任务,总共有4个任务需要处理。这就是为什么它花费的时间是你所期望的两倍的原因。

以下代码不会创建任何新的队列实例,而是在必要时修改队列中的项目(任务)。我确保队列的重写只执行一次。代码还会打印出结果和时间:

import multiprocessing as mp
import time
from functools import partial

DELAY = 2

class NewProcess(mp.get_context().Process):
    def __init__(self, rewritten, *args, **kwargs):
        self._rewritten = rewritten
        super().__init__(*args, **kwargs)

    def run(self) -> None:
        que = self._args[0]
        # 添加新的关键字参数到item[4]槽位
        with self._rewritten.get_lock():
            # 队列是否已被重写?
            if not self._rewritten.value:
                items = []
                while not que.empty():
                    item = que.get()
                    if not item[4]:
                        item = list(item)
                        item[4] = {"message": "Hello world!"}
                        item = tuple(item)
                    items.append(item)
                for item in items:
                    que.put(item)
                self._rewritten.value = 1
        super().run()

def create_new_process(rewritten, *args, **kwargs):
    return NewProcess(rewritten, *args, **kwargs)

def delay(*args, **kwargs):
    time.sleep(DELAY)
    return args, kwargs

if __name__ == "__main__":
    context = mp.get_context()
    rewritten = mp.Value('i', 0) # 队列尚未被重写
    context.Process = partial(create_new_process, rewritten)
    with context.Pool(2) as pool:
        responses = []
        start = time.time()
        for _ in range(2):
            resp = pool.apply_async(
                func=delay,
                args=tuple(range(3)),
                kwds={},
            )

            responses.append(resp)
        
        responses = [resp.get() for resp in responses]
        total = time.time() - start
        print(responses)
        print(total)

        # 更加宽松地容忍时间:
        assert total < DELAY + .2, f"期望耗时约为{DELAY}秒,实际耗时{total}秒"

        # 你期望返回2个项目:
        assert responses == (
            expected := [
                (
                    (0, 1, 2),
                    {
                        "message": "Hello world!"
                    }
                )
            ] * 2 # 注意这一行
        ), f"{responses=}!={expected}"

打印输出:

[((0, 1, 2), {'message': 'Hello world!'}), ((0, 1, 2), {'message': 'Hello world!'})]
2.1168272495269775

注意

我之所以提到了XY问题,是因为我想了解你的最终目标,并确定是否有更好、更安全的方法来实现这一目标。你的目标似乎是确保如果没有关键字参数传递给你的工作函数,那么你将提供一个默认值。如果是这样,那么肯定有一种更清晰、更简单、更高效的方法。例如,我们可以使用一个装饰器函数:

import multiprocessing as mp
import time
from functools import wraps

DELAY = 2

def provide_default_keywords(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        if not kwargs:
            kwargs = {"message": "Hello world!"}
        return f(*args, **kwargs)
    return wrapper

@provide_default_keywords
def delay(*args, **kwargs):
    time.sleep(DELAY)
    return args, kwargs

if __name__ == "__main__":
    with mp.Pool(2) as pool:
        responses = []
        start = time.time()
        for _ in range(2):
            resp = pool.apply_async(
                func=delay,
                args=tuple(range(3)),
                kwds={},
            )

            responses.append(resp)

        responses = [resp.get() for resp in responses]
        total = time.time() - start
        print(responses)
        print(total)

        # 更加宽松地容忍时间:
        assert total < DELAY + .2, f"期望耗时约为{DELAY}秒,实际耗时{total}秒"

        # 你期望返回2个项目:
        assert responses == (
            expected := [
                (
                    (0, 1, 2),
                    {
                        "message": "Hello world!"
                    }
                )
            ] * 2 # 注意这一行
        ), f"{responses=}!={expected}"

更新

请查看下面我的评论,关于使你的初始方法存在问题的竞争条件。如果你不想使用装饰器,那么更好的方法是重写apply_async方法。在下面的代码中,我创建了一个混合类来执行此操作,它可以与多进程池或多线程池一起使用:

from multiprocessing.pool import Pool
import time

DELAY = 2

class Apply_Async_Mixin: # 可以与多进程或多线程一起使用
    def apply_async(self, func, args=(), kwds={}, callback=None,
            error_callback=None):
        if not kwds:
            kwds = {"message": "Hello world!"}
        return super().apply_async(func,
                                   args=args,
                                   kwds=kwds,
                                   callback=callback,
                                   error_callback=error_callback)

# 你必须首先指定混合类:
class MyPool(Apply_Async_Mixin, Pool): # 多进程
    pass

def delay(*args, **kwargs):
    time.sleep(DELAY)
    return args, kwargs

if __name__

<details>
<summary>英文:</summary>

You have what I believe to be 3 issues with your code, two minor and one major:

1. What you state should be the expected `responses` values is not correct, i.e. you should be expecting two (2) tuples in the returned list.
2. The time you expect the two tasks to run needs to be a bit more generous as on a slower machine the overhead of creating and initializing processes could very well be more than .01 secods.
3. Each pool process is creating its own new `SimpleQueue` instance. So instead of having a single queue instance with two items on it, we now have 2 queue instances each with two items on it for a total of 4 tasks to process. This is why it is taking twice as long as you think it should.

The following code does not create any new queue instances but instead modifies the items (tasks) on the queue if necessary. I ensure that the rewriting of the queue is performed only once. The code also prints out the results and timing:


```python
import multiprocessing as mp
import time
from functools import partial

DELAY = 2


class NewProcess(mp.get_context().Process):
    def __init__(self, rewritten, *args, **kwargs):
        self._rewritten = rewritten
        super().__init__(*args, **kwargs)

    def run(self) -&gt; None:
        que = self._args[0]
        # add new kwarg to item[4] slot
        with self._rewritten.get_lock():
            # has the queue been rewritten?
            if not self._rewritten.value:
                items = []
                while not que.empty():
                    item = que.get()
                    if not item[4]:
                        item = list(item)
                        item[4] = {&quot;message&quot;: &quot;Hello world!&quot;}
                        item = tuple(item)
                    items.append(item)
                for item in items:
                    que.put(item)
                self._rewritten.value = 1
        super().run()

def create_new_process(rewritten, *args, **kwargs):
    return NewProcess(rewritten, *args, **kwargs)

def delay(*args, **kwargs):
    time.sleep(DELAY)
    return args, kwargs


if __name__ == &quot;__main__&quot;:
    context = mp.get_context()
    rewritten = mp.Value(&#39;i&#39;, 0) # Queue has not yet been rewritten
    context.Process = partial(create_new_process, rewritten)
    with context.Pool(2) as pool:
        responses = []
        start = time.time()
        for _ in range(2):
            resp = pool.apply_async(
                func=delay,
                args=tuple(range(3)),
                kwds={},
            )

            responses.append(resp)
        &quot;&quot;&quot;
        # This is unnecessary: # Booboo
        for resp in responses:
            resp.wait()
        &quot;&quot;&quot;

        responses = [resp.get() for resp in  responses]
        total = time.time() - start
        print(responses)
        print(total)

        # Be a bit more time tolerant: # Booboo
        assert total &lt; DELAY + .2, f&quot;Expected to take circa {DELAY}s, took {total}s&quot;

        # You are expecting 2 items returned:
        assert responses == (
            expected := [
                (
                    (0, 1, 2),
                    {
                        &quot;message&quot;: &quot;Hello world!&quot;
                    }
                )
            ] * 2 # Note this line
        ), f&quot;{responses=}!={expected}&quot;

Prints:

[((0, 1, 2), {&#39;message&#39;: &#39;Hello world!&#39;}), ((0, 1, 2), {&#39;message&#39;: &#39;Hello world!&#39;})]
2.1168272495269775

Note

The reason why I mentioned in my comment the XY problem is because I wanted to understand what your ultimate goal was and to determine whether there was a better, safer way of accomplishing this. Your goal seems to be to ensure that if no keyword arguments were passed to your worker function, then you would provide a default. If so, surely there is a cleaner, simpler, more efficient way. For example, we can use a decorator function:

import multiprocessing as mp
import time
from functools import wraps

DELAY = 2


def provide_default_keywords(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        if not kwargs:
            kwargs = {&quot;message&quot;: &quot;Hello world!&quot;}
        return f(*args, **kwargs)
    return wrapper

@provide_default_keywords
def delay(*args, **kwargs):
    time.sleep(DELAY)
    return args, kwargs


if __name__ == &quot;__main__&quot;:
    with mp.Pool(2) as pool:
        responses = []
        start = time.time()
        for _ in range(2):
            resp = pool.apply_async(
                func=delay,
                args=tuple(range(3)),
                kwds={},
            )

            responses.append(resp)

        responses = [resp.get() for resp in  responses]
        total = time.time() - start
        print(responses)
        print(total)

        # Be a bit more time tolerant: # Booboo
        assert total &lt; DELAY + .2, f&quot;Expected to take circa {DELAY}s, took {total}s&quot;

        # You are expecting 2 items returned:
        assert responses == (
            expected := [
                (
                    (0, 1, 2),
                    {
                        &quot;message&quot;: &quot;Hello world!&quot;
                    }
                )
            ] * 2 # Note this line
        ), f&quot;{responses=}!={expected}&quot;

Update

See my comment below concerning the race condition that renders your initial approach problematic. If you don't want to use a decorator, then better would be to override the apply_async method. In the following code I create a mixin class to do just that and it can be used with a multiprocessing pool or a multithreading pool:

from multiprocessing.pool import Pool
import time

DELAY = 2

class Apply_Async_Mixin: # Can be used with multiprocessing or multithreading
    def apply_async(self, func, args=(), kwds={}, callback=None,
            error_callback=None):
        if not kwds:
            kwds = {&quot;message&quot;: &quot;Hello world!&quot;}
        return super().apply_async(func,
                                   args=args,
                                   kwds=kwds,
                                   callback=callback,
                                   error_callback=error_callback)

# You must specify the mixin first:
class MyPool(Apply_Async_Mixin, Pool): # multiprocessing
    pass

def delay(*args, **kwargs):
    time.sleep(DELAY)
    return args, kwargs

if __name__ == &quot;__main__&quot;:
    with MyPool(2) as pool:
        start = time.time()
        async_results = [
            pool.apply_async(
                func=delay,
                args=tuple(range(3)),
                kwds={},
            )
            for _ in range(2)
        ]
        responses = [async_result.get() for async_result in async_results]
        total = time.time() - start

        print(responses)
        print(total)

        # Be a bit more time tolerant: # Booboo
        assert total &lt; DELAY + .2, f&quot;Expected to take circa {DELAY}s, took {total}s&quot;

        # You are expecting 2 items returned:
        assert responses == (
            expected := [
                (
                    (0, 1, 2),
                    {
                        &quot;message&quot;: &quot;Hello world!&quot;
                    }
                )
            ] * 2 # Note this line
        ), f&quot;{responses=}!={expected}&quot;

huangapple
  • 本文由 发表于 2023年5月24日 17:42:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76322147.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定