订阅 / 迭代 Mono 列表

huangapple go评论62阅读模式
英文:

Subscribe / Iterate a List of Mono

问题

我有一系列返回 Mono<Result> 的规则,我需要执行它们,并返回另一个带有每个函数结果的 Mono<List<RuleResult>> 列表。

我目前有以下代码,虽然它能工作,但是阻塞执行似乎不太正确:

List<Rule> RuleSet
...
Mono<List<RuleResult>> result = Mono.just(RuleSet.stream().map(rule -> rule.assess(object).block()).collect(Collectors.toList()));

请问如何将这个代码改成更少阻塞的版本?

我尝试了以下代码:

// 创建一个 Mono 列表
List<Mono<RuleResult>> ruleStream = statefulRuleSet.stream().map(rule -> rule.assess(assessmentObject)).collect(Collectors.toList());

// 无法转换类型
Flux.zip(ruleStream, ...).collectList();

// 不太清楚如何操作
Flux.fromIterable(ruleStream...).collectList();

也许我对解决方案的想法有误,是否有人有任何指导?

英文:

I have a list of Rules that return Mono<Result>, I need to execute them and return another List of Mono with the result of each function.

Mono<List< Rule>> to Mono<List< RuleResult>>

I have this, which works but blocking the execution seems not correct to me:

List<Rule> RuleSet
...
Mono<List<RuleResult>> result= Mono.just(RuleSet.stream().map(rule -> rule.assess(object).block()).collect(Collectors.toList()));

How can I convert this into less blocking version?

I tried the following:

//Create a List of Monos
List<Mono<RuleResult>> ruleStream=statefulRuleSet.stream().map(rule -> rule.assess(assessmentObject)).collect(Collectors.toList());

//Cannot convert type
Flux.zip(ruleStream,...)).collectList();

//Not sure how to do this
Flux.fromIterable(ruleStream...).collectList();

Perhaps I am thinking of a wrong solution overall, somebody has any pointers?

答案1

得分: 2

例如:

interface Rule extends Function<Object, Mono<Object>> { }

public static void main(String[] args) {

  Rule rule1 = (o) -> Mono.just(Integer.valueOf(o.hashCode()));
  Rule rule2 = (o) -> Mono.just(o.toString());
  List<Rule> rules = List.of(rule1, rule2);

  Object object = new Object();

  Mono<List<Object>> result = Flux.fromIterable(rules)
    .flatMapSequential(rule -> rule.apply(object)).collectList();

  result.subscribe(System.out::println);
}

使用 flatMapSequential 允许您同时等待多达 maxConcurrency 个结果。maxConcurrency 的值可以作为附加参数指定给 flatMapSequential。在 reactor-core 3.3.8 中,其默认值为 256。

英文:

For example:

interface Rule extends Function&lt;Object, Mono&lt;Object&gt;&gt; { }

public static void main(String[] args) {

  Rule rule1 = (o) -&gt; Mono.just(Integer.valueOf(o.hashCode()));
  Rule rule2 = (o) -&gt; Mono.just(o.toString());
  List&lt;Rule&gt; rules = List.of(rule1, rule2);

  Object object = new Object();

  Mono&lt;List&lt;Object&gt;&gt; result = Flux.fromIterable(rules)
    .flatMapSequential(rule -&gt; rule.apply(object)).collectList();

  result.subscribe(System.out::println);
}

Using flatMapSequential allows you to wait for up to maxConcurrency results at the same time. The maxConcurrency value can be specified as an additional argument to flatMapSequential. In reactor-core 3.3.8 its default value is 256.

huangapple
  • 本文由 发表于 2020年7月24日 00:15:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/63058695.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定