英文:
Failed to construct kafka consumer with Spring Boot
问题
我已经实现了一个简单的消费者应用程序来消费主题中的消息。当我运行kafka-consumer应用程序时,出现了以下错误。
堆栈跟踪
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.9.RELEASE.jar:5.2.9.RELEASE]
org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
配置类
@Configuration
@EnableKafka
public class KafkaConfig {
private ConsumerFactory<String,String> consumerFactory()
{
Map<String,Object> config=new ConcurrentHashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_string");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String,String> factory
=new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
监听器类
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"Kafka_Example"},groupId = "group_string")
public void consume(String message)
{
System.out.println("Consumed Message "+message);
}
}
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.1</version>
</dependency>
注意 - 我的Kafka版本是2.13-2.6.0。
英文:
I have implemented a simple consumer application to consume messages from the topics. When I run the kafka-consumer application then the following error has occurred.
StackTrace
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.9.RELEASE.jar:5.2.9.RELEASE]
org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
Config class
@Configuration
@EnableKafka
public class KafkaConfig {
private ConsumerFactory<String,String> consumerFactory()
{
Map<String,Object> config=new ConcurrentHashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_string");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String,String> factory
=new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Listener class
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"Kafka_Example"},groupId = "group_string")
public void consume(String message)
{
System.out.println("Consumed Message "+message);
}
}
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.1</version>
</dependency>
Note - My Kafka version is 2.13-2.6.0
答案1
得分: 4
你正在使用 StringSerializer
,但应该使用 StringDeserializer
,一个用于序列化,另一个用于反序列化。
而且,由于你将它们设置为 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
和 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
,显然你想要进行反序列化。
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
英文:
You are using a StringSerializer
but should use a StringDeserializer
, one serializes, the other deserializes.
And since you set them for ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
and ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
you apparently want to deserialize.
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论