英文:
Using CompletableFuture in a Loop with Two Futures to Merge per Loop Iteration
问题
I have code like the following:
testMethod(List<String> ids) {
List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
for(String id : ids) {
CompletableFuture<ResultOne> resultOne = AynchOne(id);
CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b));
resultThreeList.add(resultThree);
}
// PROCESS RESULTS HERE
}
class ResultOne {
boolean goodResult;
String id;
ResultOne(String promId) {
this.goodResult = true;
this.id = promId;
}
}
class ResultTwo {
boolean goodResult;
String id;
ResultTwo(String promId) {
this.goodResult = true;
this.id = promId;
}
class ResultThree() {
boolean goodResult;
String = id;
}
private ResultThree computeCombinedResultThree(ResultOne r1, ResultTwo r2) {
ResultThree resultThree = new ResultThree();
resultThree.id = r1.id;
resultThree.goodResult = r1.goodResult && r2.goodResult;
return resultThree;
}
I need to be able to AND the results resultOne and resultTwo together, such that for each iteration, on the completion of the entire synchronous execution, I have an (I guess) array or map that I can subsequently process, where one object in the array has the corresponding id and a true or false for that id (that represents the AND-ing of the two booleans from the separate objects.
Based on feedback from readers, I have completed the code to merge the two original futures and combine all the results from each iteration to get the entire loop of futures. At this point, I just need to process the results.
I think maybe I need another CompletableFuture? This one would maybe be something like this (put above where I have "// PROCESS RESULTS HERE"):
CompletableFuture<Void> future = resultThreeList
.thenRun(() -> forwardSuccesses(resultThreeList));
future.get();
forwardSuccesses()
would iterate through resultThreeList
forwarding the successful ids to another process, but I'm not sure if that's the correct approach. Grateful for any ideas. Thanks.
英文:
I have code like the following:
testMethod(List<String> ids) {
List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
for(String id : ids) {
CompletableFuture<ResultOne> resultOne = AynchOne(id);
CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b));
resultThreeList.add(resultThree);
}
// PROCESS RESULTS HERE
}
class ResultOne {
boolean goodResult;
String id;
ResultOne(String promId) {
this.goodResult = true;
this.id = promId;
}
}
class ResultTwo {
boolean goodResult;
String id;
ResultTwo(String promId) {
this.goodResult = true;
this.id = promId;
}
class ResultThree() {
boolean goodResult;
String = id;
}
private ResultThree computeCombinedResultThree(ResultOne r1, ResultTwo r2) {
ResultThree resultThree = new ResultThree();
resultThree.id = r1.id;
resultThree.goodResult = r1.goodResult && r2.goodResult;
return resultThree;
}
, I need to be able to AND the results resultOne and resultTwo together, such that for each iteration, on the completion of the entire synchronous execution, I have an (I guess) array or map that I can subsequently process, where one object in the array has the corresponding id and a true or false for that id (that represents the AND-ing of the two booleans from the separate objects.
Based on feedback from readers, I have gotten the code completed to the point where I can merge the two original futures, and combine all the results from each iteration to get the entire loop of futures. At this point I just need to process the results.
I think maybe I need another CompletableFuture? This one would maybe be something like this (put above where I have "// PROCESS RESULTS HERE"):
CompletableFuture<Void> future = resultThreeList
.thenRun(() -> forwardSuccesses(resultThreeList));
future.get();
forwardSuccesses() would iterate through resultThreeList forwarding the successful ids to another process, but not sue that is how to do it.
Grateful for any ideas. Thanks.
答案1
得分: 0
以下是翻译好的内容:
现在你已经完成了这么远:
List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
CompletableFuture<ResultOne> resultOne = aynchOne(id);
CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);
CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, this::computeCombinedResultThree);
resultThreeList.add(resultThree);
}
现在你需要做的是将这个 List<CompletableFuture<ResultThree>>
转换成一个在所有结果计算完成后完成的 CompletableFuture<List<ResultThree>>
:
CompletableFuture<List<ResultThree>> combinedCompletables =
CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> resultThreeList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
或者使用类似下面的方式:
CompletableFuture<List<ResultThree>> combinedCompletables =
CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));
其中 safeGet
是一个方法,只调用了 future.get()
并捕获可能发生的异常 - 你不能在 lambda 中直接调用 get()
,因为会抛出异常。
现在你可以使用 thenAccept()
处理这个列表:
try {
combinedCompletables.thenAccept(this::forwardSuccesses).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
再次说明,捕获的异常是由于调用了 get()
。顺便说一下,我真的看不出为什么会有三个结果类,因为在代码的这一部分,你只需要 id 和结果状态。我会为此引入一个接口(Result
?),然后只在该接口上进行操作。
英文:
So this is how far you got until now:
List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
CompletableFuture<ResultOne> resultOne = aynchOne(id);
CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);
CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, this::computeCombinedResultThree);
resultThreeList.add(resultThree);
}
Now all you need to do is convert this List<CompletableFuture<ResultThree>>
to a CompletableFuture<List<ResultThree>>
that will get completed once all the results are finished calculating.
CompletableFuture<List<ResultThree>> combinedCompletables =
CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> resultThreeList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
Or with something like
CompletableFuture<List<ResultThree>> combinedCompletables =
CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));
where safeGet
is a method that just calls future.get()
and catches the exceptions that may occur - you can't just call get()
in a lambda because of those exceptions.
Now you can process this list with thenAccept()
:
try {
combinedCompletables.thenAccept(this::forwardSuccesses).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
Again, the exceptions being caught are due to the call to get()
.
Side note, I don't really see why there are three result classes since all you need - for this part of the code at least - is the id and the result status. I'd introduce an interface (Result
?) for that and only work on that.
答案2
得分: 0
在我看来,您似乎不需要三个不同的ResultOne
、ResultTwo
、ResultThree
类,因为它们定义了相同的类型,所以我将把它们替换为Result
。
假设您只想转发成功的结果,我在Result
类中添加了一个简短的isGoodResult()
方法,可用作流中的谓词:
class Result {
public boolean goodResult;
public String id;
// ...
public boolean isGoodResult() {
return this.goodResult;
}
}
我还建议摆脱循环,使用流来使您的代码更加流畅。
如果forwardSuccess
是严格的,接受List<Result>
,那么我会这样实现testMethod
:
void testMethod(List<String> ids) {
final List<Result> results = ids.stream()
.parallel()
.map(id -> asynchOne(id).thenCombine(
asynchTwo(id),
(r1, r2) -> computeCombinedResult(r1, r2)))
.map(CompletableFuture::join)
.filter(Result::isGoodResult)
.collect(Collectors.toList());
// 在这里处理结果
forwardSuccesses(results);
}
如果forwardSuccess
是惰性的,接受CompletableFuture<List<Result>>
:
void testMethod(List<String> ids) {
final List<CompletableFuture<Result>> futures = ids.stream()
.parallel()
.map(id -> asynchOne(id).thenCombine(
asynchTwo(id),
(r1, r2) -> computeCombinedResult(r1, r2)))
.collect(Collectors.toList());
final CompletableFuture<List<Result>> asyncResults =
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
.thenApply(__ -> futures
.stream()
.map(CompletableFuture::join)
.filter(Result::isGoodResult)
.collect(Collectors.toList()));
// 在这里处理结果
forwardSuccessesAsync(asyncResults);
}
英文:
It seems to me you don't need 3 different ResultOne
ResultTwo
ResultThree
classes as they define the same type, so I shall replace them for Result
.
Assuming you want to forward only successes, I added a short isGoodResult()
method to the Result
class to be used as predicate with the streams:
class Result {
public boolean goodResult;
public String id;
// ...
public boolean isGoodResult() {
return this.goodResult;
}
}
I'd also recommend getting rid of the loop, replacing it for a stream to make your code more fluid.
Should forwardSuccess
be strict, accepting List<Result>
, this is how I'd implement testMethod
:
void testMethod(List<String> ids) {
final List<Result> results = ids.stream()
.parallel()
.map(id -> asynchOne(id).thenCombine(
asynchTwo(id),
(r1, r2) -> computeCombinedResult(r1, r2)))
.map(CompletableFuture::join)
.filter(Result::isGoodResult)
.collect(Collectors.toList());
// PROCESS RESULTS HERE
forwardSuccesses(results);
}
Should forwardSuccess
be lazy, accepting CompletableFuture<List<Result>>
:
void testMethod(List<String> ids) {
final List<CompletableFuture<Result>> futures = ids.stream()
.parallel()
.map(id -> asynchOne(id).thenCombine(
asynchTwo(id),
(r1, r2) -> computeCombinedResult(r1, r2)))
.collect(Collectors.toList());
final CompletableFuture<List<Result>> asyncResults =
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
.thenApply(__ -> futures
.stream()
.map(CompletableFuture::join)
.filter(Result::isGoodResult)
.collect(Collectors.toList()));
// PROCESS RESULTS HERE
forwardSuccessesAsync(asyncResults);
}
答案3
得分: -2
在循环内部,你立即获得了CompletableFuture的返回值。在后台会发生一些神奇的事情,你希望等待直到两者都完成。
因此,在两个CompletableFuture都返回之后,通过使用带有长整型值和时间单位的CompletableFuture.get来引发阻塞等待。如果你只调用get而没有任何参数,你将会永远等待。
明智地选择你的超时时间和JDK版本。可能会出现JDK 8没有提供带有超时的get方法的情况。而且,JDK 8已经不再受到支持。JDK 11现在是长期支持版本,最近的编译器不再将JDK 8作为目标。
我强烈建议你阅读关于CompletableFuture的详细信息,以及它与Future的区别,尤其是关于线程控制和取消操作方面的内容。由于我不知道CompletableFuture的底层提供者是谁,我还假设查询单个ID会浪费资源,并且吞吐量非常有限。但这是一个单独的问题。
英文:
Within the for loop you get immediately the CompletableFutures in return. In the background some magic happens and you want to wait until both are complete.
So after both CompletableFutures were returned, cause a blocking wait by invoking CompletableFuture.get with a long value and a time unit. If you only invoke get without any parameters you'll wait forever.
Choose your timeout and JDK wisely. It might happen, that JDK 8 doesn't provide a get with timeout. Also JDK 8 isn't supported anymore. JDK 11 is now long term support and recent compilers don't offer JDK 8 as a target anymore.
I really urge you to read the dirty details about CompletableFuture and how it differs from Future, esp. regarding thread control like cancellation. Not knowing the underlying provider of CompletableFuture I also assume querying one ID is waste of ressources and throughput is very limited. But this is a separate question.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论