The translation is: “写作者使用thenCompose而不是thenComposeAsync的原因是否正确?”

huangapple go评论66阅读模式
英文:

Is the writer's reason correct for using thenCompose and not thenComposeAsync

问题

The writer's idea is generally correct. When deciding between thenCompose and thenComposeAsync, the key consideration is whether the result of the second CompletableFuture depends on the result of the first one. Let me provide a concise translation of the key point:

一般来说,当决定使用thenComposethenComposeAsync时,关键考虑因素是第二个CompletableFuture的结果是否依赖于第一个的结果。

If the second CompletableFuture depends on the result of the first one, then using thenCompose or thenComposeAsync may not make a significant difference in terms of final results or timing. This is because both methods ensure that the dependent task is executed only when the previous one completes, regardless of whether it runs in the same thread or a different thread.

如果第二个CompletableFuture依赖于第一个的结果,那么在最终结果或时间上使用thenComposethenComposeAsync可能不会有明显的差异。这是因为无论它们在同一线程还是不同线程中运行,这两种方法都确保依赖任务仅在前一个任务完成后才执行。

So, in the given examples, if the result of the second CompletableFuture depends on the result of the first one (as it often does in sequential operations), using thenCompose is a reasonable choice. However, if the second task can run independently without depending on the result of the first one, thenComposeAsync might be beneficial for better parallelism and potentially faster execution, as it submits tasks to a thread pool.

因此,在给定的示例中,如果第二个CompletableFuture的结果依赖于第一个的结果(通常在顺序操作中是如此),使用thenCompose是一个合理的选择。然而,如果第二个任务可以独立运行而不依赖于第一个任务的结果,那么使用thenComposeAsync可能有利于更好的并行性和潜在的更快执行,因为它会将任务提交到线程池中。

In summary, the choice between thenCompose and thenComposeAsync depends on the specific requirements of your code and whether the tasks can run independently or have dependencies on each other. The writer's explanation aligns with this principle.

英文:

This question is different from this one https://stackoverflow.com/questions/46130969/difference-between-java8-thencompose-and-thencomposeasync because I want to know what is the writer's reason for using thenCompose and not thenComposeAsync.

I was reading Modern Java in action and I came across this part of code on page 405:

public static List<String> findPrices(String product) {
    ExecutorService executor = Executors.newFixedThreadPool(10);
    List<Shop> shops = Arrays.asList(new Shop(), new Shop());
    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote ->
                    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
            .collect(toList());
    return priceFutures.stream()
            .map(CompletableFuture::join).collect(toList());
}

Everything is Ok and I can understand this code but here is the writer's reason for why he didn't use thenComposeAsync on page 408 which I can't understand:

> In general, a method without the Async suffix in its name executes
> its task in the same threads the previous task, whereas a method
> terminating with Async always submits the succeeding task to the
> thread pool, so each of the tasks can be handled by a
> different thread. In this case, the result of the second
> CompletableFuture depends on the first,so it makes no difference to
> the final result or to its broad-brush timing whether you compose the
> two CompletableFutures with one or the other variant of this method

In my understanding with the thenCompose( and thenComposeAsync) signatures as below:

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(asyncPool, fn);
}

The result of the second CompletableFuture can depends on the previous CompletableFuture in many situations(or rather I can say almost always), should we use thenCompose and not thenComposeAsync in those cases?

What if we have blocking code in the second CompletableFuture?

This is a similar example which was given by person who answered similar question here: https://stackoverflow.com/questions/46130969/difference-between-java8-thencompose-and-thencomposeasync

public CompletableFuture<String> requestData(Quote quote) {
    Request request = blockingRequestForQuote(quote);
    return CompletableFuture.supplyAsync(() -> sendRequest(request));
}

To my mind in this situation using thenComposeAsync can make our program faster because here blockingRequestForQuote can be run on different thread. But based on the writer's opinion we should not use thenComposeAsync because it depends on the first CompletableFuture result(that is Quote).

My question is:

Is the writer's idea correct when he said :

