Collecting results from a list of Futures in java.

huangapple go评论98阅读模式

Collecting results from a list of Futures in java



  1. private void init() throws ExecutionException, InterruptedException {
  2. Long start = System.currentTimeMillis();
  3. List<ApiResponse> responses = fetchAllUsingFuture(ids, 3);
  5. Long finish = System.currentTimeMillis();
  6."Process duration: {0} in ms", finish-start));
  7. }
  8. private List<ApiResponse> fetchAllUsingFuture(List<String> ids, int threadCount) throws ExecutionException, InterruptedException {
  9. ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
  10. List<List<String>> chunks = Utils.splitToChunks(ids, threadCount);
  11. List<Future<List<ApiResponse>>> futures = new ArrayList<>();
  12. chunks.forEach(chunk -> {
  13. futures.add(wrapFetchInFuture(chunk));
  14. });
  15. Future<List<ApiResponse>> resultFuture = executorService.submit(() -> {
  16. List<ApiResponse> responses = new ArrayList<>();
  17. futures.forEach(future -> {
  18. try {
  19. responses.addAll(future.get());
  20. } catch (InterruptedException | ExecutionException e) {
  21. e.printStackTrace();
  22. }
  23. });
  24. return responses;
  25. });
  26. executorService.shutdown();
  27. return resultFuture.get();
  28. }
  29. private Future<List<ApiResponse>> wrapFetchInFuture(List<String> ids) {
  30. return new FutureTask<>(() -> {
  31. List<ApiResponse> responses = new ArrayList<>();
  32. ids.forEach(id -> {
  33. responses.add(fetchData(id));
  34. });
  35. return responses;
  36. });
  37. }
  38. private ApiResponse fetchData(String id) {
  39. ResponseEntity<ApiResponse> response = restTemplate.getForEntity(id, ApiResponse.class);
  40."Fetching from {0}", id));
  41. ApiResponse body = response.getBody();
  42."Retrieved {0}", body));
  43. return body;
  44. }



I'm trying to use futures to make concurrent api calls. Code:

  1. private void init() throws ExecutionException, InterruptedException {
  2. Long start = System.currentTimeMillis();
  3. List&lt;ApiResponse&gt; responses = fetchAllUsingFuture(ids, 3);
  5. Long finish = System.currentTimeMillis();
  6.;Process duration: {0} in ms&quot;, finish-start));
  7. }
  8. private List&lt;ApiResponse&gt; fetchAllUsingFuture(List&lt;String&gt; ids, int threadCount) throws ExecutionException, InterruptedException {
  9. ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
  10. List&lt;List&lt;String&gt;&gt; chunks = Utils.splitToChunks(ids, threadCount);
  11. List&lt;Future&lt;List&lt;ApiResponse&gt;&gt;&gt; futures = new ArrayList&lt;&gt;();
  12. chunks.forEach(chunk -&gt; {
  13. futures.add(wrapFetchInFuture(chunk));
  14. });
  15. Future&lt;List&lt;ApiResponse&gt;&gt; resultFuture = executorService.submit(() -&gt; {
  16. List&lt;ApiResponse&gt; responses = new ArrayList&lt;&gt;();
  17. futures.forEach(future -&gt; {
  18. try {
  19. responses.addAll(future.get());
  20. } catch (InterruptedException | ExecutionException e) {
  21. e.printStackTrace();
  22. }
  23. });
  24. return responses;
  25. });
  26. executorService.shutdown();
  27. return resultFuture.get();
  28. }
  29. private Future&lt;List&lt;ApiResponse&gt;&gt; wrapFetchInFuture(List&lt;String&gt; ids) {
  30. return new FutureTask&lt;&gt;(() -&gt; {
  31. List&lt;ApiResponse&gt; responses = new ArrayList&lt;&gt;();
  32. ids.forEach(id -&gt; {
  33. responses.add(fetchData(id));
  34. });
  35. return responses;
  36. });
  37. }
  38. private ApiResponse fetchData(String id) {
  39. ResponseEntity&lt;ApiResponse&gt; response = restTemplate.getForEntity(id, ApiResponse.class);
  40.;Fetching from {0}&quot;, id));
  41. ApiResponse body = response.getBody();
  42.;Retrieved {0}&quot;, body));
  43. return body;
  44. }

