如何同时等待多个Mono完成并获取值

huangapple go评论62阅读模式
英文:

How do I wait for multiple Mono's to complete at once and get the value

问题

类似于 https://stackoverflow.com/questions/49396692/waiting-for-running-reactor-mono-instances-to-complete 的问题,但我希望在另一个 Mono 中获得结果。以下是我的代码。我尝试了使用 materialize 的方法,但效果不佳。

```java
    @GetMapping("/bounced")
    public Mono<Map<String, Object>> bounced(
        @RequestHeader("X-B3-Traceid") String traceId,
        @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
    ) {

        final Mono<Map<String, Object>> sample = webClient.get()
            .uri("http://sample:8080/")
            .header(HttpHeaders.AUTHORIZATION, authorization)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

        final Mono<Map<String, Object>> httpGet = webClient.get()
            .uri("http://httpbin.org/get")
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

        final Mono<Map<String, Object>> anything = webClient.get()
            .uri("http://httpbin.org/anything/foo")
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

/*
   尝试了下面的方法,它会启动执行,但会在返回块中触发另一个“download”。

        Mono.when(anything, sample, httpGet)
            .subscribe();
            .materialize()
            .block();
*/
        return Mono.just(Map.of("traceFromBounced", traceId,
            "anything", anything.block(),
            "sample", sample.block(),
            "httpGet", httpGet.block()));
英文:

Similar in question to https://stackoverflow.com/questions/49396692/waiting-for-running-reactor-mono-instances-to-complete but I want to get the result ideally in another Mono. Here's the code I have. I tried the materialize solution but that didn't pan out.

    @GetMapping(&quot;/bounced&quot;)
    public Mono&lt;Map&lt;String, Object&gt;&gt; bounced(
        @RequestHeader(&quot;X-B3-Traceid&quot;) String traceId,
        @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
    ) {

        final Mono&lt;Map&lt;String, Object&gt;&gt; sample = webClient.get()
            .uri(&quot;http://sample:8080/&quot;)
            .header(HttpHeaders.AUTHORIZATION, authorization)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
            });

        final Mono&lt;Map&lt;String, Object&gt;&gt; httpGet = webClient.get()
            .uri(&quot;http://httpbin.org/get&quot;)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
            });

        final Mono&lt;Map&lt;String, Object&gt;&gt; anything = webClient.get()
            .uri(&quot;http://httpbin.org/anything/foo&quot;)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
            });

/*
   Tried this and it does start it up, but it triggers another &quot;download&quot; in the return block.

        Mono.when(anything, sample, httpGet)
            .subscribe();
            .materialize()
            .block();
*/
        return Mono.just(Map.of(&quot;traceFromBounced&quot;, traceId,
            &quot;anything&quot;, anything.block(),
            &quot;sample&quot;, sample.block(),
            &quot;httpGet&quot;, httpGet.block()));

答案1

得分: 5

根据 @K.Nicholas 的评论,我让它正常运行了:

@GetMapping("/bounced")
public Mono<Map<String, Object>> bounced(
    @RequestHeader("X-B3-Traceid") String traceId,
    @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
) {

    final Mono<Map<String, Object>> sample = webClient.get()
        .uri("http://sample:8080/")
        .header(HttpHeaders.AUTHORIZATION, authorization)
        .retrieve()
        .bodyToMono(new ParameterizedTypeReference<>() {
        });

    final Mono<Map<String, Object>> httpGet = webClient.get()
        .uri("http://httpbin.org/get")
        .retrieve()
        .bodyToMono(new ParameterizedTypeReference<>() {
        });

    final Mono<Map<String, Object>> anything = webClient.get()
        .uri("http://httpbin.org/anything/foo")
        .retrieve()
        .bodyToMono(new ParameterizedTypeReference<>() {
        });

    return Mono.zip(anything, sample, httpGet)
        .map(t -> Map.of("traceFromBounced", traceId,
            "anything", t.getT1(),
            "sample", t.getT2(),
            "httpGet", t.getT3()));

}

这里是 Zipkin 的输出,显示它并行运行:
如何同时等待多个Mono完成并获取值

英文:

Based on @K.Nicholas' comment I got it working

    @GetMapping(&quot;/bounced&quot;)
    public Mono&lt;Map&lt;String, Object&gt;&gt; bounced(
        @RequestHeader(&quot;X-B3-Traceid&quot;) String traceId,
        @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
    ) {

        final Mono&lt;Map&lt;String, Object&gt;&gt; sample = webClient.get()
            .uri(&quot;http://sample:8080/&quot;)
            .header(HttpHeaders.AUTHORIZATION, authorization)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
            });

        final Mono&lt;Map&lt;String, Object&gt;&gt; httpGet = webClient.get()
            .uri(&quot;http://httpbin.org/get&quot;)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
            });

        final Mono&lt;Map&lt;String, Object&gt;&gt; anything = webClient.get()
            .uri(&quot;http://httpbin.org/anything/foo&quot;)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
            });

        return Mono.zip(anything, sample, httpGet)
            .map(t -&gt; Map.of(&quot;traceFromBounced&quot;, traceId,
                &quot;anything&quot;, t.getT1(),
                &quot;sample&quot;, t.getT2(),
                &quot;httpGet&quot;, t.getT3()));

    }

Here's the zipkin output that shows it runs in parallel
如何同时等待多个Mono完成并获取值

huangapple
  • 本文由 发表于 2020年5月4日 10:53:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/61584447.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定