使用ExecutorService并行执行和监控任务。

huangapple go评论68阅读模式
英文:

Parallelize and monitor tasks with ExecutorService

问题

我有一个需要调用约4-6百万次的函数/任务void task()
我想要在线程池中并行化这个操作。
我不关心任务的返回值,所以可以避免处理Future<T>
我想要定期轮询线程的状态,即task()的多少次调用成功和多少次抛出异常。

以下是我想出来的方法:

class Test {
    AtomicInteger success = new AtomicInteger(0);
    AtomicInteger failed = new AtomicInteger(0);
    CountDownLatch latch = new CountDownLatch(1_000_000);

    private void start() {
        ExecutorService executorService = Executors.newFixedThreadPool();
        for (int i = 0; i < 1_000_000; i++) {
          executorService.execute(this::task);
        }
        while (!countDownLatch.await(1, TimeUnit.SECONDS)) {
          log("Success: %d Failed: %d", success.get(), failed.get());
        }
        log("===================== Final tally =====================");
        log("Success: %d Failed: %d", success.get(), failed.get());
        executorService.shutdown();
    }

    private void task() {
        try {
           doSomeStuff();
           success.incrementAndGet()
        } catch(Exception e) {
           failed.incrementAndGet();
        }
        countDownLatch.countDown();
    }
}

线程使用的两个AtomicInteger记录成功或失败,CountDownLatch由"监视"线程用于检查进度。

是否有更符合惯例的方法来做这个?也许不涉及向ExecutorService提交数百万个Runnable lambda 表达式的方法?

我可以将整个过程放在以下方式中:

IntStream.range(0, 1_000_000).parallelStream().map(...).groupBy(...)

但我将无法监视进度。

英文:

I have a function/task void task() that needs to be called about 4-6 million times.
I want to parallelize this operation over threads in a thread pool.
I do not care about the return value of the tasks so I can avoid messing around with Future<T>.
I want to periodically poll the status of how the threads are coming along. The status is simply how many invocations of task() returned cleanly and how many threw an exception.

Here's what I came up with:

class Test {
    AtomicInteger success = new AtomicInteger(0);
    AtomicInteger failed = new AtomicInteger(0);
    CountDownLatch latch = new CountDownLatch(1_000_000);

    private void start() {
        ExecutorService executorService = Executors.newFixedThreadPool();
        for (int i = 0; i < 1_000_000; i++) {
          executorService.execute(this::task);
        }
        while (!countDownLatch.await(1, TimeUnit.SECONDS)) {
          log("Success: %d Failed: %d", success.get(), failed.get());
        }
        log("===================== Final tally =====================");
        log("Success: %d Failed: %d", success.get(), failed.get());
        executorService.shutdown();
    }

    private void task() {
        try {
           doSomeStuff();
           success.incrementAndGet()
        } catch(Exception e) {
           failed.incrementAndGet();
        }
        countDownLatch.countDown();
    }
}

Two AtomicInteger that the threads use to record successes or failures and a CountDownLatch that the "monitor" thread uses to check on the progress.

Is there a more idiomatic way to do this? Some thing that doesn't involve submitting millions of Runnable lambdas to the ExecutorService perhaps?

I could put the whole thing in an

IntStream.range(0, 1_000_000).parallelStream().map(...).groupBy(...)

but I won't be able to monitor the progress.

答案1

得分: 1

如果您想按照您提出的方式使用流,请将监控部分移到另一个线程中,如下所示:

new Thread(() -> {  
 while (!countDownLatch.await(1, TimeUnit.SECONDS)) {
          log("Success: %d Failed: %d", success.get(), failed.get());
        }
        log("===================== 最终统计 =====================");
        log("Success: %d Failed: %d", success.get(), failed.get());
}).start()

IntStream.range(0, 1_000_000).parallelStream().map(...).groupBy(...)
英文:

If you want to use stream as you proposed, you can move your monitoring part into another thread like this:

new Thread(()->{  
 while (!countDownLatch.await(1, TimeUnit.SECONDS)) {
          log("Success: %d Failed: %d", success.get(), failed.get());
        }
        log("===================== Final tally =====================");
        log("Success: %d Failed: %d", success.get(), failed.get());
}).start()

IntStream.range(0, 1_000_000).parallelStream().map(...).groupBy(...)

huangapple
  • 本文由 发表于 2020年8月4日 10:33:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/63239432.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定