Collecting results from a list of Futures in java.

huangapple go评论53阅读模式
英文:

Collecting results from a list of Futures in java

问题

以下是您要翻译的代码部分:

private void init() throws ExecutionException, InterruptedException {
    Long start = System.currentTimeMillis();
    List<ApiResponse> responses = fetchAllUsingFuture(ids, 3);
    log.info(responses.toString());
    Long finish = System.currentTimeMillis();
    log.info(MessageFormat.format("Process duration: {0} in ms", finish-start));
}

private List<ApiResponse> fetchAllUsingFuture(List<String> ids, int threadCount) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    List<List<String>> chunks = Utils.splitToChunks(ids, threadCount);
    List<Future<List<ApiResponse>>> futures = new ArrayList<>();
    chunks.forEach(chunk -> {
        futures.add(wrapFetchInFuture(chunk));
    });
    Future<List<ApiResponse>> resultFuture = executorService.submit(() -> {
        List<ApiResponse> responses = new ArrayList<>();
        futures.forEach(future -> {
            try {
                responses.addAll(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        return responses;
    });

    executorService.shutdown();
    return resultFuture.get();
}

private Future<List<ApiResponse>> wrapFetchInFuture(List<String> ids) {
    return new FutureTask<>(() -> {
        List<ApiResponse> responses = new ArrayList<>();
        ids.forEach(id -> {
            responses.add(fetchData(id));
        });
        return responses;
    });
}

private ApiResponse fetchData(String id) {
    ResponseEntity<ApiResponse> response = restTemplate.getForEntity(id, ApiResponse.class);
    log.info(MessageFormat.format("Fetching from {0}", id));
    ApiResponse body = response.getBody();
    log.info(MessageFormat.format("Retrieved {0}", body));
    return body;
}

希望这对您有所帮助。如果您有任何问题,请随时提出。

英文:

I'm trying to use futures to make concurrent api calls. Code:

private void init() throws ExecutionException, InterruptedException {
Long start = System.currentTimeMillis();
List&lt;ApiResponse&gt; responses = fetchAllUsingFuture(ids, 3);
log.info(responses.toString());
Long finish = System.currentTimeMillis();
log.info(MessageFormat.format(&quot;Process duration: {0} in ms&quot;, finish-start));
}
private List&lt;ApiResponse&gt; fetchAllUsingFuture(List&lt;String&gt; ids, int threadCount) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
List&lt;List&lt;String&gt;&gt; chunks = Utils.splitToChunks(ids, threadCount);
List&lt;Future&lt;List&lt;ApiResponse&gt;&gt;&gt; futures = new ArrayList&lt;&gt;();
chunks.forEach(chunk -&gt; {
futures.add(wrapFetchInFuture(chunk));
});
Future&lt;List&lt;ApiResponse&gt;&gt; resultFuture = executorService.submit(() -&gt; {
List&lt;ApiResponse&gt; responses = new ArrayList&lt;&gt;();
futures.forEach(future -&gt; {
try {
responses.addAll(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
return responses;
});
executorService.shutdown();
return resultFuture.get();
}
private Future&lt;List&lt;ApiResponse&gt;&gt; wrapFetchInFuture(List&lt;String&gt; ids) {
return new FutureTask&lt;&gt;(() -&gt; {
List&lt;ApiResponse&gt; responses = new ArrayList&lt;&gt;();
ids.forEach(id -&gt; {
responses.add(fetchData(id));
});
return responses;
});
}
private ApiResponse fetchData(String id) {
ResponseEntity&lt;ApiResponse&gt; response = restTemplate.getForEntity(id, ApiResponse.class);
log.info(MessageFormat.format(&quot;Fetching from {0}&quot;, id));
ApiResponse body = response.getBody();
log.info(MessageFormat.format(&quot;Retrieved {0}&quot;, body));
return body;
}

It doesn't execute, the app starts and then just pends. Futures don't get fulfilled. All advices are appreciated.
P.S. I'm aware this is much more easily done using CompletableFuture, I was just wondering how to do this with Futures

答案1

得分: 1

在问题的原始版本中,您正在创建一个FutureTask列表,但从未将它们发送到ExecutorService来运行。这些任务永远不会完成,所以Future.get会永远阻塞。

在问题的更新版本中,您已经将执行等待的代码放入了执行器服务作为一个任务。但FutureTasks从未运行,因此FutureTask.get仍然会永远阻塞。

我建议您将fetchAllUsingFuture中的代码更改为:

List<Callable<List<ApiResponse>>> tasks = new ArrayList<>();
chunks.forEach(chunk -> {
    tasks.add(wrapFetchInCallable(chunk));
});
List<Future<List<ApiResponse>>> futures = executorService.invokeAll(tasks);

其中wrapFetchInCallable创建一个Callable而不是FutureTask

private static Callable<List<ApiResponse>> wrapFetchInCallable(List<String> ids) {
    return () -> {
        List<ApiResponse> responses = new ArrayList<>();
        ids.forEach(id -> {
            responses.add(fetchData(id));
        });
        return responses;
    };
}
英文:

In the original version of the question, you are creating a list of FutureTasks but never send them to the ExecutorService to run them. The tasks never complete, so Future.get blocks forever.

In the updated version of the question, you have put the code that does the waiting into the executor service as a task. The FutureTasks never run, so FutureTask.get will still block forever.

I would suggest you change the code in fetchAllUsingFuture to:

    List&lt;Callable&lt;List&lt;ApiResponse&gt;&gt;&gt; tasks = new ArrayList&lt;&gt;();
chunks.forEach(chunk -&gt; {
tasks.add(wrapFetchInCallable(chunk));
});
List&lt;Future&lt;List&lt;ApiResponse&gt;&gt;&gt; futures = executorService.invokeAll(tasks);

where wrapFetchInCallable creates a Callable instead of FutureTask:

private static Callable&lt;List&lt;ApiResponse&gt;&gt; wrapFetchInCallable(List&lt;String&gt; ids) {
return () -&gt; {
List&lt;ApiResponse&gt; responses = new ArrayList&lt;&gt;();
ids.forEach(id -&gt; {
responses.add(fetchData(id));
});
return responses;
};
}

答案2

得分: 1

看起来你正在创建一个FutureTasks的列表,但从未将它们发送到ExecutorService来运行。

我已经实现了一个使用Future对象的ExecutorService,如下所示,希望对你有所帮助:

服务层:

public List<MovieDTO> searchMoviesParallel(String limit, String offset, String searchPhrase) throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(1);
    Future<List<MovieDTO>> digitoonResult = executor.submit(new DigitoonSearchTask(limit, offset, searchPhrase));
    List<MovieDTO> movieDTOList = digitoonResult.get();
    executor.shutdown();
    return movieDTOList;
}

而我的搜索任务(DigitoonSearchTask类)如下所示:

public class DigitoonSearchTask implements Callable<List<MovieDTO>> {
    private String limit;
    private String offset;
    private String searchPhrase;
    private final static String digitoonSearchBaseUrl = "http://apitwo.xxx.com/partner/search/?q=";

    public DigitoonSearchTask(String limit, String offset, String searchPhrase) {
        this.limit = limit;
        this.offset = offset;
        this.searchPhrase = searchPhrase;
    }

    @Override
    public List<MovieDTO> call() throws Exception {
        List<MovieDTO> movieDTOList = new ArrayList<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            String uri = digitoonSearchBaseUrl + URLEncoder.encode(searchPhrase, "utf-8") + "&limit=" + limit + "&offset=" + offset;
            URL url = new URL(uri);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("GET");
            conn.setRequestProperty("Accept", "application/json");
            conn.setRequestProperty("authorization", "xxxxxxxxxx");
            if (conn.getResponseCode() != 200) {
                throw new RuntimeException("Failed : HTTP error code : " + conn.getResponseCode());
            }
            BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()));

            String output;
            while ((output = br.readLine()) != null) {
                movieDTOList = Arrays.asList(mapper.readValue(output, MovieDTO[].class));
            }
            br.close();
            conn.disconnect();
        } catch (UnknownHostException e) {
            call();
        } catch (MalformedURLException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return movieDTOList;
    }
}

请注意,现在我只有一个API,获取其他API后,可以通过增加线程数在服务层中添加另一个搜索任务。

英文:

It looks like you are creating a list of FutureTasks but never send them to the ExecutorService to run them.
I have implemented ExecutorService with Future Object as below, i hope it helps you:

Service layer:

public List&lt;MovieDTO&gt; searchMoviesParallel(String limit, String offset, String searchPhrase) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);
Future&lt;List&lt;MovieDTO&gt;&gt; digitoonResult = executor.submit(new DigitoonSearchTask(limit, offset, searchPhrase));
List&lt;MovieDTO&gt; movieDTOList = digitoonResult.get();
executor.shutdown();
return movieDTOList;
}

And my Search task(DigitoonSearchTask class) is as below:

public class DigitoonSearchTask implements Callable&lt;List&lt;MovieDTO&gt;&gt; {
private String limit;
private String offset;
private String searchPhrase;
private final static String digitoonSearchBaseUrl = &quot;http://apitwo.xxx.com/partner/search/?q=&quot;;
public DigitoonSearchTask(String limit, String offset, String searchPhrase) {
this.limit = limit;
this.offset = offset;
this.searchPhrase = searchPhrase;
}
@Override
public List&lt;MovieDTO&gt; call() throws Exception {
List&lt;MovieDTO&gt; movieDTOList = new ArrayList&lt;&gt;();
ObjectMapper mapper = new ObjectMapper();
try {
String uri = digitoonSearchBaseUrl + URLEncoder.encode(searchPhrase, &quot;utf-8&quot;) + &quot;&amp;limit=&quot; + limit + &quot;&amp;offset=&quot; + offset;
URL url = new URL(uri);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(&quot;GET&quot;);
conn.setRequestProperty(&quot;Accept&quot;, &quot;application/json&quot;);
conn.setRequestProperty(&quot;authorization&quot;, &quot;xxxxxxxxxx&quot;);
if (conn.getResponseCode() != 200) {
throw new RuntimeException(&quot;Failed : HTTP error code : &quot;
+ conn.getResponseCode());
}
BufferedReader br = new BufferedReader(new InputStreamReader(
(conn.getInputStream())));
String output;
while ((output = br.readLine()) != null) {
movieDTOList = Arrays.asList(mapper.readValue(output, MovieDTO[].class));
}
br.close();
conn.disconnect();
} catch (UnknownHostException e) {
call();
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return movieDTOList;
}}

consider that now I have just one API and after getting others they can be added as another Search task in service layer by increasing the thread number.

huangapple
  • 本文由 发表于 2020年8月1日 18:32:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/63204221.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定