Spring Cloud Stream分支流未按预期工作。

huangapple go评论55阅读模式
英文:

Spring cloud stream branch stream not working as expected

问题

以下是分支代码,它仅流向一个主题(第一个主题)。 据我了解,它应该流向所有三个主题?

无论如何,我可以使用分支流到三个主题吗?

@Bean
public Function<KStream<String, User>, KStream<String, User>[]> testprocess() {

    Predicate<String, User> stream1 = (k, v) -> v != null;
    Predicate<String, User> stream2 = (k, v) -> v != null;
    Predicate<String, User> stream3 = (k, v) -> v != null;

    return input -> input.map(
            (key, user) -> new KeyValue<String, User>(user.getId(), user))
            .branch(stream1, stream2, stream3);
}

处理器的配置

testprocess-in-0:
  destination: input.users
testprocess-out-0:
  destination: users.test.out.0
testprocess-out-1:
  destination: users.test.out.1
testprocess-out-2:
  destination: users.test.out.2
英文:

Below is the code for branching, it streams to only one topic (the first one). As I understand, it should stream to all three topics?

Anyway I can stream to three topics using branch?

@Bean
        public Function&lt;KStream&lt;String, Usesr&gt;, KStream&lt;String, User&gt;[]&gt; testprocess() {

            Predicate&lt;String, User&gt; stream1 = (k, v) -&gt; v != null;
            Predicate&lt;String, User&gt; stream2 = (k, v) -&gt; v != null;
            Predicate&lt;String, User&gt; stream3 = (k, v) -&gt; v != null;

            return input -&gt; input.map(
                    (key, user) -&gt; new KeyValue&lt;String, User&gt;(user.getId(), user))
                    .branch(stream1, stream2, stream3);

Configuration for the processor

		testprocess-in-0:
          destination: input.users
        testprocess-out-0:
          destination: users.test.out.0
        testprocess-out-1:
          destination: users.test.out.1
        testprocess-out-2:
          destination: users.test.out.2

答案1

得分: 1

通过查看您的谓词,似乎第一个谓词总是获胜,其他谓词没有机会。在Kafka Streams分支中,首个评估为真的谓词成功,相应的主题接收数据。您需要更改谓词中的逻辑,以正确映射到适当的主题。这里有一个示例。

英文:

By looking at your predicates, it appears that the first predicate always wins and the others do not get a chance. In Kafka Streams branching, the first predicate that evaluates to true succeeds and the corresponding topic receives the data. You need to change the logic in the predicates so that the correct topic is appropriately mapped. Here is an example.

huangapple
  • 本文由 发表于 2020年1月3日 13:17:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/59573523.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定