英文:
Calling two functions in parallel in Java 8
问题
我在我的 Spring Boot 应用程序中有一个如下的用例:
我想从响应中获取 id
字段的值,使用以下函数:
String id = getIdFromResponse(response);
如果在响应中没有获取到任何 id,那么我会检查请求参数中是否存在 id
字段,使用以下函数:
String id = getIdFromRequest(request);
目前,我是按顺序调用它们的。但我想让这两个函数并行运行,我希望在从它们中的任何一个获取到 id 后停止。
我想知道是否有办法在 Java 8 中使用 streams
来实现这一点。
英文:
I have a use case in my Spring boot application as follows:
I would like to fetch the id
field value from the response with the following function:
String id = getIdFromResponse(response);
If I don't get any id in the response, then I check if the id
field is present in the request argument with the following function:
String id = getIdFromRequest(request);
As of now, I am invoking them sequentially. But I would like to make these two functions run parallelly, I would like to stop as soon as I get an id from either of them.
I am wondering if there is any way to implement this using streams
in Java 8.
答案1
得分: 8
你可以使用类似这样的代码:
String id = Stream.<Supplier<String>>of(
() -> getIdFromResponse(response),
() -> getIdFromRequest(request)
)
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.findFirst()
.orElseThrow():
这些供应商是必需的,因为当您不使用它们时,两个请求仍然按顺序执行。
我还假设您的方法在未找到内容时返回null
,所以我需要使用.filter(Objects::nonNull)
来过滤掉这些值。
根据您的用例,您可以将.orElseThrow()
替换为不同的内容,比如.orElse(null)
。
英文:
You can use something like this:
String id = Stream.<Supplier<String>>of(
() -> getIdFromResponse(response),
() -> getIdFromRequest(request)
)
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.findFirst()
.orElseThrow():
The suppliers are needed, because when you don't use them, then both requests are still executed sequentially.
I also assumed that your methods return null
when nothing is found, so I had to filter these values out with .filter(Objects::nonNull)
.
Depending on your use case, you can replace .orElseThrow()
with something different, like .orElse(null)
答案2
得分: 6
只要存在一种适用于此情况的方法,就不需要使用流(Stream) API。
ExecutorService::invokeAny(Collection<? extends Callable<T>>)
> 执行给定的任务,返回已成功完成的任务的结果(即未抛出异常的任务结果),如果有的话。在正常或异常返回时,未完成的任务将被取消。
List<Callable<String>> collection = Arrays.asList(
() -> 从响应中获取ID(response),
() -> 从请求中获取ID(request)
);
// 您希望有与集合大小相同的线程数
ExecutorService executorService = Executors.newFixedThreadPool(collection.size());
String id = executorService.invokeAny(collection);
三个注意事项:
- 也有一个带有超时的重载方法,如果在超时时间内没有结果可用,则会抛出
TimeoutException
:invokeAny(Collection<? extends Callable<T>>, long, TimeUnit)
- 您需要处理
invokeAny
方法引发的ExecutionException
和InterruptedException
。 - 完成后不要忘记关闭服务。
英文:
There is no need to use Stream API as long as there exists a method exactly for this.
ExecutorService::invokeAny(Collection<? extends Callable<T>>)
> Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do. Upon normal or exceptional return, tasks that have not completed are cancelled.
List<Callable<String>> collection = Arrays.asList(
() -> getIdFromResponse(response),
() -> getIdFromRequest(request)
);
// you want the same number of threads as the size of the collection
ExecutorService executorService = Executors.newFixedThreadPool(collection.size());
String id = executorService.invokeAny(collection);
Three notes:
- There is also an overloaded method with timeout throwing
TimeoutException
if no result is available in time:invokeAny(Collection<? extends Callable<T>>, long, TimeUnit)
- You need to handle
ExecutionException
andInterruptedException
from theinvokeAny
method. - Don't forget to close the service once you are done
答案3
得分: 3
如果您想完全控制何时启用替代评估,您可以使用 CompletableFuture
:
CompletableFuture<String> job
= CompletableFuture.supplyAsync(() -> getIdFromResponse(response));
String id;
try {
id = job.get(300, TimeUnit.MILLISECONDS);
}
catch(TimeoutException ex) {
// 在指定时间内未响应,设置替代方案
id = job.applyToEither(
CompletableFuture.supplyAsync(() -> getIdFromRequest(request)), s -> s).join();
}
catch(InterruptedException|ExecutionException ex) {
// 处理错误
}
当第一个作业未在指定时间内完成时,才会提交第二个作业。然后,无论哪个作业先响应,都将提供结果值。
英文:
If you want to be in full control over when to enable the alternative evaluation, you may use CompletableFuture
:
CompletableFuture<String> job
= CompletableFuture.supplyAsync(() -> getIdFromResponse(response));
String id;
try {
id = job.get(300, TimeUnit.MILLISECONDS);
}
catch(TimeoutException ex) {
// did not respond within the specified time, set up alternative
id = job.applyToEither(
CompletableFuture.supplyAsync(() -> getIdFromRequest(request)), s -> s).join();
}
catch(InterruptedException|ExecutionException ex) {
// handle error
}
The second job is only submitted when the first did not complete within the specified time. Then, whichever job responds first will provide the result value.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论