英文:
Job queue with dependencies with python multiprocessing
问题
我有一个函数和一系列的工作:
jobs = [[(2, 'dog'), None],
[(-1, 'cat'), (0,)],
[(-1, 'Bob'), (1,)],
[(7, 'Alice'), None],
[(0, 'spam'), (2,3)]]
我想要同时并行地将函数应用于第一个元组中的参数,同时满足对先前工作的依赖关系(第二个元组)。例如,猫的工作不能开始,直到狗的工作完成。然而,我不想占用一个核心并等待工作的依赖关系完成。相反,我想继续进行可以立即执行的不同工作,以便在可能的情况下使所有核心始终保持繁忙。有什么建议吗?非常感谢!
英文:
I have a function and a list of jobs:
jobs = [[(2, 'dog'), None],
[(-1, 'cat'), (0,)],
[(-1, 'Bob'), (1,)],
[(7, 'Alice'), None],
[(0, 'spam'), (2,3)]]
I would like to apply the function to the arguments (first tuple) in parallel, while satisfying the dependencies on previous jobs (second tuple). Eg the cat job cannot be started until the dog job has finished. However, I don't want to occupy a core and wait until the job's dependencies are finished. Instead, I want to move on to a different job which can be executed immediately, so that all cores are kept busy at all times, if possible. Any tips? Many thanks!
答案1
得分: 3
以下是您要翻译的内容:
"The comment posted by Charchit Agarwal is potentially one way. The problem is if a job has multiple dependencies and these dependencies are completed in different processes, how do these "super" functions communicate among one another? So, here is another method that uses a job-completion callback to submit new jobs as job dependencies complete:
I would first process your jobs
list to create the following instances:
starts_immediately
: A list of job numbers (i.e. indices ofjobs
) that can be submitted immediately since they have no dependencies.depends_on
: A dictionary of sets. The key is a job number and its value is a set of jobs that must complete before this job can be submitted.precedes
: A dictionary of sets. The key is a job number and its value is a set of job numbers that cannot be started until this job completes.
We then arrange that for whenever a job completes, we determine what jobs, if any, can be submitted. For this we use a job-completion callback function:
DEBUG = True
def worker(tpl):
import time
print('Starting work on:', tpl, flush=True)
time.sleep(.5) # Simulate work being done
...
print('Completed work on:', tpl, flush=True)
def main(jobs):
from multiprocessing import Pool
from collections import defaultdict
from functools import partial
from threading import Event
starts_immediately = []
depends_on = {}
precedes = defaultdict(set)
for job_number, job in enumerate(jobs):
_, dependency = job
if dependency is None:
starts_immediately.append(job_number)
else:
depends_on[job_number] = set(dependency)
for job_number_2 in dependency:
precedes[job_number_2].add(job_number)
if DEBUG:
print('starts _immediately:', starts_immediately)
print('depends on:', depends_on)
print('precedes:', precedes)
print()
jobs_completed = Event()
jobs_to_complete = len(jobs)
with Pool() as pool:
def my_callback(job_number, result):
nonlocal jobs_to_complete
jobs_to_complete -= 1
if jobs_to_complete == 0: # Ww have completed all jobs:
jobs_completed.set()
return
for job_number_2 in precedes[job_number]:
s = depends_on[job_number_2]
s remove(job_number) # This dependency completed
if not s: # No more dependencies to wait for:
pool.apply_async(worker, args=(jobs[job_number_2][0],), callback=partial(my_callback, job_number_2))
# The jobs we can initially submit to get things rolling:
for job_number in starts_immediately:
pool.apply_async(worker, args=(jobs[job_number][0],), callback=partial(my_callback, job_number))
jobs_completed.wait() # Wait for all jobs to complete
if __name__ == '__main__':
jobs = [[(2, 'dog'), None],
[(-1, 'cat'), (0,)],
[(-1, 'Bob'), (1,)],
[(7, 'Alice'), None],
[(0, 'spam'), (2,3)]]
main(jobs)
Prints:
starts _immediately: [0, 3]
depends on: {1: {0}, 2: {1}, 4: {2, 3}}
precedes: defaultdict(<class 'set'>, {0: {1}, 1: {2}, 2: {4}, 3: {4}})
Starting work on: (2, 'dog')
Starting work on: (7, 'Alice')
Completed work on: (2, 'dog')
Completed work on: (7, 'Alice')
Starting work on: (-1, 'cat')
Completed work on: (-1, 'cat')
Starting work on: (-1, 'Bob')
Completed work on: (-1, 'Bob')
Starting work on: (0, 'spam')
Completed work on: (0, 'spam')
英文:
The comment posted by Charchit Agarwal is potentially one way. The problem is if a job has multiple dependencies and these dependencies are completed in different processes, how do these "super" functions communicate among one another? So, here is another method that uses a job-completion callback to submit new jobs as job dependencies complete:
I would first process your jobs
list to create the following instances:
starts_immediately
: A list of job numbers (i.e. indices ofjobs
) that can be submitted immediately since they have no dependencies.depends_on
: A dictionary of sets. The key is a job number and its value is a set of jobs that must complete before this job can be submitted.precedes
: A dictionary of sets. The key is a job number and its value is a set of job numbers that cannot be started until this job completes.
We then arrange that for whenever a job completes, we determine what jobs, if any, can be submitted. For this we use a job-completion callback function:
DEBUG = True
def worker(tpl):
import time
print('Starting work on:', tpl, flush=True)
time.sleep(.5) # Simulate work being done
...
print('Completed work on:', tpl, flush=True)
def main(jobs):
from multiprocessing import Pool
from collections import defaultdict
from functools import partial
from threading import Event
starts_immediately = []
depends_on = {}
precedes = defaultdict(set)
for job_number, job in enumerate(jobs):
_, dependency = job
if dependency is None:
starts_immediately.append(job_number)
else:
depends_on[job_number] = set(dependency)
for job_number_2 in dependency:
precedes[job_number_2].add(job_number)
if DEBUG:
print('starts _immediately:', starts_immediately)
print('depends on:', depends_on)
print('precedes:', precedes)
print()
jobs_completed = Event()
jobs_to_complete = len(jobs)
with Pool() as pool:
def my_callback(job_number, result):
nonlocal jobs_to_complete
jobs_to_complete -= 1
if jobs_to_complete == 0: # Ww have completed all jobs:
jobs_completed.set()
return
for job_number_2 in precedes[job_number]:
s = depends_on[job_number_2]
s.remove(job_number) # This dependency completed
if not s: # No more dependencies to wait for:
pool.apply_async(worker, args=(jobs[job_number_2][0],), callback=partial(my_callback, job_number_2))
# The jobs we can initially submit to get things rolling:
for job_number in starts_immediately:
pool.apply_async(worker, args=(jobs[job_number][0],), callback=partial(my_callback, job_number))
jobs_completed.wait() # Wait for all jobs to complete
if __name__ == '__main__':
jobs = [[(2, 'dog'), None],
[(-1, 'cat'), (0,)],
[(-1, 'Bob'), (1,)],
[(7, 'Alice'), None],
[(0, 'spam'), (2,3)]]
main(jobs)
Prints:
starts _immediately: [0, 3]
depends on: {1: {0}, 2: {1}, 4: {2, 3}}
precedes: defaultdict(<class 'set'>, {0: {1}, 1: {2}, 2: {4}, 3: {4}})
Starting work on: (2, 'dog')
Starting work on: (7, 'Alice')
Completed work on: (2, 'dog')
Completed work on: (7, 'Alice')
Starting work on: (-1, 'cat')
Completed work on: (-1, 'cat')
Starting work on: (-1, 'Bob')
Completed work on: (-1, 'Bob')
Starting work on: (0, 'spam')
Completed work on: (0, 'spam')
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论