英文:
Connecting a Flux to a Mono
问题
我有一个Flux<T>
,它返回多个T
。在继续进行Mono之前,我需要等待所有项目从我的Flux中发出。当我只使用Monos进行类似操作时,我能够将它们一起压缩(使用zipWhen()
)以实现这种效果。目前,我正在设置并在Flux的doOnComplete()
中执行第二组操作,但这似乎不是最佳解决方案。
请注意,我对Flux和Mono相对较新。
getFlux()
.doOnComplete(() -> {
// 1a. 从Flux的结果中提取ids
@NotNull List<String> ids = extractIds(dto.objects); // objects由Flux填充
Mono<?> myOtherMono =
getMono1(ids)
.zipWhen(
// 创建第二个Mono
);
Flux<List<?>> otherFlux =
getOtherFlux(ids);
myOtherMono.and(otherFlux).block();
})
.blockLast();
编辑以添加更多细节:
其中一个答案建议使用then
方法来将其实质上转化为Mono。我这样做了,但现在似乎忽略了zipWhen
中的代码。请参见下面的代码:
getFlux()
.then()
.zipWhen(
(x) -> {
// 1a. 从Flux的结果中提取ids
@NotNull List<String> ids = extractIds(dto.objects); // objects由Flux填充
Mono<?> myOtherMono =
getMono1(ids)
.zipWhen(
// 创建第二个Mono
);
Flux<List<?>> otherFlux =
getOtherFlux(ids);
Mono<?> otherFluxAsMono = otherFlux.then();
return myOtherMono.zipWith(otherFluxAsMono);
})
.block();
英文:
I have a Flux<T>
that returns multiple T
s. I need to wait until all the items are emitted from my Flux before continuing on to the Mono. When I was doing similarly with just Monos, I was able to zip them together (using zipWhen()
) to achieve this effect. For now I'm setting up and doing the second set of actions in the Flux's doOnComplete()
, but it doesn't seem like the best solution.
Note that I'm fairly new to Flux and Mono
getFlux()
.doOnComplete(
() -> {
// 1a. Retrieve ids from results of the flux
@NotNull List<String> ids = extractIds(dto.objects); // objects is populated by the Flux
Mono<?> myOtherMono =
getMono1(ids)
.zipWhen(
// create second mono
});
Flux<List<?>> otherFlux =
getOtherFlux(ids);
myOtherMono .and(otherFlux).block();
})
.blockLast();
Edited to add more detail:
One of the answers suggested using the then
method to essentially turn it into a Mono. I did so, but now it seems to be ignoring the code in the zipWhen
. See below:
getFlux()
.then()
.zipWhen( // should continue here when the other mono completes, but it's never getting in here
(x) -> {
// 1a. Retrieve ids from results of the flux
@NotNull List<String> ids = extractIds(dto.objects); // objects is populated by the Flux
Mono<?> myOtherMono =
getMono1(ids)
.zipWhen(
// create second mono
});
Flux<List<?>> otherFlux =
getOtherFlux(ids);
Mono<?> otherFluxAsMono = otherFlux.then()
return myOtherMono.zipWith(otherFluxAsMono);
})
.block();
答案1
得分: 1
Sounds like you need then(Mono<V> other)
operator of the Flux
:
/**
- 让这个{@link Flux}完成,然后播放来自提供的{@link Mono}的信号。
- 换句话说,忽略来自这个{@link Flux}的元素,并将其完成信号转换为提供的{@code Mono
}的发射和完成信号。错误信号在生成的{@code Mono }中重新播放。 -
丢弃支持:此操作符会丢弃源中的元素。
- @param other 用于终止后发出的{@link Mono}
- @param
提供的Mono的元素类型 - @return 一个新的{@link Flux},等待源完成,然后从提供的{@link Mono}中发出
*/
public finalMono then(Mono other) {
英文:
Sounds like you need then(Mono<V> other)
operator of the Flux
: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#then-reactor.core.publisher.Mono-
/**
* Let this {@link Flux} complete then play signals from a provided {@link Mono}.
* <p>
* In other words ignore element from this {@link Flux} and transform its completion signal into the
* emission and completion signal of a provided {@code Mono<V>}. Error signal is
* replayed in the resulting {@code Mono<V>}.
*
* <p>
* <img class="marble" src="doc-files/marbles/thenWithMonoForFlux.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards elements from the source.
*
* @param other a {@link Mono} to emit from after termination
* @param <V> the element type of the supplied Mono
*
* @return a new {@link Flux} that wait for source completion then emits from the supplied {@link Mono}
*/
public final <V> Mono<V> then(Mono<V> other) {
答案2
得分: 1
如果您想获取您的流所包含的所有元素的列表,您可以使用collect方法:
.collect(Collectors.toList())
然后您可以使用map、flatmap或任何Mono操作符。
英文:
If you want a list of all the elements that your flux had, you can use collect
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#collect-java.util.stream.Collector-
example:
.collect(Collectors.toList())
And after that you can use a map or flatmap or any Mono operator.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论