Multiprocessing Process Pool Executor blocking submit function

huangapple go评论71阅读模式
英文:

Multiprocessing Process Pool Executor blocking submit function

问题

我试图将我的 executor.submit 函数调用转换为一个阻塞函数,使其等待,直到 ProcessPoolExecutor 池有可用的工作线程。

换句话说,最初它应该打印1,2,然后等待5秒,打印3,4,依此类推。

我该如何实现这一点?

import concurrent.futures
import multiprocessing
import time

def wait_f():
    time.sleep(5)
    return 1

if __name__ == '__main__':
    multiprocessing.freeze_support()
    global_results = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        futures = []
        for j in range(10):
            future = executor.submit(wait_f)
            futures.append(future)
            print(j)
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            global_results.append(result)
英文:

I am trying to turn my executor.submit function call into a blocking function so that it waits until the ProcessPoolExecutor pool has available workers.

In other words, initially it should print 1,2, then wait for 5 seconds, print 3,4, etc.

How can I achieve this?

import concurrent.futures
import multiprocessing
import time

def wait_f():
    time.sleep(5)
    return 1



if __name__ == '__main__':
    multiprocessing.freeze_support()
    global_results = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        futures = []
        for j in range(10):
            future = executor.submit(wait_f)
            futures.append(future)
            print(j)
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            global_results.append(result)

答案1

得分: 1

请记住,工作池模式旨在为您处理并发,因此您无需担心调度超过工作人员数量的任务,因为工作池已经为您优化了任务流程。

这样说吧,您可以根据此答案构建一个可行的解决方案。

from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor

class TaskManager():
    def __init__(self, processes):
        self.workers = Semaphore(processes)
        self.executor = ProcessPoolExecutor(max_workers=processes)

    def new_task(self, function):
        """启动新任务如果队列已满则阻塞"""
        self.workers.acquire()
        future = self.executor.submit(function)
        future.add_done_callback(self.task_done)

    def task_done(self):
        """任务完成后调用释放阻塞的队列"""
        self.workers.release()


task_manager = TaskManager(2)
task_manager.new_task(wait_f)
英文:

Please do keep in mind that the Pool of workers pattern is designed to handle the concurrency for you so you don't have to. In other words, you should not worry about scheduling more tasks than workers as the Pool will already optimize the task flow for you.

This said, you can build from this answer a working solution.

from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor

class TaskManager():
    def __init__(self, processes):
        self.workers = Semaphore(processes)
        self.executor = ProcessPoolExecutor(max_workers=processes)

    def new_task(self, function):
        """Start a new task, blocks if queue is full."""
        self.workers.acquire()
        future = self.executor.submit(function)
        future.add_done_callback(self.task_done)

    def task_done(self):
        """Called once task is done, releases the queue if blocked."""
        self.workers.release()


task_manager = TaskManager(2)
task_manager.new_task(wait_f)

huangapple
  • 本文由 发表于 2023年5月14日 20:46:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/76247561.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定