> In this case, the result of the second
> CompletableFuture depends on the first,so it makes no difference to
> the final result or to its broad-brush timing whether you compose the
> two CompletableFutures with one or the other variant of this method

答案1

得分: 15

TL;DR 在这里使用thenCompose而不是thenComposeAsync是正确的,但原因不是引用中提到的。总的来说,不建议将代码示例用作自己代码的模板。

这一章节是Stackoverflow上一个反复出现的话题,原因可以最好地描述为“质量不足”,以礼貌的方式表述。

通常情况下,方法名称中没有Async后缀的方法会在与前一个任务相同的线程中执行其任务,...

但是在规范中并没有对执行线程作出此类保证。文档中说:

  • 针对非异步方法的依赖完成所提供的操作可以由完成当前CompletableFuture的线程执行,也可以由调用完成方法的任何其他调用者执行。

所以也有可能任务由“调用完成方法的任何其他调用者”执行。一个直观的例子是

CompletableFuture<X> f = CompletableFuture.supplyAsync(() -> foo())
    .thenApply(f -> f.bar());

涉及到两个线程。一个调用supplyAsyncthenApply,另一个将调用foo()。如果第二个在线程进入执行thenApply之前完成了对foo()的调用,那么可能未来已经完成。

未来不会记住哪个线程完成了它。它也没有一些魔法的能力来告诉该线程执行动作,尽管它可能正在忙于其他事情,甚至在此之后终止。因此,很明显,在已经完成的未来上调用thenApply不能保证使用完成它的线程。在大多数情况下,它将立即在调用thenApply的线程中执行操作。这在规范中的措辞“完成方法的任何其他调用者”中有所涵盖。

但这还不是故事的结束。正如这个回答解释的那样,当涉及到多于两个线程时,操作也可以由同时调用未来上的不相关完成方法的另一个线程执行。这种情况可能发生得很少,但它在参考实现中是可能的,并且在规范中被允许。

我们可以总结为:没有Async的方法提供了对执行操作的最少控制,甚至可能在调用线程中立即执行,导致同步行为。

因此,当执行线程无关紧要且不希望进行后台线程执行时,它们是最好的选择。

而以Async结尾的方法总是将后续任务提交给线程池,因此每个任务可以由不同的线程处理。在这种情况下,第二个CompletableFuture的结果依赖于第一个,...

当你这样做时

future.thenCompose(quote ->
    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))

涉及到三个未来,所以“第二个”未来并不明确。supplyAsync正在提交一个操作并返回一个未来。提交包含在传递给thenCompose的函数中,它将返回另一个未来。

如果你在这里使用了thenComposeAsync,你只是要求执行supplyAsync的操作必须提交给线程池,而不是直接在完成线程或“完成方法的任何其他调用者”,例如在调用thenCompose的线程中执行它。

关于依赖性的推理在这里毫无意义。“then”总是意味着依赖性。如果你在这里使用thenComposeAsync,你强制提交操作到线程池,但这个提交仍然不会在future完成之前发生。如果future异常完成,提交将根本不会发生。

所以,在这里使用thenCompose合理吗?是的,但不是引用中提到的原因。正如前面所说,使用非异步方法意味着放弃对执行线程的控制,只有在线程无关紧要且最重要的是对短期、非阻塞操作不关心时才应该使用。调用supplyAsync是一种廉价的操作,它将独立地将实际操作提交到线程池,因此可以在任何空闲线程中执行它。

然而,这是一个不必要的复杂化。你可以通过使用

future.thenApplyAsync(quote -> Discount.applyDiscount(quote), executor)

来实现相同的效果,当future完成时将applyDiscount提交给executor,并生成一个表示结果的新未来。在这里不必要地结合使用thenComposesupplyAsync

请注意,这个示例已经在这个问答中讨论过,其中还涉及到未来操作在多个Stream操作之间的不必要分离以及错误的序列图。

英文:

TL;DR It is correct to use thenCompose instead of thenComposeAsync here, but not for the cited reasons. Generally, the code example should not be used as a template for your own code.


