英文:
python: multiprocessing. When the value of the queue is too large, the child process cannot throw an exception
问题
import multiprocessing
def runQueue():
data_queue = multiprocessing.Queue(maxsize=1)
data_queue.put_nowait([[9] * 10000000])
print("in")
raise StopIteration
if __name__ == '__main__':
a = multiprocessing.Process(target=runQueue)
a.start()
a.join()
print("done")
它没有抛出异常,整个程序都卡住了。
我尝试使用 data_queue.put_nowait([[9] * 100])
,它正常工作;像这样:
Process Process-1:
Traceback (most recent call last):
File "D:\python3.7\lib\multiprocessing\process.py", line 297, in _bootstrap
self.run()
File "D:\python3.7\lib\multiprocessing\process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "D:\test\main.py", line 50, in runQueue
raise StopIteration
StopIteration
我想知道如何使先前的代码正常抛出异常。
英文:
import multiprocessing
def runQueue():
data_queue = multiprocessing.Queue(maxsize=1)
data_queue.put_nowait([[9] * 10000000])
print("in")
raise StopIteration
if __name__ == '__main__':
a = multiprocessing.Process(target=runQueue)
a.start()
a.join()
print("done")
It didn't throw an exception and the whole program got stuck
I try data_queue.put_nowait([[9] * 100])
,it worked fine; like that:
Process Process-1:
Traceback (most recent call last):
File "D:\python3.7\lib\multiprocessing\process.py", line 297, in _bootstrap
self.run()
File "D:\python3.7\lib\multiprocessing\process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "D:\test\main.py", line 50, in runQueue
raise StopIteration
StopIteration
I would like to know how to get the previous code to throw exceptions normally.
答案1
得分: 1
以下是您要翻译的代码部分:
import multiprocessing
def runQueue():
# Use multiprocessing.Manager.Queue instead of multiprocessing.Queue
data_queue = multiprocessing.Manager().Queue(maxsize=1)
data_queue.put_nowait([[9] * 10000000])
print("in")
raise StopIteration
def main():
a = multiprocessing.Process(target=runQueue)
a.start()
a.join()
print("done")
if __name__ == "__main__":
main()
import multiprocessing
import queue
def runQueue(data_queue):
data_queue.put_nowait([[9] * 10000000])
print("in")
raise StopIteration
def main():
data_queue = multiprocessing.Queue(maxsize=1)
a = multiprocessing.Process(target=runQueue, args=(data_queue,))
a.start()
# Empty the queue before joining.
try:
while True:
data_queue.get(block=True, timeout=1)
except queue.Empty:
pass
a.join()
print("done")
if __name__ == "__main__":
main()
英文:
As the document states:
> As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
>
> This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
>
> Note that a queue created using a manager does not have this issue.
Your function raises the exception correctly, but it is stuck just before it appears in the terminal. The reason this does not happen with smaller data is probably because the data can fit into the pipe's buffer. In most cases, you should not rely on it.
There are two ways to explicitly prevent this.
The first is, as menthioned above, to create a queue using multiprocessing.Manager.
import multiprocessing
def runQueue():
# Use multiprocessing.Manager.Queue instead of multiprocessing.Queue
data_queue = multiprocessing.Manager().Queue(maxsize=1)
data_queue.put_nowait([[9] * 10000000])
print("in")
raise StopIteration
def main():
a = multiprocessing.Process(target=runQueue)
a.start()
a.join()
print("done")
if __name__ == "__main__":
main()
The second is to empty the queue before joining.
import multiprocessing
import queue
def runQueue(data_queue):
data_queue.put_nowait([[9] * 10000000])
print("in")
raise StopIteration
def main():
data_queue = multiprocessing.Queue(maxsize=1)
a = multiprocessing.Process(target=runQueue, args=(data_queue,))
a.start()
# Empty the queue before joining.
try:
while True:
data_queue.get(block=True, timeout=1)
except queue.Empty:
pass
a.join()
print("done")
if __name__ == "__main__":
main()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论