多进程后端阻塞异步IO前端

huangapple go评论86阅读模式
英文:

Multiprocessing backend blocking asyncio frontend

问题

在一个基于GUI的脚本(框架)中,我有一个基于Tkinter的GUI。这个GUI是异步运行的(可以工作)。当我按下“开始”按钮时,会启动一个处理过程,由于它需要大量的CPU资源,所以使用多进程来完成(可以工作)。
不工作的是将处理过程的反馈(已完成这个,已完成那个)传递回GUI以显示进度。

为了传递进程的消息,我使用了一个multiprocessing.Queue。由于异步GUI无法从该队列中获取数据,所以我使用了一个asyncio.Queue来传递给GUI,并且我有一个mp_queue_to_async_queue()函数来从mp_queue中提取消息并将其放入async_queue中。
理论上,一切都可以工作,但是mp_queue_to_async_queue()行会阻塞接下来的print_out_async()行。

下面是一个描述这个问题的“虚拟”代码:

import multiprocessing
import os
import time
import asyncio
import random

N_PROCESSES = 2
N_ITER = 10
N_SEC = 1

async_queue = asyncio.Queue()

def worker_main(p_queue):
    print (_pid:=os.getpid(),"working")
    for i in range(N_ITER):
        some_random_time = N_SEC * random.random()
        p_queue.put(f"{i} - {_pid}: {some_random_time} sec")
        time.sleep(some_random_time)
    p_queue.put(None)

async def run():
    await mp_queue_to_async_queue() #这一行会阻塞
    await print_out_async()

async def mp_queue_to_async_queue():
    processes_finished = 0
    while True:
        message = mp_queue.get()
        print(f"-> {message}")
        await async_queue.put(message)
        if message == None:
            processes_finished += 1
        if processes_finished == N_PROCESSES:
            break

async def print_out_async():
    processes_finished = 0
    while True:
        b = await async_queue.get()
        print(f"<- {b}")
        if b == None:
            processes_finished += 1
        if processes_finished == N_PROCESSES:
            break

if __name__ == '__main__':
    mp_queue = multiprocessing.Queue()
    pool = multiprocessing.Pool(processes=N_PROCESSES, initializer=worker_main, initargs=(mp_queue,))

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

    pool.terminate()

希望这可以帮助解决你的问题!

英文:

in a general GUI-based script (framework) I have a Tkinter-based GUI. This is run asynchronously (works). When I press the 'Start' button, a processing starts and as it's CPU-heavy, this is done using multiprocessing (works).
What doesn't work is the back-reporting of the processes (done this, done that) to the GUI in order to display the progress.

In order the deliver the messages from processes, I use a multiprocessing.Queue. Since the async GUI cannot be fed from this Queue, I use an asyncio.Queue to fed the GUI, and I have the mp_queue_to_async_queue() function to pick out the messages from mp_queue and put them to the async_queue.
In theory everything works with the exception that the mp_queue_to_async_queue() row blocks the following print_out_async() row.

Here is a 'dummy' code that depicts the problem:

import multiprocessing
import os
import time
import asyncio
import random

N_PROCESSES = 2
N_ITER = 10
N_SEC = 1

async_queue = asyncio.Queue()

def worker_main(p_queue):
    print (_pid:=os.getpid(),&quot;working&quot;)
    for i in range(N_ITER):
        some_random_time = N_SEC * random.random()
        p_queue.put(f&quot;{i} - {_pid}: {some_random_time} sec&quot;)
        time.sleep(some_random_time)
    p_queue.put(None)

async def run():
    await mp_queue_to_async_queue() #This row is blocking
    await print_out_async()

async def mp_queue_to_async_queue():
    processes_finished = 0
    while True:
        message = mp_queue.get()
        print(f&quot;-&gt; {message}&quot;)
        await async_queue.put(message)
        if message == None:
            processes_finished += 1
        if processes_finished == N_PROCESSES:
            break

async def print_out_async():
    processes_finished = 0
    while True:
        b = await async_queue.get()
        print(f&quot;&lt;- {b}&quot;)
        if b == None:
            processes_finished += 1
        if processes_finished == N_PROCESSES:
            break

if __name__ == &#39;__main__&#39;:
    mp_queue = multiprocessing.Queue()
    pool = multiprocessing.Pool(processes=N_PROCESSES, initializer=worker_main, initargs=(mp_queue,))

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

    pool.terminate()

答案1

得分: 1

你的问题是关于代码中的一些内容的翻译。以下是翻译好的内容:

你的问题是你正在“等待”一个无限循环调用 - 它必须被创建为一个任务,并确保在每次循环执行期间给予 asyncio 循环控制权。

此外,mp.queue.get 默认是阻塞的 - 当没有消息时,你必须将控制权交还给循环:

async def mp_queue_to_async_queue():
    processes_finished = 0
    while True:
        message = mp_queue.get()
        ... 

只需检查是否有准备好的消息,否则将控制权交还给异步循环:

from queue import Empty as QueueEmpty

async def run():
   
    # 创建的任务将在其他异步代码让出时运行
    # 虽然在这种情况下我们可以“发射并忘记”
    # 但保持对每个创建的任务的引用很重要:
    queues_task = asyncio.create_task(mp_queue_to_async_queue()) # 这一行是阻塞的
    await print_out_async()


async def mp_queue_to_async_queue():
    processes_finished = 0
    while True:
        try:
            message = mp_queue.get_nowait()
        except QueueEmpty:
            await asyncio.sleep(0)
            continue
        ...
        print(f"-> {message}")
        ...
        

希望对你有帮助!如果还有其他问题,请随时提问。

英文:

Your villain is that you are "awaiting" an infinite loop call - it has to be created as a task, and ensure the asyncio loop is given control during each loop execution.

Besides it, mp.queue.get is, by default, blocking - you have to yield to the loop when there are no messages:

async def mp_queue_to_async_queue():
    processes_finished = 0
    while True:
        message = mp_queue.get()
        ... 

Simply check if there is a message ready, otherwise yield the control back to the async loop:

from queue import Empty as QueueEmpty

async def run():
   
    # a created task will run whenever another async code yields
    # to the loop. 
    # Although in this case we can do &quot;fire and forget&quot;
    # it is important to keep a reference to each created task:
    queues_task = asyncio.create_task(mp_queue_to_async_queue()) #This row is blocking
    await print_out_async()


async def mp_queue_to_async_queue():
    processes_finished = 0
    while True:
        try:
            message = mp_queue.get_nowait()
        except QueueEmpty:
            await asyncio.sleep(0)
            continue
        ...
        print(f&quot;-&gt; {message}&quot;)
        ...
        

huangapple
  • 本文由 发表于 2023年8月9日 05:05:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/76863182.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定