使用Mono谓词并行制作filterWhen()

huangapple go评论69阅读模式
英文:

making filterWhen() with Mono predicate parallel

问题

使用 Reactor 的 Flux filterWhen 时,我遇到了一些需要解决的行为。

给定以下代码:

Flux.fromIterable(List.of(1,2))
  .filterWhen(it -> predicateMono(it))

其中:

Mono<boolean> predicateMono(int value) { ... } 

我注意到 predicateMono() 会按顺序执行,这意味着对于值 2,操作直到第一个操作完成后才被调用。

当我的代码中的 predicateMono() 是与后端系统进行的非阻塞 Http 调用时,我希望能够并行执行这些调用。我应该如何编写代码以便以并行方式过滤 Flux 值?

predicateMono() 是非阻塞的 Http 调用,与响应式方法兼容。

英文:

With the Reactor Flux filterWhen, I see some behaviour that I need to overcome.

Given the following code:

Flux.fromIterable(List.of(1,2))
  .filterWhen(it -> predicateMono(it))

Where:

Mono<boolean> predicateMono(int value) { ... } 

I noticed that the predicateMono()s are executed sequentially, meaning that for the value 2 the operation is not called until the first one has completed.

This becomes a problem when the predicateMono()s in my code are http calls to backend system that I would like to execute parallel. How do I write this so that I can filter the flux values in parallel fashion?

The predicateMono() are non blocking http calls, compatible with the reactive approach.

答案1

得分: 1

解决方法是使用 flatmap 替代 filterwhen。如果你想要过滤掉某些值,只需映射为空即可。在 Flux 中,空值会被后续操作忽略。

Flux.fromIterable(List.of(1, 2))
        .flatMap(it ->
            predicateMono(it).flatMap(result ->
                result ? Mono.just(it) : Mono.empty()));
英文:

Solution is to use flatmap instead of the filterwhen. Just map to empty in case you want to filter out. The empty values on Flux are just ignored for upcoming operations.

Flux.fromIterable(List.of(1, 2))
        .flatMap(it ->
            predicateMono(it).flatMap(result ->
                result ? Mono.just(it) : Mono.empty()));

huangapple
  • 本文由 发表于 2020年10月8日 03:12:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/64250803.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定