英文:
How to make threads in ExecutorService to wait stage
问题
在我的 Web 应用程序中,我需要在一个 API 调用中调用超过 10 个方法。为了提高效率,我使用 ExecutorService
同时创建多个线程。每个方法返回不同的对象,分别是 fav_a()
、fav_b()
和 fav_c()
。(为了方便起见,下面提供了示例代码)
@GetMapping(RequestUrl.INIT)
public ResponseEntity<Map<String, List<?>>> init() throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(6);
Future<List<Object>> method_a = service.submit(() -> someService.method_a());
Future<List<Object>> method_b = service.submit(() -> someService.method_b());
Future<List<Object>> method_c = service.submit(() -> someService.method_c());
Future<List<FavouriteConverter>> fav_a = service.submit(() -> someService.fav_a());
Future<List<FavouriteConverter>> fav_b = service.submit(() -> someService.fav_b());
Future<List<FavouriteConverter>> fav_c = service.submit(() -> someService.fav_c());
service.shutdown();
List<FavouriteConverter> combinedFavourite = Stream.of(fav_a.get(), fav_b.get(), fav_c.get())
.flatMap(f -> f.stream())
.collect(Collectors.toList());
combinedFavourite = combinedFavourite.stream()
.sorted(Comparator.comparing(FavouriteConverter::get_id, Comparator.reverseOrder()))
.limit(25)
.collect(Collectors.toList());
Map<String, List<?>> map = new HashMap<>();
map.put("method_a", method_a.get());
map.put("method_b", method_b.get());
map.put("method_c", method_c.get());
map.put("favourite", combinedFavourite);
return new ResponseEntity<>(map, HttpStatus.OK);
}
首先,我需要获取 fav_a.get()
、fav_b.get()
和 fav_c.get()
来创建 combinedFavourite
。如果其中一个延迟,逻辑就会出错。创建线程是昂贵的操作。
Stream
是否会自动处理这种情况?- 如果
fav_a()
、fav_b()
和fav_c()
比其他方法更早完成,我如何将combinedFavourite
放入另一个线程?这意味着如何使Future<List<FavouriteConverter>> combinedFavourite
处于等待状态,直到fav_a.get()
、fav_b.get()
和fav_c.get()
完成。(假设method_a()
、method_b()
和method_c()
仍在运行。)
英文:
In my web application, I need to call around more than 10 methods in one API call. To make that efficient I use ExecutorService
to create multiple threads at a same time. Each methods returning different Objects expect fav_a(), fav_b(), fav_c()
. (Sample code is given below for easiness)
@GetMapping(RequestUrl.INIT)
public ResponseEntity<Map<String, List<?>>> init() throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(6);
Future<List<Object>> method_a = service.submit(() -> someService.method_a());
Future<List<Object>> method_b = service.submit(() -> someService.method_b());
Future<List<Object>> method_c = service.submit(() -> someService.method_c());
Future<List<FavouriteConverter>> fav_a = service.submit(() -> someService.fav_a());
Future<List<FavouriteConverter>> fav_b = service.submit(() -> someService.fav_b());
Future<List<FavouriteConverter>> fav_c = service.submit(() -> someService.fav_c());
service.shutdown();
List<FavouriteConverter> combinedFavourite = Stream.of(fav_a.get(), fav_b.get(), fav_c.get()).flatMap(f -> f.stream()).collect(Collectors.toList());
combinedFavourite=combinedFavourite.stream()
.sorted(Comparator.comparing(FavouriteConverter::get_id, Comparator.reverseOrder()))
.limit(25)
.collect(Collectors.toList());
Map<String, List<?>> map = new HashMap<>();
map.put("method_a", method_a.get());
map.put("method_b", method_b.get());
map.put("method_c", method_c.get());
map.put("favourite", combinedFavourite);
return new ResponseEntity<>(map, HttpStatus.OK);
}
First I need to get fav_a.get(), fav_b.get(), fav_c.get()
to make combinedFavourite
. If any of one delays, the logic will be wrong. Creating threads are expensive.
- Does
Stream
automatically handle this kind of situation? - If
fav_a(), fav_b(), fav_c()
do it jobs earlier than other methods, How can I putcombinedFavourite
into another thread? This means how to makeFuture<List<FavouriteConverter>> combinedFavourite
in waiting stage untilfav_a.get(), fav_b.get(), fav_c.get()
finishes. (Assumemethod_a(),method_b(),method_c()
still running.)
答案1
得分: 2
-
不,流(Streams)不负责加入这些线程。
-
由于您等待这 3 个线程的结果,并将它们放入一个映射中进行返回,将这样的逻辑包装在单独的线程中并不会帮助您,因为只要您必须等待并返回结果。
使用
ExecutorService::invokeAll
来执行所有任务,并在所有任务完成时(当Future::done
为true
时)返回一组 Futures 的列表。List<Future<List<Object>>> list = service.invokeAll( Arrays.asList( () -> someService.method_a(), () -> someService.method_b(), () -> someService.method_c() ));
注意这些是有保证的:
- 结果的
List<Future>
与给定任务的集合的顺序相同(根据其Iterator
)。 - 如果池化的线程数大于或等于执行的任务数,则所有任务将在单独的线程中运行(假设没有其他任务使用同一线程池的线程)。
- 结果的
这些逻辑帮助您处理完整的结果。
英文:
-
No, Streams are not responsible for joining these threads.
-
Since you wait for the results of these 3 threads and putting them into a map which you return, wrapping such logic in a separate thread doesn't help you as long as you have to wait and return the result.
Use
ExecutorService::invokeAll
to execute all the tasks and returning a list of Futures when all are complete (whenFuture::done
istrue
).List<Future<List<Object>>> list = service.invokeAll( Arrays.asList( () -> someService.method_a(), () -> someService.method_b(), () -> someService.method_c() ));
Note these are guaranteed:
- The result
List<Future>
is in the same order as the collection of tasks given (according to itsIterator
). - All the tasks will run in a separate thread if the pooled number of threads are higher or equal than executed tasks (assuming there are no other tasks using a thread from the same thread pool).
- The result
This logics helps you to work with complete results.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论