It doesn't execute, the app starts and then just pends. Futures don't get fulfilled. All advices are appreciated.
P.S. I'm aware this is much more easily done using CompletableFuture, I was just wondering how to do this with Futures


得分: 1




  1. List<Callable<List<ApiResponse>>> tasks = new ArrayList<>();
  2. chunks.forEach(chunk -> {
  3. tasks.add(wrapFetchInCallable(chunk));
  4. });
  5. List<Future<List<ApiResponse>>> futures = executorService.invokeAll(tasks);


  1. private static Callable<List<ApiResponse>> wrapFetchInCallable(List<String> ids) {
  2. return () -> {
  3. List<ApiResponse> responses = new ArrayList<>();
  4. ids.forEach(id -> {
  5. responses.add(fetchData(id));
  6. });
  7. return responses;
  8. };
  9. }

In the original version of the question, you are creating a list of FutureTasks but never send them to the ExecutorService to run them. The tasks never complete, so Future.get blocks forever.

In the updated version of the question, you have put the code that does the waiting into the executor service as a task. The FutureTasks never run, so FutureTask.get will still block forever.

I would suggest you change the code in fetchAllUsingFuture to:

  1. List&lt;Callable&lt;List&lt;ApiResponse&gt;&gt;&gt; tasks = new ArrayList&lt;&gt;();
  2. chunks.forEach(chunk -&gt; {
  3. tasks.add(wrapFetchInCallable(chunk));
  4. });
  5. List&lt;Future&lt;List&lt;ApiResponse&gt;&gt;&gt; futures = executorService.invokeAll(tasks);

where wrapFetchInCallable creates a Callable instead of FutureTask:

  1. private static Callable&lt;List&lt;ApiResponse&gt;&gt; wrapFetchInCallable(List&lt;String&gt; ids) {
  2. return () -&gt; {
  3. List&lt;ApiResponse&gt; responses = new ArrayList&lt;&gt;();
  4. ids.forEach(id -&gt; {
  5. responses.add(fetchData(id));
  6. });
  7. return responses;
  8. };
  9. }


得分: 1




  1. public List<MovieDTO> searchMoviesParallel(String limit, String offset, String searchPhrase) throws Exception {
  2. ExecutorService executor = Executors.newFixedThreadPool(1);
  3. Future<List<MovieDTO>> digitoonResult = executor.submit(new DigitoonSearchTask(limit, offset, searchPhrase));
  4. List<MovieDTO> movieDTOList = digitoonResult.get();
  5. executor.shutdown();
  6. return movieDTOList;
  7. }


  1. public class DigitoonSearchTask implements Callable<List<MovieDTO>> {
  2. private String limit;
  3. private String offset;
  4. private String searchPhrase;
  5. private final static String digitoonSearchBaseUrl = "";
  6. public DigitoonSearchTask(String limit, String offset, String searchPhrase) {
  7. this.limit = limit;
  8. this.offset = offset;
  9. this.searchPhrase = searchPhrase;
  10. }
  11. @Override
  12. public List<MovieDTO> call() throws Exception {
  13. List<MovieDTO> movieDTOList = new ArrayList<>();
  14. ObjectMapper mapper = new ObjectMapper();
  15. try {
  16. String uri = digitoonSearchBaseUrl + URLEncoder.encode(searchPhrase, "utf-8") + "&limit=" + limit + "&offset=" + offset;
  17. URL url = new URL(uri);
  18. HttpURLConnection conn = (HttpURLConnection) url.openConnection();
  19. conn.setRequestMethod("GET");
  20. conn.setRequestProperty("Accept", "application/json");
  21. conn.setRequestProperty("authorization", "xxxxxxxxxx");
  22. if (conn.getResponseCode() != 200) {
  23. throw new RuntimeException("Failed : HTTP error code : " + conn.getResponseCode());
  24. }
  25. BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
  26. String output;
  27. while ((output = br.readLine()) != null) {
  28. movieDTOList = Arrays.asList(mapper.readValue(output, MovieDTO[].class));
  29. }
  30. br.close();
  31. conn.disconnect();
  32. } catch (UnknownHostException e) {
  33. call();
  34. } catch (MalformedURLException e) {
  35. e.printStackTrace();
  36. } catch (IOException e) {
  37. e.printStackTrace();
  38. }
  39. return movieDTOList;
  40. }
  41. }



