无法使用Spring Boot构建Kafka消费者。

huangapple go评论96阅读模式
英文:

Failed to construct kafka consumer with Spring Boot

问题

我已经实现了一个简单的消费者应用程序来消费主题中的消息。当我运行kafka-consumer应用程序时,出现了以下错误。

堆栈跟踪

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.9.RELEASE.jar:5.2.9.RELEASE]

org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer

配置类

@Configuration
@EnableKafka
public class KafkaConfig {

    private ConsumerFactory<String,String> consumerFactory()
    {
        Map<String,Object> config=new ConcurrentHashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_string");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory()
    {
        ConcurrentKafkaListenerContainerFactory<String,String> factory
                =new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

监听器类

@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"Kafka_Example"},groupId = "group_string")
    public void consume(String message)
    {
        System.out.println("Consumed Message "+message);
    }
}

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.6.0</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.11.1</version>
</dependency>

注意 - 我的Kafka版本是2.13-2.6.0。

英文:

I have implemented a simple consumer application to consume messages from the topics. When I run the kafka-consumer application then the following error has occurred.

StackTrace

org.springframework.context.ApplicationContextException: Failed to start bean &#39;org.springframework.kafka.config.internalKafkaListenerEndpointRegistry&#39;; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.9.RELEASE.jar:5.2.9.RELEASE]

org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer

Config class

@Configuration
@EnableKafka
public class KafkaConfig {

    private ConsumerFactory&lt;String,String&gt; consumerFactory()
    {
        Map&lt;String,Object&gt; config=new ConcurrentHashMap&lt;&gt;();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,&quot;127.0.0.1:9092&quot;);
        config.put(ConsumerConfig.GROUP_ID_CONFIG,&quot;group_string&quot;);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);

        return new DefaultKafkaConsumerFactory&lt;&gt;(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory&lt;String,String&gt; kafkaListenerContainerFactory()
    {
        ConcurrentKafkaListenerContainerFactory&lt;String,String&gt; factory
                =new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

Listener class

@Component
public class KafkaConsumer {

    @KafkaListener(topics = {&quot;Kafka_Example&quot;},groupId = &quot;group_string&quot;)
    public void consume(String message)
    {
        System.out.println(&quot;Consumed Message &quot;+message);
    }
}

pom.xml

&lt;dependency&gt;
            &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
            &lt;artifactId&gt;spring-boot-starter-web&lt;/artifactId&gt;
        &lt;/dependency&gt;

        &lt;dependency&gt;
            &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
            &lt;artifactId&gt;spring-kafka&lt;/artifactId&gt;
            &lt;version&gt;2.6.0&lt;/version&gt;
        &lt;/dependency&gt;
 &lt;dependency&gt;
            &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
            &lt;artifactId&gt;spring-kafka-test&lt;/artifactId&gt;
            &lt;version&gt;2.6.0&lt;/version&gt;
            &lt;scope&gt;test&lt;/scope&gt;
        &lt;/dependency&gt;

        &lt;dependency&gt;
            &lt;groupId&gt;com.fasterxml.jackson.core&lt;/groupId&gt;
            &lt;artifactId&gt;jackson-databind&lt;/artifactId&gt;
            &lt;version&gt;2.11.1&lt;/version&gt;
        &lt;/dependency&gt;

Note - My Kafka version is 2.13-2.6.0

答案1

得分: 4

你正在使用 StringSerializer,但应该使用 StringDeserializer,一个用于序列化,另一个用于序列化。
而且,由于你将它们设置为 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIGConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,显然你想要进行序列化。

config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
英文:

You are using a StringSerializer but should use a StringDeserializer, one serializes, the other deserializes.
And since you set them for ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG and ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG you apparently want to deserialize.

config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

huangapple
  • 本文由 发表于 2020年10月13日 16:01:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/64331056.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定