英文:
Subscribe / Iterate a List of Mono
问题
我有一系列返回 Mono<Result>
的规则,我需要执行它们,并返回另一个带有每个函数结果的 Mono<List<RuleResult>>
列表。
我目前有以下代码,虽然它能工作,但是阻塞执行似乎不太正确:
List<Rule> RuleSet
...
Mono<List<RuleResult>> result = Mono.just(RuleSet.stream().map(rule -> rule.assess(object).block()).collect(Collectors.toList()));
请问如何将这个代码改成更少阻塞的版本?
我尝试了以下代码:
// 创建一个 Mono 列表
List<Mono<RuleResult>> ruleStream = statefulRuleSet.stream().map(rule -> rule.assess(assessmentObject)).collect(Collectors.toList());
// 无法转换类型
Flux.zip(ruleStream, ...).collectList();
// 不太清楚如何操作
Flux.fromIterable(ruleStream...).collectList();
也许我对解决方案的想法有误,是否有人有任何指导?
英文:
I have a list of Rules that return Mono<Result>
, I need to execute them and return another List of Mono with the result of each function.
Mono<List< Rule>>
to Mono<List< RuleResult>>
I have this, which works but blocking the execution seems not correct to me:
List<Rule> RuleSet
...
Mono<List<RuleResult>> result= Mono.just(RuleSet.stream().map(rule -> rule.assess(object).block()).collect(Collectors.toList()));
How can I convert this into less blocking version?
I tried the following:
//Create a List of Monos
List<Mono<RuleResult>> ruleStream=statefulRuleSet.stream().map(rule -> rule.assess(assessmentObject)).collect(Collectors.toList());
//Cannot convert type
Flux.zip(ruleStream,...)).collectList();
//Not sure how to do this
Flux.fromIterable(ruleStream...).collectList();
Perhaps I am thinking of a wrong solution overall, somebody has any pointers?
答案1
得分: 2
例如:
interface Rule extends Function<Object, Mono<Object>> { }
public static void main(String[] args) {
Rule rule1 = (o) -> Mono.just(Integer.valueOf(o.hashCode()));
Rule rule2 = (o) -> Mono.just(o.toString());
List<Rule> rules = List.of(rule1, rule2);
Object object = new Object();
Mono<List<Object>> result = Flux.fromIterable(rules)
.flatMapSequential(rule -> rule.apply(object)).collectList();
result.subscribe(System.out::println);
}
使用 flatMapSequential
允许您同时等待多达 maxConcurrency 个结果。maxConcurrency 的值可以作为附加参数指定给 flatMapSequential
。在 reactor-core 3.3.8 中,其默认值为 256。
英文:
For example:
interface Rule extends Function<Object, Mono<Object>> { }
public static void main(String[] args) {
Rule rule1 = (o) -> Mono.just(Integer.valueOf(o.hashCode()));
Rule rule2 = (o) -> Mono.just(o.toString());
List<Rule> rules = List.of(rule1, rule2);
Object object = new Object();
Mono<List<Object>> result = Flux.fromIterable(rules)
.flatMapSequential(rule -> rule.apply(object)).collectList();
result.subscribe(System.out::println);
}
Using flatMapSequential
allows you to wait for up to maxConcurrency results at the same time. The maxConcurrency value can be specified as an additional argument to flatMapSequential
. In reactor-core 3.3.8 its default value is 256.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论