连接一个 Flux 到一个 Mono

huangapple go评论63阅读模式
英文:

Connecting a Flux to a Mono

问题

我有一个Flux<T>,它返回多个T。在继续进行Mono之前,我需要等待所有项目从我的Flux中发出。当我只使用Monos进行类似操作时,我能够将它们一起压缩(使用zipWhen())以实现这种效果。目前,我正在设置并在Flux的doOnComplete()中执行第二组操作,但这似乎不是最佳解决方案。

请注意,我对Flux和Mono相对较新。

getFlux()
    .doOnComplete(() -> {
        // 1a. 从Flux的结果中提取ids
        @NotNull List<String> ids = extractIds(dto.objects); // objects由Flux填充
        Mono<?> myOtherMono =
            getMono1(ids)
                .zipWhen(
                    // 创建第二个Mono
                );

        Flux<List<?>> otherFlux =
            getOtherFlux(ids);
        myOtherMono.and(otherFlux).block();
    })
    .blockLast();

编辑以添加更多细节:
其中一个答案建议使用then方法来将其实质上转化为Mono。我这样做了,但现在似乎忽略了zipWhen中的代码。请参见下面的代码:

getFlux()
    .then()
    .zipWhen(
        (x) -> {
            // 1a. 从Flux的结果中提取ids
            @NotNull List<String> ids = extractIds(dto.objects); // objects由Flux填充
            Mono<?> myOtherMono =
                getMono1(ids)
                    .zipWhen(
                        // 创建第二个Mono
                    );

            Flux<List<?>> otherFlux =
                getOtherFlux(ids);
            Mono<?> otherFluxAsMono = otherFlux.then();
            return myOtherMono.zipWith(otherFluxAsMono);
        })
    .block();
英文:

I have a Flux&lt;T&gt; that returns multiple Ts. I need to wait until all the items are emitted from my Flux before continuing on to the Mono. When I was doing similarly with just Monos, I was able to zip them together (using zipWhen()) to achieve this effect. For now I'm setting up and doing the second set of actions in the Flux's doOnComplete(), but it doesn't seem like the best solution.

Note that I'm fairly new to Flux and Mono

    getFlux()
        .doOnComplete(
            () -&gt; {
              // 1a. Retrieve ids from results of the flux
              @NotNull List&lt;String&gt; ids = extractIds(dto.objects); // objects is populated by the Flux
              Mono&lt;?&gt; myOtherMono =
                    getMono1(ids)
                      .zipWhen(
                          // create second mono
                          });
              
              Flux&lt;List&lt;?&gt;&gt; otherFlux =
                  getOtherFlux(ids);
              myOtherMono .and(otherFlux).block();
            })
        .blockLast();

Edited to add more detail:
One of the answers suggested using the then method to essentially turn it into a Mono. I did so, but now it seems to be ignoring the code in the zipWhen. See below:

    getFlux()
        .then()
        .zipWhen( // should continue here when the other mono completes, but it&#39;s never getting in here
            (x) -&gt; {
              // 1a. Retrieve ids from results of the flux
              @NotNull List&lt;String&gt; ids = extractIds(dto.objects); // objects is populated by the Flux
              Mono&lt;?&gt; myOtherMono =
                    getMono1(ids)
                      .zipWhen(
                          // create second mono
                          });
              
              Flux&lt;List&lt;?&gt;&gt; otherFlux =
                  getOtherFlux(ids);
              Mono&lt;?&gt; otherFluxAsMono = otherFlux.then()
              return myOtherMono.zipWith(otherFluxAsMono);
            })
        .block();

答案1

得分: 1

Sounds like you need then(Mono<V> other) operator of the Flux:

/**

  • 让这个{@link Flux}完成,然后播放来自提供的{@link Mono}的信号。
  • 换句话说,忽略来自这个{@link Flux}的元素,并将其完成信号转换为提供的{@code Mono}的发射和完成信号。错误信号在生成的{@code Mono}中重新播放。
  • 丢弃支持:此操作符会丢弃源中的元素。

  • @param other 用于终止后发出的{@link Mono}
  • @param 提供的Mono的元素类型
  • @return 一个新的{@link Flux},等待源完成,然后从提供的{@link Mono}中发出
    */
    public final Mono then(Mono other) {
英文:

Sounds like you need then(Mono&lt;V&gt; other) operator of the Flux: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#then-reactor.core.publisher.Mono-

/**
 * Let this {@link Flux} complete then play signals from a provided {@link Mono}.
 * &lt;p&gt;
 * In other words ignore element from this {@link Flux} and transform its completion signal into the
 * emission and completion signal of a provided {@code Mono&lt;V&gt;}. Error signal is
 * replayed in the resulting {@code Mono&lt;V&gt;}.
 *
 * &lt;p&gt;
 * &lt;img class=&quot;marble&quot; src=&quot;doc-files/marbles/thenWithMonoForFlux.svg&quot; alt=&quot;&quot;&gt;
 *
 * &lt;p&gt;&lt;strong&gt;Discard Support:&lt;/strong&gt; This operator discards elements from the source.
 *
 * @param other a {@link Mono} to emit from after termination
 * @param &lt;V&gt; the element type of the supplied Mono
 *
 * @return a new {@link Flux} that wait for source completion then emits from the supplied {@link Mono}
 */
public final &lt;V&gt; Mono&lt;V&gt; then(Mono&lt;V&gt; other) {

答案2

得分: 1

如果您想获取您的流所包含的所有元素的列表,您可以使用collect方法:

.collect(Collectors.toList())

然后您可以使用map、flatmap或任何Mono操作符。

英文:

If you want a list of all the elements that your flux had, you can use collect
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#collect-java.util.stream.Collector-

example:

.collect(Collectors.toList())

And after that you can use a map or flatmap or any Mono operator.

huangapple
  • 本文由 发表于 2023年3月7日 07:14:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/75656675.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定