Spring Kafka可以在运行时重新创建Kafka Stream拓扑。

huangapple go评论100阅读模式
英文:

Spring kafka re-create Kafka Stream Topology in runtime

问题

我有一个基于Spring Boot、Spring Kafka和Kafka Streams的应用程序。
应用程序启动时,它会使用默认的主题列表创建Kafka Streams拓扑结构。
我需要在运行时编辑/重新创建拓扑结构。例如,当应用程序已经在运行时,会出现一个新的主题名称,我希望将此主题添加到我的拓扑中。
目前我在考虑以某种方式删除现有的拓扑结构,关闭和清理KafkaStreams,运行创建带有新主题名称的拓扑结构的逻辑,然后再次启动KafkaStreams。我不想重新启动我的应用程序。
有人可以建议我如何在运行时做到这一点吗?

英文:

I have an application that is based on spring boot, spring-kafka and kafka-streams.
When application starts up, it creates kafka streams topology with default list of topics.
What I need to do is edit/recreate topology in runtime. For example, when application already running, there is new topic name comes and I want to add this topic to my topology.
Currently I'm thinking about somehow to delete existing topology, close and clean up KafkaStreams, run logic where I create topology but with new topic name and start KafkaStreams again. I don`t want to restart my application.
Can someone suggest me how to do this in runtime?

答案1

得分: 1

  1. 我找到了1个解决方案
  2. 我扩展了StreamsBuilderFactoryBean
  3. @Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
  4. @Primary
  5. public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
  6. return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
  7. }
  8. public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {
  9. private StreamsBuilder instance;
  10. public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
  11. super(streamsConfig);
  12. }
  13. @Override
  14. public boolean isSingleton() {
  15. return false;
  16. }
  17. @Override
  18. protected synchronized StreamsBuilder createInstance() {
  19. if (instance == null) {
  20. instance = new StreamsBuilder();
  21. }
  22. return instance;
  23. }
  24. @Override
  25. public synchronized void stop() {
  26. instance = null;
  27. super.stop();
  28. }
  29. }
  30. 而当我构建拓扑时我不使用StreamsBuilder而是使用StreamsBuilderFactoryBean#getObject()
  31. @Component
  32. public class DynamicStream {
  33. private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
  34. public void init() {
  35. StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
  36. // 构建拓扑
  37. }
  38. // 当需要重新配置流时调用此方法
  39. public void reinitialize() {
  40. streamsBuilderFactoryBean.stop();
  41. init();
  42. streamsBuilderFactoryBean.start();
  43. }
  44. }
英文:

I found 1 solution.
I extend StreamsBuilderFactoryBean:

  1. @Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
  2. @Primary
  3. public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
  4. return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
  5. }
  6. public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {
  7. private StreamsBuilder instance;
  8. public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
  9. super(streamsConfig);
  10. }
  11. @Override
  12. public boolean isSingleton() {
  13. return false;
  14. }
  15. @Override
  16. protected synchronized StreamsBuilder createInstance() {
  17. if (instance == null) {
  18. instance = new StreamsBuilder();
  19. }
  20. return instance;
  21. }
  22. @Override
  23. public synchronized void stop() {
  24. instance = null;
  25. super.stop();
  26. }
  27. }

And when I build topology, I, instead of using StreamsBuilder, use StreamsBuilderFactoryBean#getObject():

  1. @Component

public class DynamicStream {

  1. private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
  2. public void init() {
  3. StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
  4. //build topology
  5. }
  6. //call this method when stream reconfiguration is needed
  7. public void reinitialize() {
  8. streamsBuilderFactoryBean.stop();
  9. init();
  10. streamsBuilderFactoryBean.start();
  11. }

}

huangapple
  • 本文由 发表于 2020年4月11日 00:24:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/61144434.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定