并行执行有向无环图任务

huangapple go评论79阅读模式
英文:

Parallel execution of directed acyclic graph of tasks

问题

我有一个任务列表 [Task-A, Task-B, Task-C, Task-D, ...]
一个任务可以选择依赖于其他任务。

例如:
A 可以依赖于 3 个任务:B、C 和 D
B 可以依赖于 2 个任务:C 和 E

这基本上是一个有向无环图,任务的执行应该在依赖任务执行后才能发生。

现在可能会出现这样的情况,即在任何时刻,有多个准备执行的任务。在这种情况下,我们可以并行运行它们。

有没有关于如何实现这样的执行并尽可能多地并行运行的想法?

class Task {
    private String name;
    private List<Task> dependentTasks;

    public void run() {
        // 业务逻辑
    }
}
英文:

I have a list of tasks [Task-A,Task-B,Task-C,Task-D, ...].
One task can be optionally dependent on other tasks.

For example:
A can be dependent on 3 tasks: B, C and D
B can be dependent on 2 tasks: C and E

It's basically a directed acyclic graph and execution of a task should happen only after the dependent tasks are executed.

Now it might happen that at any point of time, there are multiple tasks that are ready for execution. In such a case, we can run them in parallel.

Any idea on how to implement such an execution while having as much parallelism as possible?

class Task{
     private String name;
     private List&lt;Task&gt; dependentTasks;
     
     public void run(){
     // business logic
     }
}

答案1

得分: 6

以下是您要翻译的内容:

"The other answer works fine but is too complicated."
另一个答案效果不错,但太复杂了。

"A simpler way is to just execute Kahn's algorithm but in parallel."
更简单的方法是只需并行执行 Kahn 算法

"The key is to execute all the tasks in parallel for whom all dependencies have been executed."
关键是对于所有依赖项已执行的任务并行执行。

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

class DependencyManager {
    private final ConcurrentHashMap&lt;String, List&lt;String&gt;&gt; _dependencies = new ConcurrentHashMap&lt;&gt;();
    private final ConcurrentHashMap&lt;String, List&lt;String&gt;&gt; _reverseDependencies = new ConcurrentHashMap&lt;&gt;();
    private final ConcurrentHashMap&lt;String, Runnable&gt; _tasks = new ConcurrentHashMap&lt;&gt;();
    private final ConcurrentHashMap&lt;String, Integer&gt; _numDependenciesExecuted = new ConcurrentHashMap&lt;&gt;();
    private final AtomicInteger _numTasksExecuted = new AtomicInteger(0);
    private final ExecutorService _executorService = Executors.newFixedThreadPool(16);