This chapter is a recurring topic on Stackoverflow for reasons we can best describe as “insufficient quality”, to stay polite.

> In general, a method without the Async suffix in its name executes its task in the same threads the previous task, …

There is no such guaranty about the executing thread in the specification. The documentation says:

> - Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

So there’s also the possibility that the task is performed “by any other caller of a completion method”. An intuitive example is

CompletableFuture&lt;X&gt; f = CompletableFuture.supplyAsync(() -&gt; foo())
    .thenApply(f -&gt; f.bar());

There are two threads involved. One that invokes supplyAsync and thenApply and the other which will invoke foo(). If the second completes the invocation of foo() before the first thread enters the execution of thenApply, it is possible that the future is already completed.

A future does not remember which thread completed it. Neither does it have some magic ability to tell that thread to perform an action despite it might be busy with something else or even have terminated since then. So it should be obvious that calling thenApply on an already completed future can’t promise to use the thread that completed it. In most cases, it will perform the action immediately in the thread that calls thenApply. This is covered by the specification’s wording “any other caller of a completion method”.

But that’s not the end of the story. As this answer explains, when there are more than two threads involved, the action can also get performed by another thread calling an unrelated completion method on the future at the same time. This may happen rarely, but it’s possible in the reference implementation and permitted by the specification.

We can summarize it as: Methods without Async provides the least control over the thread that will perform the action and may even perform it right in the calling thread, leading to synchronous behavior.

So they are best when the executing thread doesn’t matter and you’re not hoping for background thread execution, i.e. for short, non-blocking operations.

> whereas a method terminating with Async always submits the succeeding task to the thread pool, so each of the tasks can be handled by a different thread. In this case, the result of the second CompletableFuture depends on the first, …

When you do

future.thenCompose(quote -&gt;
    CompletableFuture.supplyAsync(() -&gt; Discount.applyDiscount(quote), executor))

there are three futures involved, so it’s not exactly clear, which future is meant by “second”. supplyAsync is submitting an action and returning a future. The submission is contained in a function passed to thenCompose, which will return another future.

If you used thenComposeAsync here, you only mandated that the execution of supplyAsync has to be submitted to the thread pool, instead of performing it directly in the completing thread or “any other caller of a completion method”, e.g. directly in the thread calling thenCompose.

The reasoning about dependencies makes no sense here. “then” always implies a dependency. If you use thenComposeAsync here, you enforced the submission of the action to the thread pool, but this submission still won’t happen before the completion of future. And if future completed exceptionally, the submission won’t happen at all.

So, is using thenCompose reasonable here? Yes it is, but not for the reasons given is the quote. As said, using the non-async method implies giving up control over the executing thread and should only be used when the thread doesn’t matter, most notably for short, non-blocking actions. Calling supplyAsync is a cheap action that will submit the actual action to the thread pool on its own, so it’s ok to perform it in whatever thread is free to do it.

However, it’s an unnecessary complication. You can achieve the same using

future.thenApplyAsync(quote -&gt; Discount.applyDiscount(quote), executor)

which will do exactly the same, submit applyDiscount to executor when future has been completed and produce a new future representing the result. Using a combination of thenCompose and supplyAsync is unnecessary here.

Note that this very example has been discussed in this Q&A already, which also addresses the unnecessary segregation of the future operations over multiple Stream operations as well as the wrong sequence diagram.

答案2

得分: 1

Holger的回答非常有礼貌!我真的很印象深刻,他能够提供如此出色的解释,同时还能保持不称作者错误的界限。我也想在这里提供我的看法,毕竟我读了同一本书,也曾苦思冥想。

首先,没有“记住”哪个线程执行了哪个阶段,规范也没有这样的声明(如上面已经回答过的)。甚至在上面引用的文档中都有趣的部分:

为非异步方法的依赖完成提供的操作可能由完成当前CompletableFuture的线程执行,也可能由完成方法的任何其他调用者执行。

