
huangapple go评论70阅读模式

Using Custom Kafka State Stores in kstreams application


我们正在使用包含在Spring Cloud Stream Hoxton RC7项目中的Kafka Streams(因此使用提供的Kafka Streams和Kafka Client版本[2.3.1])。

ext {
    set('springCloudVersion', 'Hoxton.SR7')

dependencies {
    // spring cloud stream
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    // redis 
    implementation 'io.lettuce:lettuce-core'
    implementation 'org.springframework.data:spring-data-redis'
    testCompile 'it.ozimov:embedded-redis:0.7.2'


public Consumer<KStream<String, IncomingEvent>> process() {

    return input -> {


.aggregate(Foo::new, (key, value1, aggregate) ->
                (aggregate == null || aggregate.getLastModified() == null || this.mustProcess(key, value1))
                        ? value1
                        : aggregate,


Materialized<String, Foo, KeyValueStore<Bytes, byte[]>> materialized =

这是由StoreBuilder Bean提供的:

public StoreBuilder<KeyValueStore<String, Foo>> builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
    return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
            new Serdes.StringSerde(),
            new SomeFooSerde());

public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {

    return new KeyValueBytesStoreSupplier() {
        public String name() {
            return "redis-store";

        public KeyValueStore<Bytes, byte[]> get() {
            return redisKeyValueStoreBytes;

        public String metricsScope() {
            return "redis-session-state";


@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@SpringBootTest(classes = {TestConfigurationTests.class})
@EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
public class TestKafkaIntegration {


ReadOnlyKeyValueStore<String, Foo> queryableStore = interactiveQueryService.getQueryableStore(
        "redis-store", QueryableStoreTypes.keyValueStore());
return queryableStore;


Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore redis-store is already added.


  • 1 中解释的使用自定义状态存储的示例是在Processor中使用的。这是否自动意味着我无法在聚合中使用自定义状态存储?
  • 当不可能在聚合中使用它时,使用自定义状态存储的意义是什么?
  • 当我稍微更改上面的KStreams代码,定义一个Processor而不是在聚合方法中使用materialized时,错误会更改,它会抱怨在执行getQueryableStore时缺少状态“redis-store”存储。但事实上,我可以看到addStateStoreBeans注册了'redis-store'。这是如何发生的?
  • 使用自定义外部状态存储时,是否能够在应用程序重新启动时恢复到上一个状态?

1 https://spring.io/blog/2019/12/09/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-6-state-stores-and-interactive-queries


We are using Kafka-streams included within the spring cloud stream Hoxton RC7 project (and therefore use the Kafka-streams and Kafka-client versions provided [2.3.1])

ext {
    set(&#39;springCloudVersion&#39;, &#39;Hoxton.SR7&#39;)

dependencies {
    // spring cloud stream
    implementation &#39;org.springframework.cloud:spring-cloud-stream-binder-kafka-streams&#39;
    implementation &#39;org.springframework.cloud:spring-cloud-stream-binder-kafka&#39;
    // redis 
    implementation &#39;io.lettuce:lettuce-core&#39;
    implementation &#39;org.springframework.data:spring-data-redis&#39;
    testCompile &#39;it.ozimov:embedded-redis:0.7.2&#39;

We have implemented a kstreams application

public Consumer&lt;KStream&lt;String, IncomingEvent&gt;&gt; process() {

    return input -&gt; {

Where we do some aggregation within like:

.aggregate(Foo::new, (key, value1, aggregate) -&gt;
                (aggregate == null || aggregate.getLastModified() == null || this.mustProcess(key, value1))
                        ? value1
                        : aggregate,


Now materialized should be a custom external state store (Redis):

Materialized&lt;String, Foo, KeyValueStore&lt;Bytes, byte[]&gt;&gt; materialized =

Which is provided by a StoreBuilder Bean:

public StoreBuilder&lt;KeyValueStore&lt;String, Foo&gt;&gt; builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
    return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
            new Serdes.StringSerde(),
            new SomeFooSerde());

public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {

    return new KeyValueBytesStoreSupplier() {
        public String name() {
            return &quot;redis-store&quot;;

        public KeyValueStore&lt;Bytes, byte[]&gt; get() {
            return redisKeyValueStoreBytes;

        public String metricsScope() {
            return &quot;redis-session-state&quot;;

I now test the application with an EmbeddedKafka:

@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@SpringBootTest(classes = {TestConfigurationTests.class})
@EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
public class TestKafkaIntegration {

Where I try to access the state store and query the items added:

ReadOnlyKeyValueStore&lt;String, Foo&gt; queryableStore = interactiveQueryService.getQueryableStore(
        &quot;redis-store&quot;, QueryableStoreTypes.keyValueStore());
return queryableStore;

But when I run my test I receive an error:

Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore redis-store is already added.

Several questions:

  • The examples for using custom state stores explained by 1 use it within a Processor. Does this automatically mean, I am not able to use a custom state store in an aggregation?
  • When it is not possible to use it within an aggregation, what is the point of using custom state stores anyway?
  • When I slightly change the code above for the kstreams and define a processor instead of using materialized in the aggregate method, the error changes, it then complains about a missing state "redis-store" store while trying to execute getQueryableStore. But indeed i can see, that the addStateStoreBeans registers the 'redis-store'. How can this happen?

The reason why I want to use a custom state store is, that I am not (really easily) able to have a dedicated hard disk for the application instance. To have a fast startup for the application I want to avoid processing of the complete changelog on each startup of the application (which should preferably take place several times a day and currently takes more than an hour). So now the last question:

  • When a custom external state store is used, am I able to resume to the last state on application restart?

1 https://spring.io/blog/2019/12/09/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-6-state-stores-and-interactive-queries


得分: 1

你正在使用Materialized.as(java.lang.String storeName),它将创建一个具有给定名称的StateStore(在此处是"redis-store")。另一方面,使用builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes),你正在创建另一个具有相同名称的StateStore,Spring框架可能会自动将其添加到拓扑中,因此你会收到"store is already added"错误。

q1:你可以在聚合中使用自定义状态存储;使用Materialized.as(KeyValueBytesStoreSupplier supplier)


q3:我猜你没有(手动)将状态存储注册到拓扑中;请参阅Topology.addStateStore(StoreBuilder storeBuilder, java.lang.String... processorNames)连接处理器和状态存储



You are using Materialized.as(java.lang.String storeName) which will create (materialize) a StateStore with the given name ("redis-store" here). On the other hand, with builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes) you are creating another StateStore having the same name, which springframework is probably automatically adding it to the topology, such that you get "store is already added" error.

q1: you can use a custom state store in an aggregation; use it with Materialized.as(KeyValueBytesStoreSupplier supplier)

q2: one could also use a StateStore with a transformer or a custom processor for interactive queries; also with a global StateStore one could access the entire topic instead of KafkaStreams instance only allocated partitions (see addGlobalStore and globalTable)

q3: I guess you didn't (manually) register the state store with the topology; see Topology.addStateStore(StoreBuilder<?> storeBuilder, java.lang.String... processorNames) and Connecting Processors and State Stores

q4: yes, the state store is loaded from a changelog topic (could be the original topic when using optimizations)

  • 本文由 发表于 2020年8月8日 20:36:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/63315492.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
