英文:
How to combine Flux from multiple publishers and process all of them
问题
以下是翻译好的代码部分:
我有以下已定义的响应式链:
Flux<Tuple3<A, B, C>> enrich(List<String> idList) {
return aEnricher.getAById(idList)
.zipWith(bEnricher.getBByLookupId(lookupIds))
.zipWith(cEnricher.getCByLookupId(lookupIds))
.map(tuple -> Tuples.of(tuple.getT1().getT1(), tuple.getT1().getT2(), tuple.getT2()));
}
函数签名:
Flux<A> getAById(List<String> idList)
Flux<B> getBByLookupId(List<String> lookupIds)
Flux<C> getCByLookupId(List<String> lookupIds)
`lookupId` 作为第一个 API 调用的一部分收到。
这在这里调用:
combinedEnricher.enrich(events).subscribe(this::processTuple);
我的问题是这样的。我有多个不同的丰富器要添加到 `zipWith` 中。根据文档,`zipWith` 将在其中一个发布者完成时完成。但在我的情况下,不同的丰富器将发出不同数量的 Flux,我需要处理它们所有。
我该如何实现这一点?由于 Flux 类型不同,我不能在这里使用 merge
**编辑**
选项 A
aEnricher.getAById(idList).buffer(10).subscribe(lookupIds -> {
bEnricher.getBByLookupId(lookupIds).subscribe();
cEnricher.getCByLookupId(lookupIds).subscribe();
}
Mono<Void> getBByLookupId(List<String> lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -> serviceB.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
Mono<Void> getCByLookupId(List<String> lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -> serviceC.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
选项 B
aEnricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds ->
Mono.zip(
Mono.just(lookupIds),
aEnricher.getBByLookupId(lookupIds).collectList(),
aEnricher.getCByLookupId(lookupIds).collectList()
)
)
.map(convertToTuple3)
.map(this::sendToKafka)
请注意,我已经将代码中的HTML转义字符(<
和>
)替换为了正常的尖括号以提高可读性。
英文:
I have the following reactive chain defined:
Flux<Tuple3<A, B, C>> enrich(List<String> idList) {
return aEnricher.getAById(idList)
.zipWith(bEnricher.getBByLookupId(lookupIds))
.zipWith(cEnricher.getCByLookupId(lookupIds))
.map(tuple -> Tuples.of(tuple.getT1().getT1(), tuple.getT1().getT2(), tuple.getT2()));
}
Function Signatures:
Flux<A> getAById(List<String> idList)
Flux<B> getBByLookupId(List<String> lookupIds)
Flux<C> getCByLookupId(List<String> lookupIds)
lookupId
is received as part of object A
from the first api call.
This is called here:
combinedEnricher.enrich(events).subscribe(this::processTuple);
My question is this. I have multiple different enrichers to be added to zipWith
. As per documentation, zipWith
will complete when one of the publisher completes. But in my case, different enrichers will emit different number of Flux and I need to process all of them.
How can I achieve this? Since the Flux types are different, I cannot use merge here
EDIT
Option A
aEnricher.getAById(idList).buffer(10).subscribe( lookupIds -> {
bEnricher.getBByLookupId(lookupIds).subscribe();
cEnricher.getCByLookupId(lookupIds).subscribe();
}
Mono<Void> getBByLookupId(List<String> lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -> serviceB.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
Mono<Void> getCByLookupId(List<String> lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -> serviceC.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
Option B
aEnricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds ->
Mono.zip(
Mono.just(lookupIds),
aEnricher.getBByLookupId(lookupIds).collectList(),
aEnricher.getCByLookupId(lookupIds).collectList()
)
)
.map(convertToTuple3)
.map(this::sendToKafka)
答案1
得分: 1
基于更新的问题,我提出以下建议:
Flux<Void> enrich(List<String> idList) {
return enricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds ->
Mono.zip(
sendToKafka(Flux.fromIterable(lookupIds)),
sendToKafka(enricher.getBByLookupId(lookupIds)),
sendToKafka(enricher.getCByLookupId(lookupIds))
).then()
);
}
这个逻辑假设getXByLookupId
返回Flux<T>
。
考虑到您正在使用Reactor Kafka,sendToKafka
可能如下所示。可能需要为每种特定类型都有一个sendToKafka
。
private <T> Mono<Void> sendToKafka(Flux<T> data) {
return kafkaSender.createOutbound()
.send(data)
.then();
}
英文:
Based on the update question I would propose the following
Flux<Void> enrich(List<String> idList) {
return enricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds ->
Mono.zip(
sendToKafka(Flux.fromIterable(lookupIds)),
sendToKafka(enricher.getBByLookupId(lookupIds)),
sendToKafka(enricher.getCByLookupId(lookupIds))
).then()
);
}
This logic assumes that getXByLookupId
returns Flux<T>
.
Having in mind you are using Reactor Kafka, sendToKafka
could look like. Probably you would need to have sendToKafka
for every specific type.
private <T> Mono<Void> sendToKafka(Flux<T> data) {
return kafkaSender.createOutbound()
.send(data)
.then();
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论