Kafka Avro解析器无法将Kafka消息反序列化为特定的Avro记录。

huangapple go评论65阅读模式
英文:

Kafka Avro Deserializer is not able to deserialize the Kafka message to Specific Avro Record

问题

我正在尝试将存在于Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用KafkaAvroDeserializer进行此转换。

我可以在从Kafka返回的ConsumerRecord<String, Data>记录中看到GenericRecord。但是,当我尝试将此记录分配给生成的POJO类对象时,对于POJO字段中的date类型,它会失败并抛出ClassCastException异常。当我检查了Avro有效负载时,发现这个日期字段被表示为integer

安装:
Avro - 1.9.1
Confluent - 5.4
commercehub gradle plugin 0.20.0

在尝试反序列化Avro消息时,我收到以下错误 -

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 66
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
	at com.sample.Data.put(Data.java:229) ~[main/:na]
	at org.apache.avro.generic.GenericData.setField(GenericData.java:795) ~[avro-1.9.1.jar:1.9.1]
	...

出现ClassCastException异常的字段的Avro模式如下 -

{
    "name": "BIRTH_DT",
    "type": [
        "null",
        {
            "type": "int",
            "logicalType": "date"
        }
    ],
    "default": null
}

从生成的POJO中的代码片段 -

  @Deprecated public java.time.LocalDate BIRTH_DT;

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, java.lang.Object value$) {
    switch (field$) {
    ...
    case 8: BIRTH_DT = (java.time.LocalDate)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  public java.time.LocalDate getBIRTHDT() {
    return BIRTH_DT;
  }

  public void setBIRTHDT(java.time.LocalDate value) {
      this.BIRTH_DT = value;
  }

Kafka消费者方法 -

@KafkaListener(topics = "${spring.kafka.consumer.properties.topic}",
                 groupId = "${spring.kafka.consumer.group-id}")
// Data是由Avro工具生成的POJO
public void consume(ConsumerRecord<String, Data> record,
                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                    @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) throws IOException {

    logger.info(String.format("#### -> Consumed message -> partiion: %s, offset: %s", partition, offset));
    Data row = record.value();
    ack.acknowledge();
}

build.gradle -

dependencies {
    ...
    implementation "io.confluent:kafka-avro-serializer:${confluentVersion}"
    implementation 'org.apache.avro:avro:1.9.1'
    ...
}

在解决此问题时,你可能需要编辑生成的POJO类以处理日期字段的反序列化问题。或者,你可以检查Avro模式定义中的logicalType,并确保它与Java数据类型匹配。你可能还需要查看KafkaAvroDeserializerKafkaAvroSerializer的文档,以确保正确配置和使用这些类。最后,你可以考虑将问题报告给Confluent社区或Stack Overflow等平台以获取更详细的支持。

英文:

I am trying to deserialize Avro messgaes that are in kafka to POJO generated from Avro Schema. I am using KafkaAvroDeserializer for this conversion.

I am able to see the GenericRecord in the ConsumerRecord&lt;String, Data&gt; record returned from kafka. But when I try to assign this record to generated POJO class object, It is failing for date type of POJO field with ClassCastException. When I checked the avro payload this date field is coming in as integer.

Set up:
Avro - 1.9.1
Confluent - 5.4
commercehub gradle plugin 0.20.0

While trying to deserialze the Avro message, I am getting the error as -

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 66
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
	at com.sample.Data.put(Data.java:229) ~[main/:na]
	at org.apache.avro.generic.GenericData.setField(GenericData.java:795) ~[avro-1.9.1.jar:1.9.1]
	at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.9.1.jar:1.9.1]
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
	at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.9.1.jar:1.9.1]
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.9.1.jar:1.9.1]
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.9.1.jar:1.9.1]
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:287) ~[kafka-avro-serializer-5.4.0.jar:na]
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:102) ~[kafka-avro-serializer-5.4.0.jar:na]
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:81) ~[kafka-avro-serializer-5.4.0.jar:na]
	at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-5.4.0.jar:na]
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.5.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310) ~[kafka-clients-2.5.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) ~[kafka-clients-2.5.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541) ~[kafka-clients-2.5.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377) ~[kafka-clients-2.5.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677) ~[kafka-clients-2.5.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632) ~[kafka-clients-2.5.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290) ~[kafka-clients-2.5.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) ~[kafka-clients-2.5.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1091) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1047) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_241]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_241]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241]

