全局窗口存储无法恢复。

huangapple go评论69阅读模式
英文:

Global window store cannot be restored

问题

通过StreamsBuilder,Kafka Streams提供了定义全局窗口存储的方式。一旦定义好,就可以通过将事件发送到与该存储相关联的主题来填充该存储。

相比之下,本地存储是通过使用put方法来操作键来填充的(请参见TimeFirstWindowKeySchema.toStoreKeyBinary),但对于全局存储,没有什么可以管理/构建内部键。因此,当尝试恢复全局窗口存储时,会因键格式不正确(请参见WindowKeySchema#extractStoreTimestamp)而引发异常。

已实施的逻辑:

  • 创建一个StreamsBuilder
  • 添加一个全局窗口存储
  • 启动流
  • 通过将事件发送到与存储相关联的主题来填充流
  • 停止流
  • 创建另一个流,使用另一个临时目录以允许从主题重建存储
  • 异常

异常

Caused by: java.lang.IndexOutOfBoundsException
at java.base/java.nio.Buffer.checkIndex(Buffer.java:749)
at java.base/java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:491)
at org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp(WindowKeySchema.java:211)
at org.apache.kafka.streams.state.internals.WindowKeySchema.segmentTimestamp(WindowKeySchema.java:75)

问题:

  1. 定义全局窗口存储是否有意义?
  2. 如果是的话,Kafka是否会/应该使用记录的时间戳来建立窗口和记录之间的关联?

我们正在尝试解决的一般问题:

我们希望有一种自动的方式来清理与全局存储相关联的RocksDB。使用键值存储时,该数据库不会被清理。

欢迎任何想法或建议。

英文:

Through the StreamsBuilder, Kafka streams provides a way to define global window stores. Once defined, it's possible to populate that store by firing an event into the topic associated to that store.

In comparison, a local store is populated through the put method manipulating the key (see TimeFirstWindowKeySchema.toStoreKeyBinary) but for a global store, there is nothing to manage/build that internal key. As a consequence, an exception will be fired when trying to restore a global window store due to the key format which is not correct (see WindowKeySchema#extractStoreTimestamp)

Implemented logic:

  • create a StreamsBuilder
  • add a global window store
  • start the stream
  • populate the stream by firing events into the topic associated to the store
  • stop the stream
  • create another stream with another temp dir to allow rebuilding the store from the topic
  • EXCEPTION

Exception

Caused by: java.lang.IndexOutOfBoundsException
at java.base/java.nio.Buffer.checkIndex(Buffer.java:749)
at java.base/java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:491)
at org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp(WindowKeySchema.java:211)
at org.apache.kafka.streams.state.internals.WindowKeySchema.segmentTimestamp(WindowKeySchema.java:75)

Questions:

  1. does it make sense to define a global window store?
  2. if yes, does Kafka will/should use the timestamp of the record to make the link between a window and the record?

General issue we are trying to solve:

We want an automatic way to clean the RocksDB associated to a global store. This DB is not cleaned when using a key value store.

Any ideas or suggestions are welcome.

答案1

得分: 2

  1. 使用具有可供存储器消耗和管理的键的重新分配主题。如果主题包含墓碑,Kafka Streams API会自动为您进行清理。是否有特殊原因要使用GlobalStore,比如需要跨多个分区访问数据?
  2. 使用Processor API和Punctuator,在您的数据中添加某种时间戳,并在记录过期时由Punctuator进行数据清理。过期可以意味着1天、1周或在您认为消息不再相关时。这也可以通过利用某些对象字段的组合来实现,仅在对象满足某种状态时过期。
  3. 使用带有Punctuator的TimestampedKeyValueStore进行清理。

我建议尽量避免使用GlobalStore,因为这些通常不会很好地扩展,并增加了对代理的负载(除非您的设计另有规定)。

英文:

You have a couple of options for handling this:

  1. Create a repartition topic with a key which the store can consume and manage. If the topic contains tombstones kafka streams API manages the cleanup for you. Is there any particular reason to utilize a GlobalStore, like the need to access data across multiple partitions?
  2. Use the Processor API and Punctuator, add some kind of timestamp to your data and have the punctuator do the cleanup of your data when the record expires. That can mean 1 day, a week or whenever you feel that the message is no longer relevant. This can also be done utilizing some combination of object fields to expire only when object meets certain state.
  3. Use TimestampedKeyValueStore with a Punctuator for cleanup.

I'd try to stay away from a GlobalStore because these tend not to scale well and increase the load on the brokers (unless your design mandates otherwise)

huangapple
  • 本文由 发表于 2023年6月27日 20:00:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/76564649.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定