ExecutorService用多个线程执行单个任务n次(n个线程“竞争”)

huangapple go评论57阅读模式
英文:

ExecutorService to execute a single task n times by multiple threads (n thread "races")

问题

我需要使用多个线程来执行单个任务,以便当第一个线程完成并且在任何其他线程完成之前,所有线程都会停止,然后重新开始相同的任务。这应该重复进行 n 次。

我的尝试是使用 Callable<V> 和方法 invokeAny()(这就是我使用集合的原因),但不确定如何实现这个目标。

ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<String> task = () -> {
    someTask();
    return "";
};
Set<Callable<String>> tasks = new HashSet<>();
IntStream.range(0, n).forEach(i -> {
    tasks.add(task);
    executor.submit(task);
});

如何完成这个?或者有更好的解决方案吗?

英文:

I need to execute a single task by multiple threads, such that when the first thread finishes and before any other thread finishes, all the threads are stopped and start the same task all over again. This should be performed n times.

My attempt is using Callable&lt;V&gt; and the method invokeAny() (that is why I use the set) but not sure how to accomplish the goal.

ExecutorService executor = Executors.newFixedThreadPool(10);
Callable&lt;String&gt; task = () -&gt; {
    someTask();
    return &quot;&quot;;
};
Set&lt;Callable&lt;String&gt;&gt; tasks = new HashSet&lt;&gt;();
IntStream.range(0, n).forEach(i -&gt; {
    tasks.add(task);
    executor.submit(task);
});

How to finish this? or any better solution?

答案1

得分: 0

这里有一个建议:

class Task implements Callable<Integer> {

    private final static Random RND = new Random();

    @Override
    public Integer call() throws Exception {
        try {
            // 随机持续时间处理任务
            Thread.sleep(RND.nextInt(5000));
        } catch (InterruptedException e) {
            System.err.println("我被中断了。"
                    + "可能是有其他人在我之前解决了这个任务。");
            return -1;
        }

        // 返回一些虚拟值
        return RND.nextInt();
    }
}

class Scratch {
    public static void main(String[] args) throws InterruptedException {

        final int numWorkers = 3; // 并行运行的任务数量

        ExecutorService executor = Executors.newFixedThreadPool(numWorkers);

        // 解决任务5次。(如果需要,可以改成 while (true) { ...}。)
        for (int i = 0; i < 5; i++) {

            CompletionService<Integer> completionService =
                    new ExecutorCompletionService<>(executor);

            Future<?>[] futures = new Future<?>[numWorkers];
            for (int j = 0; j < numWorkers; j++) {
                futures[j] = completionService.submit(new Task());
            }

            Future<Integer> firstToComplete = completionService.take();

            try {
                Integer result = firstToComplete.get();
                System.err.println("我们得到了一个结果:" + result);
            } catch (ExecutionException e) {
                // 不应该发生。Future 已经完成。
            }

            // 取消所有的 futures(即使我们取消了已经完成的那个也没关系)。
            for (int j = 0; j < numWorkers; j++) {
                futures[j].cancel(true);
            }
        }

        executor.shutdown();
    }
}

如果你解决的任务对中断不做出响应,将 true 传递给 cancel(...) 是没有帮助的。在这种情况下,我建议你做出以下更改:

  1. 在外部的 for 循环中创建一个 AtomicBoolean done 变量。
  2. 将此变量传递给 Task 的构造函数,并在 Task 中保存它。
  3. 在解决任务的过程中,经常检查 done 标志,如果 donetrue,则取消尝试。
  4. 不要在第一个结果返回后调用 cancel,而是将 done 设置为 true,并等待其他线程返回。
英文:

Here's one suggestion:

class Task implements Callable&lt;Integer&gt; {
private final static Random RND = new Random();
@Override
public Integer call() throws Exception {
try {
// Work on task for a random duration
Thread.sleep(RND.nextInt(5000));
} catch (InterruptedException e) {
System.err.println(&quot;I was interrupted.&quot;
+ &quot;Someone else probably solved the task before me.&quot;);
return -1;
}
// Return some dummy value
return RND.nextInt();
}
}
class Scratch {
public static void main(String[] args) throws InterruptedException {
final int numWorkers = 3; // number of tasks to run in parallel
ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
// Solve task 5 times. (Change it to while (true) { ...} if you like.)
for (int i = 0; i &lt; 5; i++) {
CompletionService&lt;Integer&gt; completionService =
new ExecutorCompletionService&lt;&gt;(executor);
Future&lt;?&gt;[] futures = new Future&lt;?&gt;[numWorkers];
for (int j = 0; j &lt; numWorkers; j++) {
futures[j] = completionService.submit(new Task());
}
Future&lt;Integer&gt; firstToComplete = completionService.take();
try {
Integer result = firstToComplete.get();
System.err.println(&quot;We got a result: &quot; + result);
} catch (ExecutionException e) {
// Should not happen. Future has completed.
}
// Cancel all futures (it doesn&#39;t matter that we&#39;re cancelling
// the one that has already completed).
for (int j = 0; j &lt; numWorkers; j++) {
futures[j].cancel(true);
}
}
executor.shutdown();
}
}

If the task you're solving does not respond to interrupts, passing true to cancel(...) won't help. In that case I'd suggest you do the following changes:

  1. Create an AtomicBoolean done variable in the outer for loop.
  2. Pass this to the constructor to Task and save it in a field in Task.
  3. In the task solving process, check done flag ever so often, and cancel the attempt if done is true.
  4. Instead of calling cancel on the tasks after the first result is in, set done to true and wait for the other threads to return.

huangapple
  • 本文由 发表于 2020年9月7日 23:14:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/63780271.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定