同时在KStream和KTables上进行的操作

huangapple go评论78阅读模式
英文:

Simultaneous operations on KStream & KTables

问题

我正尝试在Kafka Streams中实现一个用例,根据对流应用某些过滤器来填充一个KTable,让我们称这个表为跟踪表,其中键是从事件中派生出来的,而值是事件本身。
现在,对于后续的事件,我会检查这个表,以验证它们是否被跟踪,并在跟踪的情况下更新事件,或将其发布到不同的主题。我不确定如何同时执行这些操作。以下是我目前的进展。

// 根据条件进行分支
KStream<String, Event>[] segregatedRecords = branches[0]
                       .branch((key, event) -> event.getStatus().getStatus().equals("A"),
                        (key, event) -> event.getStatus().getStatus().equals("B"),
                        (key, event) -> event.getStatus().getStatus().equals("C"),
                        

// 将状态为A的事件存储到一个主题
segregatedRecords[0]
                .selectKey((key, event) -> createKey(event))
                .mapValues(transform)
                .to(intermediateTopic);

// 从前一步加载主题作为GlobalKTable
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic);

// 我所卡住的步骤在于下面这一步,因为我无法执行条件操作
// 如果事件存在于跟踪表中(更新),但如果不存在,那么如何将其发布到不同的主题?
segregatedRecords[1]
                 // 为查找派生键
                .selectKey((key, event) -> createKey(event))
                // 在表中更新事件状态
                .join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent
                ).to(intermediateTopic);

// 其他事件将需要引用跟踪表中的最新信息以进行进一步处理

英文:

I'm trying to implement a use case in Kafka Streams where I populate a Ktable based on applying some filters on this stream, let's call this table a tracking table where the key is derived from the event and the value is the event.
Now for subsequent events I check this table to verify if they're tracked and update the event if it's tracked or publish it to a different topic. I'm not sure how to do this simultaneously. Here's what I have so far.

