如何在Kafka Streams中使用相同主题为多个转换器启动多个流?

huangapple go评论60阅读模式
英文:

How to use multiple transformers using the same topic for kafka streams?

问题

我需要使用多个转换器解析 Kafka 上的复杂消息。每个转换器解析消息的一部分,并通过填充消息上的一些属性来编辑消息。最终,完全解析的消息通过 Kafka 消费者存储在数据库中。目前,我正在这样做:

streamsBuilder.stream(Topic.A, someConsumer)
           \\ 过滤具有未解析的 X 类型部分的消息
           .filter(filterX)
           \\ 编辑消息并生成新的 Topic.E 消息的转换器
           .transform(ParseXandProduceE::new)
           .to(Topic.A, someProducer)

streamsBuilder.stream(Topic.A, someConsumer)
           \\ 过滤具有未解析的 Y 类型部分的消息
           .filter(filterY)
           \\ 编辑消息并生成新的 Topic.F 消息的转换器
           .transform(ParseYandProduceF::new)
           .to(Topic.A, someProducer)

一个转换器的样子如下:

class ParseXandProduceE implements Transformer<...> {
    @Override
    public KeyValue<String, Message> transform(String key, Message message) {
           message.x = parse(message.rawX);
           context.forward(newKey, message.x, Topic.E);
           return KeyValue.pair(key, message);
    }
}

然而,这很繁琐,相同的消息会在这些流中多次流动。另外,还有一个消费者将topic.A中的消息存储在数据库中。消息目前在每次转换之前和之后都会被多次存储。有必要只存储每条消息一次。

下面的方法似乎可行,但不太理想,因为每个过滤+转换的块都可以干净地放在自己单独的类中:

streamsBuilder.stream(Topic.A, someConsumer)
           \\ 过滤器和编辑消息生成新的 Topic.E + Topic.F 消息的转换器
           .transform(someTransformer)
           .to(Topic.B, someProducer)

并且让持久性消费者监听Topic.B

后一种提出的解决方案是否可行,还是有其他方法可以实现相同的结果?也许通过完整的拓扑配置来进行?如果可以,对于这种情况,会是什么样子的?

英文:

I need to parse complex messages on kafka using multiple transformers. Each transformer parses a part of the message and edits the message by filling some attributes on the message. In the end the fully parsed message is stored in the database using a Kafka consumer. Currently, I'm doing this:

streamsBuilder.stream(Topic.A, someConsumer)
       \\ filters messages that have unparsed parts of type X
       .filter(filterX)
       \\ transformer that edits the message and produces new Topic.E messages
       .transform(ParseXandProduceE::new)
       .to(Topic.A, someProducer)

streamsBuilder.stream(Topic.A, someConsumer)
       \\ filters messages that have unparsed parts of type Y
       .filter(filterY)
       \\ transformer that edits the message and produces new Topic.F messages
       .transform(ParseYandProduceF::new)
       .to(Topic.A, someProducer)

a Transformer looks like:

class ParseXandProduceE implements Transformer&lt;...&gt; {
    @Override
    public KeyValue&lt;String, Message&gt; transform (String key, Message message) {
           message.x = parse(message.rawX);
           context.forward(newKey, message.x, Topic.E);
           return KeyValue.pair(key, message);
    }
}

However, this is cumbersome, the same messages flow multiple times through these streams.
Additionally, there is a consumer that stores messages of topic.A in the database. Messages are currently stored multiple times, before each transformation and after each transformation. It is necessary to store each message once.

The following could work, but seems unfavorable since each block of filter+transform could have been put cleanly in its own separate class:

streamsBuilder.stream(Topic.A, someConsumer)
       \\ transformer that filters and edits the message and produces new Topic.E + Topic.F messages
       .transform(someTransformer)
       .to(Topic.B, someProducer)

and make the persistence consumer listen to Topic.B.

Is the latter proposed solution the way to go, or is there some other way to achieve the same result? Maybe with a complete Topology configuration of Sources and Sinks? If so, what would that look like for this scenario?

答案1

得分: 1

使用单个Transformer似乎是最简单的解决方案。因为您有两个独立的过滤器,如果尝试链接单独的运算符,程序将变得更加复杂。如果您知道每条消息只会通过单个过滤器,而不会同时通过两个过滤器,您可以使用branch()

KStream[] subStreams = stream.branch(new Predicates[]{filterX, filterY});

subStream[0].transform(ParseXandProduceE::new)
            .merge(subStream[1].transform(ParseYandProduceF::new)
            .to(...)

请注意,上述解决方案仅在没有任何消息需要被两个transformer同时转换时才有效(branch()将每条消息放入第一个匹配谓词的分支中,而不会放入多个分支)。因此,如果一条消息可以通过两个过滤器,您需要做一些更复杂的操作,如下所示:

KStream[] subStreams = stream.branch(new Predicates[]{filterX, filterY});

KStream passedX = subStreams[0];
KStream transformedXE = passedX.transform(ParseXandProduceE::new);

// 通过了filterX的消息也可能通过filterY,
// 因此我们将这些消息合并回“y流”
// (当然,这些消息已经被`ParseXandProduceE`转换)
KStream passedY = subStream[1].merge(transformedXE.filter(filterY));

// 结果包含所有只通过filterX并且被转换的消息,
// 加上所有通过filterY(可能还通过filterX)并且被转换的消息
KStream result = transformedXE.filterNot(filterY)
                              .merge(passedY.transform(ParseYandProduceF::new));

result.to(...)
英文:

Using a single transformer seems to be the simplest solution. Because you have two independent filters, the program would become more complex if you try to chain individual operators. If you know that each message will only pass a single filter, but never both filters, you could use branch():

<!-- language: java -->

KStream[] subStreams = stream.branch(new Predicates[]{filterX,filterY});

subStream[0].transform(ParseXandProduceE::new)
            .merge(subStream[1].transform(ParseYandProduceF::new)
            .to(...)

Note that the solution above only works if no message needs to be transformed by both transformers (branch() puts every message into the branch of the first matching predicate, but never into multiple branches). Thus, if a message could pass both filters, you need do something like this that is more complicated:

<!-- language: java -->

KStream[] subStreams = stream.branch(new Predicates[]{filterX,filterY});

KStream passedX = subStreams[0];
KStream transformedXE = passedX.transform(ParseXandProduceE::new);

// a message that passed filterX may also pass filterY,
// and thus we merge those message back to the &quot;y-stream&quot;
// (of course, those messages would already be transformed by `ParseXandProduceE`)
KStream passedY = subStream[1].merge(transformedXE.filter(filterY);

// the result contains all message that only pass filterX and got transformed,
// plus all messages that passed filterY (and maybe also filterX) and got transformed
KStream result = transformedXE.filterNot(filterY)
                              .merge(passedY.transform(ParseYandProduceF::new)

result.to(...)

huangapple
  • 本文由 发表于 2020年8月19日 21:04:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/63487597.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定