在Java 8中并行调用两个函数

huangapple go评论65阅读模式
英文:

Calling two functions in parallel in Java 8

问题

我在我的 Spring Boot 应用程序中有一个如下的用例:

我想从响应中获取 id 字段的值,使用以下函数:

String id = getIdFromResponse(response);

如果在响应中没有获取到任何 id,那么我会检查请求参数中是否存在 id 字段,使用以下函数:

String id = getIdFromRequest(request);

目前,我是按顺序调用它们的。但我想让这两个函数并行运行,我希望在从它们中的任何一个获取到 id 后停止。

我想知道是否有办法在 Java 8 中使用 streams 来实现这一点。

英文:

I have a use case in my Spring boot application as follows:

I would like to fetch the id field value from the response with the following function:

String id = getIdFromResponse(response);

If I don't get any id in the response, then I check if the id field is present in the request argument with the following function:

String id = getIdFromRequest(request);

As of now, I am invoking them sequentially. But I would like to make these two functions run parallelly, I would like to stop as soon as I get an id from either of them.

I am wondering if there is any way to implement this using streams in Java 8.

答案1

得分: 8

你可以使用类似这样的代码:

String id = Stream.<Supplier<String>>of(
        () -> getIdFromResponse(response), 
        () -> getIdFromRequest(request)
    )
    .parallel()
    .map(Supplier::get)
    .filter(Objects::nonNull)
    .findFirst()
    .orElseThrow():

这些供应商是必需的,因为当您不使用它们时,两个请求仍然按顺序执行。

我还假设您的方法在未找到内容时返回null,所以我需要使用.filter(Objects::nonNull)来过滤掉这些值。

根据您的用例,您可以将.orElseThrow()替换为不同的内容,比如.orElse(null)

英文:

You can use something like this:

String id = Stream.&lt;Supplier&lt;String&gt;&gt;of(
        () -&gt; getIdFromResponse(response), 
        () -&gt; getIdFromRequest(request)
    )
    .parallel()
    .map(Supplier::get)
    .filter(Objects::nonNull)
    .findFirst()
    .orElseThrow():

The suppliers are needed, because when you don't use them, then both requests are still executed sequentially.

I also assumed that your methods return null when nothing is found, so I had to filter these values out with .filter(Objects::nonNull).

Depending on your use case, you can replace .orElseThrow() with something different, like .orElse(null)

答案2

得分: 6

只要存在一种适用于此情况的方法,就不需要使用流(Stream) API。

ExecutorService::invokeAny(Collection&lt;? extends Callable&lt;T&gt;&gt;)

> 执行给定的任务,返回已成功完成的任务的结果(即未抛出异常的任务结果),如果有的话。在正常或异常返回时,未完成的任务将被取消。

List&lt;Callable&lt;String&gt;&gt; collection = Arrays.asList(
    () -&gt; 从响应中获取ID(response),
    () -&gt; 从请求中获取ID(request)
);

// 您希望有与集合大小相同的线程数
ExecutorService executorService = Executors.newFixedThreadPool(collection.size());
String id = executorService.invokeAny(collection);

三个注意事项:

英文:

There is no need to use Stream API as long as there exists a method exactly for this.

ExecutorService::invokeAny(Collection&lt;? extends Callable&lt;T&gt;&gt;)

> Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do. Upon normal or exceptional return, tasks that have not completed are cancelled.

List&lt;Callable&lt;String&gt;&gt; collection = Arrays.asList(
    () -&gt; getIdFromResponse(response),
    () -&gt; getIdFromRequest(request)
);

// you want the same number of threads as the size of the collection
ExecutorService executorService = Executors.newFixedThreadPool(collection.size());
String id = executorService.invokeAny(collection);

Three notes:

答案3

得分: 3

如果您想完全控制何时启用替代评估,您可以使用 CompletableFuture

CompletableFuture<String> job
    = CompletableFuture.supplyAsync(() -> getIdFromResponse(response));
String id;
try {
    id = job.get(300, TimeUnit.MILLISECONDS);
}
catch(TimeoutException ex) {
    // 在指定时间内未响应,设置替代方案
    id = job.applyToEither(
        CompletableFuture.supplyAsync(() -> getIdFromRequest(request)), s -> s).join();
}
catch(InterruptedException|ExecutionException ex) {
    // 处理错误
}

当第一个作业未在指定时间内完成时,才会提交第二个作业。然后,无论哪个作业先响应,都将提供结果值。

英文:

If you want to be in full control over when to enable the alternative evaluation, you may use CompletableFuture:

CompletableFuture&lt;String&gt; job
    = CompletableFuture.supplyAsync(() -&gt; getIdFromResponse(response));
String id;
try {
    id = job.get(300, TimeUnit.MILLISECONDS);
}
catch(TimeoutException ex) {
    // did not respond within the specified time, set up alternative
    id = job.applyToEither(
        CompletableFuture.supplyAsync(() -&gt; getIdFromRequest(request)), s -&gt; s).join();
}
catch(InterruptedException|ExecutionException ex) {
    // handle error
}

The second job is only submitted when the first did not complete within the specified time. Then, whichever job responds first will provide the result value.

huangapple
  • 本文由 发表于 2020年10月21日 14:55:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/64458149.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定