执行Mono并行

huangapple go评论76阅读模式
英文:

Execute Mono in parallel

问题

我有以下的类

    class A {
      private Long id;
      private Long rid; //将A与B1和B2连接起来。
      //其他数据。
    }
    
    class B1 {
      private Long rid; //与A连接
      private Long cid; //与C连接
      //其他数据。
    }
    
    class B2 {
      private Long rid;  //与A连接
      private Long cid;  //与C连接
      //其他数据。
    }
    
    class C {
      private Long id;
      //其他数据。
    }

`A`来源于数据库中的`a`。`B1`,`B2``C`分别来源于`b1`,`b2``c`

现在我的逻辑如下

1. 获取最新的1000行A
2. 从A中根据rid获取B1和B2
3. 从B1和B2中获取C
4. 现在根据某些参数比较A和C并生成报告

在webflux中我的代码如下代码使用Java 17):

     class StageResult {
       private List<A> aList;
       private List<Long> rids;
       private List<Long> cids1;
       private List<Long> cids2;
       private List<C> c;
     }

     var stageResult = new StageResult();
     var page = PageRequest.of(0, 1000);
     getA(page).collectList()
     .flatMap(aList -> {
         stageResult.setAList(aList);
         stageResult.setRids(aList.stream().map(A::getRid).collect(Collectors.toList()))
         return Mono.just(stageResult);
     }). //省略部分
     //...

在这种情况下阶段2和阶段3可以并行执行只想知道如何在并行执行2和3一种方法是通过阶段1创建单独的阶段2和阶段3缓存阶段1的结果然后在阶段4之前合并阶段2和阶段3并使用结果
英文:

I have following classes

class A {
private Long id;
private Long rid; //Joins A with B1 and B2.
//Other data.
}
class B1 {
private Long rid; //Joins with A
private Long cid; //Joins with C
//other data.
}
class B2 {
private Long rid;  //Joins with A
private Long cid;  //Joins with C
//other data.
}
class C {
private Long id;
//other data.
}

A comes from a table in database. B1, B2 and C from b1, b2 and c tables respectively.

Now my logic is like following:

  1. Get latest 1000 rows of A.
  2. From A, get B1 and B2 on element rid.
  3. From B1 and B2, get C.
  4. Now, compare A and C on some parameter and produce the report.

In webflux, my code is like as follows (code is in Java 17):

 class StageResult {
private List<A> aList;
private List<Long> rids;
private List<Long> cids1;
private List<Long> cids2;
private List<C> c;
}
var stageResult = new StageResult();
var page = PageRequest.of(0, 1000);
getA(page).collectList()
.flatMap(aList -> {
stageResult.setAList(aList);
stageResult.setRids(aList.stream().map(A::getRid).collect(Collectors.toList()))
return Mono.just(stageResult);
}).
.flatMapMany(x -> getB1(x.getRids())) //Stage 2
.map(B1::getCid).collectList()
.flatMap(cids -> {
stageResult.setCids1(cids);
return Mono.just(stageResult);
})
.flatMapMany(x -> getB2(x.getRids())) //Stage 3
.map(B2::getCid).collectList()
.flatMap(cids -> {
stageResult.setCids2(cids);
return Mono.just(stageResult);
})
.flatMapMany(x -> {
var list = new ArrayList<Long>(x.getCids1());
list.addAll(x.getCids2());
return getC(list)
.collectList().map(y -> {
x.setC(y);
return x;
});
}).flatMap(x -> {
//Compare the element
});

In this case, stage 2 and 3 can be executed in parallel. Just want to know that how we can execute 2 and 3 in parallel. One approach is to create separate stage 2 and stage 3 by stage 1. Cache the result of stage 1. And then zip stage 2 and stage 3 before stage 4 and use the result.

答案1

得分: 1

你可以使用Flux.merge并行解析发布者。

将包含在数组/可变参数中的发布者序列的数据合并成交错的合并序列。与concat不同,源会急切地订阅。

.flatMapMany(x -> Flux.merge(
        getB1(x.getRids()).map(B1::getCid), //Stage 2
        getB2(x.getRids()).map(B2::getCid)  //Stage 3
    )
)
.collectList()

顺便说一下

使用Mono.just创建“伪异步”发布者然后使用flatMap来解析是没有意义的。

.flatMap(cids -> {
      stageResult.setCids1(cids);
      return Mono.just(stageResult);
})

可以使用简单的map来代替。

.map(cids -> {
      stageResult.setCids1(cids);
      return stageResult;
})
英文:

You can use Flux.merge to resolve publishers in parallel.

>Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat, sources are subscribed to eagerly.

.flatMapMany(x -> Flux.merge(
getB1(x.getRids()).map(B1::getCid), //Stage 2
getB2(x.getRids()).map(B2::getCid)  //Stage 3
)
)
.collectList()

btw

It doesn't make sense to create "pseudo-async" publishers using Mono.just and then resolve using flatMap.

.flatMap(cids -> {
stageResult.setCids1(cids);
return Mono.just(stageResult);
})

Use can use simple map instead.

.map(cids -> {
stageResult.setCids1(cids);
return stageResult;
})

huangapple
  • 本文由 发表于 2023年2月16日 12:48:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/75467968.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定