英文:
How to combine Flux from multiple publishers and process all of them
问题
以下是翻译好的代码部分:
我有以下已定义的响应式链:
Flux<Tuple3<A, B, C>> enrich(List<String> idList) {
return aEnricher.getAById(idList)
.zipWith(bEnricher.getBByLookupId(lookupIds))
.zipWith(cEnricher.getCByLookupId(lookupIds))
.map(tuple -> Tuples.of(tuple.getT1().getT1(), tuple.getT1().getT2(), tuple.getT2()));
}
函数签名:
Flux<A> getAById(List<String> idList)
Flux<B> getBByLookupId(List<String> lookupIds)
Flux<C> getCByLookupId(List<String> lookupIds)
`lookupId` 作为第一个 API 调用的一部分收到。
这在这里调用:
combinedEnricher.enrich(events).subscribe(this::processTuple);
我的问题是这样的。我有多个不同的丰富器要添加到 `zipWith` 中。根据文档,`zipWith` 将在其中一个发布者完成时完成。但在我的情况下,不同的丰富器将发出不同数量的 Flux,我需要处理它们所有。
我该如何实现这一点?由于 Flux 类型不同,我不能在这里使用 merge
**编辑**
选项 A
aEnricher.getAById(idList).buffer(10).subscribe(lookupIds -> {
bEnricher.getBByLookupId(lookupIds).subscribe();
cEnricher.getCByLookupId(lookupIds).subscribe();
}
Mono<Void> getBByLookupId(List<String> lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -> serviceB.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
Mono<Void> getCByLookupId(List<String> lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -> serviceC.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
选项 B
aEnricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds ->
Mono.zip(
Mono.just(lookupIds),
aEnricher.getBByLookupId(lookupIds).collectList(),
aEnricher.getCByLookupId(lookupIds).collectList()
)
)
.map(convertToTuple3)
.map(this::sendToKafka)
请注意,我已经将代码中的HTML转义字符(<和>)替换为了正常的尖括号以提高可读性。
英文:
I have the following reactive chain defined:
Flux<Tuple3<A, B, C>> enrich(List<String> idList) {
return aEnricher.getAById(idList)
.zipWith(bEnricher.getBByLookupId(lookupIds))
.zipWith(cEnricher.getCByLookupId(lookupIds))
.map(tuple -> Tuples.of(tuple.getT1().getT1(), tuple.getT1().getT2(), tuple.getT2()));
}
Function Signatures:
Flux<A> getAById(List<String> idList)
Flux<B> getBByLookupId(List<String> lookupIds)
Flux<C> getCByLookupId(List<String> lookupIds)
lookupId is received as part of object A from the first api call.
This is called here:
combinedEnricher.enrich(events).subscribe(this::processTuple);
My question is this. I have multiple different enrichers to be added to zipWith. As per documentation, zipWith will complete when one of the publisher completes. But in my case, different enrichers will emit different number of Flux and I need to process all of them.
How can I achieve this? Since the Flux types are different, I cannot use merge here
EDIT
Option A
aEnricher.getAById(idList).buffer(10).subscribe( lookupIds -> {
bEnricher.getBByLookupId(lookupIds).subscribe();
cEnricher.getCByLookupId(lookupIds).subscribe();
}
Mono<Void> getBByLookupId(List<String> lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -> serviceB.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
Mono<Void> getCByLookupId(List<String> lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -> serviceC.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
Option B
aEnricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds ->
Mono.zip(
Mono.just(lookupIds),
aEnricher.getBByLookupId(lookupIds).collectList(),
aEnricher.getCByLookupId(lookupIds).collectList()
)
)
.map(convertToTuple3)
.map(this::sendToKafka)
答案1
得分: 1
基于更新的问题,我提出以下建议:
Flux<Void> enrich(List<String> idList) {
return enricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds ->
Mono.zip(
sendToKafka(Flux.fromIterable(lookupIds)),
sendToKafka(enricher.getBByLookupId(lookupIds)),
sendToKafka(enricher.getCByLookupId(lookupIds))
).then()
);
}
这个逻辑假设getXByLookupId返回Flux<T>。
考虑到您正在使用Reactor Kafka,sendToKafka可能如下所示。可能需要为每种特定类型都有一个sendToKafka。
private <T> Mono<Void> sendToKafka(Flux<T> data) {
return kafkaSender.createOutbound()
.send(data)
.then();
}
英文:
Based on the update question I would propose the following
Flux<Void> enrich(List<String> idList) {
return enricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds ->
Mono.zip(
sendToKafka(Flux.fromIterable(lookupIds)),
sendToKafka(enricher.getBByLookupId(lookupIds)),
sendToKafka(enricher.getCByLookupId(lookupIds))
).then()
);
}
This logic assumes that getXByLookupId returns Flux<T>.
Having in mind you are using Reactor Kafka, sendToKafka could look like. Probably you would need to have sendToKafka for every specific type.
private <T> Mono<Void> sendToKafka(Flux<T> data) {
return kafkaSender.createOutbound()
.send(data)
.then();
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论