Spring Kafka可以在运行时重新创建Kafka Stream拓扑。

huangapple go评论66阅读模式
英文:

Spring kafka re-create Kafka Stream Topology in runtime

问题

我有一个基于Spring Boot、Spring Kafka和Kafka Streams的应用程序。
应用程序启动时,它会使用默认的主题列表创建Kafka Streams拓扑结构。
我需要在运行时编辑/重新创建拓扑结构。例如,当应用程序已经在运行时,会出现一个新的主题名称,我希望将此主题添加到我的拓扑中。
目前我在考虑以某种方式删除现有的拓扑结构,关闭和清理KafkaStreams,运行创建带有新主题名称的拓扑结构的逻辑,然后再次启动KafkaStreams。我不想重新启动我的应用程序。
有人可以建议我如何在运行时做到这一点吗?

英文:

I have an application that is based on spring boot, spring-kafka and kafka-streams.
When application starts up, it creates kafka streams topology with default list of topics.
What I need to do is edit/recreate topology in runtime. For example, when application already running, there is new topic name comes and I want to add this topic to my topology.
Currently I'm thinking about somehow to delete existing topology, close and clean up KafkaStreams, run logic where I create topology but with new topic name and start KafkaStreams again. I don`t want to restart my application.
Can someone suggest me how to do this in runtime?

答案1

得分: 1

我找到了1个解决方案
我扩展了StreamsBuilderFactoryBean

    @Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
    @Primary
    public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
        return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
    }

    public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {

        private StreamsBuilder instance;

        public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
            super(streamsConfig);
        }

        @Override
        public boolean isSingleton() {
            return false;
        }

        @Override
        protected synchronized StreamsBuilder createInstance() {
            if (instance == null) {
                instance = new StreamsBuilder();
            }
            return instance;
        }

        @Override
        public synchronized void stop() {
            instance = null;
            super.stop();
        }
    }

而当我构建拓扑时我不使用StreamsBuilder而是使用StreamsBuilderFactoryBean#getObject()

    @Component
public class DynamicStream {

    private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;

    public void init() {
        StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
            // 构建拓扑
    }

    // 当需要重新配置流时调用此方法
    public void reinitialize() {
        streamsBuilderFactoryBean.stop();
        init();
        streamsBuilderFactoryBean.start();
    }
}
英文:

I found 1 solution.
I extend StreamsBuilderFactoryBean:

@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}
public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {
private StreamsBuilder instance;
public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
super(streamsConfig);
}
@Override
public boolean isSingleton() {
return false;
}
@Override
protected synchronized StreamsBuilder createInstance() {
if (instance == null) {
instance = new StreamsBuilder();
}
return instance;
}
@Override
public synchronized void stop() {
instance = null;
super.stop();
}
}

And when I build topology, I, instead of using StreamsBuilder, use StreamsBuilderFactoryBean#getObject():

@Component

public class DynamicStream {

private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
public void init() {
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
//build topology
}
//call this method when stream reconfiguration is needed
public void reinitialize() {
streamsBuilderFactoryBean.stop();
init();
streamsBuilderFactoryBean.start();
}

}

huangapple
  • 本文由 发表于 2020年4月11日 00:24:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/61144434.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定