Multiprocessing code behaves differently when commenting out one print statement

huangapple go评论68阅读模式
英文:

Multiprocessing code behaves differently when commenting out one print statement

问题

这是我的代码:

from random import random
from multiprocessing import Process
from multiprocessing import Queue
import time

def new(shared_queue):
    print('Consumer: Running', flush=True)
    while shared_queue.qsize() > 0:
        shared_queue.get()
    print('Consumer: Done', flush=True)

if __name__ == '__main__':
    start_time = time.time()

    # 初始化队列
    queue = Queue()
    for _ in range(50000):
        # 生成一个值
        value = random()
        # 添加到队列
        queue.put(value)

    print('初始化队列大小', queue.qsize())

    p0 = Process(target=new, args=(queue,))
    p0.start()
    p1 = Process(target=new, args=(queue,))
    p1.start()
    p2 = Process(target=new, args=(queue,))
    p2.start()

    p0.join()
    p1.join()
    p2.join()

    print("完成 --- %s 秒 ---" % (time.time() - start_time))

当我运行这段代码时,我得到了我期望的结果:

Consumer: Done
Consumer: Done
Consumer: Done
Done in --- 5.304457664489746 seconds ---

然而,当我只注释掉这行:

print(shared_queue.qsize())

我只得到了这个结果:

初始化队列大小 50000
Consumer: Running
Consumer: Running
Consumer: Running
Consumer: Done

为什么我看不到其他的 Consumer: Done 行和 Done in --- xxx seconds ---

英文:

This is my code:

from random import random
from multiprocessing import Process
from multiprocessing import Queue
import time


def new(shared_queue):
    print('Consumer: Running', flush=True)
    while shared_queue.qsize() > 0:
        shared_queue.get()
        print(shared_queue.qsize())
    print('Consumer: Done', flush=True)


if __name__ == '__main__':
    start_time = time.time()

    # Init Queue
    queue = Queue()
    for _ in range(50000):
        # generate a value
        value = random()
        # add to the queue
        queue.put(value)

    print('Init Queue size', queue.qsize())

    p0 = Process(target=new, args=(queue,))
    p0.start()
    p1 = Process(target=new, args=(queue,))
    p1.start()
    p2 = Process(target=new, args=(queue,))
    p2.start()

    p0.join()
    p1.join()
    p2.join()

    print("Done in --- %s seconds ---" % (time.time() - start_time))

When I run this, I get the result I expect:

Consumer: Done
Consumer: Done
Consumer: Done
Done in --- 5.304457664489746 seconds ---

However, when I only comment out the line

print(shared_queue.qsize())

I only get this result:

Init Queue size 50000
Consumer: Running
Consumer: Running
Consumer: Running
Consumer: Done

Why don't I see the other Consumer: Done lines and the
Done in --- xxx seconds ---?

答案1

得分: 2

Your problem is that get is too fast and print too slow. If the consumers get elements super fast may occur that one consumer, let's say p1 checks shared_queue.qsize() > 0 and by the time p1 arrives to shared_queue.get() other consumer has called shared_queue.get() for the last element.

If your example has not concurrent producers and consumers (if your queue is fully defined by the time the first consumer starts) instead of using timeout as AhmedAEK suggested, you can use shared_queue.get(block=False), which will raise an exception that you will have to catch as AhmedAEK showed:

def new(shared_queue):
    print('Consumer : Running', flush=True)
    while shared_queue.qsize() > 0:
        try:
            shared_queue.get(block=False)
        except queue.Empty:
            pass
        # print(shared_queue.qsize())
    print('Consumer : Done', flush=True)
英文:

Your problem is that get is too fast and print too slow. If the consumers get elements super fast may occur that one consumer, let's say p1 checks shared_queue.qsize() > 0 and by the time p1arrives to shared_queue.get() other consumer has called shared_queue.get()for the last element.

