英文:
multiprocessing.Manager() hangs Popen.communicate() on Python
问题
以下是您提供的内容的中文翻译:
使用 multiprocessing.Manager
阻止了使用 subprocess.Process.Popen.terminate()
和 subprocess.Process.Popen.kill()
清晰终止 Python 子进程。
这似乎是因为 Manager
在幕后创建了一个用于通信的子进程,但当父进程终止时,该进程不知道如何自行清理。
使用 multiprocessing.Manager
以便不阻止通过信号关闭进程的最简单方法是什么?
演示:
"""多进程管理器 hang 测试。"""
import multiprocessing
import subprocess
import sys
import time
def launch_and_read_process():
proc = subprocess.Popen(
[
"python",
sys.argv[0],
"run_unkillable"
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# 给进程运行和打印()一些时间
time.sleep(3)
status = proc.poll()
print("poll() 结果是", status)
print("终止")
assert proc.returncode is None
proc.terminate()
exit_code = proc.wait()
print("得到退出代码", exit_code)
stdout, stderr = proc.communicate()
print("得到输出", stdout.decode("utf-8"))
def run_unkillable():
# 禁用管理器的创建以使代码正确运行
manager = multiprocessing.Manager()
d = manager.dict()
d["foo"] = "bar"
print("这是一个示例输出", flush=True)
time.sleep(999)
def main():
mode = sys.argv[1]
print("执行子程序", mode)
func = globals().get(mode)
func()
if __name__ == "__main__":
main()
以 python test-script.py launch_and_read_process
运行。
良好的输出(没有使用 multiprocessing.Manager):
执行子程序 launch_and_read_process
poll() 结果是 None
终止
得到退出代码 -15
得到输出 执行子程序 run_unkillable
这是一个示例输出
使用 Manager
导致 subprocess.Popen.communicate
挂起时的输出:
执行子程序 launch_and_read_process
poll() 结果是 None
终止
得到退出代码 -15
英文:
The use of multiprocessing.Manager
prevents clean termination of Python child process using subprocess.Process.Popen.terminate()
and subprocess.Process.Popen.kill()
.
This seems to be because Manager
creates a child process behind the scenes for communicating, but this process does not know how to clean itself up when the parent is terminated.
What is the easiest way to use multiprocessing.Manager
so that it does not prevent a process shutdown by a signal?
A demostration:
"""Multiprocess manager hang test."""
import multiprocessing
import subprocess
import sys
import time
def launch_and_read_process():
proc = subprocess.Popen(
[
"python",
sys.argv[0],
"run_unkillable"
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Give time for the process to run and print()
time.sleep(3)
status = proc.poll()
print("poll() is", status)
print("Terminating")
assert proc.returncode is None
proc.terminate()
exit_code = proc.wait()
print("Got exit code", exit_code)
stdout, stderr = proc.communicate()
print("Got output", stdout.decode("utf-8"))
def run_unkillable():
# Disable manager creation to make the code run correctly
manager = multiprocessing.Manager()
d = manager.dict()
d["foo"] = "bar"
print("This is an example output", flush=True)
time.sleep(999)
def main():
mode = sys.argv[1]
print("Doing subrouting", mode)
func = globals().get(mode)
func()
if __name__ == "__main__":
main()
Run as python test-script.py launch_and_read_process
.
Good output (no multiprocessing.Manager):
Doing subrouting launch_and_read_process
poll() is None
Terminating
Got exit code -15
Got output Doing subrouting run_unkillable
This is an example output
Output when subprocess.Popen.communicate
hangs because use of Manager
:
Doing subrouting launch_and_read_process
poll() is None
Terminating
Got exit code -15
答案1
得分: 1
你指出的问题是因为管理器生成了自己的子进程。因此,当你执行proc.communicate()
时,代码会挂起,因为该子进程的stderr和stdout仍然保持打开状态。在Unix上,你可以通过设置自己的SIGTERM
和SIGINT
处理程序来轻松解决这个问题,但在Windows上,由于这两个信号几乎没有用,问题会有点复杂。此外,要注意信号只会传递到主线程。根据你的操作系统和信号,如果线程忙碌(例如time.sleep(999)
),则信号可能需要在定时器运行完之前才能被拦截。无论如何,我已经为Windows和Unix提供了解决方案,并在最后附有注释:
UNIX
这很简单,你只需为信号定义自己的处理程序,其中你显式调用manager.shutdown()
以正确清理其子进程:
def handler(manager, *args):
"""
Our handler, use functools.partial to fix arg manager (or you
can create a factory function too)
"""
manager.shutdown()
sys.exit()
def run_unkillable():
# 禁用管理器的创建,以使代码正确运行
manager = multiprocessing.Manager()
# 注册我们的处理程序
h = functools.partial(handler, manager)
signal.signal(signal.SIGINT, h)
signal.signal(signal.SIGTERM, h)
d = manager.dict()
d["foo"] = "bar"
print("This is an example output", flush=True)
time.sleep(999)
Windows
在Windows上,你需要明确发送信号signal.CTRL_BREAK_EVENT
,而不是简单地使用proc.terminate()
,以确保子进程拦截它(参考[引用][1])。此外,你还需要在循环中以较短的持续时间进行睡眠,而不是执行sleep(999)
,以确保信号中断主线程,而不是等待整个睡眠持续时间(参见[此][2]问题以获取替代方案)。
"""Multiprocess manager hang test."""
import functools
import multiprocessing
import subprocess
import sys
import time
import signal
def launch_and_read_process():
proc = subprocess.Popen(
[
"python",
sys.argv[0],
"run_unkillable"
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP # 以防止我们的当前进程接收到SIGBREAK信号
)
# 给进程运行和打印()的时间
time.sleep(5)
status = proc.poll()
print("poll() is", status)
print("Terminating")
assert proc.returncode is None
# 发送特定的信号而不是终止()
proc.send_signal(signal.CTRL_BREAK_EVENT)
exit_code = proc.wait()
print("Got exit code", exit_code)
stdout, stderr = proc.communicate()
print("Got output", stdout.decode("utf-8"))
def handler(manager, *args):
"""
Our handler, use functools.partial to fix arg manager (or you
can create a factory function too)
"""
manager.shutdown()
sys.exit()
def run_unkillable():
# 禁用管理器的创建,以使代码正确运行
manager = multiprocessing.Manager()
# 注册我们的处理程序
signal.signal(signal.SIGBREAK, functools.partial(handler, manager))
d = manager.dict()
d["foo"] = "bar"
print("This is an example output", flush=True)
# 在循环中进行睡眠,否则信号不会中断主线程
for _ in range(999):
time.sleep(1)
def main():
mode = sys.argv[1]
print("Doing subrouting", mode)
func = globals().get(mode)
func()
if __name__ == "__main__":
main()
**注意**:请注意,在上述解决方案中存在竞争条件,因为我们在创建管理器后注册信号处理程序。从理论上讲,你*可能*会在处理程序注册之前终止进程,然后`proc.communicate()`将挂起,因为管理器没有清理。因此,你可能需要为`.communicate`提供一个超时参数,以及用于记录这些边缘情况的错误处理。[1]: https://stackoverflow.com/a/35792192/16310741[2]: https://stackoverflow.com/q/5114292/16310741
<details>
<summary>英文:</summary>
Like you pointed out, this happens because the manager spawns its own child processes. So when you do `proc.communicate()` the code hangs because that child process's stderr and stdout are still open. You can easily solve this on Unix by setting your own handlers for `SIGTERM` and `SIGINT`, but it becomes a little hairy on Windows since those two signals are pretty much useless. Also, keep in mind that signals are only delivered to the main thread. Depending on your OS and the signal, if the thread is busy (`time.sleep(999)`) then the whole timer may need to run out before the signal can be intercepted. Anyway, I have provided a solution for both Windows and Unix with a note at the end:
**UNIX**
This is pretty straightforward, you simply define your own handlers for the signals where you explicitly call `manager.shutdown()` to properly cleanup its child process:
def handler(manager, *args):
"""
Our handler, use functools.partial to fix arg manager (or you
can create a factory function too)
"""
manager.shutdown()
sys.exit()
def run_unkillable():
# Disable manager creation to make the code run correctly
manager = multiprocessing.Manager()
# Register our handler,
h = functools.partial(handler, manager)
signal.signal(signal.SIGINT, h)
signal.signal(signal.SIGTERM, h)
d = manager.dict()
d["foo"] = "bar"
print("This is an example output", flush=True)
time.sleep(999)
**Windows**
On Windows you will need to explicitly send the signal `signal.CTRL_BREAK_EVENT` rather than the plain `proc.terminate()` to ensure that the child process intercepts it ([reference][1]). Additionally, you'll also want to sleep in shorter durations in a loop instead of doing `sleep(999)` to make sure the signal interrupts the main thread rather than waiting for the whole duration of sleep (see [this][2] question for alternatives).
"""Multiprocess manager hang test."""
import functools
import multiprocessing
import subprocess
import sys
import time
import signal
def launch_and_read_process():
proc = subprocess.Popen(
[
"python",
sys.argv[0],
"run_unkillable"
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP # So that our current process does not get SIGBREAK signal
)
# Give time for the process to run and print()
time.sleep(5)
status = proc.poll()
print("poll() is", status)
print("Terminating")
assert proc.returncode is None
# Send this specific signal instead of doing terminate()
proc.send_signal(signal.CTRL_BREAK_EVENT)
exit_code = proc.wait()
print("Got exit code", exit_code)
stdout, stderr = proc.communicate()
print("Got output", stdout.decode("utf-8"))
def handler(manager, *args):
"""
Our handler, use functools.partial to fix arg manager (or you
can create a factory function too)
"""
manager.shutdown()
sys.exit()
def run_unkillable():
# Disable manager creation to make the code run correctly
manager = multiprocessing.Manager()
# Register our handler,
signal.signal(signal.SIGBREAK, functools.partial(handler, manager))
d = manager.dict()
d["foo"] = "bar"
print("This is an example output", flush=True)
# Sleep in a loop otherwise the signal won't interrupt the main thread
for _ in range(999):
time.sleep(1)
def main():
mode = sys.argv[1]
print("Doing subrouting", mode)
func = globals().get(mode)
func()
if __name__ == "__main__":
main()
**Note**: Keep in mind that there is a race condition in the above solution because we are registering the signal handler after the creation of a manager. Theoretically, one *could* kill the process right before the handler is registered and the `proc.communicate()` will then hang because the manager was not cleaned up. So you may want to supply a timeout parameter to `.communicate` with error handling to log these edge cases.
[1]: https://stackoverflow.com/a/35792192/16310741
[2]: https://stackoverflow.com/q/5114292/16310741
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论