使用CompletableFuture在循环中,每次迭代使用两个Future进行合并。

huangapple go评论77阅读模式
英文:

Using CompletableFuture in a Loop with Two Futures to Merge per Loop Iteration

问题

I have code like the following:

testMethod(List<String> ids)  {
    List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
    
    for(String id : ids) {
        CompletableFuture<ResultOne> resultOne = AynchOne(id);
        CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
        
        CompletableFuture<ResultThree> resultThree =  resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b)); 
        
        resultThreeList.add(resultThree);
    }
    // PROCESS RESULTS HERE
}

class ResultOne {
    boolean goodResult;
    String id;

    ResultOne(String promId) {
        this.goodResult = true;
        this.id = promId;
    }
}

class ResultTwo {
    boolean goodResult;
    String id;

    ResultTwo(String promId) {
        this.goodResult = true;
        this.id = promId;
    }

class ResultThree() {
        boolean goodResult;
        String = id;
    }

private ResultThree computeCombinedResultThree(ResultOne r1,  ResultTwo r2) { 
   ResultThree resultThree = new ResultThree();
    resultThree.id = r1.id;
    resultThree.goodResult = r1.goodResult && r2.goodResult;

    return resultThree;
}

I need to be able to AND the results resultOne and resultTwo together, such that for each iteration, on the completion of the entire synchronous execution, I have an (I guess) array or map that I can subsequently process, where one object in the array has the corresponding id and a true or false for that id (that represents the AND-ing of the two booleans from the separate objects.

Based on feedback from readers, I have completed the code to merge the two original futures and combine all the results from each iteration to get the entire loop of futures. At this point, I just need to process the results.

I think maybe I need another CompletableFuture? This one would maybe be something like this (put above where I have "// PROCESS RESULTS HERE"):

CompletableFuture<Void> future = resultThreeList
    .thenRun(() -> forwardSuccesses(resultThreeList));

future.get();

forwardSuccesses() would iterate through resultThreeList forwarding the successful ids to another process, but I'm not sure if that's the correct approach. Grateful for any ideas. Thanks.

英文:

I have code like the following:

testMethod(List&lt;String&gt; ids)  {
    List&lt;CompletableFuture&lt;ResultThree&gt;&gt; resultThreeList = new ArrayList&lt;&gt;();
    
    for(String id : ids) {
        CompletableFuture&lt;ResultOne&gt; resultOne = AynchOne(id);
        CompletableFuture&lt;ResultTwo&gt; resultTwo = AynchTwo(id);
        
    CompletableFuture&lt;ResultThree&gt; resultThree =  resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -&gt; computeCombinedResultThree(a, b)); 
    
    resultThreeList.add(resultThree);
    }
    // PROCESS RESULTS HERE
}

class ResultOne {
    boolean goodResult;
    String id;

    ResultOne(String promId) {
        this.goodResult = true;
        this.id = promId;
    }
}

class ResultTwo {
    boolean goodResult;
    String id;

    ResultTwo(String promId) {
        this.goodResult = true;
        this.id = promId;
    }

class ResultThree() {
        boolean goodResult;
        String = id;
    }

private ResultThree computeCombinedResultThree(ResultOne r1,  ResultTwo r2) { 
   ResultThree resultThree = new ResultThree();
    resultThree.id = r1.id;
    resultThree.goodResult = r1.goodResult &amp;&amp; r2.goodResult;

    return resultThree;
}

, I need to be able to AND the results resultOne and resultTwo together, such that for each iteration, on the completion of the entire synchronous execution, I have an (I guess) array or map that I can subsequently process, where one object in the array has the corresponding id and a true or false for that id (that represents the AND-ing of the two booleans from the separate objects.

Based on feedback from readers, I have gotten the code completed to the point where I can merge the two original futures, and combine all the results from each iteration to get the entire loop of futures. At this point I just need to process the results.

I think maybe I need another CompletableFuture? This one would maybe be something like this (put above where I have "// PROCESS RESULTS HERE"):

CompletableFuture&lt;Void&gt; future = resultThreeList
  .thenRun(() -&gt; forwardSuccesses(resultThreeList));

future.get();

forwardSuccesses() would iterate through resultThreeList forwarding the successful ids to another process, but not sue that is how to do it.
Grateful for any ideas. Thanks.

答案1

得分: 0

以下是翻译好的内容:

现在你已经完成了这么远:

List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
    CompletableFuture<ResultOne> resultOne = aynchOne(id);
    CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);

    CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, this::computeCombinedResultThree);
    resultThreeList.add(resultThree);
}

现在你需要做的是将这个 List<CompletableFuture<ResultThree>> 转换成一个在所有结果计算完成后完成的 CompletableFuture<List<ResultThree>>

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
                .thenApply(v -> resultThreeList.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList())
                );

或者使用类似下面的方式:

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));

其中 safeGet 是一个方法,只调用了 future.get() 并捕获可能发生的异常 - 你不能在 lambda 中直接调用 get(),因为会抛出异常。

现在你可以使用 thenAccept() 处理这个列表:

try {
    combinedCompletables.thenAccept(this::forwardSuccesses).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

再次说明,捕获的异常是由于调用了 get()。顺便说一下,我真的看不出为什么会有三个结果类,因为在代码的这一部分,你只需要 id 和结果状态。我会为此引入一个接口(Result?),然后只在该接口上进行操作。

英文:

So this is how far you got until now:

List&lt;CompletableFuture&lt;ResultThree&gt;&gt; resultThreeList = new ArrayList&lt;&gt;(ids.size());
for (String id : ids) {
    CompletableFuture&lt;ResultOne&gt; resultOne = aynchOne(id);
    CompletableFuture&lt;ResultTwo&gt; resultTwo = aynchTwo(id);

    CompletableFuture&lt;ResultThree&gt; resultThree = resultOne.thenCombine(resultTwo, this::computeCombinedResultThree);
    resultThreeList.add(resultThree);
}

Now all you need to do is convert this List&lt;CompletableFuture&lt;ResultThree&gt;&gt; to a CompletableFuture&lt;List&lt;ResultThree&gt;&gt; that will get completed once all the results are finished calculating.

CompletableFuture&lt;List&lt;ResultThree&gt;&gt; combinedCompletables =
        CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture&lt;?&gt;[0]))
                .thenApply(v -&gt; resultThreeList.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList())
                );

Or with something like

CompletableFuture&lt;List&lt;ResultThree&gt;&gt; combinedCompletables =
        CompletableFuture.supplyAsync(() -&gt; resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));

where safeGet is a method that just calls future.get() and catches the exceptions that may occur - you can't just call get() in a lambda because of those exceptions.

Now you can process this list with thenAccept():

try {
    combinedCompletables.thenAccept(this::forwardSuccesses).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

Again, the exceptions being caught are due to the call to get().

Side note, I don't really see why there are three result classes since all you need - for this part of the code at least - is the id and the result status. I'd introduce an interface (Result?) for that and only work on that.

答案2

得分: 0

在我看来,您似乎不需要三个不同的ResultOneResultTwoResultThree类,因为它们定义了相同的类型,所以我将把它们替换为Result

假设您只想转发成功的结果,我在Result类中添加了一个简短的isGoodResult()方法,可用作流中的谓词:

class Result {
    public boolean goodResult;
    public String id;
    // ...
    public boolean isGoodResult() {
        return this.goodResult;
    }
}

我还建议摆脱循环,使用流来使您的代码更加流畅。

如果forwardSuccess严格的,接受List<Result>,那么我会这样实现testMethod

void testMethod(List<String> ids)  {
    final List<Result> results = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -> computeCombinedResult(r1, r2)))
        .map(CompletableFuture::join)
        .filter(Result::isGoodResult)
        .collect(Collectors.toList());

    // 在这里处理结果
    forwardSuccesses(results);
}

如果forwardSuccess惰性的,接受CompletableFuture<List<Result>>

