英文:
Parallelize and monitor tasks with ExecutorService
问题
我有一个需要调用约4-6百万次的函数/任务void task()
。
我想要在线程池中并行化这个操作。
我不关心任务的返回值,所以可以避免处理Future<T>
。
我想要定期轮询线程的状态,即task()
的多少次调用成功和多少次抛出异常。
以下是我想出来的方法:
class Test {
AtomicInteger success = new AtomicInteger(0);
AtomicInteger failed = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1_000_000);
private void start() {
ExecutorService executorService = Executors.newFixedThreadPool();
for (int i = 0; i < 1_000_000; i++) {
executorService.execute(this::task);
}
while (!countDownLatch.await(1, TimeUnit.SECONDS)) {
log("Success: %d Failed: %d", success.get(), failed.get());
}
log("===================== Final tally =====================");
log("Success: %d Failed: %d", success.get(), failed.get());
executorService.shutdown();
}
private void task() {
try {
doSomeStuff();
success.incrementAndGet()
} catch(Exception e) {
failed.incrementAndGet();
}
countDownLatch.countDown();
}
}
线程使用的两个AtomicInteger
记录成功或失败,CountDownLatch
由"监视"线程用于检查进度。
是否有更符合惯例的方法来做这个?也许不涉及向ExecutorService
提交数百万个Runnable
lambda 表达式的方法?
我可以将整个过程放在以下方式中:
IntStream.range(0, 1_000_000).parallelStream().map(...).groupBy(...)
但我将无法监视进度。
英文:
I have a function/task void task()
that needs to be called about 4-6 million times.
I want to parallelize this operation over threads in a thread pool.
I do not care about the return value of the tasks so I can avoid messing around with Future<T>
.
I want to periodically poll the status of how the threads are coming along. The status is simply how many invocations of task()
returned cleanly and how many threw an exception.
Here's what I came up with:
class Test {
AtomicInteger success = new AtomicInteger(0);
AtomicInteger failed = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1_000_000);
private void start() {
ExecutorService executorService = Executors.newFixedThreadPool();
for (int i = 0; i < 1_000_000; i++) {
executorService.execute(this::task);
}
while (!countDownLatch.await(1, TimeUnit.SECONDS)) {
log("Success: %d Failed: %d", success.get(), failed.get());
}
log("===================== Final tally =====================");
log("Success: %d Failed: %d", success.get(), failed.get());
executorService.shutdown();
}
private void task() {
try {
doSomeStuff();
success.incrementAndGet()
} catch(Exception e) {
failed.incrementAndGet();
}
countDownLatch.countDown();
}
}
Two AtomicInteger
that the threads use to record successes or failures and a CountDownLatch
that the "monitor" thread uses to check on the progress.
Is there a more idiomatic way to do this? Some thing that doesn't involve submitting millions of Runnable
lambdas to the ExecutorService
perhaps?
I could put the whole thing in an
IntStream.range(0, 1_000_000).parallelStream().map(...).groupBy(...)
but I won't be able to monitor the progress.
答案1
得分: 1
如果您想按照您提出的方式使用流,请将监控部分移到另一个线程中,如下所示:
new Thread(() -> {
while (!countDownLatch.await(1, TimeUnit.SECONDS)) {
log("Success: %d Failed: %d", success.get(), failed.get());
}
log("===================== 最终统计 =====================");
log("Success: %d Failed: %d", success.get(), failed.get());
}).start()
IntStream.range(0, 1_000_000).parallelStream().map(...).groupBy(...)
英文:
If you want to use stream as you proposed, you can move your monitoring part into another thread like this:
new Thread(()->{
while (!countDownLatch.await(1, TimeUnit.SECONDS)) {
log("Success: %d Failed: %d", success.get(), failed.get());
}
log("===================== Final tally =====================");
log("Success: %d Failed: %d", success.get(), failed.get());
}).start()
IntStream.range(0, 1_000_000).parallelStream().map(...).groupBy(...)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论