英文:
Is it possible to block other runnables while executing the first one, using ExecutorService in Java?
问题
我正在尝试在多个线程中使用 ExecutorService
处理一个相对庞大的 Stream
,其中包含多个 List
。方法大致如下。
public void initMigration() {
ExecutorService executorService = Executors.newCachedThreadPool();
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
streamOfLists.forEach(record4List -> {
Runnable runnable = () -> {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
};
executorService.submit(runnable);
});
}
LOGGER.info("Shutting down the ExecutorService");
executorService.shutdown();
}
基本上,我在这里尝试的是,对于 Stream
中的每个 List
,都创建了一个 Runnable
并将其提交到 ExecutorService
。这似乎运行得很正常。但是,我现在真正想做的是,查看是否有办法让 ExecutorService
在阻塞其他 Runnable
的同时运行从 Stream
中的第一个 List
获取的第一个 Runnable
,并在其执行之后继续运行其他 Runnable
(并行执行)。希望能得到一些帮助。
英文:
I'm trying to process a relatively huge Stream
of List
in multiple threads, using an ExecutorService
. The method looks something like this.
public void initMigration() {
ExecutorService executorService = Executors.newCachedThreadPool();
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
streamOfLists.forEach(record4List -> {
Runnable runnable = () -> {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
};
executorService.submit(runnable);
});
}
LOGGER.info("Shutting down the ExecutorService");
executorService.shutdown();
}
Basically, what I'm trying to do here is, for each List
in the Stream
, a Runnable
is being created and submitted to the ExecutorService
. It seems to be working alright. But, what I really wanna do now, is to see if there's any way I can make the ExecutorService
run the first Runnable
obtained from the first List
in the Stream
while blocking other Runnables
until
its execution, and continue running other Runnables
(in parallel) after that. Could really use some help with this.
答案1
得分: 1
你可以执行第一个Runnable,然后再提交其他的Runnables。
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
Iterator<List<Record4<Integer, Integer, String, byte[]>>> it = streamOfLists.iterator();
if (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
runnable.run();
}
while (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
executorService.submit(runnable);
}
}
其中,
class MyRunnable implements Runnable {
Record4<Integer, Integer, String, byte[]> record4List;
MyRunnable(Record4<Integer, Integer, String, byte[]> record4List) {
this.record4List = record4List;
}
@Override
public void run() {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
}
}
英文:
You can take first Runnable, execute it, and only then submit other Runnables.
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
Iterator<List<Record4<Integer, Integer, String, byte[]>>> it = streamOfLists.iterator();
if (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
runnable.run();
}
while (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
executorService.submit(runnable);
}
}
where
class MyRunnable implements Runnable {
Record4<Integer, Integer, String, byte[]> record4List;
MyRunnable(Record4<Integer, Integer, String, byte[]> record4List) {
this.record4List = record4List;
}
@Override
public void run() {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
}
}
答案2
得分: 1
@Alexei's approach 是(我个人认为)解决这个问题的正确方法。不要阻塞这些可运行任务。而是在满足运行条件时提交这些可运行任务。
将一个可运行任务阻塞其他任务的问题在于,你可能会导致执行器的线程池因为等待另一个任务完成而被堵塞。事实上,如果线程池是有界的,甚至可能出现所有线程都处于此状态的情况,执行器无法启动将解开所有线程的任务。结果:死锁!
如果你仍然想要阻塞这些可运行任务(尽管上述原因),你可以使用CountDownLatch
来实现。
-
在实例化这些
Runnable
之前,创建一个初始计数为1
的CountDownLatch
实例。这个实例必须被所有的Runnable
共享。 -
编写一个
Runnable
,使其获取一个List
,进行处理,然后调用latch.count()
; -
编写一个第二个
Runnable
,调用latch.await()
,然后获取并处理一个List
。 -
使用第一个
Runnable
提交一个任务,其余任务使用第二个。
英文:
@Alexei's approach is (IMO) the correct way to solve this problem. Don't block the runnables. Instead submit the runnables when the preconditions for running them have already been satisfied.
The problem with having one runnable block others is that you are liable to clog up the executor's thread pool with tasks that are blocked waiting for another task to finish. Indeed, if the thread pool is bounded, you could even get into a situation where all of the threads are in this state and the executor is unable to start the task that will unblock them all. Result: deadlock!
If you still wanted to block the runnables (in spite of the above), then you could implement it using a CountDownLatch
.
-
Prior to instantiating the
Runnable
s, create aCountDownLatch
instance with an initial counter of1
. This instance must be shared by all of theRunnable
s. -
Code one
Runnable
so that it fetches aList
, processes it, then callslatch.count()
; -
Code a second
Runnable
to calllatch.await()
and then fetch and process aList
. -
Submit one task using the first
Runnable
and the remainder using the second one.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论