英文:
How to stop multiprocessing.Pool with Ctrl+c? (Python 3.10) [Solved]
问题
我花了好几个小时的时间进行研究,似乎不再有解决方案有效了。我的函数每个进程可能需要估计的30-50分钟。如果我在前几分钟内发现问题,我不想等待进程完全完成或者不得不关闭终端而不干净地终止进程。
为了简单起见,每个进程被分配了一个带有无限循环的函数。通过使用 Ctrl+c
,现在应该可以完全中断主程序而不会卡住。无限循环的原因是因为每个之前的解决方案都是使用实际循环的列表来工作的。键盘中断会被识别,但只有在所有进程都完成后才会生效。我不知道这是否与Python版本有关,但这不是我想要的要求。
我希望有一个简单干净的解决方案来解决这个问题。这可能也会帮助其他许多人。
感谢你的帮助。
示例:
from multiprocessing import Pool
def f(x):
while True:
pass
if __name__ == '__main__':
with Pool(2) as p:
p.map(f, [1, 2])
英文:
I've spent several hours on research and no solution seems to work anymore. My function will take an estimated 30-50 minutes per process. If I spot flaws during the first few minutes, I don't want to wait for the processes to finish completely or have to close the terminal without killing the processes cleanly.
For simplicity, each process is assigned a function with an infinite loop. With Ctrl+c
the complete main program should now be interrupted without getting stuck. The reason for the infinite loop is that every previous solution has worked with lists that are actually looped through. The keyboard interrupt is recognized, but only afterwards, when all processes have finished. I don't know if this has anything to do with the Python version, but that's not the requirement I would like.
I would like a simple and clean solution to this problem. That would probably help many others as well.
Thank you for your help.
Example:
from multiprocessing import Pool
def f(x):
while True:
pass
if __name__ == '__main__':
with Pool(2) as p:
p.map(f, [1, 2])
答案1
得分: 1
Problem Solved
在另一个论坛上,这个问题已经讨论并以一种简单而清晰的方式解决了。以下是链接供其他人使用:
Solution 1
在multiprocessing模块中有一个 AsyncResult 对象。其中包括各种函数,如 get()
、wait()
和 ready()
。虽然 get()
和 wait()
会阻塞,但你可以使用 ready()
来等待结果并仍然捕获 KeyboardInterrupts(键盘中断)。
不幸的是,只有 map_async()
和 apply_async()
返回一个AsyncResult对象。
from multiprocessing import Pool
def f(x):
while True:
pass
if __name__ == "__main__":
with Pool(2) as p:
result = p.map_async(f, [1, 2])
while not result.ready():
time.sleep(1)
Solution 2
这种方式还可以使用阻塞的函数:
玩弄信号似乎从子进程内部来看,除了 KeyboardInterrupt 或 SystemExit 之外的异常都会传播到主进程。当按下ctrl+c时,所有进程(子进程和主进程)都会运行其SIGINT处理程序,默认情况下只会引发 KeyboardInterrupt。要让控制从子进程中恢复,子进程中断处理程序必须引发其他异常。由于主进程忙于处理自己的 KeyboardInterrupt,因此不会看到异常。
基本上,SIGINT 处理程序也可以返回 None。
from multiprocessing import Pool
from signal import signal, SIGINT
def initializer():
signal(SIGINT, lambda: None)
def f(x):
while True:
pass
if __name__ == '__main__':
with Pool(n, initializer) as p:
p.map(f, [1, 2])
英文:
Problem Solved
In another forum this problem was discussed and solved in a simple and clean way. Here is the link for everyone else:
Solution 1
In multiprocessing module there is an AsyncResult object. This includes various functions such as get()
, wait()
and ready()
. While get() and wait() block, you can use ready()
to wait for the result and still catch KeyboardInterrupts.
Unfortunately, only map_async()
and apply_async()
return an AsyncResult object.
from multiprocessing import Pool
def f(x):
while True:
pass
if __name__ == "__main__":
with Pool(2) as p:
result = p.map_async(f, [1, 2])
while not result.ready():
time.sleep(1)
Solution 2
This way you can also use functions that block:
> Playing around with signals it seems that from inside child processes,
> exceptions other than KeyboardInterrupt or SystemExit propagate to the
> main process. When ctrl+c is pressed, all processes (children and main
> process) run their SIGINT handler which by default just raises
> KeyboardInterrupt. To get control out of the children, the child
> interrupt handler must raise some other exception. The exception is
> not seen by the way because the main process has its hands full with
> its own KeyboardInterrupt.
Basically, the SIGINT handler can also return None.
from multiprocessing import Pool
from signal import signal, SIGINT
def initializer():
signal(SIGINT, lambda: None)
def f(x):
while True:
pass
if __name__ == '__main__':
with Pool(n, initializer) as p:
p.map(f, [1, 2])
答案2
得分: 0
对于您的情景,使用apply_async
比map
更好,您可以这样实现它:
from multiprocessing import Pool
import signal
def f(x):
while True:
pass
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
if __name__ == '__main__':
with Pool(2, init_worker) as p:
processes = ]
try:
# 在进程运行时检查键盘中断
for process in processes:
process.get(timeout=5) # 根据需要调整超时时间
except KeyboardInterrupt:
print("接收到键盘中断。停止进程...")
p.terminate()
p.join()
如果您需要任何进一步的帮助,请告诉我。
英文:
For your scenario using apply_async
is better than map
so you can implement it like this:
from multiprocessing import Pool
import signal
def f(x):
while True:
pass
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
if __name__ == '__main__':
with Pool(2, init_worker) as p:
processes = ]
try:
# Check for keyboard interrupts while the processes are running
for process in processes:
process.get(timeout=5) # Adjust the timeout as needed
except KeyboardInterrupt:
print("Keyboard interrupt received. Stopping processes...")
p.terminate()
p.join()
答案3
得分: 0
以下是翻译好的内容:
你需要“驱动程序”和子进程代码以某种方式协作。事件对此很有用。
假设你的子进程在某种循环中运行,那么你只需要定期让该进程检查事件。如果事件被设置,它可以优雅地终止。
类似于这样:
from multiprocessing import Pool, Manager
from signal import signal, SIGINT, SIG_IGN
from time import sleep
N = 10
def do_work(t):
e, x = t
while True: # 无限循环
sleep(1) # 做一些工作
if e.is_set(): # 检查事件
break # 进程终止
return x
def ignore():
signal(SIGINT, SIG_IGN)
def main():
with Manager() as manager:
e = manager.Event() # 事件最初未设置
with Pool(N, ignore) as pool:
results = pool.map_async(do_work, [(e, i) for i in range(N)])
print('阻塞... 中断 (Ctrl-C 或 Ctrl-Break) 以继续')
while True:
try:
for r in results.get(): # 阻塞
print(r)
break
except KeyboardInterrupt:
e.set()
if __name__ == '__main__':
main()
输出:
阻塞... 中断 (Ctrl-C 或 Ctrl-Break) 以继续
^C0
1
2
3
4
5
6
7
8
9
英文:
You need the "driver" and sub-processing code to somehow collaborate. An Event can be useful for this.
Let's say that your sub-process is running in some kind of loop, then you just need that process to check an Event from time to time. If the event is set then it can terminate gracefully.
Something like this:
from multiprocessing import Pool, Manager
from signal import signal, SIGINT, SIG_IGN
from time import sleep
N = 10
def do_work(t):
e, x = t
while True: # infinite loop
sleep(1) # do some work
if e.is_set(): # check the Event
break # process terminates
return x
def ignore():
signal(SIGINT, SIG_IGN)
def main():
with Manager() as manager:
e = manager.Event() # event is not set initially
with Pool(N, ignore) as pool:
results = pool.map_async(do_work, [(e, i) for i in range(N)])
print('Blocking... Interrupt (Ctrl-C or Ctrl-Break) to continue')
while True:
try:
for r in results.get(): # block
print(r)
break
except KeyboardInterrupt:
e.set()
if __name__ == '__main__':
main()
Output:
Blocking... Interrupt (Ctrl-C or Ctrl-Break) to continue
^C0
1
2
3
4
5
6
7
8
9
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论