    private static Runnable getRunnable(DependencyManager dependencyManager, String taskId) {
        return () -&gt; {
            try {
                Thread.sleep(2000);  // A task takes 2 seconds to finish.
                dependencyManager.taskCompleted(taskId);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
    }

    /**
     * In case a vertex is disconnected from the rest of the graph.
     *
     * @param taskId The task id
     */
    public void addVertex(String taskId) {
        _dependencies.putIfAbsent(taskId, new ArrayList&lt;&gt;());
        _reverseDependencies.putIfAbsent(taskId, new ArrayList&lt;&gt;());
        _tasks.putIfAbsent(taskId, getRunnable(this, taskId));
        _numDependenciesExecuted.putIfAbsent(taskId, 0);
    }

    private void addEdge(String dependentTaskId, String dependeeTaskId) {
        _dependencies.get(dependentTaskId).add(dependeeTaskId);
        _reverseDependencies.get(dependeeTaskId).add(dependentTaskId);
    }

    public void addDependency(String dependentTaskId, String dependeeTaskId) {
        addVertex(dependentTaskId);
        addVertex(dependeeTaskId);
        addEdge(dependentTaskId, dependeeTaskId);
    }

    private void taskCompleted(String taskId) {
        System.out.println(String.format(&quot;%s:: Task %s done!!&quot;, Instant.now(), taskId));
        _numTasksExecuted.incrementAndGet();
        _reverseDependencies.get(taskId).forEach(nextTaskId -&gt; {
            _numDependenciesExecuted.computeIfPresent(nextTaskId, (__, currValue) -&gt; currValue + 1);
            int numDependencies = _dependencies.get(nextTaskId).size();
            int numDependenciesExecuted = _numDependenciesExecuted.get(nextTaskId);
            if (numDependenciesExecuted == numDependencies) {
                // All dependencies have been executed, so we can submit this task to the threadpool.
                _executorService.submit(_tasks.get(nextTaskId));
            }
        });
        if (_numTasksExecuted.get() == _tasks.size()) {
            topoSortCompleted();
        }
    }

    private void topoSortCompleted() {
        System.out.println(&quot;Topo sort complete!!&quot;);
        _executorService.shutdownNow();
    }

    public void executeTopoSort() {
        System.out.println(String.format(&quot;%s:: Topo sort started!!&quot;, Instant.now()));
        _dependencies.forEach((taskId, dependencies) -&gt; {
            if (dependencies.isEmpty()) {
                _executorService.submit(_tasks.get(taskId));
            }
        });
    }
}

public class TestParallelTopoSort {

    public static void main(String[] args) {
        DependencyManager dependencyManager = new DependencyManager();
        dependencyManager.addDependency(&quot;8&quot;, &quot;5&quot;);
        dependencyManager.addDependency(&quot;7&quot;, &quot;5&quot;);
        dependencyManager.addDependency(&quot;7&quot;, &quot;6&quot;);
        dependencyManager.addDependency(&quot;6&quot;, &quot;3&quot;);
        dependencyManager.addDependency(&quot;6&quot;, &quot;4&quot;);
        dependencyManager.addDependency(&quot;5&quot;, &quot;1&quot;);
        dependencyManager.addDependency(&quot;5&quot;, &quot;2&quot;);
        dependencyManager.addDependency(&quot;5&quot;, &quot;3&quot;);
        dependencyManager.addDependency(&quot;4&quot;, &quot;1&quot;);
        dependencyManager.executeTopoSort();
        // Parallel version takes 8 seconds to execute.
        // Serial version would have taken 16 seconds.
    }
}

上面是您提供的代码的翻译。

英文:

The other answer works fine but is too complicated.

A simpler way is to just execute Kahn's algorithm but in parallel.

The key is to execute all the tasks in parallel for whom all dependencies have been executed.

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
class DependencyManager {
private final ConcurrentHashMap&lt;String, List&lt;String&gt;&gt; _dependencies = new ConcurrentHashMap&lt;&gt;();
private final ConcurrentHashMap&lt;String, List&lt;String&gt;&gt; _reverseDependencies = new ConcurrentHashMap&lt;&gt;();
private final ConcurrentHashMap&lt;String, Runnable&gt; _tasks = new ConcurrentHashMap&lt;&gt;();
private final ConcurrentHashMap&lt;String, Integer&gt; _numDependenciesExecuted = new ConcurrentHashMap&lt;&gt;();
private final  AtomicInteger _numTasksExecuted = new AtomicInteger(0);
private final ExecutorService _executorService = Executors.newFixedThreadPool(16);
private static Runnable getRunnable(DependencyManager dependencyManager, String taskId){
return () -&gt; {
try {
Thread.sleep(2000);  // A task takes 2 seconds to finish.
dependencyManager.taskCompleted(taskId);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}
/**
* In case a vertex is disconnected from the rest of the graph.
* @param taskId The task id
*/
public void addVertex(String taskId) {
_dependencies.putIfAbsent(taskId, new ArrayList&lt;&gt;());
_reverseDependencies.putIfAbsent(taskId, new ArrayList&lt;&gt;());
_tasks.putIfAbsent(taskId, getRunnable(this, taskId));
_numDependenciesExecuted.putIfAbsent(taskId, 0);
}
private void addEdge(String dependentTaskId, String dependeeTaskId) {
_dependencies.get(dependentTaskId).add(dependeeTaskId);
_reverseDependencies.get(dependeeTaskId).add(dependentTaskId);
}
public void addDependency(String dependentTaskId, String dependeeTaskId) {
addVertex(dependentTaskId);
addVertex(dependeeTaskId);
addEdge(dependentTaskId, dependeeTaskId);
}
private void taskCompleted(String taskId) {
System.out.println(String.format(&quot;%s:: Task %s done!!&quot;, Instant.now(), taskId));
_numTasksExecuted.incrementAndGet();
_reverseDependencies.get(taskId).forEach(nextTaskId -&gt; {
_numDependenciesExecuted.computeIfPresent(nextTaskId, (__, currValue) -&gt; currValue + 1);
int numDependencies = _dependencies.get(nextTaskId).size();
int numDependenciesExecuted = _numDependenciesExecuted.get(nextTaskId);
if (numDependenciesExecuted == numDependencies) {
// All dependencies have been executed, so we can submit this task to the threadpool. 
_executorService.submit(_tasks.get(nextTaskId));
}
});
if (_numTasksExecuted.get() == _tasks.size()) {
topoSortCompleted();
}
}
private void topoSortCompleted() {
System.out.println(&quot;Topo sort complete!!&quot;);
_executorService.shutdownNow();
}
public void executeTopoSort() {
System.out.println(String.format(&quot;%s:: Topo sort started!!&quot;, Instant.now()));
_dependencies.forEach((taskId, dependencies) -&gt; {
if (dependencies.isEmpty()) {
_executorService.submit(_tasks.get(taskId));
}
});
}
}
public class TestParallelTopoSort {
public static void main(String[] args) {
DependencyManager dependencyManager = new DependencyManager();
dependencyManager.addDependency(&quot;8&quot;, &quot;5&quot;);
dependencyManager.addDependency(&quot;7&quot;, &quot;5&quot;);
dependencyManager.addDependency(&quot;7&quot;, &quot;6&quot;);
dependencyManager.addDependency(&quot;6&quot;, &quot;3&quot;);
dependencyManager.addDependency(&quot;6&quot;, &quot;4&quot;);
dependencyManager.addDependency(&quot;5&quot;, &quot;1&quot;);
dependencyManager.addDependency(&quot;5&quot;, &quot;2&quot;);
dependencyManager.addDependency(&quot;5&quot;, &quot;3&quot;);
dependencyManager.addDependency(&quot;4&quot;, &quot;1&quot;);
dependencyManager.executeTopoSort();
// Parallel version takes 8 seconds to execute.
// Serial version would have taken 16 seconds.
}
}

The Directed Acyclic Graph constructed in this example is this:

并行执行有向无环图任务

答案2

得分: 2

我们可以创建一个有向无环图(DAG),其中图的每个顶点都代表一个任务。之后,我们可以计算它的拓扑排序。然后,我们可以用一个优先级字段装饰任务类,并使用PriorityBlockingQueue运行ThreadPoolExecutor,该队列使用优先级字段来比较任务。

最终的技巧是重写run()方法,首先等待所有依赖任务完成。

由于每个任务无限等待其依赖任务完成,我们不能让线程池完全被位于拓扑排序顶部的任务占满;线程池将永远卡住。为了避免这种情况,我们只需根据拓扑顺序为任务分配优先级。

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Testing {

  private static Callable<Void> getCallable(String taskId){
    return () -> {
      System.out.println(String.format("任务 %s 结果", taskId));
      Thread.sleep(100);
      return null;
    };
  }

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    Callable<Void> taskA = getCallable("A");
    Callable<Void> taskB = getCallable("B");
    Callable<Void> taskC = getCallable("C");
    Callable<Void> taskD = getCallable("D");
    Callable<Void> taskE = getCallable("E");
    PrioritizedFutureTask<Void> pfTaskA = new PrioritizedFutureTask<>(taskA);
    PrioritizedFutureTask<Void> pfTaskB = new PrioritizedFutureTask<>(taskB);
    PrioritizedFutureTask<Void> pfTaskC = new PrioritizedFutureTask<>(taskC);
    PrioritizedFutureTask<Void> pfTaskD = new PrioritizedFutureTask<>(taskD);
    PrioritizedFutureTask<Void> pfTaskE = new PrioritizedFutureTask<>(taskE);
    // 创建一个DAG图。
    pfTaskB.addDependency(pfTaskC).addDependency(pfTaskE);
    pfTaskA.addDependency(pfTaskB).addDependency(pfTaskC).addDependency(pfTaskD);
    // 现在我们有了一个图,我们可以获取其拓扑排序的顺序。
    List<PrioritizedFutureTask<Void>> topological_sort = new ArrayList<>();
    topological_sort.add(pfTaskE);
    topological_sort.add(pfTaskC);
    topological_sort.add(pfTaskB);
    topological_sort.add(pfTaskD);
    topological_sort.add(pfTaskA);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
        new PriorityBlockingQueue<Runnable>(1, new CustomRunnableComparator()));
    // 插入任务时按拓扑排序顺序插入很重要,否则线程池可能会永远卡住。
    for (int i = 0; i < topological_sort.size(); i++) {
      PrioritizedFutureTask<Void> pfTask = topological_sort.get(i);
      pfTask.setPriority(i);
      // 优先级越低,它将越早运行。
      executor.execute(pfTask);
    }
  }
}

class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask<T>> {

  private Integer _priority = 0;
  private final Callable<T> callable;
  private final List<PrioritizedFutureTask> _dependencies = new ArrayList<>();

  public PrioritizedFutureTask(Callable<T> callable) {
    super(callable);
    this.callable = callable;
  }

  public PrioritizedFutureTask(Callable<T> callable, Integer priority) {
    this(callable);
    _priority = priority;
  }

  public Integer getPriority() {
    return _priority;
  }

  public PrioritizedFutureTask<T> setPriority(Integer priority) {
    _priority = priority;
    return this;
  }

  public PrioritizedFutureTask<T> addDependency(PrioritizedFutureTask dep) {
    this._dependencies.add(dep);
    return this;
  }

  @Override
  public void run() {
    for (PrioritizedFutureTask dep : _dependencies) {
      try {
        dep.get();
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }
    }
    super.run();
  }

  @Override
  public int compareTo(PrioritizedFutureTask<T> other) {
    if (other == null) {
      throw new NullPointerException();
    }
    return getPriority().compareTo(other.getPriority());
  }
}

class CustomRunnableComparator implements Comparator<Runnable> {
  @Override
  public int compare(Runnable task1, Runnable task2) {
    return ((PrioritizedFutureTask) task1).compareTo((PrioritizedFutureTask) task2);
  }
}

输出:

任务 E 结果
任务 C 结果
任务 B 结果
任务 D 结果
任务 A 结果

PS:这里是一个经过充分测试且简单的Python拓扑排序实现,你可以轻松将其移植到Java中。

英文:

We can create a DAG where each vertex of the graph is one of the tasks.
After that, we can compute its topological sorted order.
We can then decorate the Task class with a priority field and run the ThreadPoolExecutor with a PriorityBlockingQueue which compares Tasks using the priority field.

The final trick is to override run() to first wait for all the dependent tasks to finish.

Since each task waits indefinitely for its dependent tasks to finish, we cannot afford to let the thread-pool be completely occupied with tasks that are higher up in the topological sort order; the thread pool will get stuck forever.
To avoid this, we just have to assign priorities to tasks according to the topological order.

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class Testing {

  private static Callable&lt;Void&gt; getCallable(String taskId){
    return () -&gt; {
      System.out.println(String.format(&quot;Task %s result&quot;, taskId));
      Thread.sleep(100);
      return null;
    };
  }

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    Callable&lt;Void&gt; taskA = getCallable(&quot;A&quot;);
    Callable&lt;Void&gt; taskB = getCallable(&quot;B&quot;);
    Callable&lt;Void&gt; taskC = getCallable(&quot;C&quot;);
    Callable&lt;Void&gt; taskD = getCallable(&quot;D&quot;);
    Callable&lt;Void&gt; taskE = getCallable(&quot;E&quot;);
    PrioritizedFutureTask&lt;Void&gt; pfTaskA = new PrioritizedFutureTask&lt;&gt;(taskA);
    PrioritizedFutureTask&lt;Void&gt; pfTaskB = new PrioritizedFutureTask&lt;&gt;(taskB);
    PrioritizedFutureTask&lt;Void&gt; pfTaskC = new PrioritizedFutureTask&lt;&gt;(taskC);
    PrioritizedFutureTask&lt;Void&gt; pfTaskD = new PrioritizedFutureTask&lt;&gt;(taskD);
    PrioritizedFutureTask&lt;Void&gt; pfTaskE = new PrioritizedFutureTask&lt;&gt;(taskE);
    // Create a DAG graph.
    pfTaskB.addDependency(pfTaskC).addDependency(pfTaskE);
    pfTaskA.addDependency(pfTaskB).addDependency(pfTaskC).addDependency(pfTaskD);
    // Now that we have a graph, we can just get its topological sorted order.
    List&lt;PrioritizedFutureTask&lt;Void&gt;&gt; topological_sort = new ArrayList&lt;&gt;();
    topological_sort.add(pfTaskE);
    topological_sort.add(pfTaskC);
    topological_sort.add(pfTaskB);
    topological_sort.add(pfTaskD);
    topological_sort.add(pfTaskA);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
        new PriorityBlockingQueue&lt;Runnable&gt;(1, new CustomRunnableComparator()));
    // Its important to insert the tasks in the topological sorted order, otherwise its possible that the thread pool will be stuck forever.
    for (int i = 0; i &lt; topological_sort.size(); i++) {
      PrioritizedFutureTask&lt;Void&gt; pfTask = topological_sort.get(i);
      pfTask.setPriority(i);
      // The lower the priority, the sooner it will run.
      executor.execute(pfTask);
    }
  }
}

