英文:
making filterWhen() with Mono predicate parallel
问题
使用 Reactor 的 Flux filterWhen
时,我遇到了一些需要解决的行为。
给定以下代码:
Flux.fromIterable(List.of(1,2))
.filterWhen(it -> predicateMono(it))
其中:
Mono<boolean> predicateMono(int value) { ... }
我注意到 predicateMono()
会按顺序执行,这意味着对于值 2,操作直到第一个操作完成后才被调用。
当我的代码中的 predicateMono()
是与后端系统进行的非阻塞 Http 调用时,我希望能够并行执行这些调用。我应该如何编写代码以便以并行方式过滤 Flux 值?
predicateMono()
是非阻塞的 Http 调用,与响应式方法兼容。
英文:
With the Reactor Flux filterWhen, I see some behaviour that I need to overcome.
Given the following code:
Flux.fromIterable(List.of(1,2))
.filterWhen(it -> predicateMono(it))
Where:
Mono<boolean> predicateMono(int value) { ... }
I noticed that the predicateMono()
s are executed sequentially, meaning that for the value 2 the operation is not called until the first one has completed.
This becomes a problem when the predicateMono()
s in my code are http calls to backend system that I would like to execute parallel. How do I write this so that I can filter the flux values in parallel fashion?
The predicateMono()
are non blocking http calls, compatible with the reactive approach.
答案1
得分: 1
解决方法是使用 flatmap
替代 filterwhen
。如果你想要过滤掉某些值,只需映射为空即可。在 Flux 中,空值会被后续操作忽略。
Flux.fromIterable(List.of(1, 2))
.flatMap(it ->
predicateMono(it).flatMap(result ->
result ? Mono.just(it) : Mono.empty()));
英文:
Solution is to use flatmap
instead of the filterwhen
. Just map to empty in case you want to filter out. The empty values on Flux are just ignored for upcoming operations.
Flux.fromIterable(List.of(1, 2))
.flatMap(it ->
predicateMono(it).flatMap(result ->
result ? Mono.just(it) : Mono.empty()));
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论