If your example has not concurrent producers and consumers (if your queue is fully defined by the time the first consumer starts) instead of use timeout as AhmedAEK suggested, you can use shared_queue.get(block=False), which will raise and exception that you will have to catch as AhmedAEK showed:

def new(shared_queue):
    print('Consumer : Running', flush=True)
    while shared_queue.qsize() > 0:
        try:
            shared_queue.get(block=False)
        except queue.Empty:
            pass
        # print(shared_queue.qsize())
    print('Consumer : Done', flush=True)

答案2

得分: 1

在检查 shared_queue.qsize() > 0 和执行 shared_queue.get() 之间有一个以微秒为单位的有限时间,这段短时间内,另一个进程可能已经获取了队列中的最后一个项目,因此当这个进程调用 get() 时,它现在尝试从一个空队列中获取项目,这是一个阻塞操作,会导致应用程序挂起。

打印操作会减慢进程的速度,以至于它 "似乎" 正常工作,但这只是纯粹的幸运,最糟糕的错误是在测试中不会出现的错误。

处理这个问题的一个好方法是在 shared_queue.get(timeout=0.01) 上设置一个超时,这样子进程不会被困在等待一个空队列上。当然,还有其他方法,比如在所有工作发送后在队列本身放置终止命令。

通过添加超时,下面的代码保证每次都能正确运行:

from multiprocessing import Process
from multiprocessing import Queue
import time
from random import random
import queue

def new(shared_queue):
    print('Consumer: Running', flush=True)
    while shared_queue.qsize() > 0:
        try:
            shared_queue.get(timeout=0.01)
        except queue.Empty:
            pass
    print('Consumer: Done', flush=True)

if __name__ == '__main__':
    start_time = time.time()

    # 初始化队列
    queue = Queue()
    for _ in range(50000):
        # 生成一个值
        value = random()
        # 添加到队列
        queue.put(value)

    print('初始化队列大小', queue.qsize())

    p0 = Process(target=new, args=(queue,))
    p0.start()
    p1 = Process(target=new, args=(queue,))
    p1.start()
    p2 = Process(target=new, args=(queue,))
    p2.start()

    p0.join()
    p1.join()
    p2.join()

    print("Done in --- %s seconds ---" % (time.time() - start_time))
英文:

there is a finite time in microsecond between checking shared_queue.qsize() > 0 and executing shared_queue.get(), in this small time another process could have grabbed the last item off the queue so when this process calls get(), it is now trying to get an item from an empty queue which is a blocking action, and your application will hang.

the print slows the process down enough that it "seems" to work, but this is only by pure luck, the wrost bug is the bug that doesn't appear in testing.

a good way to handle this is to set a timeout on the shared_queue.get(timeout=0.01) so that the child won't be stuck waiting on an empty queue, there are obviously other ways like puting a termination command on the queue itself after all the work is sent.

with the added timeout the next code is guaranteed to work correctly every time.

from multiprocessing import Process
from multiprocessing import Queue
import time
from random import random
import queue

def new(shared_queue):
    print('Consumer: Running', flush=True)
    while shared_queue.qsize() > 0:
        try:
            shared_queue.get(timeout=0.01)
        except queue.Empty:
            pass
    print('Consumer: Done', flush=True)



if __name__ == '__main__':
    start_time = time.time()

    # Init Queue
    queue = Queue()
    for _ in range(50000):
        # generate a value
        value = random()
        # add to the queue
        queue.put(value)

    print('Init Queue size', queue.qsize())

    p0 = Process(target=new, args=(queue,))
    p0.start()
    p1 = Process(target=new, args=(queue,))
    p1.start()
    p2 = Process(target=new, args=(queue,))
    p2.start()

    p0.join()
    p1.join()
    p2.join()

    print("Done in --- %s seconds ---" % (time.time() - start_time))

huangapple
  • 本文由 发表于 2023年3月7日 00:55:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/75653661.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定