Kafka Avro解析器无法将Kafka消息反序列化为特定的Avro记录。

huangapple go评论105阅读模式
英文:

Kafka Avro Deserializer is not able to deserialize the Kafka message to Specific Avro Record

问题

我正在尝试将存在于Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用KafkaAvroDeserializer进行此转换。

我可以在从Kafka返回的ConsumerRecord<String, Data>记录中看到GenericRecord。但是,当我尝试将此记录分配给生成的POJO类对象时,对于POJO字段中的date类型,它会失败并抛出ClassCastException异常。当我检查了Avro有效负载时,发现这个日期字段被表示为integer

安装:
Avro - 1.9.1
Confluent - 5.4
commercehub gradle plugin 0.20.0

在尝试反序列化Avro消息时,我收到以下错误 -

  1. Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 66
  2. Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
  3. at com.sample.Data.put(Data.java:229) ~[main/:na]
  4. at org.apache.avro.generic.GenericData.setField(GenericData.java:795) ~[avro-1.9.1.jar:1.9.1]
  5. ...

出现ClassCastException异常的字段的Avro模式如下 -

  1. {
  2. "name": "BIRTH_DT",
  3. "type": [
  4. "null",
  5. {
  6. "type": "int",
  7. "logicalType": "date"
  8. }
  9. ],
  10. "default": null
  11. }

从生成的POJO中的代码片段 -

  1. @Deprecated public java.time.LocalDate BIRTH_DT;
  2. // Used by DatumReader. Applications should not call.
  3. @SuppressWarnings(value="unchecked")
  4. public void put(int field$, java.lang.Object value$) {
  5. switch (field$) {
  6. ...
  7. case 8: BIRTH_DT = (java.time.LocalDate)value$; break;
  8. default: throw new org.apache.avro.AvroRuntimeException("Bad index");
  9. }
  10. }
  11. public java.time.LocalDate getBIRTHDT() {
  12. return BIRTH_DT;
  13. }
  14. public void setBIRTHDT(java.time.LocalDate value) {
  15. this.BIRTH_DT = value;
  16. }

Kafka消费者方法 -

  1. @KafkaListener(topics = "${spring.kafka.consumer.properties.topic}",
  2. groupId = "${spring.kafka.consumer.group-id}")
  3. // Data是由Avro工具生成的POJO
  4. public void consume(ConsumerRecord<String, Data> record,
  5. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
  6. @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) throws IOException {
  7. logger.info(String.format("#### -> Consumed message -> partiion: %s, offset: %s", partition, offset));
  8. Data row = record.value();
  9. ack.acknowledge();
  10. }

build.gradle -

  1. dependencies {
  2. ...
  3. implementation "io.confluent:kafka-avro-serializer:${confluentVersion}"
  4. implementation 'org.apache.avro:avro:1.9.1'
  5. ...
  6. }

在解决此问题时,你可能需要编辑生成的POJO类以处理日期字段的反序列化问题。或者,你可以检查Avro模式定义中的logicalType,并确保它与Java数据类型匹配。你可能还需要查看KafkaAvroDeserializerKafkaAvroSerializer的文档,以确保正确配置和使用这些类。最后,你可以考虑将问题报告给Confluent社区或Stack Overflow等平台以获取更详细的支持。

英文:

I am trying to deserialize Avro messgaes that are in kafka to POJO generated from Avro Schema. I am using KafkaAvroDeserializer for this conversion.

I am able to see the GenericRecord in the ConsumerRecord&lt;String, Data&gt; record returned from kafka. But when I try to assign this record to generated POJO class object, It is failing for date type of POJO field with ClassCastException. When I checked the avro payload this date field is coming in as integer.

Set up:
Avro - 1.9.1
Confluent - 5.4
commercehub gradle plugin 0.20.0

While trying to deserialze the Avro message, I am getting the error as -

  1. Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 66
  2. Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
  3. at com.sample.Data.put(Data.java:229) ~[main/:na]
  4. at org.apache.avro.generic.GenericData.setField(GenericData.java:795) ~[avro-1.9.1.jar:1.9.1]
  5. at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.9.1.jar:1.9.1]
  6. at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
  7. at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
  8. at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
  9. at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.9.1.jar:1.9.1]
  10. at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
  11. at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
  12. at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
  13. at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.9.1.jar:1.9.1]
  14. at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.9.1.jar:1.9.1]
  15. at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:287) ~[kafka-avro-serializer-5.4.0.jar:na]
  16. at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:102) ~[kafka-avro-serializer-5.4.0.jar:na]
  17. at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:81) ~[kafka-avro-serializer-5.4.0.jar:na]
  18. at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-5.4.0.jar:na]
  19. at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.5.0.jar:na]
  20. at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310) ~[kafka-clients-2.5.0.jar:na]
  21. at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) ~[kafka-clients-2.5.0.jar:na]
  22. at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541) ~[kafka-clients-2.5.0.jar:na]
  23. at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377) ~[kafka-clients-2.5.0.jar:na]
  24. at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677) ~[kafka-clients-2.5.0.jar:na]
  25. at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632) ~[kafka-clients-2.5.0.jar:na]
  26. at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290) ~[kafka-clients-2.5.0.jar:na]
  27. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) ~[kafka-clients-2.5.0.jar:na]
  28. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
  29. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1091) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  30. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1047) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  31. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  32. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_241]
  33. at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_241]
  34. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241]

