无法使用Spring Boot构建Kafka消费者。

huangapple go评论132阅读模式
英文:

Failed to construct kafka consumer with Spring Boot

问题

我已经实现了一个简单的消费者应用程序来消费主题中的消息。当我运行kafka-consumer应用程序时,出现了以下错误。

堆栈跟踪

  1. org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
  2. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.9.RELEASE.jar:5.2.9.RELEASE]
  3. org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer

配置类

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConfig {
  4. private ConsumerFactory<String,String> consumerFactory()
  5. {
  6. Map<String,Object> config=new ConcurrentHashMap<>();
  7. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
  8. config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_string");
  9. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
  10. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);
  11. return new DefaultKafkaConsumerFactory<>(config);
  12. }
  13. @Bean
  14. public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory()
  15. {
  16. ConcurrentKafkaListenerContainerFactory<String,String> factory
  17. =new ConcurrentKafkaListenerContainerFactory<>();
  18. factory.setConsumerFactory(consumerFactory());
  19. return factory;
  20. }
  21. }

监听器类

  1. @Component
  2. public class KafkaConsumer {
  3. @KafkaListener(topics = {"Kafka_Example"},groupId = "group_string")
  4. public void consume(String message)
  5. {
  6. System.out.println("Consumed Message "+message);
  7. }
  8. }

pom.xml

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>spring-kafka</artifactId>
  8. <version>2.6.0</version>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.kafka</groupId>
  12. <artifactId>spring-kafka-test</artifactId>
  13. <version>2.6.0</version>
  14. <scope>test</scope>
  15. </dependency>
  16. <dependency>
  17. <groupId>com.fasterxml.jackson.core</groupId>
  18. <artifactId>jackson-databind</artifactId>
  19. <version>2.11.1</version>
  20. </dependency>

注意 - 我的Kafka版本是2.13-2.6.0。

英文:

I have implemented a simple consumer application to consume messages from the topics. When I run the kafka-consumer application then the following error has occurred.

StackTrace

  1. org.springframework.context.ApplicationContextException: Failed to start bean &#39;org.springframework.kafka.config.internalKafkaListenerEndpointRegistry&#39;; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
  2. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.9.RELEASE.jar:5.2.9.RELEASE]
  3. org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer

Config class

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConfig {
  4. private ConsumerFactory&lt;String,String&gt; consumerFactory()
  5. {
  6. Map&lt;String,Object&gt; config=new ConcurrentHashMap&lt;&gt;();
  7. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,&quot;127.0.0.1:9092&quot;);
  8. config.put(ConsumerConfig.GROUP_ID_CONFIG,&quot;group_string&quot;);
  9. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
  10. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);
  11. return new DefaultKafkaConsumerFactory&lt;&gt;(config);
  12. }
  13. @Bean
  14. public ConcurrentKafkaListenerContainerFactory&lt;String,String&gt; kafkaListenerContainerFactory()
  15. {
  16. ConcurrentKafkaListenerContainerFactory&lt;String,String&gt; factory
  17. =new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
  18. factory.setConsumerFactory(consumerFactory());
  19. return factory;
  20. }
  21. }

Listener class

  1. @Component
  2. public class KafkaConsumer {
  3. @KafkaListener(topics = {&quot;Kafka_Example&quot;},groupId = &quot;group_string&quot;)
  4. public void consume(String message)
  5. {
  6. System.out.println(&quot;Consumed Message &quot;+message);
  7. }
  8. }

pom.xml

  1. &lt;dependency&gt;
  2. &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
  3. &lt;artifactId&gt;spring-boot-starter-web&lt;/artifactId&gt;
  4. &lt;/dependency&gt;
  5. &lt;dependency&gt;
  6. &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
  7. &lt;artifactId&gt;spring-kafka&lt;/artifactId&gt;
  8. &lt;version&gt;2.6.0&lt;/version&gt;
  9. &lt;/dependency&gt;
  10. &lt;dependency&gt;
  11. &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
  12. &lt;artifactId&gt;spring-kafka-test&lt;/artifactId&gt;
  13. &lt;version&gt;2.6.0&lt;/version&gt;
  14. &lt;scope&gt;test&lt;/scope&gt;
  15. &lt;/dependency&gt;
  16. &lt;dependency&gt;
  17. &lt;groupId&gt;com.fasterxml.jackson.core&lt;/groupId&gt;
  18. &lt;artifactId&gt;jackson-databind&lt;/artifactId&gt;
  19. &lt;version&gt;2.11.1&lt;/version&gt;
  20. &lt;/dependency&gt;

Note - My Kafka version is 2.13-2.6.0

答案1

得分: 4

你正在使用 StringSerializer,但应该使用 StringDeserializer,一个用于序列化,另一个用于序列化。
而且,由于你将它们设置为 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIGConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,显然你想要进行序列化。

  1. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  2. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
英文:

You are using a StringSerializer but should use a StringDeserializer, one serializes, the other deserializes.
And since you set them for ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG and ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG you apparently want to deserialize.

  1. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  2. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

huangapple
  • 本文由 发表于 2020年10月13日 16:01:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/64331056.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定