Avro Schema of the field for which conversion is failing with ClassCastException

{
    &quot;name&quot;: &quot;BIRTH_DT&quot;,
    &quot;type&quot;: [
        &quot;null&quot;,
        {
            &quot;type&quot;: &quot;int&quot;,
            &quot;logicalType&quot;: &quot;date&quot;
        }
    ],
    &quot;default&quot;: null
}

Code Snippets from Generated POJO

  @Deprecated public java.time.LocalDate BIRTH_DT;

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value=&quot;unchecked&quot;)
  public void put(int field$, java.lang.Object value$) {
    switch (field$) {
    .
    .
    case 8: BIRTH_DT = (java.time.LocalDate)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException(&quot;Bad index&quot;);
    }
  }

  public java.time.LocalDate getBIRTHDT() {
    return BIRTH_DT;
  }

  public void setBIRTHDT(java.time.LocalDate value) {
      this.BIRTH_DT = value;
  }

Kafka Consumer Method

    @KafkaListener(topics = &quot;${spring.kafka.consumer.properties.topic}&quot;,
                     groupId = &quot;${spring.kafka.consumer.group-id}&quot;)
    // Data is a POJO generated by Avro tools
    public void consume(ConsumerRecord&lt;String, Data&gt; record,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                        @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) throws IOException {
    
        logger.info(String.format(&quot;#### -&gt; Consumed message -&gt; partiion: %s, offset: %s&quot;, partition, offset));
        Data row = record.value();
        ack.acknowledge();
    }

build.gradle

buildscript {
	repositories {
		jcenter {
			url &quot;https://nexus.abc.com:8443/content/repositories/jcenter/&quot;
		}
	}
	dependencies {
		classpath &quot;com.commercehub.gradle.plugin:gradle-avro-plugin:0.20.0&quot;
	}
}

plugins {
	id &#39;org.springframework.boot&#39; version &#39;2.3.1.RELEASE&#39;
	id &#39;io.spring.dependency-management&#39; version &#39;1.0.9.RELEASE&#39;
	id &#39;java&#39;
	id &#39;idea&#39;
	id &#39;eclipse&#39;
}

repositories {
	maven { url nexusPublicRepoURL }
	maven { url &quot;https://nexus.abc.com:8443/content/repositories/confluence.io-maven/&quot; }
	jcenter()
	maven { url &quot;https://nexus.abc.com:8443/content/repositories/jcenter/&quot; }
}

group = &#39;com.abc.cscm&#39;
version = &#39;0.0.1-SNAPSHOT&#39;
sourceCompatibility = &#39;8&#39;
targetCompatibility = &#39;8&#39;

ext {
	springCloudVersion = &#39;Hoxton.SR6&#39;
	confluentVersion = &#39;5.4.0&#39;
}

dependencies {
	implementation &#39;org.springframework.boot:spring-boot-starter-actuator&#39;
	implementation &#39;org.springframework.boot:spring-boot-starter-web&#39;
	implementation &#39;org.springframework.kafka:spring-kafka&#39;

	implementation &quot;io.confluent:kafka-avro-serializer:${confluentVersion}&quot;

	implementation &#39;org.apache.avro:avro:1.9.1&#39;

	testImplementation(&#39;org.springframework.boot:spring-boot-starter-test&#39;) {
		exclude group: &#39;org.junit.vintage&#39;, module: &#39;junit-vintage-engine&#39;
	}
	testImplementation &#39;org.springframework.kafka:spring-kafka-test&#39;
}

springBoot {
	buildInfo()
}

dependencyManagement {
	imports {
		mavenBom &quot;org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}&quot;
	}
}

test {
	useJUnitPlatform()
}

wrapper {
	distributionUrl = &quot;https://nexus.abc.com:8443/service/local/repositories/thirdparty/content/org/gradle/gradle/6.5/gradle-6.5.zip&quot;
}

apply plugin: &quot;com.commercehub.gradle.plugin.avro&quot;
apply plugin: &#39;idea&#39;

./gradlew dependencies --configuration compileClasspath (Output)

&gt; Task :dependencies

------------------------------------------------------------
Root project
------------------------------------------------------------

compileClasspath - Compile classpath for source set &#39;main&#39;.
                        ** omiting spring deps
+--- io.confluent:kafka-avro-serializer:5.4.0
|    +--- org.apache.avro:avro:1.9.1
|    |    +--- com.fasterxml.jackson.core:jackson-core:2.9.9 -&gt; 2.11.0
|    |    +--- com.fasterxml.jackson.core:jackson-databind:2.9.9.3 -&gt; 2.11.0 (*)
|    |    +--- org.apache.commons:commons-compress:1.19
|    |    \--- org.slf4j:slf4j-api:1.7.25 -&gt; 1.7.30
|    +--- io.confluent:kafka-schema-registry-client:5.4.0
|    |    +--- org.apache.kafka:kafka-clients:5.4.0-ccs -&gt; 2.5.0 (*)
|    |    +--- io.confluent:common-config:5.4.0
|    |    |    +--- io.confluent:common-utils:5.4.0
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.26 -&gt; 1.7.30
|    |    |    \--- org.slf4j:slf4j-api:1.7.26 -&gt; 1.7.30
|    |    +--- org.apache.avro:avro:1.9.1 (*)
|    |    +--- com.fasterxml.jackson.core:jackson-databind:2.9.10.1 -&gt; 2.11.0 (*)
|    |    +--- io.swagger:swagger-annotations:1.5.22
|    |    +--- io.swagger:swagger-core:1.5.3
|    |    |    +--- org.apache.commons:commons-lang3:3.2.1 -&gt; 3.10
|    |    |    +--- org.slf4j:slf4j-api:1.6.3 -&gt; 1.7.30
|    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.4.5 -&gt; 2.11.0
|    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.4.5 -&gt; 2.11.0 (*)
|    |    |    +--- com.fasterxml.jackson.datatype:jackson-datatype-joda:2.4.5 -&gt; 2.11.0
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-core:2.11.0
|    |    |    |    \--- joda-time:joda-time:2.9.9
|    |    |    +--- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.4.5 -&gt; 2.11.0
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.11.0 (*)
|    |    |    |    +--- org.yaml:snakeyaml:1.26
|    |    |    |    \--- com.fasterxml.jackson.core:jackson-core:2.11.0
|    |    |    +--- io.swagger:swagger-models:1.5.3
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.4.5 -&gt; 2.11.0
|    |    |    |    +--- org.slf4j:slf4j-api:1.6.3 -&gt; 1.7.30
|    |    |    |    \--- io.swagger:swagger-annotations:1.5.3 -&gt; 1.5.22
|    |    |    \--- com.google.guava:guava:18.0 -&gt; 29.0-android
|    |    |         +--- com.google.guava:failureaccess:1.0.1
|    |    |         +--- com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
|    |    |         +--- com.google.code.findbugs:jsr305:3.0.2
|    |    |         +--- org.checkerframework:checker-compat-qual:2.5.5
|    |    |         +--- com.google.errorprone:error_prone_annotations:2.3.4
|    |    |         \--- com.google.j2objc:j2objc-annotations:1.3
|    |    \--- io.confluent:common-utils:5.4.0 (*)
|    +--- io.confluent:common-config:5.4.0 (*)
|    \--- io.confluent:common-utils:5.4.0 (*)
\--- org.apache.avro:avro:1.9.1 (*)

./gradlew buildEnvironment (Output)

classpath
+--- com.commercehub.gradle.plugin:gradle-avro-plugin:0.20.0
|    \--- org.apache.avro:avro-compiler:1.9.2    &lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;
|         +--- org.apache.avro:avro:1.9.2
|         |    +--- com.fasterxml.jackson.core:jackson-core:2.10.2 -&gt; 2.11.0
|         |    +--- com.fasterxml.jackson.core:jackson-databind:2.10.2 -&gt; 2.11.0
|         |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.11.0
|         |    |    \--- com.fasterxml.jackson.core:jackson-core:2.11.0
|         |    +--- org.apache.commons:commons-compress:1.19
|         |    \--- org.slf4j:slf4j-api:1.7.25 -&gt; 1.7.30
|         +--- org.apache.commons:commons-lang3:3.9 -&gt; 3.10
|         +--- org.apache.velocity:velocity-engine-core:2.2
|         |    +--- org.apache.commons:commons-lang3:3.9 -&gt; 3.10
|         |    \--- org.slf4j:slf4j-api:1.7.30
|         +--- com.fasterxml.jackson.core:jackson-databind:2.10.2 -&gt; 2.11.0 (*)
|         +--- joda-time:joda-time:2.10.1
|         \--- org.slf4j:slf4j-api:1.7.25 -&gt; 1.7.30

I am not sure if I should be editing my generated POJO class or I am missing something.

I was able to convert avro message into POJO by changing the schema as mentioned in below question. But I think it is hacky and problem is not yet solved.

Question - Avro is not able to deserialize Union with Logical Types in fields

答案1

得分: 1

如果有人仍然遇到这个错误,请注意联合中逻辑类型转换器存在问题,截止到目前为止,我没有找到一个可用的 Avro 版本。看起来问题可能已经通过此修复:https://github.com/apache/avro/pull/1721,但新版本仍需发布。

我成功修复了以下字段类型的问题,
通过手动设置转换:

字段类型:

          "type": [
            "null",
            {
              "type": "long",
              "logicalType": "timestamp-millis"
            }

设置转换:

ReflectData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());

并使用 io.confluent.kafka.streams.serdes.avro.ReflectionAvroSerde 进行序列化/反序列化。

英文:

In case somebody still encounters this error, there is a problem with the converter for logical types in unions and I didn't find a working avro version as of this date. It looks like the issue might be fixed by: https://github.com/apache/avro/pull/1721, but a new version still needs to be released.

I was able to fix my issue for the following field type
by setting the conversion manually

field type:

          &quot;type&quot;: [
            &quot;null&quot;,
            {
              &quot;type&quot;: &quot;long&quot;,
              &quot;logicalType&quot;: &quot;timestamp-millis&quot;
            }

setting the conversion:

ReflectData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());

and serialization/deserialization using the io.confluent.kafka.streams.serdes.avro.ReflectionAvroSerde

答案2

得分: 0

请您确认一下您使用的是哪个版本的 avro-maven-plugin 来生成 POJO 类?在 avro 版本 1.9.0 中,已经弃用了 Joda-Time,改用了 Java8 JSR310,并将 Java8 设置为默认选项。详见 Apache Avro 1.9.0 发布说明

当我从头开始生成一个 POJO 时,我得到的是 java.time.LocalDate BIRTH_DT,而不是 org.joda.time.LocalDate BIRTH_DT

   @Deprecated public java.time.LocalDate BIRTH_DT;

所以,在您的情况下,我认为很可能是类路径中存在 avro 版本不匹配,或者是一个过时的 POJO。我建议通过 mvn dependency:tree -Dincludes=org.apache.avro:avro 命令来验证 avro 版本,并重新生成 POJO。

英文:

Could you please clarify what version of avro-maven-plugin are you using to generate the POJO ? As of avro version 1.9.0, Joda-Time has been deprecated in favor of Java8 JSR310 and Java8 was set as default. See Apache Avro 1.9.0 release notes.

When I generate a POJO from scratch, I get java.time.LocalDate BIRTH_DT and not org.joda.time.LocalDate BIRTH_DT.

   @Deprecated public java.time.LocalDate BIRTH_DT;

So, in your case I think there is most likely an avro versions mismatch inside the classpath or a stale pojo. I would recommend verifying the avro version via mvn dependency:tree -Dincludes=org.apache.avro:avro call and regenerating POJO.

huangapple
  • 本文由 发表于 2020年8月25日 06:25:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/63569470.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定