英文:
How to implement Kafka Streams topology that process single topic with interactive queries store and global store
问题
我正在尝试实现Kafka Streams,将单个主题流视为具有交互式查询可能性的全局数据库。因此,我想要有:
-
用于记录的全局存储(GlobalKTable、KeyValueStore)
-
可查询的存储,允许我获取交互式查询的结果(最大值)
交互式查询必须计算记录字段中的全局最大值:
KStream<String, TercUnitRecord> recordsStream = topologyBuilder.stream(topicName);
KTable<String, Long> lastUpdateStore = recordsStream.mapValues(record -> record.getLastUpdate())
.selectKey((key, value) -> "lastdate")
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce((maxValue, currValue) -> maxValue.compareTo(currValue) == 1 ? maxValue : currValue,
Materialized.as("terc-lastupdate"));
然而,我面临的问题是我不能在一个Kafka Streams实例中使用同一单个主题作为源。我已经进行了研究,我发现唯一的方法是使用多个KafkaStreams实例,但我不确定这是否是实现此目标的正确且唯一的方法。有任何想法吗?
英文:
I am trying to implement Kafka Streams that is going to treat single topic stream as global database with interactive queries possible. So I want to have:
-
global store for records (GlobalKTable, KeyValueStore)
-
queryable store, that allows me to get result of an interactive query (maximum)
Interactive query has to calculate the global maximum of one of record's field:
KStream<String, TercUnitRecord> recordsStream = topologyBuilder.stream(topicName);
KTable<String, Long> lastUpdateStore = recordsStream.mapValues(record -> record.getLastUpdate())
.selectKey((key, value) -> "lastdate")
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce((maxValue, currValue) -> maxValue.compareTo(currValue) == 1 ? maxValue : currValue,
Materialized.as("terc-lastupdate"));
However, I am facing problem that I cannot use the same single topic as source in one Kafka Streams instance. I have done a reasearch and the only way I found to do this is by multiple KafkaStreams instances, but I am not sure it is the correct and only way to achieve this. Any ideas?
答案1
得分: 0
我为每个任务使用了多个KafkaStreams实例,它运行正常。
英文:
I used multiple KafkaStreams instances for each task and it worked properly.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论