英文:
ExecutorService to execute a single task n times by multiple threads (n thread "races")
问题
我需要使用多个线程来执行单个任务,以便当第一个线程完成并且在任何其他线程完成之前,所有线程都会停止,然后重新开始相同的任务。这应该重复进行 n 次。
我的尝试是使用 Callable<V>
和方法 invokeAny()
(这就是我使用集合的原因),但不确定如何实现这个目标。
ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<String> task = () -> {
someTask();
return "";
};
Set<Callable<String>> tasks = new HashSet<>();
IntStream.range(0, n).forEach(i -> {
tasks.add(task);
executor.submit(task);
});
如何完成这个?或者有更好的解决方案吗?
英文:
I need to execute a single task by multiple threads, such that when the first thread finishes and before any other thread finishes, all the threads are stopped and start the same task all over again. This should be performed n times.
My attempt is using Callable<V>
and the method invokeAny()
(that is why I use the set) but not sure how to accomplish the goal.
ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<String> task = () -> {
someTask();
return "";
};
Set<Callable<String>> tasks = new HashSet<>();
IntStream.range(0, n).forEach(i -> {
tasks.add(task);
executor.submit(task);
});
How to finish this? or any better solution?
答案1
得分: 0
这里有一个建议:
class Task implements Callable<Integer> {
private final static Random RND = new Random();
@Override
public Integer call() throws Exception {
try {
// 随机持续时间处理任务
Thread.sleep(RND.nextInt(5000));
} catch (InterruptedException e) {
System.err.println("我被中断了。"
+ "可能是有其他人在我之前解决了这个任务。");
return -1;
}
// 返回一些虚拟值
return RND.nextInt();
}
}
class Scratch {
public static void main(String[] args) throws InterruptedException {
final int numWorkers = 3; // 并行运行的任务数量
ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
// 解决任务5次。(如果需要,可以改成 while (true) { ...}。)
for (int i = 0; i < 5; i++) {
CompletionService<Integer> completionService =
new ExecutorCompletionService<>(executor);
Future<?>[] futures = new Future<?>[numWorkers];
for (int j = 0; j < numWorkers; j++) {
futures[j] = completionService.submit(new Task());
}
Future<Integer> firstToComplete = completionService.take();
try {
Integer result = firstToComplete.get();
System.err.println("我们得到了一个结果:" + result);
} catch (ExecutionException e) {
// 不应该发生。Future 已经完成。
}
// 取消所有的 futures(即使我们取消了已经完成的那个也没关系)。
for (int j = 0; j < numWorkers; j++) {
futures[j].cancel(true);
}
}
executor.shutdown();
}
}
如果你解决的任务对中断不做出响应,将 true
传递给 cancel(...)
是没有帮助的。在这种情况下,我建议你做出以下更改:
- 在外部的
for
循环中创建一个AtomicBoolean done
变量。 - 将此变量传递给
Task
的构造函数,并在Task
中保存它。 - 在解决任务的过程中,经常检查
done
标志,如果done
是true
,则取消尝试。 - 不要在第一个结果返回后调用
cancel
,而是将done
设置为true
,并等待其他线程返回。
英文:
Here's one suggestion:
class Task implements Callable<Integer> {
private final static Random RND = new Random();
@Override
public Integer call() throws Exception {
try {
// Work on task for a random duration
Thread.sleep(RND.nextInt(5000));
} catch (InterruptedException e) {
System.err.println("I was interrupted."
+ "Someone else probably solved the task before me.");
return -1;
}
// Return some dummy value
return RND.nextInt();
}
}
class Scratch {
public static void main(String[] args) throws InterruptedException {
final int numWorkers = 3; // number of tasks to run in parallel
ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
// Solve task 5 times. (Change it to while (true) { ...} if you like.)
for (int i = 0; i < 5; i++) {
CompletionService<Integer> completionService =
new ExecutorCompletionService<>(executor);
Future<?>[] futures = new Future<?>[numWorkers];
for (int j = 0; j < numWorkers; j++) {
futures[j] = completionService.submit(new Task());
}
Future<Integer> firstToComplete = completionService.take();
try {
Integer result = firstToComplete.get();
System.err.println("We got a result: " + result);
} catch (ExecutionException e) {
// Should not happen. Future has completed.
}
// Cancel all futures (it doesn't matter that we're cancelling
// the one that has already completed).
for (int j = 0; j < numWorkers; j++) {
futures[j].cancel(true);
}
}
executor.shutdown();
}
}
If the task you're solving does not respond to interrupts, passing true
to cancel(...)
won't help. In that case I'd suggest you do the following changes:
- Create an
AtomicBoolean done
variable in the outerfor
loop. - Pass this to the constructor to
Task
and save it in a field inTask
. - In the task solving process, check
done
flag ever so often, and cancel the attempt ifdone
istrue
. - Instead of calling
cancel
on the tasks after the first result is in, setdone
totrue
and wait for the other threads to return.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论