Spring Kafka消费者 – 从复杂对象中获取主题

huangapple go评论152阅读模式
英文:

Spring Kafka Consumer - Getting topics from a complex object

问题

我正在开发一个Spring Boot Kafka消费者应用程序。它将有不同的消费者处理不同的主题。所有消费者的信息将来自于application.yml文件。

application:
  kafka:
    consumer-config:
      - name: consumer-a
        topics: topic1,topic2
        ...
      - name: consumer-b
        topics: topic3,topic4
        ...

我无法从应用程序属性中设置主题列表到KafkaListener上。

我尝试了以下方法:

@KafkaListener(topics = "${application.kafka.consumer-config[0].topics}", containerFactory = "kafkaListenerContainerFactory")

@KafkaListener(topics = "${application.kafka.consumer-config.?[name == 'consumer-a'].topics}", containerFactory = "kafkaListenerContainerFactory")

在这两种情况下,我都遇到了以下错误:

java.lang.IllegalArgumentException: 无法解析占位符

从应用程序属性中获取主题并将其设置到KafkaListener主题上的最佳方法是什么?

英文:

I am working on a spring boot kafka consumer application. It will have different consumers working on different topics. All the information for the consumers will come from the application.yml file.

<!-- begin snippet: js hide: false console: true babel: false -->

<!-- language: lang-html -->

application:
  kafka:
    consumer-config:
      - name: consumer-a
        topics: topic1,topic2
        ......
      - name: consumer-b
        topics: topic3,topic4
        .....

<!-- end snippet -->

I am not able to set the list of topics from the application properties on to the KafkaListener.

I tried the following:

<!-- begin snippet: js hide: false console: true babel: false -->

<!-- language: lang-html -->

@KafkaListener(topics = &quot;#{&#39;${application.kafka.consumer-config[0].topics}&#39;.split(&#39;,&#39;)}&quot;,containerFactory = &quot;kafkaListenerContainerFactory&quot;)


@KafkaListener(topics = &quot;#{&#39;${application.kafka.consumer-config.?[name == &#39;consumer-a&#39;].topics}&#39;.split(&#39;,&#39;)}&quot;, containerFactory = &quot;kafkaListenerContainerFactory&quot;)

<!-- end snippet -->

In both the cases I am getting the following error:

java.lang.IllegalArgumentException: Could not resolve placeholder

What is the best way to get the topics from application properties and set it on KafkaListener topics?

答案1

得分: 0

@SpringBootApplication
public class So63583349Application {

    public static void main(String[] args) {
        SpringApplication.run(So63583349Application.class, args);
    }

    @KafkaListener(
        topics = "${application.kafka.consumer-config[0].topics}".split(","),
        id = "so63583349"
    )
    public void listen(String in) {
        System.out.println(in);
    }

}

2020-08-25 13:02:28.384 WARN 66237 --- [o63583349-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-so63583349-1, groupId=so63583349] Error while fetching metadata with correlation id 41 : {topic1=UNKNOWN_TOPIC_OR_PARTITION, topic2=UNKNOWN_TOPIC_OR_PARTITION}

For the second one, you can't use SpEL selection within the property placeholder. Here is one solution for that situation:

@SpringBootApplication
public class So63583349Application {

    public static void main(String[] args) {
        SpringApplication.run(So63583349Application.class, args);
    }

    @KafkaListener(
        topics = "#{@props.consumerConfig.?[name == 'consumer-a'].get(0).topics.split(',')}",
        id = "so63583349"
    )
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    Props props() {
        return new Props();
    }

}

@ConfigurationProperties(value = "application.kafka")
class Props {

    List<Properties> consumerConfig;

    public List<Properties> getConsumerConfig() {
        return this.consumerConfig;
    }

    public void setConsumerConfig(List<Properties> consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

}
英文:

What version are you using? I just tested it and it works fine...

@SpringBootApplication
public class So63583349Application {

	public static void main(String[] args) {
		SpringApplication.run(So63583349Application.class, args);
	}

	@KafkaListener(topics = &quot;#{&#39;${application.kafka.consumer-config[0].topics}&#39;.split(&#39;,&#39;)}&quot;, id = &quot;so63583349&quot;)
	public void listen(String in) {
		System.out.println(in);
	}

}

>2020-08-25 13:02:28.384 WARN 66237 --- [o63583349-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-so63583349-1, groupId=so63583349] Error while fetching metadata with correlation id 41 : {topic1=UNKNOWN_TOPIC_OR_PARTITION, topic2=UNKNOWN_TOPIC_OR_PARTITION}

For the second one, you can't use SpEL selection within the property placeholder. Here is one solution for that situation:

@SpringBootApplication
public class So63583349Application {

	public static void main(String[] args) {
		SpringApplication.run(So63583349Application.class, args);
	}

	@KafkaListener(topics = &quot;#{@props.consumerConfig.?[name == &#39;consumer-a&#39;].get(0).topics.split(&#39;,&#39;)}&quot;,
			id = &quot;so63583349&quot;)
	public void listen(String in) {
		System.out.println(in);
	}

	@Bean
	Props props() {
		return new Props();
	}

}

@ConfigurationProperties(value = &quot;application.kafka&quot;)
class Props {

	List&lt;Properties&gt; consumerConfig;

	public List&lt;Properties&gt; getConsumerConfig() {
		return this.consumerConfig;
	}

	public void setConsumerConfig(List&lt;Properties&gt; consumerConfig) {
		this.consumerConfig = consumerConfig;
	}

}

huangapple
  • 本文由 发表于 2020年8月26日 00:35:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/63583349.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定