英文:
Simultaneous operations on KStream & KTables
问题
我正尝试在Kafka Streams中实现一个用例,根据对流应用某些过滤器来填充一个KTable,让我们称这个表为跟踪表,其中键是从事件中派生出来的,而值是事件本身。
现在,对于后续的事件,我会检查这个表,以验证它们是否被跟踪,并在跟踪的情况下更新事件,或将其发布到不同的主题。我不确定如何同时执行这些操作。以下是我目前的进展。
// 根据条件进行分支
KStream<String, Event>[] segregatedRecords = branches[0]
.branch((key, event) -> event.getStatus().getStatus().equals("A"),
(key, event) -> event.getStatus().getStatus().equals("B"),
(key, event) -> event.getStatus().getStatus().equals("C"),
// 将状态为A的事件存储到一个主题
segregatedRecords[0]
.selectKey((key, event) -> createKey(event))
.mapValues(transform)
.to(intermediateTopic);
// 从前一步加载主题作为GlobalKTable
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic);
// 我所卡住的步骤在于下面这一步,因为我无法执行条件操作
// 如果事件存在于跟踪表中(更新),但如果不存在,那么如何将其发布到不同的主题?
segregatedRecords[1]
// 为查找派生键
.selectKey((key, event) -> createKey(event))
// 在表中更新事件状态
.join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent
).to(intermediateTopic);
// 其他事件将需要引用跟踪表中的最新信息以进行进一步处理
英文:
I'm trying to implement a use case in Kafka Streams where I populate a Ktable based on applying some filters on this stream, let's call this table a tracking table where the key is derived from the event and the value is the event.
Now for subsequent events I check this table to verify if they're tracked and update the event if it's tracked or publish it to a different topic. I'm not sure how to do this simultaneously. Here's what I have so far.
// Branch based on conditions
KStream<String, Event>[] segregatedRecords = branches[0]
.branch((key, event) -> event.getStatus().getStatus().equals("A"),
(key, event) -> event.getStatus().getStatus().equals("B"),
(key, event) -> event.getStatus().getStatus().equals("C"),
// Store events with status A to a topic
segregatedRecords[0]
.selectKey((key, event) -> createKey(event))
.mapValues(transform)
.to(intermediateTopic);
// Load topic from previous step as GlobalKtable
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic);
// The following step is where I'm stuck, because I can not perform conditional actions
// If the event exists in the tracking table (update) but if not then how to publish it to a different topic?
segregatedRecords[1]
// derive key for lookup
.selectKey((key, event) -> createKey(event))
// update the event status in the table
.join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent
).to(intermediateTopic);
// Other events will need to refer the latest information in the tracked table for further processing
</details>
# 答案1
**得分**: 3
你可以通过将`segregatedRecords[1]`分支为两个子拓扑来实现这一点,一个分支执行表格查找(与您的代码相同),另一个分支使用低级别的处理器 API(在这种情况下使用`transformValues`)来检查底层的`GlobalKTable`状态存储是否包含新派生键的记录,如果记录存在,则将事件转换为`null`事件,然后我们过滤掉具有空`Event`的事件(因为这些事件已经在第一个子拓扑中进行了连接)。
我稍微更新了您的代码:
```java
// 给你的GlobalKTable一个名称以便以后查询
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic, Materialized.as("tracked_event_global_store"));
KStream<String, Event> derivedKStream = segregatedRecords[1]
// 为查找派生键
.selectKey((key, event) -> createKey(event));
// 这个子拓扑正常执行表格查找:在表格中更新事件状态
derivedKStream.join(trackedEvents, (key, value) -> key, (event, tracked) -> modifiedEvent)
.to(intermediateTopic);
// 这个子拓扑检查事件是否存在于trackedEvents中,如果是,则事件已经被连接,所以我们转换为null值并在下一步中过滤掉
derivedKStream.transformValues(() -> new ValueTransformerWithKey<String, Event, Event>() {
// 获取Tracked GlobalKTable的底层存储
KeyValueStore<String, Event> trackedKvStore;
@Override
public void init(ProcessorContext context) {
// 使用先前的名称
trackedKvStore = (KeyValueStore<String, Event>) context.getStateStore("tracked_event_global_store");
}
@Override
public Event transform(String derivedKey, Event event) {
// 如果事件在trackedEvents中存在,则返回null事件,以便我们可以在下一个步骤中过滤掉
if (trackedKvStore.get(derivedKey) != null) {
return null;
}
// 事件在trackedEvents中不存在,保留事件并发送到不同的主题
return event;
}
@Override
public void close() {
}
})
.filter((derivedKey, event) -> event != null)
.to("your different topic name");
```
**更新**:关于您无法从单个主题`intermediate`创建`GlobalKTable`和`KStream`的问题(不能多次读取主题[如此处所述][1]):
1. 为`GlobalKTable`创建一个专用的输入主题(此主题必须启用日志压缩):
```java
KStream<Object, Object> intermediateKStream = streamsBuilder.stream(intermediate);
intermediateKStream.to(trackedInputTopic);
// 不要从intermediate构建GlobalKTable,使用这个专用主题trackedInputTopic
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(trackedInputTopic, Materialized.as("tracked_event_global_store"));
// 进行您想要在中间主题上进行的操作
intermediateKStream
...
```
[1]: https://issues.apache.org/jira/browse/KAFKA-6687
<details>
<summary>英文:</summary>
You can do this by branching `segregatedRecords[1]` into 2 sub-Topology, one branch performs table lockup as your code, and the other branch uses low level processor API (using a transformValues in this case) to check whether the underlying `GlobalKTable` state store contains the record for the new derived key, if the record exists then transform the Event to `null` Event, then we filter out event which have null `Event` (because those events we already joined in the your first sub-Topology).
I updated your code a little bit:
```
//give your GlobalKTable a name to query later
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic, Materialized.as("tracked_event_global_store"));
KStream<String, Event> derivedKStream = segregatedRecords[1]
// derive key for lookup
.selectKey((key, event) -> createKey(event));
// this sub-topology perform table lockup as normal: update the event status in the table
derivedKStream.join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent)
.to(intermediateTopic);
// this sub-topology check whether the event existed in trackedEvents, if yes then event has been already joined
// so we transform to null value and filter in next step
derivedKStream.transformValues(() -> new ValueTransformerWithKey<String, Event, Event>() {
//get the underlying store of Tracked GlobalKTable
KeyValueStore<String, Event> trackedKvStore;
@Override
public void init(ProcessorContext context) {
//using the previous name
trackedKvStore = (KeyValueStore<String, Event>) context.getStateStore("tracked_event_global_store");
}
@Override
public Event transform(String derivedKey, Event event) {
//if event existed in trackedEvents then return a null event so we can filter out in next pipe
if (trackedKvStore.get(derivedKey) != null) {
return null;
}
//event not exist in trackedEvents, keep the event and send to different topic
return event;
}
@Override
public void close() {
}
})
.filter((derivedKey, event) -> event != null)
.to("your different toic name");
```
**Update** : about the problem where you can not create both a GlobalKTable and a KStream from single topic `intermediate` (can not read a topic multiple time [as described here][1]):
1. Create a dedicated input topic for `GlobalKTable` (this topic must have log compaction enabled):
```
KStream<Object, Object> intermediateKStream = streamsBuilder.stream(intermediate);
intermediateKStream.to(trackedInputTopic);
//instead of building GlobalKTable from intermediate, use this dedicated topic trackedInputTopic
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(trackedInputTopic, Materialized.as("tracked_event_global_store"));
//Perform things you want to do with the intermediate topic
intermediateKStream
...
```
[1]: https://issues.apache.org/jira/browse/KAFKA-6687
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论