Java响应式。如何等待流中的所有数据,然后对其进行处理。

huangapple go评论71阅读模式
英文:

Java Reactive. How to wait for all data in flux and then process them

问题

Flux<Views> views = someRepository.findAllByUsersIn(userId).doOnNext(v -> {
v.setInterlocutor(v.getUsers().stream().filter(u -> !userId.equals(u)).findFirst().orElse(null));
});

return Flux.zip(views.map(view -> conversionService.convert(view, ResponseViewDto.class)), getUserInfo(views).flux())
.flatMap(fZip -> {
ResponseViewDto dto = fZip.getT1();
dto.setInterlocutor(fZip.getT2().get(dto.getInter()));
return Flux.just(dto);
});

英文:

I'm getting data from mongo reactive repository and updating it. Then I have to collect all data in one collection and path it to another service to get more info. Then I should map them in one Flux. My code is:

        Flux&lt;Views&gt; views = someRepository.findAllByUsersIn(userId).doOnNext(v -&gt; {
        v.setInterlocutor(v.getUsers().stream().filter(u -&gt; !userId.equals(u)).findFirst().orElse(null));
    });
    

    return Flux.zip(views.map(view -&gt; conversionService.convert(view, ResponseViewDto.class)), getUserInfo(views).flux())
            .flatMap(fZip -&gt; {
                ResponseViewDto dto = fZip.getT1();
                dto.setInterlocutor(fZip.getT2().get(dto.getInter()));
                return Flux.just(dto);
            });

getUserInfo does collecting usersId and sends them to another service and returns expanded info.

I found that getting from DB calls 2 times and I can understand why, but is there any solution to do it once and still be not blocking.

答案1

得分: 0

感谢Adhika Setya Pramudita的帮助。我所需的方法就是使用cache()方法。

英文:

Thanks to Adhika Setya Pramudita for help. The way to do what I need is just to use cache() method

huangapple
  • 本文由 发表于 2020年8月24日 03:14:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/63551037.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定