How to implement Kafka Streams topology that process single topic with interactive queries store and global store

huangapple go评论56阅读模式
英文:

How to implement Kafka Streams topology that process single topic with interactive queries store and global store

问题

我正在尝试实现Kafka Streams,将单个主题流视为具有交互式查询可能性的全局数据库。因此,我想要有:

  1. 用于记录的全局存储(GlobalKTable、KeyValueStore)

  2. 可查询的存储,允许我获取交互式查询的结果(最大值)

交互式查询必须计算记录字段中的全局最大值:

KStream<String, TercUnitRecord> recordsStream = topologyBuilder.stream(topicName);
KTable<String, Long> lastUpdateStore = recordsStream.mapValues(record -> record.getLastUpdate())
                .selectKey((key, value) -> "lastdate")
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
                .reduce((maxValue, currValue) -> maxValue.compareTo(currValue) == 1 ? maxValue : currValue,
 Materialized.as("terc-lastupdate"));

然而,我面临的问题是我不能在一个Kafka Streams实例中使用同一单个主题作为源。我已经进行了研究,我发现唯一的方法是使用多个KafkaStreams实例,但我不确定这是否是实现此目标的正确且唯一的方法。有任何想法吗?

英文:

I am trying to implement Kafka Streams that is going to treat single topic stream as global database with interactive queries possible. So I want to have:

  1. global store for records (GlobalKTable, KeyValueStore)

  2. queryable store, that allows me to get result of an interactive query (maximum)

Interactive query has to calculate the global maximum of one of record's field:

 KStream&lt;String, TercUnitRecord&gt; recordsStream = topologyBuilder.stream(topicName);
 KTable&lt;String, Long&gt; lastUpdateStore = recordsStream.mapValues(record -&gt; record.getLastUpdate())
                .selectKey((key, value) -&gt; &quot;lastdate&quot;)
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
                .reduce((maxValue, currValue) -&gt; maxValue.compareTo(currValue) == 1 ? maxValue : currValue,
 Materialized.as(&quot;terc-lastupdate&quot;));

However, I am facing problem that I cannot use the same single topic as source in one Kafka Streams instance. I have done a reasearch and the only way I found to do this is by multiple KafkaStreams instances, but I am not sure it is the correct and only way to achieve this. Any ideas?

答案1

得分: 0

我为每个任务使用了多个KafkaStreams实例,它运行正常。

英文:

I used multiple KafkaStreams instances for each task and it worked properly.

huangapple
  • 本文由 发表于 2020年9月16日 00:34:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/63906261.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定