Kafka Streams通过Punctuator批量更新/删除条目。

huangapple go评论69阅读模式
英文:

Kafka Streams bulk entry update/delete by Punctuator

问题

  1. 这在你的情况下是否是合法的ProcessorAPI用法?还是需要使用 Topology.addProcessor()?或者这两者本质上是相同的?
  2. 我需要提交什么吗?
  3. 我使用 Ktable.toStream(),因为 process() 是一个终端操作。我只需要在 aggregate() 之前使用 transformValues() 吗?据我了解,transform 是有状态的,不像 process,这对性能有何影响?它会更改现有的拓扑并因此损坏变更日志主题吗?
  4. 由于我只关心访问StateStore,我是否需要在 process() 方法中执行任何操作?
  5. STREAM_TIME 和 WALL_CLOCK_TIME 之间是否有性能差异?假设对于我的数据,它们将并行进行,因此问题不是一个比另一个发生得更频繁,我的意思是它们是否由任务的同一线程管理,或者是否存在任何怪异行为?
  6. Punctuator 中的操作是否会更新变更日志主题?
  7. 在现有的有状态应用程序中添加这种操作是否被视为拓扑的更改,会损坏现有的数据?

谢谢!
=====更新======
我使用以下代码来检查StateStore是否发生了更新,我发现Punctuator始终收到未更新的值。因此,更新要么没有写入,要么丢失了。

时间戳状态存储从上下文返回:

public void init(ProcessorContext context) {
    this.context = context;
    KeyValueStore<String, ValueAndTimestamp<ExampleObject>> stateStore = 
        (KeyValueStore<String, ValueAndTimestamp<ExampleObject>>) context.getStateStore(this.stateStoreName);
    this.cancellable = context.schedule(Duration.ofMinutes(5),
        PunctuationType.WALL_CLOCK_TIME, getPunctuator(stateStore, stateStoreName, context));
}

然后我读取,更新,然后再次读取,日志记录的是未更改的值:

private Punctuator getPunctuator(KeyValueStore<String, ValueAndTimestamp<ExampleObject>> stateStore) {
    return timestamp -> {
        try (final KeyValueIterator<String, ValueAndTimestamp<ExampleObject>> iter = stateStore.all()) {
            String testId = "someId";
            logger.info("Punctuator started with stateStore {}", stateStoreName);
            while (iter.hasNext()) {
                final KeyValue<String, ValueAndTimestamp<ExampleObject>> entry = iter.next();
                String key = entry.key;
                if (testId.equals(key)) {
                    ExampleObject value = entry.value.value();
                    logger.info(
                        "PunctuatorProcessor in action, storeName {} testId {}, current ExampleObject {}",
                        stateStoreName, key, value);
                    boolean stripped = stripElement(value);
                    logger.info(
                        "PunctuatorProcessor in action, storeName {} testId {}, found and stripped: {}",
                        stateStoreName, key, stripped);
                    if (stripped) {
                        stateStore.put(key, ValueAndTimestamp.make(value, context.timestamp()));
                    }
                }
            }
        }
    };
}

为什么值始终未更改?Punctuator的持续时间是10分钟。

英文:

I have a stateful Kafka Streams app, and I need to implement regular deletions based on a condition.
I've come up with a solution based on the ProcessorAPI which is explained in documentation, but looks like I'm missing the intuition here, so I'm having some doubts.

I have bunch of streams like this for every StateStore I create during the app initialization:

    private void doStuff(KStream&lt;String, ExampleObject&gt; sourceStream, 
         Materialized&lt;String, ExampleObject, KeyValueStore&lt;Bytes, byte[]&gt;&gt; materialized, String tableName) {   
	     KTable&lt;String, ExampleObject&gt; ktable = sourceStream.groupByKey()
                                   .aggregate(() -&gt; null, (id, newValue, existingValue) -&gt; {...}, materialized);
	     ktable.toStream().process(new PunctuatorProcessorSupplier(tableName), tableName);                             
    }

And I have this Processor (I omit the Supplier for shortness because the implementation is trivial, it just returns new Processor every time)

