如何使ExecutorService中的线程进入等待阶段

huangapple go评论67阅读模式
英文:

How to make threads in ExecutorService to wait stage

问题

在我的 Web 应用程序中,我需要在一个 API 调用中调用超过 10 个方法。为了提高效率,我使用 ExecutorService 同时创建多个线程。每个方法返回不同的对象,分别是 fav_a()fav_b()fav_c()。(为了方便起见,下面提供了示例代码)

@GetMapping(RequestUrl.INIT)
public ResponseEntity<Map<String, List<?>>> init() throws ExecutionException, InterruptedException {

    ExecutorService service = Executors.newFixedThreadPool(6);

    Future<List<Object>> method_a = service.submit(() -> someService.method_a());
    Future<List<Object>> method_b = service.submit(() -> someService.method_b());
    Future<List<Object>> method_c = service.submit(() -> someService.method_c());

    Future<List<FavouriteConverter>> fav_a = service.submit(() -> someService.fav_a());
    Future<List<FavouriteConverter>> fav_b = service.submit(() -> someService.fav_b());
    Future<List<FavouriteConverter>> fav_c = service.submit(() -> someService.fav_c());

    service.shutdown();

    List<FavouriteConverter> combinedFavourite = Stream.of(fav_a.get(), fav_b.get(), fav_c.get())
            .flatMap(f -> f.stream())
            .collect(Collectors.toList());

    combinedFavourite = combinedFavourite.stream()
            .sorted(Comparator.comparing(FavouriteConverter::get_id, Comparator.reverseOrder()))
            .limit(25)
            .collect(Collectors.toList());

    Map<String, List<?>> map = new HashMap<>();

    map.put("method_a", method_a.get());
    map.put("method_b", method_b.get());
    map.put("method_c", method_c.get());
    map.put("favourite", combinedFavourite);

    return new ResponseEntity<>(map, HttpStatus.OK);
}

首先,我需要获取 fav_a.get()fav_b.get()fav_c.get() 来创建 combinedFavourite。如果其中一个延迟,逻辑就会出错。创建线程是昂贵的操作。

  1. Stream 是否会自动处理这种情况?
  2. 如果 fav_a()fav_b()fav_c() 比其他方法更早完成,我如何将 combinedFavourite 放入另一个线程?这意味着如何使 Future<List<FavouriteConverter>> combinedFavourite 处于等待状态,直到 fav_a.get()fav_b.get()fav_c.get() 完成。(假设 method_a()method_b()method_c() 仍在运行。)
英文:

In my web application, I need to call around more than 10 methods in one API call. To make that efficient I use ExecutorService to create multiple threads at a same time. Each methods returning different Objects expect fav_a(), fav_b(), fav_c(). (Sample code is given below for easiness)

@GetMapping(RequestUrl.INIT)
public ResponseEntity&lt;Map&lt;String, List&lt;?&gt;&gt;&gt; init() throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(6);
Future&lt;List&lt;Object&gt;&gt; method_a = service.submit(() -&gt; someService.method_a());
Future&lt;List&lt;Object&gt;&gt; method_b = service.submit(() -&gt; someService.method_b());
Future&lt;List&lt;Object&gt;&gt; method_c = service.submit(() -&gt; someService.method_c());    	
Future&lt;List&lt;FavouriteConverter&gt;&gt; fav_a = service.submit(() -&gt; someService.fav_a());
Future&lt;List&lt;FavouriteConverter&gt;&gt; fav_b = service.submit(() -&gt; someService.fav_b());
Future&lt;List&lt;FavouriteConverter&gt;&gt; fav_c = service.submit(() -&gt; someService.fav_c());
service.shutdown();
List&lt;FavouriteConverter&gt; combinedFavourite = Stream.of(fav_a.get(), fav_b.get(), fav_c.get()).flatMap(f -&gt; f.stream()).collect(Collectors.toList());
combinedFavourite=combinedFavourite.stream()
.sorted(Comparator.comparing(FavouriteConverter::get_id, Comparator.reverseOrder()))
.limit(25)
.collect(Collectors.toList());
Map&lt;String, List&lt;?&gt;&gt; map = new HashMap&lt;&gt;();    
map.put(&quot;method_a&quot;, method_a.get());
map.put(&quot;method_b&quot;, method_b.get());
map.put(&quot;method_c&quot;, method_c.get());
map.put(&quot;favourite&quot;, combinedFavourite);
return new ResponseEntity&lt;&gt;(map, HttpStatus.OK);
}

First I need to get fav_a.get(), fav_b.get(), fav_c.get() to make combinedFavourite. If any of one delays, the logic will be wrong. Creating threads are expensive.

  1. Does Stream automatically handle this kind of situation?
  2. If fav_a(), fav_b(), fav_c() do it jobs earlier than other methods, How can I put combinedFavourite into another thread? This means how to make Future&lt;List&lt;FavouriteConverter&gt;&gt; combinedFavourite in waiting stage until fav_a.get(), fav_b.get(), fav_c.get() finishes. (Assume method_a(),method_b(),method_c() still running.)

答案1

得分: 2

  1. 不,流(Streams)不负责加入这些线程。

  2. 由于您等待这 3 个线程的结果,并将它们放入一个映射中进行返回,将这样的逻辑包装在单独的线程中并不会帮助您,因为只要您必须等待并返回结果。

    使用 ExecutorService::invokeAll 来执行所有任务,并在所有任务完成时(当 Future::donetrue 时)返回一组 Futures 的列表。

    List<Future<List<Object>>> list = service.invokeAll(
    Arrays.asList(
    () -> someService.method_a(),
    () -> someService.method_b(),
    () -> someService.method_c()
    ));
    

    注意这些是有保证的:

    • 结果的 List<Future> 与给定任务的集合的顺序相同(根据其 Iterator)。
    • 如果池化的线程数大于或等于执行的任务数,则所有任务将在单独的线程中运行(假设没有其他任务使用同一线程池的线程)。

这些逻辑帮助您处理完整的结果。

英文:
  1. No, Streams are not responsible for joining these threads.

  2. Since you wait for the results of these 3 threads and putting them into a map which you return, wrapping such logic in a separate thread doesn't help you as long as you have to wait and return the result.

    Use ExecutorService::invokeAll to execute all the tasks and returning a list of Futures when all are complete (when Future::done is true).

    List&lt;Future&lt;List&lt;Object&gt;&gt;&gt; list = service.invokeAll(
    Arrays.asList(
    () -&gt; someService.method_a(),
    () -&gt; someService.method_b(),
    () -&gt; someService.method_c()
    ));
    

    Note these are guaranteed:

    • The result List&lt;Future&gt; is in the same order as the collection of tasks given (according to its Iterator).
    • All the tasks will run in a separate thread if the pooled number of threads are higher or equal than executed tasks (assuming there are no other tasks using a thread from the same thread pool).

This logics helps you to work with complete results.

huangapple
  • 本文由 发表于 2020年7月24日 12:59:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/63067126.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定