英文:
Multiprocessing code behaves differently when commenting out one print statement
问题
这是我的代码:
from random import random
from multiprocessing import Process
from multiprocessing import Queue
import time
def new(shared_queue):
print('Consumer: Running', flush=True)
while shared_queue.qsize() > 0:
shared_queue.get()
print('Consumer: Done', flush=True)
if __name__ == '__main__':
start_time = time.time()
# 初始化队列
queue = Queue()
for _ in range(50000):
# 生成一个值
value = random()
# 添加到队列
queue.put(value)
print('初始化队列大小', queue.qsize())
p0 = Process(target=new, args=(queue,))
p0.start()
p1 = Process(target=new, args=(queue,))
p1.start()
p2 = Process(target=new, args=(queue,))
p2.start()
p0.join()
p1.join()
p2.join()
print("完成 --- %s 秒 ---" % (time.time() - start_time))
当我运行这段代码时,我得到了我期望的结果:
Consumer: Done
Consumer: Done
Consumer: Done
Done in --- 5.304457664489746 seconds ---
然而,当我只注释掉这行:
print(shared_queue.qsize())
我只得到了这个结果:
初始化队列大小 50000
Consumer: Running
Consumer: Running
Consumer: Running
Consumer: Done
为什么我看不到其他的 Consumer: Done
行和 Done in --- xxx seconds ---
?
英文:
This is my code:
from random import random
from multiprocessing import Process
from multiprocessing import Queue
import time
def new(shared_queue):
print('Consumer: Running', flush=True)
while shared_queue.qsize() > 0:
shared_queue.get()
print(shared_queue.qsize())
print('Consumer: Done', flush=True)
if __name__ == '__main__':
start_time = time.time()
# Init Queue
queue = Queue()
for _ in range(50000):
# generate a value
value = random()
# add to the queue
queue.put(value)
print('Init Queue size', queue.qsize())
p0 = Process(target=new, args=(queue,))
p0.start()
p1 = Process(target=new, args=(queue,))
p1.start()
p2 = Process(target=new, args=(queue,))
p2.start()
p0.join()
p1.join()
p2.join()
print("Done in --- %s seconds ---" % (time.time() - start_time))
When I run this, I get the result I expect:
Consumer: Done
Consumer: Done
Consumer: Done
Done in --- 5.304457664489746 seconds ---
However, when I only comment out the line
print(shared_queue.qsize())
I only get this result:
Init Queue size 50000
Consumer: Running
Consumer: Running
Consumer: Running
Consumer: Done
Why don't I see the other Consumer: Done
lines and the
Done in --- xxx seconds ---
?
答案1
得分: 2
Your problem is that get
is too fast and print
too slow. If the consumers get elements super fast may occur that one consumer, let's say p1
checks shared_queue.qsize() > 0
and by the time p1
arrives to shared_queue.get()
other consumer has called shared_queue.get()
for the last element.
If your example has not concurrent producers and consumers (if your queue is fully defined by the time the first consumer starts) instead of using timeout
as AhmedAEK suggested, you can use shared_queue.get(block=False)
, which will raise an exception that you will have to catch as AhmedAEK showed:
def new(shared_queue):
print('Consumer : Running', flush=True)
while shared_queue.qsize() > 0:
try:
shared_queue.get(block=False)
except queue.Empty:
pass
# print(shared_queue.qsize())
print('Consumer : Done', flush=True)
英文:
Your problem is that get
is too fast and print
too slow. If the consumers get elements super fast may occur that one consumer, let's say p1
checks shared_queue.qsize() > 0
and by the time p1
arrives to shared_queue.get()
other consumer has called shared_queue.get()
for the last element.
If your example has not concurrent producers and consumers (if your queue is fully defined by the time the first consumer starts) instead of use timeout
as AhmedAEK suggested, you can use shared_queue.get(block=False)
, which will raise and exception that you will have to catch as AhmedAEK showed:
def new(shared_queue):
print('Consumer : Running', flush=True)
while shared_queue.qsize() > 0:
try:
shared_queue.get(block=False)
except queue.Empty:
pass
# print(shared_queue.qsize())
print('Consumer : Done', flush=True)
答案2
得分: 1
在检查 shared_queue.qsize() > 0
和执行 shared_queue.get()
之间有一个以微秒为单位的有限时间,这段短时间内,另一个进程可能已经获取了队列中的最后一个项目,因此当这个进程调用 get()
时,它现在尝试从一个空队列中获取项目,这是一个阻塞操作,会导致应用程序挂起。
打印操作会减慢进程的速度,以至于它 "似乎" 正常工作,但这只是纯粹的幸运,最糟糕的错误是在测试中不会出现的错误。
处理这个问题的一个好方法是在 shared_queue.get(timeout=0.01)
上设置一个超时,这样子进程不会被困在等待一个空队列上。当然,还有其他方法,比如在所有工作发送后在队列本身放置终止命令。
通过添加超时,下面的代码保证每次都能正确运行:
from multiprocessing import Process
from multiprocessing import Queue
import time
from random import random
import queue
def new(shared_queue):
print('Consumer: Running', flush=True)
while shared_queue.qsize() > 0:
try:
shared_queue.get(timeout=0.01)
except queue.Empty:
pass
print('Consumer: Done', flush=True)
if __name__ == '__main__':
start_time = time.time()
# 初始化队列
queue = Queue()
for _ in range(50000):
# 生成一个值
value = random()
# 添加到队列
queue.put(value)
print('初始化队列大小', queue.qsize())
p0 = Process(target=new, args=(queue,))
p0.start()
p1 = Process(target=new, args=(queue,))
p1.start()
p2 = Process(target=new, args=(queue,))
p2.start()
p0.join()
p1.join()
p2.join()
print("Done in --- %s seconds ---" % (time.time() - start_time))
英文:
there is a finite time in microsecond between checking shared_queue.qsize() > 0
and executing shared_queue.get()
, in this small time another process could have grabbed the last item off the queue so when this process calls get()
, it is now trying to get an item from an empty queue which is a blocking action, and your application will hang.
the print slows the process down enough that it "seems" to work, but this is only by pure luck, the wrost bug is the bug that doesn't appear in testing.
a good way to handle this is to set a timeout on the shared_queue.get(timeout=0.01)
so that the child won't be stuck waiting on an empty queue, there are obviously other ways like puting a termination command on the queue itself after all the work is sent.
with the added timeout the next code is guaranteed to work correctly every time.
from multiprocessing import Process
from multiprocessing import Queue
import time
from random import random
import queue
def new(shared_queue):
print('Consumer: Running', flush=True)
while shared_queue.qsize() > 0:
try:
shared_queue.get(timeout=0.01)
except queue.Empty:
pass
print('Consumer: Done', flush=True)
if __name__ == '__main__':
start_time = time.time()
# Init Queue
queue = Queue()
for _ in range(50000):
# generate a value
value = random()
# add to the queue
queue.put(value)
print('Init Queue size', queue.qsize())
p0 = Process(target=new, args=(queue,))
p0.start()
p1 = Process(target=new, args=(queue,))
p1.start()
p2 = Process(target=new, args=(queue,))
p2.start()
p0.join()
p1.join()
p2.join()
print("Done in --- %s seconds ---" % (time.time() - start_time))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论