It looks like you are creating a list of FutureTasks but never send them to the ExecutorService to run them.
I have implemented ExecutorService with Future Object as below, i hope it helps you:

Service layer:

  1. public List&lt;MovieDTO&gt; searchMoviesParallel(String limit, String offset, String searchPhrase) throws Exception {
  2. ExecutorService executor = Executors.newFixedThreadPool(1);
  3. Future&lt;List&lt;MovieDTO&gt;&gt; digitoonResult = executor.submit(new DigitoonSearchTask(limit, offset, searchPhrase));
  4. List&lt;MovieDTO&gt; movieDTOList = digitoonResult.get();
  5. executor.shutdown();
  6. return movieDTOList;
  7. }

And my Search task(DigitoonSearchTask class) is as below:

  1. public class DigitoonSearchTask implements Callable&lt;List&lt;MovieDTO&gt;&gt; {
  2. private String limit;
  3. private String offset;
  4. private String searchPhrase;
  5. private final static String digitoonSearchBaseUrl = &quot;;;
  6. public DigitoonSearchTask(String limit, String offset, String searchPhrase) {
  7. this.limit = limit;
  8. this.offset = offset;
  9. this.searchPhrase = searchPhrase;
  10. }
  11. @Override
  12. public List&lt;MovieDTO&gt; call() throws Exception {
  13. List&lt;MovieDTO&gt; movieDTOList = new ArrayList&lt;&gt;();
  14. ObjectMapper mapper = new ObjectMapper();
  15. try {
  16. String uri = digitoonSearchBaseUrl + URLEncoder.encode(searchPhrase, &quot;utf-8&quot;) + &quot;&amp;limit=&quot; + limit + &quot;&amp;offset=&quot; + offset;
  17. URL url = new URL(uri);
  18. HttpURLConnection conn = (HttpURLConnection) url.openConnection();
  19. conn.setRequestMethod(&quot;GET&quot;);
  20. conn.setRequestProperty(&quot;Accept&quot;, &quot;application/json&quot;);
  21. conn.setRequestProperty(&quot;authorization&quot;, &quot;xxxxxxxxxx&quot;);
  22. if (conn.getResponseCode() != 200) {
  23. throw new RuntimeException(&quot;Failed : HTTP error code : &quot;
  24. + conn.getResponseCode());
  25. }
  26. BufferedReader br = new BufferedReader(new InputStreamReader(
  27. (conn.getInputStream())));
  28. String output;
  29. while ((output = br.readLine()) != null) {
  30. movieDTOList = Arrays.asList(mapper.readValue(output, MovieDTO[].class));
  31. }
  32. br.close();
  33. conn.disconnect();
  34. } catch (UnknownHostException e) {
  35. call();
  36. } catch (MalformedURLException e) {
  37. e.printStackTrace();
  38. } catch (IOException e) {
  39. e.printStackTrace();
  40. }
  41. return movieDTOList;
  42. }}

consider that now I have just one API and after getting others they can be added as another Search task in service layer by increasing the thread number.

  • 本文由 发表于 2020年8月1日 18:32:42
  • 转载请务必保留本文链接:



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
