英文:
Spring kafka re-create Kafka Stream Topology in runtime
问题
我有一个基于Spring Boot、Spring Kafka和Kafka Streams的应用程序。
应用程序启动时,它会使用默认的主题列表创建Kafka Streams拓扑结构。
我需要在运行时编辑/重新创建拓扑结构。例如,当应用程序已经在运行时,会出现一个新的主题名称,我希望将此主题添加到我的拓扑中。
目前我在考虑以某种方式删除现有的拓扑结构,关闭和清理KafkaStreams,运行创建带有新主题名称的拓扑结构的逻辑,然后再次启动KafkaStreams。我不想重新启动我的应用程序。
有人可以建议我如何在运行时做到这一点吗?
英文:
I have an application that is based on spring boot, spring-kafka and kafka-streams.
When application starts up, it creates kafka streams topology with default list of topics.
What I need to do is edit/recreate topology in runtime. For example, when application already running, there is new topic name comes and I want to add this topic to my topology.
Currently I'm thinking about somehow to delete existing topology, close and clean up KafkaStreams, run logic where I create topology but with new topic name and start KafkaStreams again. I don`t want to restart my application.
Can someone suggest me how to do this in runtime?
答案1
得分: 1
我找到了1个解决方案。
我扩展了StreamsBuilderFactoryBean:
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}
public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {
private StreamsBuilder instance;
public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
super(streamsConfig);
}
@Override
public boolean isSingleton() {
return false;
}
@Override
protected synchronized StreamsBuilder createInstance() {
if (instance == null) {
instance = new StreamsBuilder();
}
return instance;
}
@Override
public synchronized void stop() {
instance = null;
super.stop();
}
}
而当我构建拓扑时,我不使用StreamsBuilder,而是使用StreamsBuilderFactoryBean#getObject():
@Component
public class DynamicStream {
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
public void init() {
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
// 构建拓扑
}
// 当需要重新配置流时调用此方法
public void reinitialize() {
streamsBuilderFactoryBean.stop();
init();
streamsBuilderFactoryBean.start();
}
}
英文:
I found 1 solution.
I extend StreamsBuilderFactoryBean:
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}
public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {
private StreamsBuilder instance;
public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
super(streamsConfig);
}
@Override
public boolean isSingleton() {
return false;
}
@Override
protected synchronized StreamsBuilder createInstance() {
if (instance == null) {
instance = new StreamsBuilder();
}
return instance;
}
@Override
public synchronized void stop() {
instance = null;
super.stop();
}
}
And when I build topology, I, instead of using StreamsBuilder, use StreamsBuilderFactoryBean#getObject():
@Component
public class DynamicStream {
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
public void init() {
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
//build topology
}
//call this method when stream reconfiguration is needed
public void reinitialize() {
streamsBuilderFactoryBean.stop();
init();
streamsBuilderFactoryBean.start();
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论