甚至“完成当前CompletableFuture”部分都有点棘手。如果有两个线程尝试在CompletableFuture上调用complete方法,哪个线程会运行所有的依赖操作?是已经实际完成它的线程吗?还是其他任何线程?我编写了一个非常不直观的jcstress测试,当查看结果时会产生深思熟虑的问题:

@JCStressTest
@State
@Outcome(id = "1, 0", expect = Expect.ACCEPTABLE, desc = "在完成线程中执行")
@Outcome(id = "0, 1", expect = Expect.ACCEPTABLE, desc = "在其他线程中执行")
@Outcome(id = "0, 0", expect = Expect.FORBIDDEN)
@Outcome(id = "1, 1", expect = Expect.FORBIDDEN)
public class CompletableFutureWhichThread1 {

private final CompletableFuture<String> future = new CompletableFuture<>();

public CompletableFutureWhichThread1() {
    future.thenApply(x -> action(Thread.currentThread().getName()));
}

volatile int x = -1; // 不同的默认值,以不干扰预期结果
volatile int y = -1; // 不同的默认值,以不干扰预期结果
volatile int actor1 = 0;
volatile int actor2 = 0;

private String action(String threadName) {
    System.out.println(Thread.currentThread().getName());
    // 与完成future的相同线程执行操作
    if ("actor1".equals(threadName) && actor1 == 1) {
        x = 1;
        return "action";
    }

    // 与完成future的相同线程执行操作
    if ("actor2".equals(threadName) && actor2 == 1) {
        x = 1;
        return "action";
    }

    y = 1;
    return "action";

}

@Actor
public void actor1() {
    Thread.currentThread().setName("actor1");
    boolean completed = future.complete("done-actor1");
    if (completed) {
        actor1 = 1;
    } else {
        actor2 = 1;
    }
}

@Actor
public void actor2() {
    Thread.currentThread().setName("actor2");
    boolean completed = future.complete("done-actor2");
    if (completed) {
        actor2 = 1;
    }
}

@Arbiter
public void arbiter(II_Result result) {
    if (x == 1) {
        result.r1 = 1;
    }

    if (y == 1) {
        result.r2 = 1;
    }

}

}

运行此测试,会看到0, 11, 0都会出现。您不需要深入了解测试本身,但它证明了一个相当有趣的观点。

您对速度的推理有点不准确。当您将工作分派到不同的线程时,通常会为此付出代价。thenComposethenComposeAsync涉及到能够准确预测您的工作将在何处发生。正如您上面所看到的,您无法这样做,除非使用带有线程池的...Async方法。您自然的问题应该是:“我为什么关心它在哪里执行?”。

jdkHttpClient中有一个名为SelectorManager的内部类。从高层来看,它的任务相当简单:它从套接字读取并将“响应”返回给等待HTTP结果的线程。实质上,这是一个唤醒等待某些HTTP数据包的所有相关方的线程。现在想象一下,这个特定的线程在内部执行thenCompose。还想象一下,您的调用链如下所示:

httpClient.sendAsync(() -> ...)
          .thenApply(x -> foo())

其中foo是一个永远不会完成(或需要很长时间才能完成)的方法。由于您无法确定实际执行将在哪个线程中发生,因此它很可能会在SelectorManager线程中执行。这将是一场灾难。因为这个线程现在很忙,所以其他所有HTTP调用都会处于停滞状态。因此,thenComposeAsync允许配置的线程池在需要时执行工作/等待,而SelectorManager线程则可以自由执行其工作。

因此,作者给出的理由是明显错误的。

英文:

What a polite answer from Holger! I am really impressed he could provide such a great explanation and at the same time staying in bounds of not calling the author plain wrong. I want to provide my 0.02$ here too, a little, after reading the same book and having to scratch my head twice.

First of all, there is no "remembering" of which thread executed which stage, neither does the specification make such a statement (as already answered above). The interesting part is even in the cited above documentation:

> Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

Even that ...completes the current CompletableFuture part is tricky. What if there are two threads that try to call complete on a CompletableFuture, which thread will run all the dependent actions? The one that has actually completed it? Or any other? I wrote a jcstress test that is very non-intuitive when looking at the results:

