如何同时等待多个Mono完成并获取值

huangapple go评论101阅读模式
英文:

How do I wait for multiple Mono's to complete at once and get the value

问题

  1. 类似于 https://stackoverflow.com/questions/49396692/waiting-for-running-reactor-mono-instances-to-complete 的问题,但我希望在另一个 Mono 中获得结果。以下是我的代码。我尝试了使用 materialize 的方法,但效果不佳。
  2. ```java
  3. @GetMapping("/bounced")
  4. public Mono<Map<String, Object>> bounced(
  5. @RequestHeader("X-B3-Traceid") String traceId,
  6. @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
  7. ) {
  8. final Mono<Map<String, Object>> sample = webClient.get()
  9. .uri("http://sample:8080/")
  10. .header(HttpHeaders.AUTHORIZATION, authorization)
  11. .retrieve()
  12. .bodyToMono(new ParameterizedTypeReference<>() {
  13. });
  14. final Mono<Map<String, Object>> httpGet = webClient.get()
  15. .uri("http://httpbin.org/get")
  16. .retrieve()
  17. .bodyToMono(new ParameterizedTypeReference<>() {
  18. });
  19. final Mono<Map<String, Object>> anything = webClient.get()
  20. .uri("http://httpbin.org/anything/foo")
  21. .retrieve()
  22. .bodyToMono(new ParameterizedTypeReference<>() {
  23. });
  24. /*
  25. 尝试了下面的方法,它会启动执行,但会在返回块中触发另一个“download”。
  26. Mono.when(anything, sample, httpGet)
  27. .subscribe();
  28. .materialize()
  29. .block();
  30. */
  31. return Mono.just(Map.of("traceFromBounced", traceId,
  32. "anything", anything.block(),
  33. "sample", sample.block(),
  34. "httpGet", httpGet.block()));
英文:

Similar in question to https://stackoverflow.com/questions/49396692/waiting-for-running-reactor-mono-instances-to-complete but I want to get the result ideally in another Mono. Here's the code I have. I tried the materialize solution but that didn't pan out.

  1. @GetMapping(&quot;/bounced&quot;)
  2. public Mono&lt;Map&lt;String, Object&gt;&gt; bounced(
  3. @RequestHeader(&quot;X-B3-Traceid&quot;) String traceId,
  4. @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
  5. ) {
  6. final Mono&lt;Map&lt;String, Object&gt;&gt; sample = webClient.get()
  7. .uri(&quot;http://sample:8080/&quot;)
  8. .header(HttpHeaders.AUTHORIZATION, authorization)
  9. .retrieve()
  10. .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
  11. });
  12. final Mono&lt;Map&lt;String, Object&gt;&gt; httpGet = webClient.get()
  13. .uri(&quot;http://httpbin.org/get&quot;)
  14. .retrieve()
  15. .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
  16. });
  17. final Mono&lt;Map&lt;String, Object&gt;&gt; anything = webClient.get()
  18. .uri(&quot;http://httpbin.org/anything/foo&quot;)
  19. .retrieve()
  20. .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
  21. });
  22. /*
  23. Tried this and it does start it up, but it triggers another &quot;download&quot; in the return block.
  24. Mono.when(anything, sample, httpGet)
  25. .subscribe();
  26. .materialize()
  27. .block();
  28. */
  29. return Mono.just(Map.of(&quot;traceFromBounced&quot;, traceId,
  30. &quot;anything&quot;, anything.block(),
  31. &quot;sample&quot;, sample.block(),
  32. &quot;httpGet&quot;, httpGet.block()));

答案1

得分: 5

根据 @K.Nicholas 的评论,我让它正常运行了:

  1. @GetMapping("/bounced")
  2. public Mono<Map<String, Object>> bounced(
  3. @RequestHeader("X-B3-Traceid") String traceId,
  4. @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
  5. ) {
  6. final Mono<Map<String, Object>> sample = webClient.get()
  7. .uri("http://sample:8080/")
  8. .header(HttpHeaders.AUTHORIZATION, authorization)
  9. .retrieve()
  10. .bodyToMono(new ParameterizedTypeReference<>() {
  11. });
  12. final Mono<Map<String, Object>> httpGet = webClient.get()
  13. .uri("http://httpbin.org/get")
  14. .retrieve()
  15. .bodyToMono(new ParameterizedTypeReference<>() {
  16. });
  17. final Mono<Map<String, Object>> anything = webClient.get()
  18. .uri("http://httpbin.org/anything/foo")
  19. .retrieve()
  20. .bodyToMono(new ParameterizedTypeReference<>() {
  21. });
  22. return Mono.zip(anything, sample, httpGet)
  23. .map(t -> Map.of("traceFromBounced", traceId,
  24. "anything", t.getT1(),
  25. "sample", t.getT2(),
  26. "httpGet", t.getT3()));
  27. }

这里是 Zipkin 的输出,显示它并行运行:
如何同时等待多个Mono完成并获取值

英文:

Based on @K.Nicholas' comment I got it working

  1. @GetMapping(&quot;/bounced&quot;)
  2. public Mono&lt;Map&lt;String, Object&gt;&gt; bounced(
  3. @RequestHeader(&quot;X-B3-Traceid&quot;) String traceId,
  4. @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
  5. ) {
  6. final Mono&lt;Map&lt;String, Object&gt;&gt; sample = webClient.get()
  7. .uri(&quot;http://sample:8080/&quot;)
  8. .header(HttpHeaders.AUTHORIZATION, authorization)
  9. .retrieve()
  10. .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
  11. });
  12. final Mono&lt;Map&lt;String, Object&gt;&gt; httpGet = webClient.get()
  13. .uri(&quot;http://httpbin.org/get&quot;)
  14. .retrieve()
  15. .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
  16. });
  17. final Mono&lt;Map&lt;String, Object&gt;&gt; anything = webClient.get()
  18. .uri(&quot;http://httpbin.org/anything/foo&quot;)
  19. .retrieve()
  20. .bodyToMono(new ParameterizedTypeReference&lt;&gt;() {
  21. });
  22. return Mono.zip(anything, sample, httpGet)
  23. .map(t -&gt; Map.of(&quot;traceFromBounced&quot;, traceId,
  24. &quot;anything&quot;, t.getT1(),
  25. &quot;sample&quot;, t.getT2(),
  26. &quot;httpGet&quot;, t.getT3()));
  27. }

Here's the zipkin output that shows it runs in parallel
如何同时等待多个Mono完成并获取值

huangapple
  • 本文由 发表于 2020年5月4日 10:53:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/61584447.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定