multiprocessing.Manager() 在 Python 中会导致 Popen.communicate() 挂起。

huangapple go评论60阅读模式
英文:

multiprocessing.Manager() hangs Popen.communicate() on Python

问题

以下是您提供的内容的中文翻译:

使用 multiprocessing.Manager 阻止了使用 subprocess.Process.Popen.terminate()subprocess.Process.Popen.kill() 清晰终止 Python 子进程。

这似乎是因为 Manager 在幕后创建了一个用于通信的子进程,但当父进程终止时,该进程不知道如何自行清理。

使用 multiprocessing.Manager 以便不阻止通过信号关闭进程的最简单方法是什么?

演示:

"""多进程管理器 hang 测试"""
import multiprocessing
import subprocess
import sys
import time


def launch_and_read_process():
    proc = subprocess.Popen(
        [
            "python",
            sys.argv[0],
            "run_unkillable"
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )

    # 给进程运行和打印()一些时间
    time.sleep(3)

    status = proc.poll()
    print("poll() 结果是", status)

    print("终止")
    assert proc.returncode is None
    proc.terminate()
    exit_code = proc.wait()
    print("得到退出代码", exit_code)
    stdout, stderr = proc.communicate()
    print("得到输出", stdout.decode("utf-8"))


def run_unkillable():
    # 禁用管理器的创建以使代码正确运行
    manager = multiprocessing.Manager()
    d = manager.dict()
    d["foo"] = "bar"
    print("这是一个示例输出", flush=True)
    time.sleep(999)


def main():
    mode = sys.argv[1]
    print("执行子程序", mode)
    func = globals().get(mode)
    func()


if __name__ == "__main__":
    main()

python test-script.py launch_and_read_process 运行。

良好的输出(没有使用 multiprocessing.Manager):



    执行子程序 launch_and_read_process
    poll() 结果是 None
    终止
    得到退出代码 -15
    得到输出 执行子程序 run_unkillable
    这是一个示例输出

使用 Manager 导致 subprocess.Popen.communicate 挂起时的输出:



    执行子程序 launch_and_read_process
    poll() 结果是 None
    终止
    得到退出代码 -15
英文:

The use of multiprocessing.Manager prevents clean termination of Python child process using subprocess.Process.Popen.terminate() and subprocess.Process.Popen.kill().

This seems to be because Manager creates a child process behind the scenes for communicating, but this process does not know how to clean itself up when the parent is terminated.

What is the easiest way to use multiprocessing.Manager so that it does not prevent a process shutdown by a signal?

A demostration:

"""Multiprocess manager hang test."""
import multiprocessing
import subprocess
import sys
import time


def launch_and_read_process():
    proc = subprocess.Popen(
        [
            "python",
            sys.argv[0],
            "run_unkillable"
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )

    # Give time for the process to run and print()
    time.sleep(3)

    status = proc.poll()
    print("poll() is", status)

    print("Terminating")
    assert proc.returncode is None
    proc.terminate()
    exit_code = proc.wait()
    print("Got exit code", exit_code)
    stdout, stderr = proc.communicate()
    print("Got output", stdout.decode("utf-8"))


def run_unkillable():
    # Disable manager creation to make the code run correctly
    manager = multiprocessing.Manager()
    d = manager.dict()
    d["foo"] = "bar"
    print("This is an example output", flush=True)
    time.sleep(999)


def main():
    mode = sys.argv[1]
    print("Doing subrouting", mode)
    func = globals().get(mode)
    func()


if __name__ == "__main__":
    main()

Run as python test-script.py launch_and_read_process.

Good output (no multiprocessing.Manager):



    Doing subrouting launch_and_read_process
    poll() is None
    Terminating
    Got exit code -15
    Got output Doing subrouting run_unkillable
    This is an example output

Output when subprocess.Popen.communicate hangs because use of Manager:

    Doing subrouting launch_and_read_process
    poll() is None
    Terminating
    Got exit code -15

答案1

得分: 1

你指出的问题是因为管理器生成了自己的子进程。因此,当你执行proc.communicate()时,代码会挂起,因为该子进程的stderr和stdout仍然保持打开状态。在Unix上,你可以通过设置自己的SIGTERMSIGINT处理程序来轻松解决这个问题,但在Windows上,由于这两个信号几乎没有用,问题会有点复杂。此外,要注意信号只会传递到主线程。根据你的操作系统和信号,如果线程忙碌(例如time.sleep(999)),则信号可能需要在定时器运行完之前才能被拦截。无论如何,我已经为Windows和Unix提供了解决方案,并在最后附有注释:

UNIX

这很简单,你只需为信号定义自己的处理程序,其中你显式调用manager.shutdown()以正确清理其子进程:

def handler(manager, *args):
    """
    Our handler, use functools.partial to fix arg manager (or you 
    can create a factory function too)
    """
    manager.shutdown()
    sys.exit()

def run_unkillable():

    # 禁用管理器的创建,以使代码正确运行
    manager = multiprocessing.Manager()

    # 注册我们的处理程序
    h = functools.partial(handler, manager)
    signal.signal(signal.SIGINT, h)
    signal.signal(signal.SIGTERM, h)

    d = manager.dict()
    d["foo"] = "bar"
    print("This is an example output", flush=True)
    time.sleep(999)

Windows

在Windows上,你需要明确发送信号signal.CTRL_BREAK_EVENT,而不是简单地使用proc.terminate(),以确保子进程拦截它(参考[引用][1])。此外,你还需要在循环中以较短的持续时间进行睡眠,而不是执行sleep(999),以确保信号中断主线程,而不是等待整个睡眠持续时间(参见[此][2]问题以获取替代方案)。

"""Multiprocess manager hang test."""
import functools
import multiprocessing
import subprocess
import sys
import time
import signal

def launch_and_read_process():
    proc = subprocess.Popen(
        [
            "python",
            sys.argv[0],
            "run_unkillable"
        ],

        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        creationflags=subprocess.CREATE_NEW_PROCESS_GROUP  # 以防止我们的当前进程接收到SIGBREAK信号
    )

    # 给进程运行和打印()的时间
    time.sleep(5)

    status = proc.poll()
    print("poll() is", status)

    print("Terminating")
    assert proc.returncode is None

    # 发送特定的信号而不是终止()
    proc.send_signal(signal.CTRL_BREAK_EVENT)

    exit_code = proc.wait()
    print("Got exit code", exit_code)
    stdout, stderr = proc.communicate()
    print("Got output", stdout.decode("utf-8"))

def handler(manager, *args):
    """
    Our handler, use functools.partial to fix arg manager (or you
    can create a factory function too)
    """
    manager.shutdown()
    sys.exit()

def run_unkillable():

    # 禁用管理器的创建,以使代码正确运行
    manager = multiprocessing.Manager()

    # 注册我们的处理程序
    signal.signal(signal.SIGBREAK, functools.partial(handler, manager))

    d = manager.dict()
    d["foo"] = "bar"
    print("This is an example output", flush=True)

    # 在循环中进行睡眠,否则信号不会中断主线程
    for _ in range(999):
        time.sleep(1)

def main():
    mode = sys.argv[1]
    print("Doing subrouting", mode)
    func = globals().get(mode)
    func()

if __name__ == "__main__":
    main()

**注意**请注意在上述解决方案中存在竞争条件因为我们在创建管理器后注册信号处理程序从理论上讲*可能*会在处理程序注册之前终止进程然后`proc.communicate()`将挂起因为管理器没有清理因此你可能需要为`.communicate`提供一个超时参数以及用于记录这些边缘情况的错误处理[1]: https://stackoverflow.com/a/35792192/16310741[2]: https://stackoverflow.com/q/5114292/16310741

<details>
<summary>英文:</summary>

Like you pointed out, this happens because the manager spawns its own child processes. So when you do `proc.communicate()` the code hangs because that child process&#39;s stderr and stdout are still open. You can easily solve this on Unix by setting your own handlers for `SIGTERM` and `SIGINT`, but it becomes a little hairy on Windows since those two signals are pretty much useless. Also, keep in mind that signals are only delivered to the main thread. Depending on your OS and the signal, if the thread is busy (`time.sleep(999)`) then the whole timer may need to run out before the signal can be intercepted. Anyway, I have provided a solution for both Windows and Unix with a note at the end:

**UNIX**

This is pretty straightforward, you simply define your own handlers for the signals where you explicitly call `manager.shutdown()` to properly cleanup its child process:

    def handler(manager, *args):
        &quot;&quot;&quot;
        Our handler, use functools.partial to fix arg manager (or you 
        can create a factory function too)
        &quot;&quot;&quot;
        manager.shutdown()
        sys.exit()
    
    def run_unkillable():
    
        # Disable manager creation to make the code run correctly
        manager = multiprocessing.Manager()
    
        # Register our handler,
        h = functools.partial(handler, manager)
        signal.signal(signal.SIGINT, h)
        signal.signal(signal.SIGTERM, h)
    
        d = manager.dict()
        d[&quot;foo&quot;] = &quot;bar&quot;
        print(&quot;This is an example output&quot;, flush=True)
        time.sleep(999)

**Windows**

On Windows you will need to explicitly send the signal `signal.CTRL_BREAK_EVENT` rather than the plain `proc.terminate()` to ensure that the child process intercepts it ([reference][1]). Additionally, you&#39;ll also want to sleep in shorter durations in a loop instead of doing `sleep(999)` to make sure the signal interrupts the main thread rather than waiting for the whole duration of sleep (see [this][2] question for alternatives).

    &quot;&quot;&quot;Multiprocess manager hang test.&quot;&quot;&quot;
    import functools
    import multiprocessing
    import subprocess
    import sys
    import time
    import signal
    
    
    def launch_and_read_process():
        proc = subprocess.Popen(
            [
                &quot;python&quot;,
                sys.argv[0],
                &quot;run_unkillable&quot;
            ],
    
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            creationflags=subprocess.CREATE_NEW_PROCESS_GROUP  # So that our current process does not get SIGBREAK signal
        )
    
        # Give time for the process to run and print()
        time.sleep(5)
    
        status = proc.poll()
        print(&quot;poll() is&quot;, status)
    
        print(&quot;Terminating&quot;)
        assert proc.returncode is None
    
        # Send this specific signal instead of doing terminate()
        proc.send_signal(signal.CTRL_BREAK_EVENT)
    
        exit_code = proc.wait()
        print(&quot;Got exit code&quot;, exit_code)
        stdout, stderr = proc.communicate()
        print(&quot;Got output&quot;, stdout.decode(&quot;utf-8&quot;))
    
    
    def handler(manager, *args):
        &quot;&quot;&quot;
        Our handler, use functools.partial to fix arg manager (or you
        can create a factory function too)
        &quot;&quot;&quot;
        manager.shutdown()
        sys.exit()
    
    
    def run_unkillable():
    
        # Disable manager creation to make the code run correctly
        manager = multiprocessing.Manager()
    
        # Register our handler,
        signal.signal(signal.SIGBREAK, functools.partial(handler, manager))
    
        d = manager.dict()
        d[&quot;foo&quot;] = &quot;bar&quot;
        print(&quot;This is an example output&quot;, flush=True)
    
        # Sleep in a loop otherwise the signal won&#39;t interrupt the main thread
        for _ in range(999):
            time.sleep(1)
    
    
    def main():
        mode = sys.argv[1]
        print(&quot;Doing subrouting&quot;, mode)
        func = globals().get(mode)
        func()
    
    
    if __name__ == &quot;__main__&quot;:
        main()

**Note**: Keep in mind that there is a race condition in the above solution because we are registering the signal handler after the creation of a manager. Theoretically, one *could* kill the process right before the handler is registered and the `proc.communicate()` will then hang because the manager was not cleaned up. So you may want to supply a timeout parameter to `.communicate`  with error handling to log these edge cases.

  [1]: https://stackoverflow.com/a/35792192/16310741
  [2]: https://stackoverflow.com/q/5114292/16310741





</details>



huangapple
  • 本文由 发表于 2023年2月8日 21:40:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/75386643.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定