将两个源与条件一起压缩。

huangapple go评论80阅读模式
英文:

reactor zip two sources with conditions

问题

public static Flux<Tuple2<Integer, Integer>> match(Flux<Tuple2<Integer, Integer>> flux) {
    return flux.filterWhen(pair -> {
        Integer t1 = pair.getT1();
        Integer t2 = pair.getT2();
        return Flux.just(t1)
                .zipWithValues(Flux.just(t2))
                .isEmpty()
                .map(isEmpty -> !isEmpty);
    });
}
英文:

I have two sources of sorted integers: A and B.
And I need to zip the two sources into one the restriction: Only zip these items who exist both in A and B.

For example:
A is: 1 2 3 10 11 12 13 14
B is: 10 12 13 14 15 16
The reactor's zip operator would generate (1,10),(2,12),(3,13),(10,14),(11,15),(12,16)
But I want to get this source: (10,10),(12,12),(13,13),(14,14)

I tried a zip and match approach below, but failed, because the first complete source will cause zip operator too complete the result source, and thus left source integer unmatched.

public static Flux&lt;Tuple2&lt;Integer, Integer&gt;&gt; match(Flux&lt;Tuple2&lt;Integer, Integer&gt;&gt; flux) {
        Tuple2&lt;Integer, Integer&gt; invalidValue = Tuples.of(0, 0);
        return flux.compose(f -&gt; f.map(new Function&lt;Tuple2&lt;Integer, Integer&gt;, Tuple2&lt;Integer, Integer&gt;&gt;() {
            private final Queue&lt;Integer&gt; left = new ArrayDeque&lt;&gt;(); // here is a problem: when complete, left or right queue may still have some data.
            private final Queue&lt;Integer&gt; right = new ArrayDeque&lt;&gt;();

            @Override
            public Tuple2&lt;Integer, Integer&gt; apply(Tuple2&lt;Integer, Integer&gt; pair) {
                left.add(pair.getT1());
                right.add(pair.getT2());

                return tryMatch();
            }

            private Tuple2&lt;Integer, Integer&gt; tryMatch() {
                if (left.isEmpty() || right.isEmpty()) {
                    return invalidValue;
                }
                Integer t1 = left.peek();
                Integer t2 = right.peek();
                if (t1.equals(t2)) {
                    left.poll();
                    right.poll();
                    return Tuples.of(t1, t2);
                } else if (t1 &lt; t2) {
                    Integer item = left.poll();
                    log.warn(&quot;not matched: {}&quot;, item);
                    return tryMatch();
                } else {
                    Integer item = right.poll();
                    log.warn(&quot;not matched: {}&quot;, item);
                    return tryMatch();
                }
            }
        })).filter(pair -&gt; pair != invalidValue);
    }

Could someone help me, tell me what operator can I use?

答案1

得分: 1

不需要自己进行映射,可以使用 join 和 filter 函数。

Flux<Integer> f1 = Flux.just(1, 2, 3, 10, 11, 12, 13, 14);
Flux<Integer> f2 = Flux.just(10, 12, 13, 14, 15, 16);

f1.join(f2, f -> Flux.never(), f -> Flux.never(), Tuples::of)
  .filter(t -> t.getT1().equals(t.getT2()));

或者如果你想要一个函数:

public <T> Flux<Tuple2<T, T>> match(Flux<T> f1, Flux<T> f2) {
    return f1.join(f2, f -> Flux.never(), f -> Flux.never(), Tuples::of)
            .filter(t -> t.getT1().equals(t.getT2()));
}
英文:

No need to map it yourself you can use join and filter

Flux&lt;Integer&gt; f1 = Flux.just(1,2,3,10,11,12,13,14);
Flux&lt;Integer&gt; f2 = Flux.just(10,12,13,14,15,16);

f1.join(f2,f -&gt;Flux.never(),f-&gt; Flux.never(),Tuples::of)
                .filter(t -&gt; t.getT1().equals(t.getT2()));

Or if you want a function

public &lt;T&gt; Flux&lt;Tuple2&lt;T,T&gt;&gt; match(Flux&lt;T&gt; f1, Flux&lt;T&gt; f2){

    return f1.join(f2,f -&gt;Flux.never(),f-&gt; Flux.never(),Tuples::of)
            .filter(t -&gt; t.getT1().equals(t.getT2()));

}

huangapple
  • 本文由 发表于 2020年5月31日 01:18:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/62106097.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定