private static class PunctuatorProcessor implements
    Processor&lt;String, ExampleObject&gt; {

    private final String stateStoreName;
    
    private Cancellable cancellable;

    private PunctuatorProcessor(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext context) {
        KeyValueStore&lt;String, ExampleObject&gt; stateStore = 
            (KeyValueStore&lt;String, ExampleObject&gt;) context.getStateStore(this.stateStoreName);
        this.cancellable = context.schedule(Duration.ofDays(1),
            PunctuationType.WALL_CLOCK_TIME, getPunctuator(stateStore));
    }

    @Override
    public void process(String key, ExampleObject value) {
        
    }

    private static Punctuator getPunctuator(KeyValueStore&lt;String, ExampleObject&gt; stateStore) {
        return timestamp -&gt; {
            try (final KeyValueIterator&lt;String, ExampleObject&gt; iter = stateStore.all()) {
                while (iter.hasNext()) {
                    final KeyValue&lt;String, ExampleObject&gt; entry = iter.next();
                    if (some condition) {
                        // Update the object.
                        stateStore.put(entry.key, entry.value);
                        // OR delete the object.
                        stateStore.delete(entry.key);
                    }
                }
            }
        };
    }

    @Override
    public void close() {
        this.cancellable.cancel();
    }
}

Now the questions:

  1. Is this a legit usage of the ProcessorAPI in my case? Or do I need to use Topology.addProcessor()? Or are those two are essentially the same?
  2. Do I need to commit anything?
  3. I'm doing Ktable.toStream() since process() is a terminal operation. Do I just need to use transformValues() instead and put it somewhere before aggregate()? As I understand, transform is stateful unlike process, how does this affect the performance? Will it change the existing topology and corrupt the changelog topic because of that?
  4. Since I only care about accessing the StateStore, do I need to do anything in the process() method?
  5. Is there any performance difference between STREAM_TIME and WALL_CLOCK_TIME? Let's suppose with my data they are going to go side by side, so it's not the question if one happens more often than the other, I mean, are they are managed by the same thread as the task, or are there any quirks?
  6. Is the operation in Punctuator going to update the changelog topic too?
  7. Is adding that kind of operation to an existing stateful app considered a change in topology, and will it corrupt the existing data?

Thank you!
=====UPDATE======
I use the following code to check if update is happening in the stateStore, and I see that the Punctuator always receives the value that's not updated. So the update is either not written or lost.

The timestamped stateStore is returned from the context:

public void init(ProcessorContext context) {
    this.context = context;
    KeyValueStore&lt;String, ValueAndTimestamp&lt;ExampleObject&gt;&gt; stateStore = 
        (KeyValueStore&lt;String, ValueAndTimestamp&lt;ExampleObject&gt;&gt;) context.getStateStore(this.stateStoreName);
    this.cancellable = context.schedule(Duration.ofMinutes(5),
        PunctuationType.WALL_CLOCK_TIME, getPunctuator(stateStore, stateStoreName, context));
}

Then I read, update and then I read again and the logger logs unchanged value:

    private Punctuator getPunctuator(KeyValueStore&lt;String, ValueAndTimestamp&lt;ExampleObject&gt;&gt; stateStore) {
        return timestamp -&gt; {
            try (final KeyValueIterator&lt;String, ValueAndTimestamp&lt;ExampleObject&gt;&gt; iter = stateStore.all()) {
                String testId = &quot;someId&quot;;
                logger.info(&quot;Punctuator started with stateStore {}&quot;, stateStoreName);
                while (iter.hasNext()) {
                    final KeyValue&lt;String, ValueAndTimestamp&lt;ExampleObject&gt;&gt; entry = iter.next();
                    String key = entry.key;
                    if (testId.equals(key)) {
                        ExampleObject value = entry.value.value();
                        logger.info(
                            &quot;PunctuatorProcessor in action, storeName {} testId {}, current ExampleObject {}&quot;,
                            stateStoreName, key, value);
                        boolean stripped = stripElement(value);
                        logger.info(
                            &quot;PunctuatorProcessor in action, storeName {} testId {}, found and stripped: {}&quot;,
                            stateStoreName, key, stripped);
                        if (stripped) {
                            stateStore.put(key, ValueAndTimestamp.make(value, context.timestamp()));
                        }
                    }
                }
            }
        }

;
        }

Why the value is always unchanged? The punctuator Duration is 10 mins.

答案1