Avro Schema of the field for which conversion is failing with ClassCastException

  1. {
  2. &quot;name&quot;: &quot;BIRTH_DT&quot;,
  3. &quot;type&quot;: [
  4. &quot;null&quot;,
  5. {
  6. &quot;type&quot;: &quot;int&quot;,
  7. &quot;logicalType&quot;: &quot;date&quot;
  8. }
  9. ],
  10. &quot;default&quot;: null
  11. }

Code Snippets from Generated POJO

  1. @Deprecated public java.time.LocalDate BIRTH_DT;
  2. // Used by DatumReader. Applications should not call.
  3. @SuppressWarnings(value=&quot;unchecked&quot;)
  4. public void put(int field$, java.lang.Object value$) {
  5. switch (field$) {
  6. .
  7. .
  8. case 8: BIRTH_DT = (java.time.LocalDate)value$; break;
  9. default: throw new org.apache.avro.AvroRuntimeException(&quot;Bad index&quot;);
  10. }
  11. }
  12. public java.time.LocalDate getBIRTHDT() {
  13. return BIRTH_DT;
  14. }
  15. public void setBIRTHDT(java.time.LocalDate value) {
  16. this.BIRTH_DT = value;
  17. }

Kafka Consumer Method

  1. @KafkaListener(topics = &quot;${spring.kafka.consumer.properties.topic}&quot;,
  2. groupId = &quot;${spring.kafka.consumer.group-id}&quot;)
  3. // Data is a POJO generated by Avro tools
  4. public void consume(ConsumerRecord&lt;String, Data&gt; record,
  5. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
  6. @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) throws IOException {
  7. logger.info(String.format(&quot;#### -&gt; Consumed message -&gt; partiion: %s, offset: %s&quot;, partition, offset));
  8. Data row = record.value();
  9. ack.acknowledge();
  10. }

build.gradle

  1. buildscript {
  2. repositories {
  3. jcenter {
  4. url &quot;https://nexus.abc.com:8443/content/repositories/jcenter/&quot;
  5. }
  6. }
  7. dependencies {
  8. classpath &quot;com.commercehub.gradle.plugin:gradle-avro-plugin:0.20.0&quot;
  9. }
  10. }
  11. plugins {
  12. id &#39;org.springframework.boot&#39; version &#39;2.3.1.RELEASE&#39;
  13. id &#39;io.spring.dependency-management&#39; version &#39;1.0.9.RELEASE&#39;
  14. id &#39;java&#39;
  15. id &#39;idea&#39;
  16. id &#39;eclipse&#39;
  17. }
  18. repositories {
  19. maven { url nexusPublicRepoURL }
  20. maven { url &quot;https://nexus.abc.com:8443/content/repositories/confluence.io-maven/&quot; }
  21. jcenter()
  22. maven { url &quot;https://nexus.abc.com:8443/content/repositories/jcenter/&quot; }
  23. }
  24. group = &#39;com.abc.cscm&#39;
  25. version = &#39;0.0.1-SNAPSHOT&#39;
  26. sourceCompatibility = &#39;8&#39;
  27. targetCompatibility = &#39;8&#39;
  28. ext {
  29. springCloudVersion = &#39;Hoxton.SR6&#39;
  30. confluentVersion = &#39;5.4.0&#39;
  31. }
  32. dependencies {
  33. implementation &#39;org.springframework.boot:spring-boot-starter-actuator&#39;
  34. implementation &#39;org.springframework.boot:spring-boot-starter-web&#39;
  35. implementation &#39;org.springframework.kafka:spring-kafka&#39;
  36. implementation &quot;io.confluent:kafka-avro-serializer:${confluentVersion}&quot;
  37. implementation &#39;org.apache.avro:avro:1.9.1&#39;
  38. testImplementation(&#39;org.springframework.boot:spring-boot-starter-test&#39;) {
  39. exclude group: &#39;org.junit.vintage&#39;, module: &#39;junit-vintage-engine&#39;
  40. }
  41. testImplementation &#39;org.springframework.kafka:spring-kafka-test&#39;
  42. }
  43. springBoot {
  44. buildInfo()
  45. }
  46. dependencyManagement {
  47. imports {
  48. mavenBom &quot;org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}&quot;
  49. }
  50. }
  51. test {
  52. useJUnitPlatform()
  53. }
  54. wrapper {
  55. distributionUrl = &quot;https://nexus.abc.com:8443/service/local/repositories/thirdparty/content/org/gradle/gradle/6.5/gradle-6.5.zip&quot;
  56. }
  57. apply plugin: &quot;com.commercehub.gradle.plugin.avro&quot;
  58. apply plugin: &#39;idea&#39;

./gradlew dependencies --configuration compileClasspath (Output)

  1. &gt; Task :dependencies
  2. ------------------------------------------------------------
  3. Root project
  4. ------------------------------------------------------------
  5. compileClasspath - Compile classpath for source set &#39;main&#39;.
  6. ** omiting spring deps
  7. +--- io.confluent:kafka-avro-serializer:5.4.0
  8. | +--- org.apache.avro:avro:1.9.1
  9. | | +--- com.fasterxml.jackson.core:jackson-core:2.9.9 -&gt; 2.11.0
  10. | | +--- com.fasterxml.jackson.core:jackson-databind:2.9.9.3 -&gt; 2.11.0 (*)
  11. | | +--- org.apache.commons:commons-compress:1.19
  12. | | \--- org.slf4j:slf4j-api:1.7.25 -&gt; 1.7.30
  13. | +--- io.confluent:kafka-schema-registry-client:5.4.0
  14. | | +--- org.apache.kafka:kafka-clients:5.4.0-ccs -&gt; 2.5.0 (*)
  15. | | +--- io.confluent:common-config:5.4.0
  16. | | | +--- io.confluent:common-utils:5.4.0
  17. | | | | \--- org.slf4j:slf4j-api:1.7.26 -&gt; 1.7.30
  18. | | | \--- org.slf4j:slf4j-api:1.7.26 -&gt; 1.7.30
  19. | | +--- org.apache.avro:avro:1.9.1 (*)
  20. | | +--- com.fasterxml.jackson.core:jackson-databind:2.9.10.1 -&gt; 2.11.0 (*)
  21. | | +--- io.swagger:swagger-annotations:1.5.22
  22. | | +--- io.swagger:swagger-core:1.5.3
  23. | | | +--- org.apache.commons:commons-lang3:3.2.1 -&gt; 3.10
  24. | | | +--- org.slf4j:slf4j-api:1.6.3 -&gt; 1.7.30
  25. | | | +--- com.fasterxml.jackson.core:jackson-annotations:2.4.5 -&gt; 2.11.0
  26. | | | +--- com.fasterxml.jackson.core:jackson-databind:2.4.5 -&gt; 2.11.0 (*)
  27. | | | +--- com.fasterxml.jackson.datatype:jackson-datatype-joda:2.4.5 -&gt; 2.11.0
  28. | | | | +--- com.fasterxml.jackson.core:jackson-core:2.11.0
  29. | | | | \--- joda-time:joda-time:2.9.9
  30. | | | +--- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.4.5 -&gt; 2.11.0
  31. | | | | +--- com.fasterxml.jackson.core:jackson-databind:2.11.0 (*)
  32. | | | | +--- org.yaml:snakeyaml:1.26
  33. | | | | \--- com.fasterxml.jackson.core:jackson-core:2.11.0
  34. | | | +--- io.swagger:swagger-models:1.5.3
  35. | | | | +--- com.fasterxml.jackson.core:jackson-annotations:2.4.5 -&gt; 2.11.0
  36. | | | | +--- org.slf4j:slf4j-api:1.6.3 -&gt; 1.7.30
  37. | | | | \--- io.swagger:swagger-annotations:1.5.3 -&gt; 1.5.22
  38. | | | \--- com.google.guava:guava:18.0 -&gt; 29.0-android
  39. | | | +--- com.google.guava:failureaccess:1.0.1
  40. | | | +--- com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
  41. | | | +--- com.google.code.findbugs:jsr305:3.0.2
  42. | | | +--- org.checkerframework:checker-compat-qual:2.5.5
  43. | | | +--- com.google.errorprone:error_prone_annotations:2.3.4
  44. | | | \--- com.google.j2objc:j2objc-annotations:1.3
  45. | | \--- io.confluent:common-utils:5.4.0 (*)
  46. | +--- io.confluent:common-config:5.4.0 (*)
  47. | \--- io.confluent:common-utils:5.4.0 (*)
  48. \--- org.apache.avro:avro:1.9.1 (*)

./gradlew buildEnvironment (Output)

  1. classpath
  2. +--- com.commercehub.gradle.plugin:gradle-avro-plugin:0.20.0
  3. | \--- org.apache.avro:avro-compiler:1.9.2 &lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;
  4. | +--- org.apache.avro:avro:1.9.2
  5. | | +--- com.fasterxml.jackson.core:jackson-core:2.10.2 -&gt; 2.11.0
  6. | | +--- com.fasterxml.jackson.core:jackson-databind:2.10.2 -&gt; 2.11.0
  7. | | | +--- com.fasterxml.jackson.core:jackson-annotations:2.11.0
  8. | | | \--- com.fasterxml.jackson.core:jackson-core:2.11.0
  9. | | +--- org.apache.commons:commons-compress:1.19
  10. | | \--- org.slf4j:slf4j-api:1.7.25 -&gt; 1.7.30
  11. | +--- org.apache.commons:commons-lang3:3.9 -&gt; 3.10
  12. | +--- org.apache.velocity:velocity-engine-core:2.2
  13. | | +--- org.apache.commons:commons-lang3:3.9 -&gt; 3.10
  14. | | \--- org.slf4j:slf4j-api:1.7.30
  15. | +--- com.fasterxml.jackson.core:jackson-databind:2.10.2 -&gt; 2.11.0 (*)
  16. | +--- joda-time:joda-time:2.10.1
  17. | \--- org.slf4j:slf4j-api:1.7.25 -&gt; 1.7.30

I am not sure if I should be editing my generated POJO class or I am missing something.

I was able to convert avro message into POJO by changing the schema as mentioned in below question. But I think it is hacky and problem is not yet solved.

Question - Avro is not able to deserialize Union with Logical Types in fields

答案1

得分: 1

如果有人仍然遇到这个错误,请注意联合中逻辑类型转换器存在问题,截止到目前为止,我没有找到一个可用的 Avro 版本。看起来问题可能已经通过此修复:https://github.com/apache/avro/pull/1721,但新版本仍需发布。

我成功修复了以下字段类型的问题,
通过手动设置转换:

字段类型:

  1. "type": [
  2. "null",
  3. {
  4. "type": "long",
  5. "logicalType": "timestamp-millis"
  6. }

设置转换:

  1. ReflectData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());

并使用 io.confluent.kafka.streams.serdes.avro.ReflectionAvroSerde 进行序列化/反序列化。

英文:

In case somebody still encounters this error, there is a problem with the converter for logical types in unions and I didn't find a working avro version as of this date. It looks like the issue might be fixed by: https://github.com/apache/avro/pull/1721, but a new version still needs to be released.

I was able to fix my issue for the following field type
by setting the conversion manually

field type:

  1. &quot;type&quot;: [
  2. &quot;null&quot;,
  3. {
  4. &quot;type&quot;: &quot;long&quot;,
  5. &quot;logicalType&quot;: &quot;timestamp-millis&quot;
  6. }

setting the conversion:

  1. ReflectData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());

and serialization/deserialization using the io.confluent.kafka.streams.serdes.avro.ReflectionAvroSerde

答案2

得分: 0

请您确认一下您使用的是哪个版本的 avro-maven-plugin 来生成 POJO 类?在 avro 版本 1.9.0 中,已经弃用了 Joda-Time,改用了 Java8 JSR310,并将 Java8 设置为默认选项。详见 Apache Avro 1.9.0 发布说明

当我从头开始生成一个 POJO 时,我得到的是 java.time.LocalDate BIRTH_DT,而不是 org.joda.time.LocalDate BIRTH_DT

  1. @Deprecated public java.time.LocalDate BIRTH_DT;

所以,在您的情况下,我认为很可能是类路径中存在 avro 版本不匹配,或者是一个过时的 POJO。我建议通过 mvn dependency:tree -Dincludes=org.apache.avro:avro 命令来验证 avro 版本,并重新生成 POJO。

英文:

Could you please clarify what version of avro-maven-plugin are you using to generate the POJO ? As of avro version 1.9.0, Joda-Time has been deprecated in favor of Java8 JSR310 and Java8 was set as default. See Apache Avro 1.9.0 release notes.

When I generate a POJO from scratch, I get java.time.LocalDate BIRTH_DT and not org.joda.time.LocalDate BIRTH_DT.

  1. @Deprecated public java.time.LocalDate BIRTH_DT;

So, in your case I think there is most likely an avro versions mismatch inside the classpath or a stale pojo. I would recommend verifying the avro version via mvn dependency:tree -Dincludes=org.apache.avro:avro call and regenerating POJO.

huangapple
  • 本文由 发表于 2020年8月25日 06:25:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/63569470.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定