英文:
Multiprocessing Process Pool Executor blocking submit function
问题
我试图将我的 executor.submit
函数调用转换为一个阻塞函数,使其等待,直到 ProcessPoolExecutor 池有可用的工作线程。
换句话说,最初它应该打印1,2,然后等待5秒,打印3,4,依此类推。
我该如何实现这一点?
import concurrent.futures
import multiprocessing
import time
def wait_f():
time.sleep(5)
return 1
if __name__ == '__main__':
multiprocessing.freeze_support()
global_results = []
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
futures = []
for j in range(10):
future = executor.submit(wait_f)
futures.append(future)
print(j)
for future in concurrent.futures.as_completed(futures):
result = future.result()
global_results.append(result)
英文:
I am trying to turn my executor.submit
function call into a blocking function so that it waits until the ProcessPoolExecutor pool has available workers.
In other words, initially it should print 1,2, then wait for 5 seconds, print 3,4, etc.
How can I achieve this?
import concurrent.futures
import multiprocessing
import time
def wait_f():
time.sleep(5)
return 1
if __name__ == '__main__':
multiprocessing.freeze_support()
global_results = []
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
futures = []
for j in range(10):
future = executor.submit(wait_f)
futures.append(future)
print(j)
for future in concurrent.futures.as_completed(futures):
result = future.result()
global_results.append(result)
答案1
得分: 1
请记住,工作池模式旨在为您处理并发,因此您无需担心调度超过工作人员数量的任务,因为工作池已经为您优化了任务流程。
这样说吧,您可以根据此答案构建一个可行的解决方案。
from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor
class TaskManager():
def __init__(self, processes):
self.workers = Semaphore(processes)
self.executor = ProcessPoolExecutor(max_workers=processes)
def new_task(self, function):
"""启动新任务,如果队列已满则阻塞。"""
self.workers.acquire()
future = self.executor.submit(function)
future.add_done_callback(self.task_done)
def task_done(self):
"""任务完成后调用,释放阻塞的队列。"""
self.workers.release()
task_manager = TaskManager(2)
task_manager.new_task(wait_f)
英文:
Please do keep in mind that the Pool of workers pattern is designed to handle the concurrency for you so you don't have to. In other words, you should not worry about scheduling more tasks than workers as the Pool will already optimize the task flow for you.
This said, you can build from this answer a working solution.
from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor
class TaskManager():
def __init__(self, processes):
self.workers = Semaphore(processes)
self.executor = ProcessPoolExecutor(max_workers=processes)
def new_task(self, function):
"""Start a new task, blocks if queue is full."""
self.workers.acquire()
future = self.executor.submit(function)
future.add_done_callback(self.task_done)
def task_done(self):
"""Called once task is done, releases the queue if blocked."""
self.workers.release()
task_manager = TaskManager(2)
task_manager.new_task(wait_f)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论