class PrioritizedFutureTask&lt;T&gt; extends FutureTask&lt;T&gt; implements Comparable&lt;PrioritizedFutureTask&lt;T&gt;&gt; {

  private Integer _priority = 0;
  private final Callable&lt;T&gt; callable;
  private final List&lt;PrioritizedFutureTask&gt; _dependencies = new ArrayList&lt;&gt;();
  ;

  public PrioritizedFutureTask(Callable&lt;T&gt; callable) {
    super(callable);
    this.callable = callable;
  }

  public PrioritizedFutureTask(Callable&lt;T&gt; callable, Integer priority) {
    this(callable);
    _priority = priority;
  }

  public Integer getPriority() {
    return _priority;
  }

  public PrioritizedFutureTask&lt;T&gt; setPriority(Integer priority) {
    _priority = priority;
    return this;
  }

  public PrioritizedFutureTask&lt;T&gt; addDependency(PrioritizedFutureTask dep) {
    this._dependencies.add(dep);
    return this;
  }

  @Override
  public void run() {
    for (PrioritizedFutureTask dep : _dependencies) {
      try {
        dep.get();
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }
    }
    super.run();
  }

  @Override
  public int compareTo(PrioritizedFutureTask&lt;T&gt; other) {
    if (other == null) {
      throw new NullPointerException();
    }
    return getPriority().compareTo(other.getPriority());
  }
}

class CustomRunnableComparator implements Comparator&lt;Runnable&gt; {
  @Override
  public int compare(Runnable task1, Runnable task2) {
    return ((PrioritizedFutureTask) task1).compareTo((PrioritizedFutureTask) task2);
  }
}

Output:

Task E result
Task C result
Task B result
Task D result
Task A result

PS: Here is a well-tested and simple implementation of topological sort in Python which you can easily port in Java.

huangapple
  • 本文由 发表于 2020年8月11日 16:59:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/63354899.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定