得分: 1

  1. 这是在我的情况下正确使用 ProcessorAPI 吗?还是我需要使用 Topology.addProcessor()?或者这两者本质上是一样的吗? 这是合法使用处理器 API 来清理您的状态存储的方式。事实上,这几乎是唯一的方式。需要注意的一点是,如果迭代整个存储所花费的时间超过任务超时时间,将导致应用程序崩溃。Process() 方法在底层实际上执行的是 Topology.addProcessor(),所以这就是全部。

  2. 我需要提交任何东西吗? 不需要提交任何东西。如果您的用例是清理存储,那就是这样了。

  3. 我正在使用 Ktable.toStream(),因为 process() 是一个终端操作。我只需要在 aggregate() 之前使用 transformValues() 吗?据我了解,transform 是有状态的,不像 process,这对性能有什么影响?它会更改现有的拓扑并因此损坏更改日志主题吗? process 在 KafkaStreams 版本 3.3.x 之前是终端操作。在这些版本中,使用 transform 操作。您的 transform 操作是有状态的(因为附加了一个存储),但总体而言,它不必是有状态的。您的操作的有状态性不会影响性能,因为您没有发布新消息或重新分区。它不会损害您的拓扑,但可能会对其进行轻微更改。如果您想比较拓扑,请使用 describe 方法进行比较。但它会影响整体消息处理,因为在 punctuator 运行时,子拓扑中不会处理消息(其他处理器不会运行)。

  4. 由于我只关心访问 StateStore,我需要在 process() 方法中做任何事情吗? 如果您使用 process,它是终端操作,无需进一步工作。如果您要使用 transform,则需要将传入的记录转发,否则它们将被简单丢弃。这个操作是使用 context.forward() 方法执行的。

  5. STREAM_TIME 和 WALL_CLOCK_TIME 之间有性能差异吗?假设在我的数据中,它们将并行进行,所以问题不是其中一个发生得比另一个频繁,我的意思是,它们是否由与任务相同的线程管理,或者是否存在任何怪癖? 我会从这里开始:链接。但简而言之:壁钟时间总是前进的(这是您的情况所需的),但流时间只在记录到达时前进。因此,如果您的流很少有任何内容发布到其中,您的 punctuator 将无法准时运行。

  6. Punctuator 中的操作会更新更改日志主题吗? 是的。所有本地状态存储都由更改日志主题支持,所以 tombstones 将被发布到主题。

  7. 向现有的有状态应用程序添加这种操作是否被视为拓扑的更改,会损坏现有数据吗? 这是拓扑的更改,因为您正在添加一个处理器。它不会损坏数据,因为您没有更改发布到 Kafka 的对象。此外,如果您发布的是 JSON 对象,那么不会有数据损坏,但对于 Avro 或 Protobuf,情况可能不同。

总的来说,您的代码看起来很稳固,但我会对 punctuator 占用大量时间感到有些担忧,因为它将停止子拓扑的消息处理,只要它运行。

英文:

Since there are multiple questions it's best to answer one by one:

  1. Is this a legit usage of the ProcessorAPI in my case? Or do I need to use Topology.addProcessor()? Or are those two are essentially the same? It is a legit use of processor API to clean up your state stores. In fact that's pretty much the way to do it. One thing to be mindful of is that if the time taken to iterate through the whole store will be longer than task timeout it will cause your app to crash. Process() method under the hood does pretty much Topology.addProcessor() so this is all.

  2. Do I need to commit anything? No you do not. You are not forwarding any records so if your use case is a store clean up this is it.

  3. I'm doing Ktable.toStream() since process() is a terminal operation. Do I just need to use transformValues() instead and put it somewhere before aggregate()? As I understand, transform is stateful unlike process, how does this affect the performance? Will it change the existing topology and corrupt the changelog topic because of that? process is a terminal operation in versions of KafkaStreams before 3.3.x. In these versions utilize the transform operation. Your transform operation is stateful (because of attaching a store), but overall it doesn't have to be stateful. The statefulness of your operations doesn't impact performance here since you are not publishing new messages, or repartitioning. It will not hurt your topology, but it may cause a small change to it. If you'd like to compare topologies use describe method and compare them. However it will impact overall message processing, since while the punctuator is running messages are not processed in the subtopology (other processors are not running).

  4. Since I only care about accessing the StateStore, do I need to do anything in the process() method? If you are using process it is terminal, and no further work needs to be done. If you were to use transform you'd need to forward incoming records, otherwise they would simply be lost. This operation is performed using context.forward() method.

  5. Is there any performance difference between STREAM_TIME and WALL_CLOCK_TIME? Let's suppose with my data they are going to go side by side, so it's not the question if one happens more often than the other, I mean, are they are managed by the same thread as the task, or are there any quirks? I'd start here: link. But long story short: wall clock time advances always (it's the one you want for your case), but the stream time advances only when records arrive. So if you had a stream that rarely had anything published to it your punctuator wouldn't run on time.

  6. Is the operation in Punctuator going to update the changelog topic too? Yes. All local state stores are backed by changelog topics, so tombstones will be published to the topic.

  7. Is adding that kind of operation to an existing stateful app considered a change in topology, and will it corrupt the existing data? It is a change to topology since you are adding a processor. It will not corrupt data, since you are not changing the objects published to Kafka. Also if you were publishing JSON objects there wouldn't be data corruption, but in case of Avro or Protobuf.

Overall your code looks solid, I would be a little worried about the punctuator taking a lot of time, because it will stop the processing of messages for the subtopology as long as it runs.

huangapple
  • 本文由 发表于 2023年6月29日 00:11:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/76574983.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定