// Branch based on conditions
KStream&lt;String, Event&gt;[] segregatedRecords = branches[0]
                       .branch((key, event) -&gt; event.getStatus().getStatus().equals(&quot;A&quot;),
                        (key, event) -&gt; event.getStatus().getStatus().equals(&quot;B&quot;),
                        (key, event) -&gt; event.getStatus().getStatus().equals(&quot;C&quot;),
                        

// Store events with status A to a topic
segregatedRecords[0]
                .selectKey((key, event) -&gt; createKey(event))
                .mapValues(transform)
                .to(intermediateTopic);

// Load topic from previous step as GlobalKtable
GlobalKTable&lt;String, Event&gt; trackedEvents = streamsBuilder.globalTable(intermediateTopic);

// The following step is where I&#39;m stuck, because I can not perform conditional actions
// If the event exists in the tracking table (update) but if not then how to publish it to a different topic?
segregatedRecords[1]
                 // derive key for lookup
                .selectKey((key, event) -&gt; createKey(event))
                // update the event status in the table 
                .join(trackedEvents, (key, value) -&gt; key,(event, tracked) -&gt; modifiedEvent
                ).to(intermediateTopic);

// Other events will need to refer the latest information in the tracked table for further processing 

 

</details>


# 答案1
**得分**: 3

你可以通过将`segregatedRecords[1]`分支为两个子拓扑来实现这一点,一个分支执行表格查找(与您的代码相同),另一个分支使用低级别的处理器 API(在这种情况下使用`transformValues`)来检查底层的`GlobalKTable`状态存储是否包含新派生键的记录,如果记录存在,则将事件转换为`null`事件,然后我们过滤掉具有空`Event`的事件(因为这些事件已经在第一个子拓扑中进行了连接)。
我稍微更新了您的代码:
```java
// 给你的GlobalKTable一个名称以便以后查询
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic, Materialized.as("tracked_event_global_store"));

KStream<String, Event> derivedKStream = segregatedRecords[1]
    // 为查找派生键
    .selectKey((key, event) -> createKey(event));
// 这个子拓扑正常执行表格查找:在表格中更新事件状态
derivedKStream.join(trackedEvents, (key, value) -> key, (event, tracked) -> modifiedEvent)
    .to(intermediateTopic);
// 这个子拓扑检查事件是否存在于trackedEvents中,如果是,则事件已经被连接,所以我们转换为null值并在下一步中过滤掉
derivedKStream.transformValues(() -> new ValueTransformerWithKey<String, Event, Event>() {
    // 获取Tracked GlobalKTable的底层存储
    KeyValueStore<String, Event> trackedKvStore;
    @Override
    public void init(ProcessorContext context) {
        // 使用先前的名称
        trackedKvStore = (KeyValueStore<String, Event>) context.getStateStore("tracked_event_global_store");
    }

    @Override
    public Event transform(String derivedKey, Event event) {
        // 如果事件在trackedEvents中存在,则返回null事件,以便我们可以在下一个步骤中过滤掉
        if (trackedKvStore.get(derivedKey) != null) {
            return null;
        }
        // 事件在trackedEvents中不存在,保留事件并发送到不同的主题
        return event;
    }

    @Override
    public void close() {
    }
})
.filter((derivedKey, event) -> event != null)
.to("your different topic name");
```
**更新**:关于您无法从单个主题`intermediate`创建`GlobalKTable`和`KStream`的问题(不能多次读取主题[如此处所述][1]):

 1. 为`GlobalKTable`创建一个专用的输入主题(此主题必须启用日志压缩):
```java
KStream<Object, Object> intermediateKStream = streamsBuilder.stream(intermediate);
intermediateKStream.to(trackedInputTopic);
// 不要从intermediate构建GlobalKTable,使用这个专用主题trackedInputTopic
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(trackedInputTopic, Materialized.as("tracked_event_global_store"));

// 进行您想要在中间主题上进行的操作
intermediateKStream
        ...
```

  [1]: https://issues.apache.org/jira/browse/KAFKA-6687


<details>
<summary>英文:</summary>

You can do this by branching `segregatedRecords[1]` into 2 sub-Topology, one branch performs table lockup as your code, and the other branch uses low level processor API (using a transformValues in this case) to check whether the underlying `GlobalKTable` state store contains the record for the new derived key, if the record exists then transform the Event to `null` Event, then we filter out event which have null `Event` (because those events we already joined in the your first sub-Topology).
I updated your code a little bit:
```
//give your GlobalKTable a name to query later
GlobalKTable&lt;String, Event&gt; trackedEvents = streamsBuilder.globalTable(intermediateTopic, Materialized.as(&quot;tracked_event_global_store&quot;));

KStream&lt;String, Event&gt; derivedKStream = segregatedRecords[1]
    // derive key for lookup
    .selectKey((key, event) -&gt; createKey(event));
// this sub-topology perform table lockup as normal: update the event status in the table
derivedKStream.join(trackedEvents, (key, value) -&gt; key,(event, tracked) -&gt; modifiedEvent)
    .to(intermediateTopic);
// this sub-topology check whether the event existed in trackedEvents, if yes then event has been already joined 
// so we transform to null value and filter in next step 
derivedKStream.transformValues(() -&gt; new ValueTransformerWithKey&lt;String, Event, Event&gt;() {
    //get the underlying store of Tracked GlobalKTable
    KeyValueStore&lt;String, Event&gt; trackedKvStore;
    @Override
    public void init(ProcessorContext context) {
        //using the previous name
        trackedKvStore = (KeyValueStore&lt;String, Event&gt;) context.getStateStore(&quot;tracked_event_global_store&quot;);
    }

    @Override
    public Event transform(String derivedKey, Event event) {
        //if event existed in trackedEvents then return a null event so we can filter out in next pipe
        if (trackedKvStore.get(derivedKey) != null) {
            return null;
        }
        //event not exist in trackedEvents, keep the event and send to different topic
        return event;
    }

    @Override
    public void close() {
    }
})
.filter((derivedKey, event) -&gt; event != null)
.to(&quot;your different toic name&quot;);
```
**Update** : about the problem where you can not create both a GlobalKTable and a KStream from single topic `intermediate` (can not read a topic multiple time [as described here][1]):

 1. Create a dedicated input topic for `GlobalKTable` (this topic must have log compaction enabled):
```
KStream&lt;Object, Object&gt; intermediateKStream = streamsBuilder.stream(intermediate);
intermediateKStream.to(trackedInputTopic);
//instead of building GlobalKTable from intermediate, use this dedicated topic trackedInputTopic
GlobalKTable&lt;String, Event&gt; trackedEvents = streamsBuilder.globalTable(trackedInputTopic, Materialized.as(&quot;tracked_event_global_store&quot;));

//Perform things you want to do with the intermediate topic
intermediateKStream
        ...
```


  [1]: https://issues.apache.org/jira/browse/KAFKA-6687

</details>



huangapple
  • 本文由 发表于 2020年4月5日 21:12:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/61043156.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定