Is it possible to block other runnables while executing the first one, using ExecutorService in Java?

huangapple go评论62阅读模式
英文:

Is it possible to block other runnables while executing the first one, using ExecutorService in Java?

问题

我正在尝试在多个线程中使用 ExecutorService 处理一个相对庞大的 Stream,其中包含多个 List。方法大致如下。

public void initMigration() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {            
        streamOfLists.forEach(record4List -> {
            Runnable runnable = () -> {
                try {
                    final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
                    LOGGER.info("Invoking POST with payload {}", attachments);
                    Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
                    restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
                } catch (ExceptionA | ExceptionB e) {
                    e.printStackTrace();
                }
            };
            executorService.submit(runnable);
        });
    }
    LOGGER.info("Shutting down the ExecutorService");
    executorService.shutdown();
}

基本上,我在这里尝试的是,对于 Stream 中的每个 List,都创建了一个 Runnable 并将其提交到 ExecutorService。这似乎运行得很正常。但是,我现在真正想做的是,查看是否有办法让 ExecutorService 在阻塞其他 Runnable 的同时运行从 Stream 中的第一个 List 获取的第一个 Runnable,并在其执行之后继续运行其他 Runnable(并行执行)。希望能得到一些帮助。

英文:

I'm trying to process a relatively huge Stream of List in multiple threads, using an ExecutorService. The method looks something like this.

public void initMigration() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    try (Stream&lt;List&lt;Record4&lt;Integer, Integer, String, byte[]&gt;&gt;&gt; streamOfLists = getStreamOfLists()) {            
        streamOfLists.forEach(record4List -&gt; {
            Runnable runnable = () -&gt; {
                try {
                    final List&lt;Attachment&gt; attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
                    LOGGER.info(&quot;Invoking POST with payload {}&quot;, attachments);
                    Collection&lt;UploadLink&gt; uploadLinks = restClient.postAttachments(attachments);
                    restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
                } catch (ExceptionA | ExceptionB e) {
                    e.printStackTrace();
                }
            };
            executorService.submit(runnable);
        });
    }
    LOGGER.info(&quot;Shutting down the ExecutorService&quot;);
    executorService.shutdown();
}

Basically, what I'm trying to do here is, for each List in the Stream, a Runnable is being created and submitted to the ExecutorService. It seems to be working alright. But, what I really wanna do now, is to see if there's any way I can make the ExecutorService run the first Runnable obtained from the first List in the Stream while blocking other Runnables until
its execution, and continue running other Runnables (in parallel) after that. Could really use some help with this.

答案1

得分: 1

你可以执行第一个Runnable,然后再提交其他的Runnables。

try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
    Iterator<List<Record4<Integer, Integer, String, byte[]>>> it = streamOfLists.iterator();
    if (it.hasNext()) {
        List<Record4<Integer, Integer, String, byte[]>> list = it.next();
        Runnable runnable = new MyRunnable(record4List);
        runnable.run();
    }
    while (it.hasNext()) {
        List<Record4<Integer, Integer, String, byte[]>> list = it.next();
        Runnable runnable = new MyRunnable(record4List);
        executorService.submit(runnable);
    }
}

其中,

class MyRunnable implements Runnable {
    Record4<Integer, Integer, String, byte[]> record4List;

    MyRunnable(Record4<Integer, Integer, String, byte[]> record4List) {
        this.record4List = record4List;
    }

    @Override
    public void run() {
        try {
            final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
            LOGGER.info("Invoking POST with payload {}", attachments);
            Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
            restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
        } catch (ExceptionA | ExceptionB e) {
            e.printStackTrace();
        }
    }
}
英文:

You can take first Runnable, execute it, and only then submit other Runnables.

    try (Stream&lt;List&lt;Record4&lt;Integer, Integer, String, byte[]&gt;&gt;&gt; streamOfLists = getStreamOfLists()) {
        Iterator&lt;List&lt;Record4&lt;Integer, Integer, String, byte[]&gt;&gt;&gt; it = streamOfLists.iterator();
        if (it.hasNext()) {
            List&lt;Record4&lt;Integer, Integer, String, byte[]&gt;&gt; list = it.next();
            Runnable runnable = new MyRunnable(record4List);
            runnable.run();
        }
        while (it.hasNext()) {
            List&lt;Record4&lt;Integer, Integer, String, byte[]&gt;&gt; list = it.next();
            Runnable runnable = new MyRunnable(record4List);
            executorService.submit(runnable);
        }
    }

where

class MyRunnable implements Runnable {
    Record4&lt;Integer, Integer, String, byte[]&gt; record4List;

    MyRunnable(Record4&lt;Integer, Integer, String, byte[]&gt; record4List) {
        this.record4List = record4List;
    }

    @Override
    public void run() {
        try {
            final List&lt;Attachment&gt; attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
            LOGGER.info(&quot;Invoking POST with payload {}&quot;, attachments);
            Collection&lt;UploadLink&gt; uploadLinks = restClient.postAttachments(attachments);
            restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
        } catch (ExceptionA | ExceptionB e) {
            e.printStackTrace();
        }
    }
}

答案2

得分: 1

@Alexei's approach 是(我个人认为)解决这个问题的正确方法。不要阻塞这些可运行任务。而是在满足运行条件时提交这些可运行任务。

将一个可运行任务阻塞其他任务的问题在于,你可能会导致执行器的线程池因为等待另一个任务完成而被堵塞。事实上,如果线程池是有界的,甚至可能出现所有线程都处于此状态的情况,执行器无法启动将解开所有线程的任务。结果:死锁!

如果你仍然想要阻塞这些可运行任务(尽管上述原因),你可以使用CountDownLatch来实现。

  1. 在实例化这些Runnable之前,创建一个初始计数为1CountDownLatch实例。这个实例必须被所有的Runnable共享。

  2. 编写一个Runnable,使其获取一个List,进行处理,然后调用latch.count()

  3. 编写一个第二个Runnable,调用latch.await(),然后获取并处理一个List

  4. 使用第一个Runnable提交一个任务,其余任务使用第二个。

英文:

@Alexei's approach is (IMO) the correct way to solve this problem. Don't block the runnables. Instead submit the runnables when the preconditions for running them have already been satisfied.

The problem with having one runnable block others is that you are liable to clog up the executor's thread pool with tasks that are blocked waiting for another task to finish. Indeed, if the thread pool is bounded, you could even get into a situation where all of the threads are in this state and the executor is unable to start the task that will unblock them all. Result: deadlock!


If you still wanted to block the runnables (in spite of the above), then you could implement it using a CountDownLatch.

  1. Prior to instantiating the Runnables, create a CountDownLatch instance with an initial counter of 1. This instance must be shared by all of the Runnables.

  2. Code one Runnable so that it fetches a List, processes it, then calls latch.count();

  3. Code a second Runnable to call latch.await() and then fetch and process a List.

  4. Submit one task using the first Runnable and the remainder using the second one.

huangapple
  • 本文由 发表于 2020年9月3日 13:31:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/63717422.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定