英文:
Using Custom Kafka State Stores in kstreams application
问题
我们正在使用包含在Spring Cloud Stream Hoxton RC7项目中的Kafka Streams(因此使用提供的Kafka Streams和Kafka Client版本[2.3.1])。
ext {
set('springCloudVersion', 'Hoxton.SR7')
}
...
dependencies {
// spring cloud stream
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation("org.springframework.cloud:spring-cloud-stream")
// redis
implementation 'io.lettuce:lettuce-core'
implementation 'org.springframework.data:spring-data-redis'
testCompile 'it.ozimov:embedded-redis:0.7.2'
...
}
我们已经实现了一个KStreams应用程序:
@Bean
public Consumer<KStream<String, IncomingEvent>> process() {
return input -> {
在这里,我们执行一些聚合操作,例如:
.aggregate(Foo::new, (key, value1, aggregate) ->
(aggregate == null || aggregate.getLastModified() == null || this.mustProcess(key, value1))
? value1
: aggregate,
materialized
)
现在,materialized应该是一个自定义的外部状态存储(Redis):
Materialized<String, Foo, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.as("redis-store");
这是由StoreBuilder Bean提供的:
@Bean
public StoreBuilder<KeyValueStore<String, Foo>> builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
new Serdes.StringSerde(),
new SomeFooSerde());
}
public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {
return new KeyValueBytesStoreSupplier() {
@Override
public String name() {
return "redis-store";
}
@Override
public KeyValueStore<Bytes, byte[]> get() {
return redisKeyValueStoreBytes;
}
@Override
public String metricsScope() {
return "redis-session-state";
}
};
}
我现在使用EmbeddedKafka测试应用程序:
@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@SpringBootTest(classes = {TestConfigurationTests.class})
@EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
public class TestKafkaIntegration {
在这里,我尝试访问状态存储并查询添加的项:
ReadOnlyKeyValueStore<String, Foo> queryableStore = interactiveQueryService.getQueryableStore(
"redis-store", QueryableStoreTypes.keyValueStore());
return queryableStore;
但是,当我运行测试时,我收到以下错误:
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore redis-store is already added.
一些问题:
- 1 中解释的使用自定义状态存储的示例是在Processor中使用的。这是否自动意味着我无法在聚合中使用自定义状态存储?
- 当不可能在聚合中使用它时,使用自定义状态存储的意义是什么?
- 当我稍微更改上面的KStreams代码,定义一个Processor而不是在聚合方法中使用materialized时,错误会更改,它会抱怨在执行getQueryableStore时缺少状态“redis-store”存储。但事实上,我可以看到addStateStoreBeans注册了'redis-store'。这是如何发生的?
- 使用自定义外部状态存储时,是否能够在应用程序重新启动时恢复到上一个状态?
1 https://spring.io/blog/2019/12/09/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-6-state-stores-and-interactive-queries
英文:
We are using Kafka-streams included within the spring cloud stream Hoxton RC7 project (and therefore use the Kafka-streams and Kafka-client versions provided [2.3.1])
ext {
set('springCloudVersion', 'Hoxton.SR7')
}
...
dependencies {
// spring cloud stream
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation("org.springframework.cloud:spring-cloud-stream")
// redis
implementation 'io.lettuce:lettuce-core'
implementation 'org.springframework.data:spring-data-redis'
testCompile 'it.ozimov:embedded-redis:0.7.2'
...
We have implemented a kstreams application
@Bean
public Consumer<KStream<String, IncomingEvent>> process() {
return input -> {
Where we do some aggregation within like:
.aggregate(Foo::new, (key, value1, aggregate) ->
(aggregate == null || aggregate.getLastModified() == null || this.mustProcess(key, value1))
? value1
: aggregate,
materialized
)
Now materialized should be a custom external state store (Redis):
Materialized<String, Foo, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.as("redis-store");
Which is provided by a StoreBuilder Bean:
@Bean
public StoreBuilder<KeyValueStore<String, Foo>> builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
new Serdes.StringSerde(),
new SomeFooSerde());
}
public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {
return new KeyValueBytesStoreSupplier() {
@Override
public String name() {
return "redis-store";
}
@Override
public KeyValueStore<Bytes, byte[]> get() {
return redisKeyValueStoreBytes;
}
@Override
public String metricsScope() {
return "redis-session-state";
}
};
}
I now test the application with an EmbeddedKafka:
@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@SpringBootTest(classes = {TestConfigurationTests.class})
@EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
public class TestKafkaIntegration {
Where I try to access the state store and query the items added:
ReadOnlyKeyValueStore<String, Foo> queryableStore = interactiveQueryService.getQueryableStore(
"redis-store", QueryableStoreTypes.keyValueStore());
return queryableStore;
But when I run my test I receive an error:
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore redis-store is already added.
Several questions:
- The examples for using custom state stores explained by 1 use it within a Processor. Does this automatically mean, I am not able to use a custom state store in an aggregation?
- When it is not possible to use it within an aggregation, what is the point of using custom state stores anyway?
- When I slightly change the code above for the kstreams and define a processor instead of using materialized in the aggregate method, the error changes, it then complains about a missing state "redis-store" store while trying to execute getQueryableStore. But indeed i can see, that the addStateStoreBeans registers the 'redis-store'. How can this happen?
The reason why I want to use a custom state store is, that I am not (really easily) able to have a dedicated hard disk for the application instance. To have a fast startup for the application I want to avoid processing of the complete changelog on each startup of the application (which should preferably take place several times a day and currently takes more than an hour). So now the last question:
- When a custom external state store is used, am I able to resume to the last state on application restart?
1 https://spring.io/blog/2019/12/09/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-6-state-stores-and-interactive-queries
答案1
得分: 1
你正在使用Materialized.as(java.lang.String storeName),它将创建一个具有给定名称的StateStore
(在此处是"redis-store")。另一方面,使用builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes)
,你正在创建另一个具有相同名称的StateStore
,Spring框架可能会自动将其添加到拓扑中,因此你会收到"store is already added"错误。
q1:你可以在聚合中使用自定义状态存储;使用Materialized.as(KeyValueBytesStoreSupplier supplier)。
q2:也可以在转换器或自定义处理器中使用StateStore
进行交互查询;另外,使用全局StateStore
可以访问整个主题,而不仅仅是KafkaStreams实例已分配的分区(请参阅addGlobalStore和globalTable)。
q3:我猜你没有(手动)将状态存储注册到拓扑中;请参阅Topology.addStateStore(StoreBuilder> storeBuilder, java.lang.String... processorNames)和连接处理器和状态存储。
q4:是的,状态存储从更改日志主题加载(在使用优化时可能是原始主题)。
英文:
You are using Materialized.as(java.lang.String storeName) which will create (materialize) a StateStore
with the given name ("redis-store" here). On the other hand, with builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes)
you are creating another StateStore
having the same name, which springframework is probably automatically adding it to the topology, such that you get "store is already added" error.
q1: you can use a custom state store in an aggregation; use it with Materialized.as(KeyValueBytesStoreSupplier supplier)
q2: one could also use a StateStore
with a transformer or a custom processor for interactive queries; also with a global StateStore
one could access the entire topic instead of KafkaStreams instance only allocated partitions (see addGlobalStore and globalTable)
q3: I guess you didn't (manually) register the state store with the topology; see Topology.addStateStore(StoreBuilder<?> storeBuilder, java.lang.String... processorNames) and Connecting Processors and State Stores
q4: yes, the state store is loaded from a changelog topic (could be the original topic when using optimizations)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论