英文:
Multiprocessing backend blocking asyncio frontend
问题
在一个基于GUI的脚本(框架)中,我有一个基于Tkinter的GUI。这个GUI是异步运行的(可以工作)。当我按下“开始”按钮时,会启动一个处理过程,由于它需要大量的CPU资源,所以使用多进程来完成(可以工作)。
不工作的是将处理过程的反馈(已完成这个,已完成那个)传递回GUI以显示进度。
为了传递进程的消息,我使用了一个multiprocessing.Queue。由于异步GUI无法从该队列中获取数据,所以我使用了一个asyncio.Queue来传递给GUI,并且我有一个mp_queue_to_async_queue()函数来从mp_queue中提取消息并将其放入async_queue中。
理论上,一切都可以工作,但是mp_queue_to_async_queue()行会阻塞接下来的print_out_async()行。
下面是一个描述这个问题的“虚拟”代码:
import multiprocessing
import os
import time
import asyncio
import random
N_PROCESSES = 2
N_ITER = 10
N_SEC = 1
async_queue = asyncio.Queue()
def worker_main(p_queue):
print (_pid:=os.getpid(),"working")
for i in range(N_ITER):
some_random_time = N_SEC * random.random()
p_queue.put(f"{i} - {_pid}: {some_random_time} sec")
time.sleep(some_random_time)
p_queue.put(None)
async def run():
await mp_queue_to_async_queue() #这一行会阻塞
await print_out_async()
async def mp_queue_to_async_queue():
processes_finished = 0
while True:
message = mp_queue.get()
print(f"-> {message}")
await async_queue.put(message)
if message == None:
processes_finished += 1
if processes_finished == N_PROCESSES:
break
async def print_out_async():
processes_finished = 0
while True:
b = await async_queue.get()
print(f"<- {b}")
if b == None:
processes_finished += 1
if processes_finished == N_PROCESSES:
break
if __name__ == '__main__':
mp_queue = multiprocessing.Queue()
pool = multiprocessing.Pool(processes=N_PROCESSES, initializer=worker_main, initargs=(mp_queue,))
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
pool.terminate()
希望这可以帮助解决你的问题!
英文:
in a general GUI-based script (framework) I have a Tkinter-based GUI. This is run asynchronously (works). When I press the 'Start' button, a processing starts and as it's CPU-heavy, this is done using multiprocessing (works).
What doesn't work is the back-reporting of the processes (done this, done that) to the GUI in order to display the progress.
In order the deliver the messages from processes, I use a multiprocessing.Queue. Since the async GUI cannot be fed from this Queue, I use an asyncio.Queue to fed the GUI, and I have the mp_queue_to_async_queue() function to pick out the messages from mp_queue and put them to the async_queue.
In theory everything works with the exception that the mp_queue_to_async_queue() row blocks the following print_out_async() row.
Here is a 'dummy' code that depicts the problem:
import multiprocessing
import os
import time
import asyncio
import random
N_PROCESSES = 2
N_ITER = 10
N_SEC = 1
async_queue = asyncio.Queue()
def worker_main(p_queue):
print (_pid:=os.getpid(),"working")
for i in range(N_ITER):
some_random_time = N_SEC * random.random()
p_queue.put(f"{i} - {_pid}: {some_random_time} sec")
time.sleep(some_random_time)
p_queue.put(None)
async def run():
await mp_queue_to_async_queue() #This row is blocking
await print_out_async()
async def mp_queue_to_async_queue():
processes_finished = 0
while True:
message = mp_queue.get()
print(f"-> {message}")
await async_queue.put(message)
if message == None:
processes_finished += 1
if processes_finished == N_PROCESSES:
break
async def print_out_async():
processes_finished = 0
while True:
b = await async_queue.get()
print(f"<- {b}")
if b == None:
processes_finished += 1
if processes_finished == N_PROCESSES:
break
if __name__ == '__main__':
mp_queue = multiprocessing.Queue()
pool = multiprocessing.Pool(processes=N_PROCESSES, initializer=worker_main, initargs=(mp_queue,))
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
pool.terminate()
答案1
得分: 1
你的问题是关于代码中的一些内容的翻译。以下是翻译好的内容:
你的问题是你正在“等待”一个无限循环调用 - 它必须被创建为一个任务,并确保在每次循环执行期间给予 asyncio 循环控制权。
此外,mp.queue.get 默认是阻塞的 - 当没有消息时,你必须将控制权交还给循环:
async def mp_queue_to_async_queue():
processes_finished = 0
while True:
message = mp_queue.get()
...
只需检查是否有准备好的消息,否则将控制权交还给异步循环:
from queue import Empty as QueueEmpty
async def run():
# 创建的任务将在其他异步代码让出时运行
# 虽然在这种情况下我们可以“发射并忘记”
# 但保持对每个创建的任务的引用很重要:
queues_task = asyncio.create_task(mp_queue_to_async_queue()) # 这一行是阻塞的
await print_out_async()
async def mp_queue_to_async_queue():
processes_finished = 0
while True:
try:
message = mp_queue.get_nowait()
except QueueEmpty:
await asyncio.sleep(0)
continue
...
print(f"-> {message}")
...
希望对你有帮助!如果还有其他问题,请随时提问。
英文:
Your villain is that you are "awaiting" an infinite loop call - it has to be created as a task, and ensure the asyncio loop is given control during each loop execution.
Besides it, mp.queue.get is, by default, blocking - you have to yield to the loop when there are no messages:
async def mp_queue_to_async_queue():
processes_finished = 0
while True:
message = mp_queue.get()
...
Simply check if there is a message ready, otherwise yield the control back to the async loop:
from queue import Empty as QueueEmpty
async def run():
# a created task will run whenever another async code yields
# to the loop.
# Although in this case we can do "fire and forget"
# it is important to keep a reference to each created task:
queues_task = asyncio.create_task(mp_queue_to_async_queue()) #This row is blocking
await print_out_async()
async def mp_queue_to_async_queue():
processes_finished = 0
while True:
try:
message = mp_queue.get_nowait()
except QueueEmpty:
await asyncio.sleep(0)
continue
...
print(f"-> {message}")
...
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论