python: multiprocessing. 当队列的值过大时,子进程无法抛出异常

huangapple go评论77阅读模式
英文:

python: multiprocessing. When the value of the queue is too large, the child process cannot throw an exception

问题

import multiprocessing

def runQueue():
    data_queue = multiprocessing.Queue(maxsize=1)
    data_queue.put_nowait([[9] * 10000000])
    print("in")
    raise StopIteration

if __name__ == '__main__':
    a = multiprocessing.Process(target=runQueue)
    a.start()
    a.join()
    print("done")

它没有抛出异常,整个程序都卡住了。

我尝试使用 data_queue.put_nowait([[9] * 100]),它正常工作;像这样:

Process Process-1:
Traceback (most recent call last):
  File "D:\python3.7\lib\multiprocessing\process.py", line 297, in _bootstrap
    self.run()
  File "D:\python3.7\lib\multiprocessing\process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "D:\test\main.py", line 50, in runQueue
    raise StopIteration
StopIteration

我想知道如何使先前的代码正常抛出异常。

英文:
import multiprocessing
def runQueue():
    data_queue = multiprocessing.Queue(maxsize=1)
    data_queue.put_nowait([[9] * 10000000])
    print("in")
    raise StopIteration
if __name__ == '__main__':
    a = multiprocessing.Process(target=runQueue)
    a.start()
    a.join()
    print("done")

It didn't throw an exception and the whole program got stuck

I try data_queue.put_nowait([[9] * 100]),it worked fine; like that:

Process Process-1:
Traceback (most recent call last):
  File "D:\python3.7\lib\multiprocessing\process.py", line 297, in _bootstrap
    self.run()
  File "D:\python3.7\lib\multiprocessing\process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "D:\test\main.py", line 50, in runQueue
    raise StopIteration
StopIteration

I would like to know how to get the previous code to throw exceptions normally.

答案1

得分: 1

以下是您要翻译的代码部分:

import multiprocessing


def runQueue():
    # Use multiprocessing.Manager.Queue instead of multiprocessing.Queue
    data_queue = multiprocessing.Manager().Queue(maxsize=1)
    data_queue.put_nowait([[9] * 10000000])
    print("in")
    raise StopIteration


def main():
    a = multiprocessing.Process(target=runQueue)
    a.start()
    a.join()
    print("done")


if __name__ == "__main__":
    main()
import multiprocessing
import queue


def runQueue(data_queue):
    data_queue.put_nowait([[9] * 10000000])
    print("in")
    raise StopIteration


def main():
    data_queue = multiprocessing.Queue(maxsize=1)
    a = multiprocessing.Process(target=runQueue, args=(data_queue,))
    a.start()

    # Empty the queue before joining.
    try:
        while True:
            data_queue.get(block=True, timeout=1)
    except queue.Empty:
        pass

    a.join()
    print("done")


if __name__ == "__main__":
    main()
英文:

As the document states:

> As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
>
> This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
>
> Note that a queue created using a manager does not have this issue.

Your function raises the exception correctly, but it is stuck just before it appears in the terminal. The reason this does not happen with smaller data is probably because the data can fit into the pipe's buffer. In most cases, you should not rely on it.

There are two ways to explicitly prevent this.
The first is, as menthioned above, to create a queue using multiprocessing.Manager.

import multiprocessing


def runQueue():
    # Use multiprocessing.Manager.Queue instead of multiprocessing.Queue
    data_queue = multiprocessing.Manager().Queue(maxsize=1)
    data_queue.put_nowait([[9] * 10000000])
    print("in")
    raise StopIteration


def main():
    a = multiprocessing.Process(target=runQueue)
    a.start()
    a.join()
    print("done")


if __name__ == "__main__":
    main()

The second is to empty the queue before joining.

import multiprocessing
import queue


def runQueue(data_queue):
    data_queue.put_nowait([[9] * 10000000])
    print("in")
    raise StopIteration


def main():
    data_queue = multiprocessing.Queue(maxsize=1)
    a = multiprocessing.Process(target=runQueue, args=(data_queue,))
    a.start()

    # Empty the queue before joining.
    try:
        while True:
            data_queue.get(block=True, timeout=1)
    except queue.Empty:
        pass

    a.join()
    print("done")


if __name__ == "__main__":
    main()

huangapple
  • 本文由 发表于 2023年7月3日 18:05:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/76603731.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定