void testMethod(List<String> ids)  {
    final List<CompletableFuture<Result>> futures = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -> computeCombinedResult(r1, r2)))
        .collect(Collectors.toList());

    final CompletableFuture<List<Result>> asyncResults =
    CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
            .thenApply(__ -> futures 
                    .stream()
                    .map(CompletableFuture::join)
                    .filter(Result::isGoodResult)
                    .collect(Collectors.toList()));

    // 在这里处理结果
    forwardSuccessesAsync(asyncResults);
}
英文:

It seems to me you don't need 3 different ResultOne ResultTwo ResultThree classes as they define the same type, so I shall replace them for Result.

Assuming you want to forward only successes, I added a short isGoodResult() method to the Result class to be used as predicate with the streams:

class Result {
    public boolean goodResult;
    public String id;
// ...
    public boolean isGoodResult() {
        return this.goodResult;
    }
}

I'd also recommend getting rid of the loop, replacing it for a stream to make your code more fluid.

Should forwardSuccess be strict, accepting List&lt;Result&gt;, this is how I'd implement testMethod:

void testMethod(List&lt;String&gt; ids)  {
    final List&lt;Result&gt; results = ids.stream()
        .parallel()
        .map(id -&gt; asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -&gt; computeCombinedResult(r1, r2)))
        .map(CompletableFuture::join)
        .filter(Result::isGoodResult)
        .collect(Collectors.toList());

    // PROCESS RESULTS HERE
    forwardSuccesses(results);
}

Should forwardSuccess be lazy, accepting CompletableFuture&lt;List&lt;Result&gt;&gt;:

void testMethod(List&lt;String&gt; ids)  {
    final List&lt;CompletableFuture&lt;Result&gt;&gt; futures = ids.stream()
        .parallel()
        .map(id -&gt; asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -&gt; computeCombinedResult(r1, r2)))
        .collect(Collectors.toList());

    final CompletableFuture&lt;List&lt;Result&gt;&gt; asyncResults =
    CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
            .thenApply(__ -&gt; futures 
                    .stream()
                    .map(CompletableFuture::join)
                    .filter(Result::isGoodResult)
                    .collect(Collectors.toList()));

    // PROCESS RESULTS HERE
    forwardSuccessesAsync(asyncResults);
}

答案3

得分: -2

在循环内部,你立即获得了CompletableFuture的返回值。在后台会发生一些神奇的事情,你希望等待直到两者都完成。

因此,在两个CompletableFuture都返回之后,通过使用带有长整型值和时间单位的CompletableFuture.get来引发阻塞等待。如果你只调用get而没有任何参数,你将会永远等待。

明智地选择你的超时时间和JDK版本。可能会出现JDK 8没有提供带有超时的get方法的情况。而且,JDK 8已经不再受到支持。JDK 11现在是长期支持版本,最近的编译器不再将JDK 8作为目标。

我强烈建议你阅读关于CompletableFuture的详细信息,以及它与Future的区别,尤其是关于线程控制和取消操作方面的内容。由于我不知道CompletableFuture的底层提供者是谁,我还假设查询单个ID会浪费资源,并且吞吐量非常有限。但这是一个单独的问题。

英文:

Within the for loop you get immediately the CompletableFutures in return. In the background some magic happens and you want to wait until both are complete.

So after both CompletableFutures were returned, cause a blocking wait by invoking CompletableFuture.get with a long value and a time unit. If you only invoke get without any parameters you'll wait forever.

Choose your timeout and JDK wisely. It might happen, that JDK 8 doesn't provide a get with timeout. Also JDK 8 isn't supported anymore. JDK 11 is now long term support and recent compilers don't offer JDK 8 as a target anymore.

I really urge you to read the dirty details about CompletableFuture and how it differs from Future, esp. regarding thread control like cancellation. Not knowing the underlying provider of CompletableFuture I also assume querying one ID is waste of ressources and throughput is very limited. But this is a separate question.

huangapple
  • 本文由 发表于 2020年10月1日 01:51:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/64143157.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定