如何将来自多个发布者的Flux合并并处理它们。

huangapple go评论69阅读模式
英文:

How to combine Flux from multiple publishers and process all of them

问题

以下是翻译好的代码部分:

我有以下已定义的响应式链

    Flux<Tuple3<A, B, C>> enrich(List<String> idList) {
        return aEnricher.getAById(idList)
            .zipWith(bEnricher.getBByLookupId(lookupIds))
            .zipWith(cEnricher.getCByLookupId(lookupIds))
            .map(tuple -> Tuples.of(tuple.getT1().getT1(), tuple.getT1().getT2(), tuple.getT2()));
    }

函数签名

    Flux<A> getAById(List<String> idList)
    Flux<B> getBByLookupId(List<String> lookupIds)
    Flux<C> getCByLookupId(List<String> lookupIds)

`lookupId` 作为第一个 API 调用的一部分收到

这在这里调用

    combinedEnricher.enrich(events).subscribe(this::processTuple);

我的问题是这样的我有多个不同的丰富器要添加到 `zipWith`根据文档,`zipWith` 将在其中一个发布者完成时完成但在我的情况下不同的丰富器将发出不同数量的 Flux我需要处理它们所有

我该如何实现这一点由于 Flux 类型不同我不能在这里使用 merge

**编辑**

选项 A

    aEnricher.getAById(idList).buffer(10).subscribe(lookupIds -> {
        bEnricher.getBByLookupId(lookupIds).subscribe();
        cEnricher.getCByLookupId(lookupIds).subscribe();
    }

    Mono<Void> getBByLookupId(List<String> lookupIds) {
        Flux.just(lookupIds)
        .flatMap(lookupId -> serviceB.callApi(lookupId))
        .map(this::convertToAnotherObject)
        .doOnNext(this::sendToKafka)
        .then();
    }

    Mono<Void> getCByLookupId(List<String> lookupIds) {
        Flux.just(lookupIds)
        .flatMap(lookupId -> serviceC.callApi(lookupId))
        .map(this::convertToAnotherObject)
        .doOnNext(this::sendToKafka)
        .then();
    }

选项 B

    aEnricher.getAById(idList)
                .buffer(10)
                .flatMap(lookupIds -> 
                        Mono.zip(
                                Mono.just(lookupIds),
                                aEnricher.getBByLookupId(lookupIds).collectList(), 
                                aEnricher.getCByLookupId(lookupIds).collectList()
                        )
                )
                .map(convertToTuple3)
                .map(this::sendToKafka)

请注意,我已经将代码中的HTML转义字符(&lt;&gt;)替换为了正常的尖括号以提高可读性。

英文:

I have the following reactive chain defined:

Flux&lt;Tuple3&lt;A, B, C&gt;&gt; enrich(List&lt;String&gt; idList) {
return aEnricher.getAById(idList)
.zipWith(bEnricher.getBByLookupId(lookupIds))
.zipWith(cEnricher.getCByLookupId(lookupIds))
.map(tuple -&gt; Tuples.of(tuple.getT1().getT1(), tuple.getT1().getT2(), tuple.getT2()));
}

Function Signatures:

Flux&lt;A&gt; getAById(List&lt;String&gt; idList)
Flux&lt;B&gt; getBByLookupId(List&lt;String&gt; lookupIds)
Flux&lt;C&gt; getCByLookupId(List&lt;String&gt; lookupIds)

lookupId is received as part of object A from the first api call.

This is called here:

combinedEnricher.enrich(events).subscribe(this::processTuple);

My question is this. I have multiple different enrichers to be added to zipWith. As per documentation, zipWith will complete when one of the publisher completes. But in my case, different enrichers will emit different number of Flux and I need to process all of them.

How can I achieve this? Since the Flux types are different, I cannot use merge here

EDIT

Option A

aEnricher.getAById(idList).buffer(10).subscribe( lookupIds -&gt; {
bEnricher.getBByLookupId(lookupIds).subscribe();
cEnricher.getCByLookupId(lookupIds).subscribe();
}
Mono&lt;Void&gt; getBByLookupId(List&lt;String&gt; lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -&gt; serviceB.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}
Mono&lt;Void&gt; getCByLookupId(List&lt;String&gt; lookupIds) {
Flux.just(lookupIds)
.flatMap(lookupId -&gt; serviceC.callApi(lookupId))
.map(this::convertToAnotherObject)
.doOnNext(this::sendToKafka)
.then();
}

Option B

aEnricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds -&gt; 
Mono.zip(
Mono.just(lookupIds),
aEnricher.getBByLookupId(lookupIds).collectList(), 
aEnricher.getCByLookupId(lookupIds).collectList()
)
)
.map(convertToTuple3)
.map(this::sendToKafka)

答案1

得分: 1

基于更新的问题,我提出以下建议:

Flux<Void> enrich(List<String> idList) {
    return enricher.getAById(idList)
            .buffer(10)
            .flatMap(lookupIds ->
                    Mono.zip(
                            sendToKafka(Flux.fromIterable(lookupIds)),
                            sendToKafka(enricher.getBByLookupId(lookupIds)),
                            sendToKafka(enricher.getCByLookupId(lookupIds))
                    ).then()
            );
}

这个逻辑假设getXByLookupId返回Flux<T>

考虑到您正在使用Reactor Kafka,sendToKafka可能如下所示。可能需要为每种特定类型都有一个sendToKafka

private <T> Mono<Void> sendToKafka(Flux<T> data) {
        return kafkaSender.createOutbound()
                .send(data)
                .then();
    }
英文:

Based on the update question I would propose the following

Flux&lt;Void&gt; enrich(List&lt;String&gt; idList) {
return enricher.getAById(idList)
.buffer(10)
.flatMap(lookupIds -&gt;
Mono.zip(
sendToKafka(Flux.fromIterable(lookupIds)),
sendToKafka(enricher.getBByLookupId(lookupIds)),
sendToKafka(enricher.getCByLookupId(lookupIds))
).then()
);
}

This logic assumes that getXByLookupId returns Flux&lt;T&gt;.

Having in mind you are using Reactor Kafka, sendToKafka could look like. Probably you would need to have sendToKafka for every specific type.

private &lt;T&gt; Mono&lt;Void&gt; sendToKafka(Flux&lt;T&gt; data) {
return kafkaSender.createOutbound()
.send(data)
.then();
}

huangapple
  • 本文由 发表于 2023年1月8日 23:16:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/75048912.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定