SpringBoot @KafkaListener 收到 MessageConversionException: 无法将 A 转换为 B

huangapple go评论153阅读模式
英文:

SpringBoot @KafkaListener got MessageConversionException: Cannot convert from A to B

问题

我正在使用SpringBoot 2.7.5遇到了一个问题,@KafkaListener遇到了MessageConversionException。完整的错误日志如下:

Bean [example.package.api.kafka.HelloKafkaListener@30e312a2]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [example.packageapi.ExampleEvent] to [example.packageapi.ExampleEvent] for GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}], failedMessage=GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [example.packageapi.ExampleEvent] to [example.packageapi.ExampleEvent] for GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}], failedMessage=GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}]

消费者配置如下:

consumer:
  group-id: my-topic:HelloKafkaListener
  auto-offset-reset: latest
  key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
  value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
  properties:
    spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
    spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
    request.timeout.ms: 5000

我使用的版本如下:

<confluent.version>7.3.0</confluent.version>
<avro.version>1.11.1</avro.version>
<spring.kafka.version>2.8.10</spring.kafka.version>

我的pom.xml文件如下:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-registry-client</artifactId>
    <version>${confluent.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.swagger.core.v3</groupId>
            <artifactId>swagger-annotations</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>${avro.version}</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>${confluent.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-avro-serde</artifactId>
    <version>${confluent.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
        </exclusion>
    </exclusions>
</dependency>

我的event.avsc文件如下:

{
  "namespace": "my.name.space",
  "type": "record",
  "name": "ExampleEvent",
  "doc": "A sample event",
  "fields": [
    {
      "name": "exampleField",
      "type": "string"
    }
  ]
}

我使用confluent工具在终端中发送消息如下:

./kafka-avro-console-producer \
--broker-list localhost:59092 --topic my-topic \
--property schema.registry.url=http://kafka-schema:80 \
--property value.schema="$(cat ~/codebase/avro/event.avsc)"
{"exampleField":"HELLO WORLD"}

如果使用./kafka-avro-console-consumer来获取消息,它可以正常工作。

我的监听器如下:

public class HelloKafkaListener {
  private final HelloKafkaService helloKafkaService;
  
  @Autowired
  public HelloKafkaListener(HelloKafkaService helloKafkaService) {
    this.helloKafkaService = helloKafkaService;
  }
  
  @KafkaListener(
      topics = "my-topic",
      groupId = "my-topic:HelloKafkaListener"
  )
  public void process(@Payload ExampleEvent event) {
    this.helloKafkaService.handleMessage(event);
    log.info("Processing event: " + event.getExampleField());
  }
}

如果我使用ConsumerRecord record,它可以正常工作,我可以看到record.value().toString(){"exampleField": "HELLO WORLD"},但如果切换到ExampleEvent event,它会抛出异常。有人知道为什么会发生这种情况吗?非常感谢!

在查看评论后,我创建了一个DefaultKafkaConsumerFactoryConcurrentKafkaListenerContainerFactory,如下所示:

@Bean
public DefaultKafkaConsumerFactory<String, ExampleEvent> helloConsumerFactory(
    @Value(value = "${spring.kafka.bootstrap-servers}") String bootstrapAddress,
    @Value(value =

<details>
<summary>英文:</summary>

I am using SpringBoot 2.7.5 got an issue which the @KafkaListener got the MessageConversionException. The whole error logs show below:

    Bean [example.package.api.kafka.HelloKafkaListener@30e312a2]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [example.packageapi.ExampleEvent] to [example.packageapi.ExampleEvent] for GenericMessage [payload={&quot;exampleField&quot;: &quot;HELLO WORLD&quot;}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}], failedMessage=GenericMessage [payload={&quot;exampleField&quot;: &quot;HELLO WORLD&quot;}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [example.packageapi.ExampleEvent] to [example.packageapi.ExampleEvent] for GenericMessage [payload={&quot;exampleField&quot;: &quot;HELLO WORLD&quot;}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}], failedMessage=GenericMessage [payload={&quot;exampleField&quot;: &quot;HELLO WORLD&quot;}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}]
The consumer configurations:

    consumer:
      group-id: my-topic:HelloKafkaListener
      auto-offset-reset: latest
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
        request.timeout.ms: 5000

The versions I used are:

    &lt;confluent.version&gt;7.3.0&lt;/confluent.version&gt;
    &lt;avro.version&gt;1.11.1&lt;/avro.version&gt; 
    &lt;spring.kafka.version&gt;2.8.10&lt;/spring.kafka.version&gt;

My pom.xml file:

    &lt;dependency&gt;
        &lt;groupId&gt;io.confluent&lt;/groupId&gt;
        &lt;artifactId&gt;kafka-schema-registry-client&lt;/artifactId&gt;
        &lt;version&gt;${confluent.version}&lt;/version&gt;
        &lt;exclusions&gt;
            &lt;exclusion&gt;
                &lt;groupId&gt;org.apache.avro&lt;/groupId&gt;
                &lt;artifactId&gt;avro&lt;/artifactId&gt;
            &lt;/exclusion&gt;
            &lt;exclusion&gt;
                &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
                &lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
            &lt;/exclusion&gt;
            &lt;exclusion&gt;
                &lt;groupId&gt;io.swagger.core.v3&lt;/groupId&gt;
                &lt;artifactId&gt;swagger-annotations&lt;/artifactId&gt;
            &lt;/exclusion&gt;
        &lt;/exclusions&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.apache.avro&lt;/groupId&gt;
        &lt;artifactId&gt;avro&lt;/artifactId&gt;
        &lt;version&gt;${avro.version}&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;io.confluent&lt;/groupId&gt;
        &lt;artifactId&gt;kafka-avro-serializer&lt;/artifactId&gt;
        &lt;version&gt;${confluent.version}&lt;/version&gt;
        &lt;exclusions&gt;
            &lt;exclusion&gt;
                &lt;groupId&gt;org.apache.avro&lt;/groupId&gt;
                &lt;artifactId&gt;avro&lt;/artifactId&gt;
            &lt;/exclusion&gt;
        &lt;/exclusions&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;io.confluent&lt;/groupId&gt;
        &lt;artifactId&gt;kafka-streams-avro-serde&lt;/artifactId&gt;
        &lt;version&gt;${confluent.version}&lt;/version&gt;
        &lt;exclusions&gt;
            &lt;exclusion&gt;
                &lt;groupId&gt;org.slf4j&lt;/groupId&gt;
                &lt;artifactId&gt;slf4j-log4j12&lt;/artifactId&gt;
            &lt;/exclusion&gt;
            &lt;exclusion&gt;
                &lt;groupId&gt;org.apache.avro&lt;/groupId&gt;
                &lt;artifactId&gt;avro&lt;/artifactId&gt;
            &lt;/exclusion&gt;
        &lt;/exclusions&gt;
    &lt;/dependency&gt;

My event.avsc is like this:

    {
      &quot;namespace&quot;: &quot;my.name.space&quot;,
      &quot;type&quot;: &quot;record&quot;,
      &quot;name&quot;: &quot;ExampleEvent&quot;,
      &quot;doc&quot;: &quot;A sample event&quot;,
      &quot;fields&quot;: [
        {
          &quot;name&quot;: &quot;exampleField&quot;,
          &quot;type&quot;: &quot;string&quot;
        }
      ]
    }

I use the confluent tool send the message in the terminal like this:

        ./kafka-avro-console-producer \
        --broker-list localhost:59092 --topic my-topic \
        --property schema.registry.url=http://kafka-schema:80 \
        --property value.schema=&quot;$(cat ~/codebase/avro/event.avsc)&quot;
        {&quot;exampleField&quot;:&quot;HELLO WORLD&quot;}
And if using the `./kafka-avro-console-consumer` to get the message, it works well.
 
My listener is like this:

    public class HelloKafkaListener {
      private final HelloKafkaService helloKafkaService;
      
      @Autowired
      public HelloKafkaListener(HelloKafkaService helloKafkaService) {
        this.helloKafkaService = helloKafkaService;
      }
      
      @KafkaListener(
          topics = &quot;my-topic&quot;,
          groupId = &quot;my-topic:HelloKafkaListener&quot;
      )
    //  public void process(ConsumerRecord record) {
    //    log.info(record.value().toString()); // it works: {&quot;exampleField&quot;: &quot;HELLO WORLD&quot;}
    //  }
      public void process(@Payload ExampleEvent event) {
        this.helloKafkaService.handleMessage(event);
        log.info(&quot;Processing event: &quot; + event.getExampleField());
      }
    }
If I use `ConsumerRecord record`, it works I can see the `record.value().toString()` is `{&quot;exampleField&quot;: &quot;HELLO WORLD&quot;}`, but switch to `ExampleEvent event`, it throws exception. Anyone knows why this happens? Many Thanks!

Thank you for the help! After reviewing the comments, I created a `DefaultKafkaConsumerFactory` and `ConcurrentKafkaListenerContainerFactory` like this: 

    @Bean
      public DefaultKafkaConsumerFactory&lt;String, ExampleEvent&gt; helloConsumerFactory(
          @Value(value = &quot;${spring.kafka.bootstrap-servers}&quot;) String bootstrapAddress,
          @Value(value = &quot;${spring.kafka.properties.schema.registry.url}&quot;) String schemaRegistryUrl
      ) {
        var props = Map.&lt;String, Object&gt;of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
            ConsumerConfig.GROUP_ID_CONFIG, groupId
        );
        var keyDeserializer = new ErrorHandlingDeserializer&lt;&gt;(new StringDeserializer());
        var config = new HashMap&lt;String, Object&gt;();
        config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        var jsonDeserializer = new KafkaAvroDeserializer(null, config);
        var valueDeserializer = new ErrorHandlingDeserializer&lt;&gt;(jsonDeserializer);
    
        var factory = new DefaultKafkaConsumerFactory(props, keyDeserializer, valueDeserializer, false);
        
        return factory;
      }
      
      @Bean
      public ConcurrentKafkaListenerContainerFactory&lt;String, ExampleEvent&gt;
      helloKafkaListenerContainerFactory(ConsumerFactory&lt;String, ExampleEvent&gt; helloConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory helloContainerFactory =
            new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
        helloContainerFactory.setConsumerFactory(helloConsumerFactory);
        return helloContainerFactory;
      }

Updated Listener: 

      @KafkaListener(
          topics = &quot;my-topic&quot;,
          groupId = &quot;my-topic:HelloKafkaListener&quot;,
          containerFactory = &quot;helloKafkaListenerContainerFactory&quot;
      )
    //  public void process(ConsumerRecord record) {
    //    log.info(record.value().toString()); // it works: {&quot;exampleField&quot;: &quot;HELLO WORLD&quot;}
    //  }
      public void process(@Payload ExampleEvent event) {
        this.helloKafkaService.handleMessage(event);
        log.info(&quot;Processing event: &quot; + event.getExampleField());
      }

However, still got the same issue. 

</details>


# 答案1
**得分**: 2

Check this [article][1]. You have to provide the message converter for your custom java object `ExampleEvent`. The `@Payload` just by itself is able to receive just a `String` message.

Try with

```java
@Bean
public ConsumerFactory&lt;String, Greeting&gt; exampleConsumerFactory() {
    return new DefaultKafkaConsumerFactory&lt;&gt;(
      props,
      new StringDeserializer(), 
      new JsonDeserializer&lt;&gt;(ExampleEvent.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory&lt;String, ExampleEvent&gt; 
  exampleKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory&lt;String, ExampleEvent&gt; factory =
      new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
    factory.setConsumerFactory(exampleConsumerFactory());
    return factory;
}

And then register this by

@KafkaListener(
  topics = &quot;my-topic&quot;,
  groupId = &quot;my-topic:HelloKafkaListener&quot;,
  containerFactory = &quot;exampleKafkaListenerContainerFactory&quot;  //&lt;-------
)
public void process(ExampleEvent event) {
   this.helloKafkaService.handleMessage(event);
   log.info(&quot;Processing event: &quot; + event.getExampleField());
}

Also ensure that in your dependencies you already have the following otherwise add it manually

&lt;dependency&gt;
    &lt;groupId&gt;com.fasterxml.jackson.core&lt;/groupId&gt;
    &lt;artifactId&gt;jackson-databind&lt;/artifactId&gt;
&lt;/dependency&gt;
英文:

Check this article. You have to provide the message converter for your custom java object ExampleEvent. The @Payload just by it self is able to receive just a String message.

Try with

@Bean
public ConsumerFactory&lt;String, Greeting&gt; exampleConsumerFactory() {
return new DefaultKafkaConsumerFactory&lt;&gt;(
props,
new StringDeserializer(), 
new JsonDeserializer&lt;&gt;(ExampleEvent.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory&lt;String, ExampleEvent&gt; 
exampleKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory&lt;String, ExampleEvent&gt; factory =
new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
factory.setConsumerFactory(exampleConsumerFactory());
return factory;
}

And then register this by

      @KafkaListener(
topics = &quot;my-topic&quot;,
groupId = &quot;my-topic:HelloKafkaListener&quot;,
containerFactory = &quot;exampleKafkaListenerContainerFactory&quot;  //&lt;-------
)
public void process(ExampleEvent event) {
this.helloKafkaService.handleMessage(event);
log.info(&quot;Processing event: &quot; + event.getExampleField());
}

Also ensure that in your dependencies you already have the following otherwise add it manually

&lt;dependency&gt;
&lt;groupId&gt;com.fasterxml.jackson.core&lt;/groupId&gt;
&lt;artifactId&gt;jackson-databind&lt;/artifactId&gt;
&lt;/dependency&gt;

答案2

得分: 1

[example.packageapi.ExampleEvent] to [example.packageapi.ExampleEvent]

鉴于包是相同的,通常这意味着反序列化程序是由不同于用于加载监听器的类加载器加载的,例如因为正在使用Spring Dev Tools。

当通过属性指定时,反序列化程序是由Kafka客户端而不是Spring加载的。

避免这种情况的一种方法是将反序列化程序直接添加到消费者工厂,而不是通过属性。您可以使用工厂的构造函数或setter,如其他答案中所示。

英文:

>[example.packageapi.ExampleEvent] to [example.packageapi.ExampleEvent]

Given that the packages are the same, this usually means that the deserializer was loaded by a different class loader to that used to load the listener, for example because Spring Dev Tools is being used.

When specified via a property, the deserializer is loaded by the Kafka clients instead of Spring.

One way to avoid that is to add the deserializer directly to the consumer factory instead of via a property. You can use the constructor or setter on the factory, as shown in the other answer.

huangapple
  • 本文由 发表于 2023年3月7日 22:55:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/75663581.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定