如何停止主线程以完成所有的Mono调用?

huangapple go评论59阅读模式
英文:

How to stop main thread to complete all Mono calls?

问题

我正在对数据库进行多个单调调用。所有 Mono 响应的结果都需要用来计算最终结果,该结果在声明的 Mono 逻辑之后被写入。

如果 (SomeObject.getAccountLevelActiveList() != null) {

SomeObject.getAccountLevelActiveList().parallelStream().forEach(account -> {
    Mono<SubLine> subLineMono = SubLineService
            .getLineLevelCustProfile(preNbsLineLevelConverter.getSubLine(account));

    subLineMono.subscribe(subLine -> PollObject.getSubList()
            .put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine)));

});

}

但是我的主要逻辑在 Mono 结果存储到 PollObject 之前就被执行了。因此,在 PollObject 中我得到了 null。所以我想要在 Mono 结果存储到 PollObject 之前停止我的主线程。

英文:

I'm making multiple mono calls to DB.And result of all Mono response is needed to compute final result which is written after declared Mono logic.

if (SomeObject.getAccountLevelActiveList() != null) {

				SomeObject.getAccountLevelActiveList().parallelStream().forEach(account -&gt; {
					Mono&lt;SubLine&gt; subLineMono= SubLineService
							.getLineLevelCustProfile(preNbsLineLevelConverter.getSubLine(account ));
				

					subLineMono.subscribe(subLine-&gt; PollObject.getSubList()
							.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine)));

				});

			}

But my main logic is getting executed before mono result stored to the PollObject. so i'm getting null in the PollObject. So i want to stop my main thread until Mono results stored into the PollObject.

答案1

得分: 1

如果您想要停止主线程,您可以使用阻塞而不是订阅,但您必须首先将 List 转换为 Flux,然后使用提供的 Mono 进行 flatMap 处理。您在 subscribe 方法中的逻辑可以移动到 Mono 或包装的 Flux 的副作用操作符 doOnNext 中:

Flux.fromIterable(SomeObject.getAccountLevelActiveList())
    .flatMap(account ->
        SubLineService.getLineLevelCustProfile(
            preNbsLineLevelConverter.getSubLine(account))
    ).doOnNext(subLine ->
        PollObject.getSubList().put(accountLevelMtn.getMtn(),
            Optional.ofNullable(subLine))
    ).blockLast();
    // 所有 monos 完成后将首先执行以下代码

如果在 if 后的代码不需要在主线程中运行,最好保持响应式,就像 @chrylis-cautiouslyoptimistic 已经建议的那样。使用 reduce 操作符将所有结果放在一起,生成在所有提供的 monos 完成时完成的 mono:

Flux.fromIterable(SomeObject.getAccountLevelActiveList())
    .flatMap(account ->
        SubLineService.getLineLevelCustProfile(
            preNbsLineLevelConverter.getSubLine(account))
    ).reduce(PollObject.getSubList(), (subList, subLine) ->
        sublist.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine))
    ).map(subList -> {
        // 所有 monos 完成后将首先执行此处的代码
    })
    // ... 如果需要,可以使用其他操作符
    // 最终订阅或返回 mono 进行进一步处理
    .subscribe();
英文:

If you want to stop main thread then you can use blocking instead of subscribing, but you must first convert the List into Flux and then flatMap-it using provided Mono. The logic you have in the subscribe method can be moved into a side effect operator doOnNext either of the Mono or the wrapping Flux:

Flux.fromIterable(SomeObject.getAccountLevelActiveList())
    .flatMap(account -&gt;
        SubLineService.getLineLevelCustProfile(
            preNbsLineLevelConverter.getSubLine( account ))
    ).doOnNext(subLine -&gt;
        PollObject.getSubList().put(accountLevelMtn.getMtn(),
            Optional.ofNullable(subLine))
    ).blockLast();
    // the following code will be executed first when all monos are completed

If your code following the if is not required to run in the main thread, it would be better stay reactive as the @chrylis-cautiouslyoptimistic already suggested. Use reduce operator to put all the result together producing mono that is completed when all the provided monos are completed:

Flux.fromIterable(SomeObject.getAccountLevelActiveList())
    .flatMap(account -&gt;
        SubLineService.getLineLevelCustProfile(
            preNbsLineLevelConverter.getSubLine( account ))
    ).reduce(PollObject.getSubList(), (subList, subLine) -&gt;
        sublist.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine))
    ).map(subList -&gt; {
        // the code here will be executed first when all monos are completed
    })
    // ... other operators if necessary
    // eventually subscribing or returning the mono for further processing 
    .subscribe();

huangapple
  • 本文由 发表于 2020年9月26日 00:39:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/64068156.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定