英文:
Execute Mono in parallel
问题
我有以下的类
class A {
private Long id;
private Long rid; //将A与B1和B2连接起来。
//其他数据。
}
class B1 {
private Long rid; //与A连接
private Long cid; //与C连接
//其他数据。
}
class B2 {
private Long rid; //与A连接
private Long cid; //与C连接
//其他数据。
}
class C {
private Long id;
//其他数据。
}
`A`来源于数据库中的`a`表。`B1`,`B2`和`C`分别来源于`b1`,`b2`和`c`表。
现在我的逻辑如下:
1. 获取最新的1000行A。
2. 从A中,根据rid获取B1和B2。
3. 从B1和B2中,获取C。
4. 现在,根据某些参数比较A和C,并生成报告。
在webflux中,我的代码如下(代码使用Java 17):
class StageResult {
private List<A> aList;
private List<Long> rids;
private List<Long> cids1;
private List<Long> cids2;
private List<C> c;
}
var stageResult = new StageResult();
var page = PageRequest.of(0, 1000);
getA(page).collectList()
.flatMap(aList -> {
stageResult.setAList(aList);
stageResult.setRids(aList.stream().map(A::getRid).collect(Collectors.toList()))
return Mono.just(stageResult);
}). //省略部分
//...
在这种情况下,阶段2和阶段3可以并行执行。只想知道如何在并行执行2和3。一种方法是通过阶段1创建单独的阶段2和阶段3。缓存阶段1的结果。然后在阶段4之前合并阶段2和阶段3并使用结果。
英文:
I have following classes
class A {
private Long id;
private Long rid; //Joins A with B1 and B2.
//Other data.
}
class B1 {
private Long rid; //Joins with A
private Long cid; //Joins with C
//other data.
}
class B2 {
private Long rid; //Joins with A
private Long cid; //Joins with C
//other data.
}
class C {
private Long id;
//other data.
}
A
comes from a
table in database. B1
, B2
and C
from b1
, b2
and c
tables respectively.
Now my logic is like following:
- Get latest 1000 rows of A.
- From A, get B1 and B2 on element rid.
- From B1 and B2, get C.
- Now, compare A and C on some parameter and produce the report.
In webflux, my code is like as follows (code is in Java 17):
class StageResult {
private List<A> aList;
private List<Long> rids;
private List<Long> cids1;
private List<Long> cids2;
private List<C> c;
}
var stageResult = new StageResult();
var page = PageRequest.of(0, 1000);
getA(page).collectList()
.flatMap(aList -> {
stageResult.setAList(aList);
stageResult.setRids(aList.stream().map(A::getRid).collect(Collectors.toList()))
return Mono.just(stageResult);
}).
.flatMapMany(x -> getB1(x.getRids())) //Stage 2
.map(B1::getCid).collectList()
.flatMap(cids -> {
stageResult.setCids1(cids);
return Mono.just(stageResult);
})
.flatMapMany(x -> getB2(x.getRids())) //Stage 3
.map(B2::getCid).collectList()
.flatMap(cids -> {
stageResult.setCids2(cids);
return Mono.just(stageResult);
})
.flatMapMany(x -> {
var list = new ArrayList<Long>(x.getCids1());
list.addAll(x.getCids2());
return getC(list)
.collectList().map(y -> {
x.setC(y);
return x;
});
}).flatMap(x -> {
//Compare the element
});
In this case, stage 2 and 3 can be executed in parallel. Just want to know that how we can execute 2 and 3 in parallel. One approach is to create separate stage 2 and stage 3 by stage 1. Cache the result of stage 1. And then zip stage 2 and stage 3 before stage 4 and use the result.
答案1
得分: 1
你可以使用Flux.merge
并行解析发布者。
将包含在数组/可变参数中的发布者序列的数据合并成交错的合并序列。与
concat
不同,源会急切地订阅。
.flatMapMany(x -> Flux.merge(
getB1(x.getRids()).map(B1::getCid), //Stage 2
getB2(x.getRids()).map(B2::getCid) //Stage 3
)
)
.collectList()
顺便说一下
使用Mono.just
创建“伪异步”发布者然后使用flatMap
来解析是没有意义的。
.flatMap(cids -> {
stageResult.setCids1(cids);
return Mono.just(stageResult);
})
可以使用简单的map
来代替。
.map(cids -> {
stageResult.setCids1(cids);
return stageResult;
})
英文:
You can use Flux.merge
to resolve publishers in parallel.
>Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat
, sources are subscribed to eagerly.
.flatMapMany(x -> Flux.merge(
getB1(x.getRids()).map(B1::getCid), //Stage 2
getB2(x.getRids()).map(B2::getCid) //Stage 3
)
)
.collectList()
btw
It doesn't make sense to create "pseudo-async" publishers using Mono.just
and then resolve using flatMap
.
.flatMap(cids -> {
stageResult.setCids1(cids);
return Mono.just(stageResult);
})
Use can use simple map
instead.
.map(cids -> {
stageResult.setCids1(cids);
return stageResult;
})
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论