工作队列与Python多进程的依赖关系

huangapple go评论78阅读模式
英文:

Job queue with dependencies with python multiprocessing

问题

我有一个函数和一系列的工作:

jobs = [[(2, 'dog'), None],
        [(-1, 'cat'), (0,)], 
        [(-1, 'Bob'), (1,)],
        [(7, 'Alice'), None],
        [(0, 'spam'), (2,3)]]

我想要同时并行地将函数应用于第一个元组中的参数,同时满足对先前工作的依赖关系(第二个元组)。例如,猫的工作不能开始,直到狗的工作完成。然而,我不想占用一个核心并等待工作的依赖关系完成。相反,我想继续进行可以立即执行的不同工作,以便在可能的情况下使所有核心始终保持繁忙。有什么建议吗?非常感谢!

英文:

I have a function and a list of jobs:

jobs = [[(2, 'dog'), None],
        [(-1, 'cat'), (0,)], 
        [(-1, 'Bob'), (1,)],
        [(7, 'Alice'), None],
        [(0, 'spam'), (2,3)]]

I would like to apply the function to the arguments (first tuple) in parallel, while satisfying the dependencies on previous jobs (second tuple). Eg the cat job cannot be started until the dog job has finished. However, I don't want to occupy a core and wait until the job's dependencies are finished. Instead, I want to move on to a different job which can be executed immediately, so that all cores are kept busy at all times, if possible. Any tips? Many thanks!

答案1

得分: 3

以下是您要翻译的内容:

"The comment posted by Charchit Agarwal is potentially one way. The problem is if a job has multiple dependencies and these dependencies are completed in different processes, how do these "super" functions communicate among one another? So, here is another method that uses a job-completion callback to submit new jobs as job dependencies complete:

I would first process your jobs list to create the following instances:

  1. starts_immediately: A list of job numbers (i.e. indices of jobs) that can be submitted immediately since they have no dependencies.
  2. depends_on: A dictionary of sets. The key is a job number and its value is a set of jobs that must complete before this job can be submitted.
  3. precedes: A dictionary of sets. The key is a job number and its value is a set of job numbers that cannot be started until this job completes.

We then arrange that for whenever a job completes, we determine what jobs, if any, can be submitted. For this we use a job-completion callback function:

DEBUG = True

def worker(tpl):
    import time

    print('Starting work on:', tpl, flush=True)
    time.sleep(.5) # Simulate work being done
    ...
    print('Completed work on:', tpl, flush=True)

def main(jobs):
    from multiprocessing import Pool
    from collections import defaultdict
    from functools import partial
    from threading import Event

    starts_immediately = []
    depends_on = {}
    precedes = defaultdict(set)
    for job_number, job in enumerate(jobs):
        _, dependency = job
        if dependency is None:
            starts_immediately.append(job_number)
        else:
            depends_on[job_number] = set(dependency)
            for job_number_2 in dependency:
                precedes[job_number_2].add(job_number)

    if DEBUG:
        print('starts _immediately:', starts_immediately)
        print('depends on:', depends_on)
        print('precedes:', precedes)
        print()

    jobs_completed = Event()

    jobs_to_complete = len(jobs)

    with Pool() as pool:
        def my_callback(job_number, result):
            nonlocal jobs_to_complete

            jobs_to_complete -= 1
            if jobs_to_complete == 0: # Ww have completed all jobs:
                jobs_completed.set()
                return

            for job_number_2 in precedes[job_number]:
                s = depends_on[job_number_2]
                s remove(job_number) # This dependency completed
                if not s: # No more dependencies to wait for:
                    pool.apply_async(worker, args=(jobs[job_number_2][0],), callback=partial(my_callback, job_number_2))

        # The jobs we can initially submit to get things rolling:
        for job_number in starts_immediately:
            pool.apply_async(worker, args=(jobs[job_number][0],), callback=partial(my_callback, job_number))
        jobs_completed.wait() # Wait for all jobs to complete

if __name__ == '__main__':
    jobs = [[(2, 'dog'), None],
            [(-1, 'cat'), (0,)],
            [(-1, 'Bob'), (1,)],
            [(7, 'Alice'), None],
            [(0, 'spam'), (2,3)]]
    main(jobs)

Prints:

starts _immediately: [0, 3]
depends on: {1: {0}, 2: {1}, 4: {2, 3}}
precedes: defaultdict(<class 'set'>, {0: {1}, 1: {2}, 2: {4}, 3: {4}})
Starting work on: (2, 'dog')
Starting work on: (7, 'Alice')
Completed work on: (2, 'dog')
Completed work on: (7, 'Alice')
Starting work on: (-1, 'cat')
Completed work on: (-1, 'cat')
Starting work on: (-1, 'Bob')
Completed work on: (-1, 'Bob')
Starting work on: (0, 'spam')
Completed work on: (0, 'spam')
英文:

The comment posted by Charchit Agarwal is potentially one way. The problem is if a job has multiple dependencies and these dependencies are completed in different processes, how do these "super" functions communicate among one another? So, here is another method that uses a job-completion callback to submit new jobs as job dependencies complete:

I would first process your jobs list to create the following instances:

  1. starts_immediately: A list of job numbers (i.e. indices of jobs) that can be submitted immediately since they have no dependencies.
  2. depends_on: A dictionary of sets. The key is a job number and its value is a set of jobs that must complete before this job can be submitted.
  3. precedes: A dictionary of sets. The key is a job number and its value is a set of job numbers that cannot be started until this job completes.

We then arrange that for whenever a job completes, we determine what jobs, if any, can be submitted. For this we use a job-completion callback function:

DEBUG = True

def worker(tpl):
    import time

    print(&#39;Starting work on:&#39;, tpl, flush=True)
    time.sleep(.5) # Simulate work being done
    ...
    print(&#39;Completed work on:&#39;, tpl, flush=True)

def main(jobs):
    from multiprocessing import Pool
    from collections import defaultdict
    from functools import partial
    from threading import Event

    starts_immediately = []
    depends_on = {}
    precedes = defaultdict(set)
    for job_number, job in enumerate(jobs):
        _, dependency = job
        if dependency is None:
            starts_immediately.append(job_number)
        else:
            depends_on[job_number] = set(dependency)
            for job_number_2 in dependency:
                precedes[job_number_2].add(job_number)

    if DEBUG:
        print(&#39;starts _immediately:&#39;, starts_immediately)
        print(&#39;depends on:&#39;, depends_on)
        print(&#39;precedes:&#39;, precedes)
        print()

    jobs_completed = Event()

    jobs_to_complete = len(jobs)

    with Pool() as pool:
        def my_callback(job_number, result):
            nonlocal jobs_to_complete

            jobs_to_complete -= 1
            if jobs_to_complete == 0: # Ww have completed all jobs:
                jobs_completed.set()
                return

            for job_number_2 in precedes[job_number]:
                s = depends_on[job_number_2]
                s.remove(job_number) # This dependency completed
                if not s: # No more dependencies to wait for:
                    pool.apply_async(worker, args=(jobs[job_number_2][0],), callback=partial(my_callback, job_number_2))

        # The jobs we can initially submit to get things rolling:
        for job_number in starts_immediately:
            pool.apply_async(worker, args=(jobs[job_number][0],), callback=partial(my_callback, job_number))
        jobs_completed.wait() # Wait for all jobs to complete

if __name__ == &#39;__main__&#39;:
    jobs = [[(2, &#39;dog&#39;), None],
            [(-1, &#39;cat&#39;), (0,)],
            [(-1, &#39;Bob&#39;), (1,)],
            [(7, &#39;Alice&#39;), None],
            [(0, &#39;spam&#39;), (2,3)]]
    main(jobs)

Prints:

starts _immediately: [0, 3]
depends on: {1: {0}, 2: {1}, 4: {2, 3}}
precedes: defaultdict(&lt;class &#39;set&#39;&gt;, {0: {1}, 1: {2}, 2: {4}, 3: {4}})
Starting work on: (2, &#39;dog&#39;)
Starting work on: (7, &#39;Alice&#39;)
Completed work on: (2, &#39;dog&#39;)
Completed work on: (7, &#39;Alice&#39;)
Starting work on: (-1, &#39;cat&#39;)
Completed work on: (-1, &#39;cat&#39;)
Starting work on: (-1, &#39;Bob&#39;)
Completed work on: (-1, &#39;Bob&#39;)
Starting work on: (0, &#39;spam&#39;)
Completed work on: (0, &#39;spam&#39;)

huangapple
  • 本文由 发表于 2023年2月27日 04:53:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/75574947.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定