英文:
How to zip multiple Flux streams with filtering
问题
我有两个源Flux流,它们返回所有关键字和所有字典的流:
```java
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
Keyword
引用了一个类似以下的Dictionary
对象:
public class Keyword {
private String id;
private String dictionaryId;
}
目标是获得一个转换后的Flux<DictionaryTO>
,其中包含了所有Dictionary
的属性以及属于该字典的关键字列表:
public class DictionaryTO {
private String id;
private Collection<KeywordTO> keywords;
}
public class KeywordTO {
private String id;
}
问题是如何以反应式的方式将这两个Flux流进行合并/压缩,而不会阻塞任何源流。
请注意,keywordFlux
包含了所有关键字,因此应该根据Keyword.dictionaryId
进行一些过滤。
<details>
<summary>英文:</summary>
I have 2 source Flux streams which return streams of all keywords and all dictionaries:
```java
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
Keyword
has a reference to a Dictionary
object like the following:
public class Keyword {
private String id;
private String dictionaryId;
}
The goal is to have a transformed Flux<DictionaryTO>
which inside holds all properties of Dictionary
plus a list of keywords that belong to the dictionary:
public class DictionaryTO {
private String id;
private Collection<KeywordTO> keywords;
}
public class KeywordTO {
private String id;
}
The question is how to zip/merge these 2 Flux streams in a reactive way without blocking any of the source streams.
Note that keywordFlux
contains all keywords, so some filtering should be applied based on Keyword.dictionaryId
.
答案1
得分: 0
根据 boris-the-spider 的建议,我最终使用了 .flatMap()
和 .zipWith()
。
- 创建一个
Mono<Map>
,其中关键字按dictionaryId
分组,并进行缓存,因为稍后将多次使用它。 - 使用字典的
Flux
,并与上述关键字映射的地图进行zip
。然后将“字典和关键字映射的元组”映射为带关键字的字典。
完整解决方案:
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
Mono<Map<String, Collection<KeywordTO>>> keywordsMapMono = keywordFlux
.collectMultimap(KeywordTO::getDictionaryId, k -> keywordTOMapper.map(k))
.cache();
Flux<DictionaryTO> dictionaryTOFlux = dictionaryFlux
.map(dictionaryTOMapper::map)
.flatMap(dic -> Mono.just(dic).zipWith(keywordsMapMono))
.map(tuple -> {
Collection<KeywordTO> keywordsForDic = tuple.getT2().get(dic.getId());
DictionaryTO dic = tuple.getT1();
dic.setKeywords(keywordsForDic);
return dic;
});
英文:
As suggested by boris-the-spider, I ended up using .flatMap()
and .zipWith()
.
- Create a
Mono<Map>
of keywords (grouped bydictionaryId
) and cache it because it will be used multiple times later. flatMap
theFlux
of dictionaries andzip
single dictionary with the above map of keywords. Then map "tuple of dictionary and keywords map" to a dictionary with keywords.
Full solution:
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
Mono<Map<String, Collection<KeywordTO>>> keywordsMapMono = keywordFlux
.collectMultimap(KeywordTO::getDictionaryId, k -> keywordTOMapper.map(k))
.cache();
Flux<DictionaryTO> dictionaryTOFlux = dictionaryFlux
.map(dictionaryTOMapper:map)
.flatMap(dic -> Mono.just(dic).zipWith(keywordsMapMono))
.map(tuple -> {
Collection<KeywordTO> keywordsForDic = tuple.getT2().get(dic.getId())
DictionaryTO dic = tuple.getT1();
dic.setKeywords(keywordsForDic);
return dic;
});
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论