`SupplyAsync`等待所有`CompletableFutures`完成。

huangapple go评论69阅读模式
英文:

SupplyAsync wait for all CompletableFutures to finish

问题

我在下面运行了一些异步任务,并且需要等待它们全部完成。但奇怪的是,join() 似乎没有强制等待所有任务,代码继续执行而不等待。有没有什么原因导致 join 流没有按预期工作?

CompletableFuture 列表只是一个映射 supplyAsync 的流:

List<Integer> items = Arrays.asList(1, 2, 3);

List<CompletableFuture<Integer>> futures = items
                .stream()
                .map(item -> CompletableFuture.supplyAsync(() ->  {

                    System.out.println("processing");
                    // 在这里进行一些处理
                    return item;

                }))
                .collect(Collectors.toList());

然后我这样等待这些 futures:

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

我使用 futures.forEach(CompletableFuture::join); 能够让等待工作正常进行,但我想知道为什么我的流方法没有生效。

英文:

I'm running some async tasks below and need to wait until they all finish. I'm not sure why but the join() isn't forcing a wait for all tasks and the code continues executing without waiting. Is there a reason why the stream of joins isn't working as expected?

The CompletableFutures list is just a stream that maps supplyAsync

List&lt;Integer&gt; items = Arrays.asList(1, 2, 3);

List&lt;CompletableFuture&lt;Integer&gt;&gt; futures = items
                .stream()
                .map(item -&gt; CompletableFuture.supplyAsync(() -&gt;  {

                    System.out.println(&quot;processing&quot;);
                    // do some processing here
                    return item;

                }))
                .collect(Collectors.toList());

And I wait for the futures like.

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -&gt; futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

I'm able to get the wait working with futures.forEach(CompletableFuture::join); but I wanted to know why my stream approach wasn't working.

答案1

得分: 5

这段代码:

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

并不会等待futures中的所有future完成。它实际上创建了一个新的future,该future会等待所有在futures中的异步执行完成,然后它自身才会完成(但不会阻塞直到所有这些future完成)。当这个allOf future完成时,会执行你的thenApply代码。但是allOf()会立即返回,不会阻塞。

这意味着你代码中的futures.stream().map(CompletableFuture::join).collect(Collectors.toList())只会在所有异步执行完成之后才运行,这违反了你的目的。join()调用会立即返回。但这不是更大的问题。你面临的挑战是allOf().thenApply()不会等待异步执行完成。它只会创建另一个不会阻塞的future。

最简单的解决方案是使用第一个流水线并映射为整数列表:

List<Integer> results = items.stream()
    .map(item -> CompletableFuture.supplyAsync(() -> {

        System.out.println("processing " + item);
        // 在这里进行一些处理
        return item;

    }))
    .collect(Collectors.toList()) // 强制提交所有
    .stream()
    .map(CompletableFuture::join) // 等待每个完成
    .collect(Collectors.toList());

如果你想使用类似于你原来的代码,那么你的第二个代码片段需要更改为:

List<Integer> results = futures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

这是因为CompletableFuture.allOf不会等待,它只是将所有的futures组合成一个新的future,在所有futures完成时该新future完成:

返回一个新的CompletableFuture,在所有给定的CompletableFuture都完成时完成。

或者,你仍然可以使用allOf()join(),然后运行你当前的thenApply()代码:

// 包装的future在所有future都完成时完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
        .join();

// 下面的join()调用会立即返回
List<Integer> result = futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());

最后一条语句中的join()调用会立即返回,因为对包装(allOf())future的join()调用将等待传递给它的所有futures完成。这就是为什么我认为在可以使用第一种方法的情况下没有理由这样做。

英文:

This code:

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -&gt; futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

Does not wait for all futures in futures to complete. What it does is create a new future that will wait for all async executions in futures to complete before it itself completes (but will not block until all those futures complete). When this allOf future completes, then your thenApply code runs. But allOf() will return immediately without blocking.

This means futures.stream().map(CompletableFuture::join).collect(Collectors.toList()) in your code only runs after all async executions complete, which defeats your purpose. The join() calls will all return immediately. But this is not the bigger problem. Your challenge is that allOf().thenApply() will not wait for async executions to complete. It will just create another future that won't block.

The easiest solution is to use the first pipeline and map to an Integer list:

List&lt;Integer&gt; results = items.stream()
	.map(item -&gt; CompletableFuture.supplyAsync(() -&gt; {

		System.out.println(&quot;processing &quot; + item);
		// do some processing here
		return item;

	}))
	.collect(Collectors.toList()) //force-submit all
	.stream()
	.map(CompletableFuture::join) //wait for each
	.collect(Collectors.toList());

If you want to use something like your original code, then your second snippet would have to be changed to:

List&lt;Integer&gt; reuslts = futures.stream()
	.map(CompletableFuture::join)
	.collect(Collectors.toList());

That's because CompletableFuture.allOf does not wait, it just combines all futures into a new one that completes when all complete:

> Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.

Alternatively, you could still use allOf() with join(), then run your current thenApply() code:

//wrapper future completes when all futures have completed
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
		.join(); 

//join() calls below return immediately
List&lt;Integer&gt; result = futures.stream()
		.map(CompletableFuture::join) 
		.collect(Collectors.toList());

The join() calls in the last statement return immediately, becaues the join() call on the wrapper (allOf()) future will have waited for all futures passed to it to complete. This is why I don't see the reason to do this when you can use the first approach.

huangapple
  • 本文由 发表于 2020年10月13日 02:14:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/64323232.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定