英文:
Kafka Streams: is writing to a persistent state store in a separate thread permitted?
问题
我需要遍历整个状态存储并更新一些记录。
由于标点符号是在与从Kafka消费的线程相同的线程上执行的,并且在执行标点符号时会停止消费,我可以从ProcessorContext
中获取可写状态存储并将可写状态存储传递给单独的线程,以便迭代和记录更新可以分开进行,不影响从Kafka消费和处理记录的线程的性能吗?
我看到RockDBStore.java
在源代码中是synchronized
的:https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L304
但是在我的测试中,我只看到了调试的CachingKeyValueStore
,它看起来也是线程安全的,因为它获取了写锁:
https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L267
那么我可以在单独的线程中写入存储,类似于下面的方式吗?
private void doStuff(KStream<String, ExampleObject> sourceStream,
Materialized<String, ExampleObject, KeyValueStore<Bytes, byte[]>> materialized, String tableName) {
KTable<String, ExampleObject> ktable = sourceStream.groupByKey()
.aggregate(() -> null, (id, newValue, existingValue) -> {...}, materialized);
ktable.toStream().process(new PunctuatorProcessorSupplier(tableName), tableName);
}
然后在处理器中,我只调度了Punctuator
,并且process()
方法什么也不做:
@Override
public void init(ProcessorContext context) {
KeyValueStore<String, ExampleObject> stateStore =
(KeyValueStore<String, ExampleObject>) context.getStateStore(this.stateStoreName);
this.cancellable = context.schedule(Duration.ofDays(1),
PunctuationType.WALL_CLOCK_TIME, getPunctuator(stateStore));
}
@Override
public void process(String key, ExampleObject value) {
// 什么也不做
}
private static Punctuator getPunctuator(KeyValueStore<String, ExampleObject> stateStore) {
return timestamp -> {
Thread th = new Thread(() -> {
try (final KeyValueIterator<String, ExampleObject> iter = stateStore.all()) {
while (iter.hasNext()) {
final KeyValue<String, ExampleObject> entry = iter.next();
if (some condition) {
// 更新对象。
stateStore.put(entry.key, entry.value);
}
}
}
});
th.start();
};
}
英文:
I need to iterate over the whole stat store and update some records.
Since the punctuation is executed on the same thread as the thread that consumes from Kafka, and consumption is stopped for the time while the punctuation is executed, can I get a writable store form the ProcessorContext
and then pass the writable state store to a separate thread, so the iteration and record updates happen separately, and doesn't affect the performance of the threads consuming from Kafka and processing records?
I do see that the RockDBStore.java
is synchronized
in the source code: https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L304
But in my test I only see in debug CachingKeyValueStore which also looks like thread safe because it acquires the write lock:
https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L267
So can I write to the store in separate thread, something like below?
private void doStuff(KStream<String, ExampleObject> sourceStream,
Materialized<String, ExampleObject, KeyValueStore<Bytes, byte[]>> materialized, String tableName) {
KTable<String, ExampleObject> ktable = sourceStream.groupByKey()
.aggregate(() -> null, (id, newValue, existingValue) -> {...}, materialized);
ktable.toStream().process(new PunctuatorProcessorSupplier(tableName), tableName);
}
And then in the Processor I only schedule the Punctuator
and the process()
method does nothing:
@Override
public void init(ProcessorContext context) {
KeyValueStore<String, ExampleObject> stateStore =
(KeyValueStore<String, ExampleObject>) context.getStateStore(this.stateStoreName);
this.cancellable = context.schedule(Duration.ofDays(1),
PunctuationType.WALL_CLOCK_TIME, getPunctuator(stateStore));
}
@Override
public void process(String key, ExampleObject value) {
// do nothing
}
private static Punctuator getPunctuator(KeyValueStore<String, ExampleObject> stateStore) {
return timestamp -> {
Thread th = new Thread(() -> {
try (final KeyValueIterator<String, ExampleObject> iter = stateStore.all()) {
while (iter.hasNext()) {
final KeyValue<String, ExampleObject> entry = iter.next();
if (some condition) {
// Update the object.
stateStore.put(entry.key, entry.value);
}
}
}
});
th.start();
};
}
答案1
得分: 3
这是非常不好的做法,而且很可能会被Kafka Streams的内部组件捕捉到(希望如此)。
原因在于,Streams在内部需要知道哪个输入记录导致了哪些记录被写入到变更日志(或通过ProcessorContext#forward()
写入输出主题)。因此,它并不是为了这种使用模式而设计的。
我理解你尝试的是什么,例如对存储Example
对象的方式进行模式迁移... 你是正确的,处理过程中执行store.all()
也是有风险的,因为它(最好的情况下)会导致停顿,最坏的情况下会导致超时,特别是如果你启用了精确一次语义。
考虑到你尝试的任务本身就很困难,我建议在开发环境中尝试一下,看看会发生什么。如果失败了,更安全的做法是生成一些“pill”消息,触发标点符号定期处理一次一百条记录。每个“pill”消息应包含你留下的键(以便进行范围扫描),并在某个其他主题中生成一个包含下一个“pill”开始信息的输出消息。
英文:
That is very bad practice and probably (hopefully) would be caught by the internals of Kafka Streams.
The reason for this is that Streams internally needs to know which input record caused which records to be written to the changelog (or the output topic via ProcessorContext#forward()
. Therefore, it is not designed with this usage pattern in mind.
I see what you're trying to do, for example a schema migration of the way you store your Example
objects...and you're correct that doing a store.all()
during processing is also Risky Business as it (at best) causes stalls and at worst causes timeouts, especially if you have exactly-once-semantics enabled.
Given that what you're trying is difficult anyways, I would give it a try in dev and see what happens. If it fails, a safer way to do it would be to produce a few "pill" messages that trigger the punctuation to iterate through a few 100 records at a time. Each "pill" message should contain the key that you left off on (so you can do a range scan), and produce an output message to some other topic containing info for the next "pill" to start.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论