@JCStressTest
@State
@Outcome(id = &quot;1, 0&quot;, expect = Expect.ACCEPTABLE, desc = &quot;executed in completion thread&quot;)
@Outcome(id = &quot;0, 1&quot;, expect = Expect.ACCEPTABLE, desc = &quot;executed in the other thread&quot;)
@Outcome(id = &quot;0, 0&quot;, expect = Expect.FORBIDDEN)
@Outcome(id = &quot;1, 1&quot;, expect = Expect.FORBIDDEN)
public class CompletableFutureWhichThread1 {

    private final CompletableFuture&lt;String&gt; future = new CompletableFuture&lt;&gt;();

    public CompletableFutureWhichThread1() {
        future.thenApply(x -&gt; action(Thread.currentThread().getName()));
    }

    volatile int x = -1; // different default to not mess with the expected result
    volatile int y = -1; // different default to not mess with the expected result
    volatile int actor1 = 0;
    volatile int actor2 = 0;

    private String action(String threadName) {
        System.out.println(Thread.currentThread().getName());
        // same thread that completed future, executed action
        if (&quot;actor1&quot;.equals(threadName) &amp;&amp; actor1 == 1) {
            x = 1;
            return &quot;action&quot;;
        }

        // same thread that completed future, executed action
        if (&quot;actor2&quot;.equals(threadName) &amp;&amp; actor2 == 1) {
            x = 1;
            return &quot;action&quot;;
        }

        y = 1;
        return &quot;action&quot;;

    }

    @Actor
    public void actor1() {
        Thread.currentThread().setName(&quot;actor1&quot;);
        boolean completed = future.complete(&quot;done-actor1&quot;);
        if (completed) {
            actor1 = 1;
        } else {
            actor2 = 1;
        }
    }

    @Actor
    public void actor2() {
        Thread.currentThread().setName(&quot;actor2&quot;);
        boolean completed = future.complete(&quot;done-actor2&quot;);
        if (completed) {
            actor2 = 1;
        }
    }

    @Arbiter
    public void arbiter(II_Result result) {
        if (x == 1) {
            result.r1 = 1;
        }

        if (y == 1) {
            result.r2 = 1;
        }

    }

}

After running this, both 0, 1 and 1, 0 are seen. You do not need to understand very much about the test itself, but it proves a rather interesting point.

You have a CompletableFuture future that has a future.thenApply(x -&gt; action(...)); attached to it. There are two threads (actor1 and actor2) that both, at the same time, compete with each other into completing it (the specification says that only one will be successful). The results show that if actor1 called complete, but does not actually complete the CompletableFuture (actor2 did), it can still do the actual work in action. In other words, a thread that completed a CompletableFuture is not necessarily the thread that executes the dependent actions (those thenApply for example). This was rather interesting for me to find out, though it makes sense.


Your reasonings about speed are a bit off. When you dispatch your work to a different thread, you usually pay a penalty for that. thenCompose vs thenComposeAsync is about being able to predict where exactly is your work going to happen. As you have seen above you can not do that, unless you use the ...Async methods that take a thread pool. Your natural question should be : "Why do I care where it is executed?".

There is an internal class in jdk&#39;s HttpClient called SelectorManager. It has (from a high level) a rather simple task: it reads from a socket and gives "responses" back to the threads that wait for a http result. In essence, this is a thread that wakes up all interested parties that wait for some http packets. Now imagine that this particular thread does internally thenCompose. Now also imagine that your chain of calls looks like this:

 httpClient.sendAsync(() -&gt; ...)
           .thenApply(x -&gt; foo())

where foo is a method that never finishes (or takes a lot of time to finish). Since you have no idea in which thread the actual execution is going to happen, it can, very well, happen in SelectorManager thread. Which would be a disaster. Everyone other http calls would stale, because this thread is busy now. Thus thenComposeAsync: let the configured pool do the work/waiting if needed, while the SelectorManager thread is free to do its work.

So the reasons that the author gives are plain wrong.

huangapple
  • 本文由 发表于 2020年8月2日 22:23:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/63217097.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定