如何将多个 Flux 数据流进行筛选后进行压缩

huangapple go评论75阅读模式
英文:

How to zip multiple Flux streams with filtering

问题

我有两个源Flux流它们返回所有关键字和所有字典的流

```java
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();

Keyword引用了一个类似以下的Dictionary对象:

public class Keyword {
    private String id;
    private String dictionaryId;
}

目标是获得一个转换后的Flux<DictionaryTO>,其中包含了所有Dictionary的属性以及属于该字典的关键字列表:

public class DictionaryTO {
    private String id;
    private Collection<KeywordTO> keywords;
}
public class KeywordTO {
    private String id;
}

问题是如何以反应式的方式将这两个Flux流进行合并/压缩,而不会阻塞任何源流。

请注意,keywordFlux包含了所有关键字,因此应该根据Keyword.dictionaryId进行一些过滤。


<details>
<summary>英文:</summary>

I have 2 source Flux streams which return streams of all keywords and all dictionaries:

```java
Flux&lt;Keyword&gt; keywordFlux = keywordRepository.findAll();
Flux&lt;Dictionary&gt; dictionaryFlux = dictionaryRepository.findAll();

Keyword has a reference to a Dictionary object like the following:

public class Keyword {
    private String id;
    private String dictionaryId;
}

The goal is to have a transformed Flux&lt;DictionaryTO&gt; which inside holds all properties of Dictionary plus a list of keywords that belong to the dictionary:

public class DictionaryTO {
    private String id;
    private Collection&lt;KeywordTO&gt; keywords;
}
public class KeywordTO {
    private String id;
}

The question is how to zip/merge these 2 Flux streams in a reactive way without blocking any of the source streams.

Note that keywordFlux contains all keywords, so some filtering should be applied based on Keyword.dictionaryId.

答案1

得分: 0

根据 boris-the-spider 的建议,我最终使用了 .flatMap().zipWith()

  1. 创建一个 Mono&lt;Map&gt;,其中关键字按dictionaryId分组,并进行缓存,因为稍后将多次使用它。
  2. 使用字典的 Flux,并与上述关键字映射的地图进行 zip。然后将“字典和关键字映射的元组”映射为带关键字的字典。

完整解决方案:

Flux&lt;Keyword&gt; keywordFlux = keywordRepository.findAll();
Flux&lt;Dictionary&gt; dictionaryFlux = dictionaryRepository.findAll();

Mono&lt;Map&lt;String, Collection&lt;KeywordTO&gt;&gt;&gt; keywordsMapMono = keywordFlux
    .collectMultimap(KeywordTO::getDictionaryId, k -&gt; keywordTOMapper.map(k))
    .cache(); 

Flux&lt;DictionaryTO&gt; dictionaryTOFlux = dictionaryFlux
    .map(dictionaryTOMapper::map) 
    .flatMap(dic -&gt; Mono.just(dic).zipWith(keywordsMapMono))
    .map(tuple -&gt; {
        Collection&lt;KeywordTO&gt; keywordsForDic = tuple.getT2().get(dic.getId());
        DictionaryTO dic = tuple.getT1();
        dic.setKeywords(keywordsForDic);
        return dic;
    });
英文:

As suggested by boris-the-spider, I ended up using .flatMap() and .zipWith().

  1. Create a Mono&lt;Map&gt; of keywords (grouped by dictionaryId) and cache it because it will be used multiple times later.
  2. flatMap the Flux of dictionaries and zip single dictionary with the above map of keywords. Then map "tuple of dictionary and keywords map" to a dictionary with keywords.

Full solution:

Flux&lt;Keyword&gt; keywordFlux = keywordRepository.findAll();
Flux&lt;Dictionary&gt; dictionaryFlux = dictionaryRepository.findAll();

Mono&lt;Map&lt;String, Collection&lt;KeywordTO&gt;&gt;&gt; keywordsMapMono = keywordFlux
    .collectMultimap(KeywordTO::getDictionaryId, k -&gt; keywordTOMapper.map(k))
    .cache(); 

Flux&lt;DictionaryTO&gt; dictionaryTOFlux = dictionaryFlux
    .map(dictionaryTOMapper:map) 
    .flatMap(dic -&gt; Mono.just(dic).zipWith(keywordsMapMono))
    .map(tuple -&gt; {
        Collection&lt;KeywordTO&gt; keywordsForDic = tuple.getT2().get(dic.getId())
        DictionaryTO dic = tuple.getT1();
        dic.setKeywords(keywordsForDic);
        return dic;
    });

huangapple
  • 本文由 发表于 2020年10月26日 01:50:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/64526905.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定