英文:
making filterWhen() with Mono predicate parallel
问题
使用 Reactor 的 Flux filterWhen 时,我遇到了一些需要解决的行为。
给定以下代码:
Flux.fromIterable(List.of(1,2))
.filterWhen(it -> predicateMono(it))
其中:
Mono<boolean> predicateMono(int value) { ... }
我注意到 predicateMono() 会按顺序执行,这意味着对于值 2,操作直到第一个操作完成后才被调用。
当我的代码中的 predicateMono() 是与后端系统进行的非阻塞 Http 调用时,我希望能够并行执行这些调用。我应该如何编写代码以便以并行方式过滤 Flux 值?
predicateMono() 是非阻塞的 Http 调用,与响应式方法兼容。
英文:
With the Reactor Flux filterWhen, I see some behaviour that I need to overcome.
Given the following code:
Flux.fromIterable(List.of(1,2))
.filterWhen(it -> predicateMono(it))
Where:
Mono<boolean> predicateMono(int value) { ... }
I noticed that the predicateMono()s are executed sequentially, meaning that for the value 2 the operation is not called until the first one has completed.
This becomes a problem when the predicateMono()s in my code are http calls to backend system that I would like to execute parallel. How do I write this so that I can filter the flux values in parallel fashion?
The predicateMono() are non blocking http calls, compatible with the reactive approach.
答案1
得分: 1
解决方法是使用 flatmap 替代 filterwhen。如果你想要过滤掉某些值,只需映射为空即可。在 Flux 中,空值会被后续操作忽略。
Flux.fromIterable(List.of(1, 2))
.flatMap(it ->
predicateMono(it).flatMap(result ->
result ? Mono.just(it) : Mono.empty()));
英文:
Solution is to use flatmap instead of the filterwhen. Just map to empty in case you want to filter out. The empty values on Flux are just ignored for upcoming operations.
Flux.fromIterable(List.of(1, 2))
.flatMap(it ->
predicateMono(it).flatMap(result ->
result ? Mono.just(it) : Mono.empty()));
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论