英文:
Spring Kafka Consumer - Getting topics from a complex object
问题
我正在开发一个Spring Boot Kafka消费者应用程序。它将有不同的消费者处理不同的主题。所有消费者的信息将来自于application.yml文件。
application:
kafka:
consumer-config:
- name: consumer-a
topics: topic1,topic2
...
- name: consumer-b
topics: topic3,topic4
...
我无法从应用程序属性中设置主题列表到KafkaListener上。
我尝试了以下方法:
@KafkaListener(topics = "${application.kafka.consumer-config[0].topics}", containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(topics = "${application.kafka.consumer-config.?[name == 'consumer-a'].topics}", containerFactory = "kafkaListenerContainerFactory")
在这两种情况下,我都遇到了以下错误:
java.lang.IllegalArgumentException: 无法解析占位符
从应用程序属性中获取主题并将其设置到KafkaListener主题上的最佳方法是什么?
英文:
I am working on a spring boot kafka consumer application. It will have different consumers working on different topics. All the information for the consumers will come from the application.yml file.
<!-- begin snippet: js hide: false console: true babel: false -->
<!-- language: lang-html -->
application:
kafka:
consumer-config:
- name: consumer-a
topics: topic1,topic2
......
- name: consumer-b
topics: topic3,topic4
.....
<!-- end snippet -->
I am not able to set the list of topics from the application properties on to the KafkaListener.
I tried the following:
<!-- begin snippet: js hide: false console: true babel: false -->
<!-- language: lang-html -->
@KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(topics = "#{'${application.kafka.consumer-config.?[name == 'consumer-a'].topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
<!-- end snippet -->
In both the cases I am getting the following error:
java.lang.IllegalArgumentException: Could not resolve placeholder
What is the best way to get the topics from application properties and set it on KafkaListener topics?
答案1
得分: 0
@SpringBootApplication
public class So63583349Application {
public static void main(String[] args) {
SpringApplication.run(So63583349Application.class, args);
}
@KafkaListener(
topics = "${application.kafka.consumer-config[0].topics}".split(","),
id = "so63583349"
)
public void listen(String in) {
System.out.println(in);
}
}
2020-08-25 13:02:28.384 WARN 66237 --- [o63583349-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-so63583349-1, groupId=so63583349] Error while fetching metadata with correlation id 41 : {topic1=UNKNOWN_TOPIC_OR_PARTITION, topic2=UNKNOWN_TOPIC_OR_PARTITION}
For the second one, you can't use SpEL selection within the property placeholder. Here is one solution for that situation:
@SpringBootApplication
public class So63583349Application {
public static void main(String[] args) {
SpringApplication.run(So63583349Application.class, args);
}
@KafkaListener(
topics = "#{@props.consumerConfig.?[name == 'consumer-a'].get(0).topics.split(',')}",
id = "so63583349"
)
public void listen(String in) {
System.out.println(in);
}
@Bean
Props props() {
return new Props();
}
}
@ConfigurationProperties(value = "application.kafka")
class Props {
List<Properties> consumerConfig;
public List<Properties> getConsumerConfig() {
return this.consumerConfig;
}
public void setConsumerConfig(List<Properties> consumerConfig) {
this.consumerConfig = consumerConfig;
}
}
英文:
What version are you using? I just tested it and it works fine...
@SpringBootApplication
public class So63583349Application {
public static void main(String[] args) {
SpringApplication.run(So63583349Application.class, args);
}
@KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}", id = "so63583349")
public void listen(String in) {
System.out.println(in);
}
}
>2020-08-25 13:02:28.384 WARN 66237 --- [o63583349-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-so63583349-1, groupId=so63583349] Error while fetching metadata with correlation id 41 : {topic1=UNKNOWN_TOPIC_OR_PARTITION, topic2=UNKNOWN_TOPIC_OR_PARTITION}
For the second one, you can't use SpEL selection within the property placeholder. Here is one solution for that situation:
@SpringBootApplication
public class So63583349Application {
public static void main(String[] args) {
SpringApplication.run(So63583349Application.class, args);
}
@KafkaListener(topics = "#{@props.consumerConfig.?[name == 'consumer-a'].get(0).topics.split(',')}",
id = "so63583349")
public void listen(String in) {
System.out.println(in);
}
@Bean
Props props() {
return new Props();
}
}
@ConfigurationProperties(value = "application.kafka")
class Props {
List<Properties> consumerConfig;
public List<Properties> getConsumerConfig() {
return this.consumerConfig;
}
public void setConsumerConfig(List<Properties> consumerConfig) {
this.consumerConfig = consumerConfig;
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论