英文:
Kafka Avro Deserializer is not able to deserialize the Kafka message to Specific Avro Record
问题
我正在尝试将存在于Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用KafkaAvroDeserializer进行此转换。
我可以在从Kafka返回的ConsumerRecord<String, Data>
记录中看到GenericRecord
。但是,当我尝试将此记录分配给生成的POJO类对象时,对于POJO
字段中的date
类型,它会失败并抛出ClassCastException
异常。当我检查了Avro有效负载时,发现这个日期字段被表示为integer
。
安装:
Avro - 1.9.1
Confluent - 5.4
commercehub gradle plugin 0.20.0
在尝试反序列化Avro消息时,我收到以下错误 -
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 66
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
at com.sample.Data.put(Data.java:229) ~[main/:na]
at org.apache.avro.generic.GenericData.setField(GenericData.java:795) ~[avro-1.9.1.jar:1.9.1]
...
出现ClassCastException
异常的字段的Avro模式如下 -
{
"name": "BIRTH_DT",
"type": [
"null",
{
"type": "int",
"logicalType": "date"
}
],
"default": null
}
从生成的POJO中的代码片段 -
@Deprecated public java.time.LocalDate BIRTH_DT;
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
...
case 8: BIRTH_DT = (java.time.LocalDate)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
public java.time.LocalDate getBIRTHDT() {
return BIRTH_DT;
}
public void setBIRTHDT(java.time.LocalDate value) {
this.BIRTH_DT = value;
}
Kafka消费者方法 -
@KafkaListener(topics = "${spring.kafka.consumer.properties.topic}",
groupId = "${spring.kafka.consumer.group-id}")
// Data是由Avro工具生成的POJO
public void consume(ConsumerRecord<String, Data> record,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) throws IOException {
logger.info(String.format("#### -> Consumed message -> partiion: %s, offset: %s", partition, offset));
Data row = record.value();
ack.acknowledge();
}
build.gradle -
dependencies {
...
implementation "io.confluent:kafka-avro-serializer:${confluentVersion}"
implementation 'org.apache.avro:avro:1.9.1'
...
}
在解决此问题时,你可能需要编辑生成的POJO类以处理日期字段的反序列化问题。或者,你可以检查Avro模式定义中的logicalType
,并确保它与Java数据类型匹配。你可能还需要查看KafkaAvroDeserializer
和KafkaAvroSerializer
的文档,以确保正确配置和使用这些类。最后,你可以考虑将问题报告给Confluent社区或Stack Overflow等平台以获取更详细的支持。
英文:
I am trying to deserialize Avro messgaes that are in kafka to POJO generated from Avro Schema. I am using KafkaAvroDeserializer for this conversion.
I am able to see the GenericRecord
in the ConsumerRecord<String, Data>
record returned from kafka. But when I try to assign this record to generated POJO class object, It is failing for date
type of POJO field with ClassCastException
. When I checked the avro payload this date field is coming in as integer
.
Set up:
Avro - 1.9.1
Confluent - 5.4
commercehub gradle plugin 0.20.0
While trying to deserialze the Avro message, I am getting the error as -
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 66
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
at com.sample.Data.put(Data.java:229) ~[main/:na]
at org.apache.avro.generic.GenericData.setField(GenericData.java:795) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.9.1.jar:1.9.1]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:287) ~[kafka-avro-serializer-5.4.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:102) ~[kafka-avro-serializer-5.4.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:81) ~[kafka-avro-serializer-5.4.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-5.4.0.jar:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1091) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1047) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_241]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_241]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241]
Avro Schema of the field for which conversion is failing with ClassCastException
{
"name": "BIRTH_DT",
"type": [
"null",
{
"type": "int",
"logicalType": "date"
}
],
"default": null
}
Code Snippets from Generated POJO
@Deprecated public java.time.LocalDate BIRTH_DT;
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
.
.
case 8: BIRTH_DT = (java.time.LocalDate)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
public java.time.LocalDate getBIRTHDT() {
return BIRTH_DT;
}
public void setBIRTHDT(java.time.LocalDate value) {
this.BIRTH_DT = value;
}
Kafka Consumer Method
@KafkaListener(topics = "${spring.kafka.consumer.properties.topic}",
groupId = "${spring.kafka.consumer.group-id}")
// Data is a POJO generated by Avro tools
public void consume(ConsumerRecord<String, Data> record,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) throws IOException {
logger.info(String.format("#### -> Consumed message -> partiion: %s, offset: %s", partition, offset));
Data row = record.value();
ack.acknowledge();
}
build.gradle
buildscript {
repositories {
jcenter {
url "https://nexus.abc.com:8443/content/repositories/jcenter/"
}
}
dependencies {
classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.20.0"
}
}
plugins {
id 'org.springframework.boot' version '2.3.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
id 'idea'
id 'eclipse'
}
repositories {
maven { url nexusPublicRepoURL }
maven { url "https://nexus.abc.com:8443/content/repositories/confluence.io-maven/" }
jcenter()
maven { url "https://nexus.abc.com:8443/content/repositories/jcenter/" }
}
group = 'com.abc.cscm'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '8'
targetCompatibility = '8'
ext {
springCloudVersion = 'Hoxton.SR6'
confluentVersion = '5.4.0'
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
implementation "io.confluent:kafka-avro-serializer:${confluentVersion}"
implementation 'org.apache.avro:avro:1.9.1'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
springBoot {
buildInfo()
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}
wrapper {
distributionUrl = "https://nexus.abc.com:8443/service/local/repositories/thirdparty/content/org/gradle/gradle/6.5/gradle-6.5.zip"
}
apply plugin: "com.commercehub.gradle.plugin.avro"
apply plugin: 'idea'
./gradlew dependencies --configuration compileClasspath (Output)
> Task :dependencies
------------------------------------------------------------
Root project
------------------------------------------------------------
compileClasspath - Compile classpath for source set 'main'.
** omiting spring deps
+--- io.confluent:kafka-avro-serializer:5.4.0
| +--- org.apache.avro:avro:1.9.1
| | +--- com.fasterxml.jackson.core:jackson-core:2.9.9 -> 2.11.0
| | +--- com.fasterxml.jackson.core:jackson-databind:2.9.9.3 -> 2.11.0 (*)
| | +--- org.apache.commons:commons-compress:1.19
| | \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.30
| +--- io.confluent:kafka-schema-registry-client:5.4.0
| | +--- org.apache.kafka:kafka-clients:5.4.0-ccs -> 2.5.0 (*)
| | +--- io.confluent:common-config:5.4.0
| | | +--- io.confluent:common-utils:5.4.0
| | | | \--- org.slf4j:slf4j-api:1.7.26 -> 1.7.30
| | | \--- org.slf4j:slf4j-api:1.7.26 -> 1.7.30
| | +--- org.apache.avro:avro:1.9.1 (*)
| | +--- com.fasterxml.jackson.core:jackson-databind:2.9.10.1 -> 2.11.0 (*)
| | +--- io.swagger:swagger-annotations:1.5.22
| | +--- io.swagger:swagger-core:1.5.3
| | | +--- org.apache.commons:commons-lang3:3.2.1 -> 3.10
| | | +--- org.slf4j:slf4j-api:1.6.3 -> 1.7.30
| | | +--- com.fasterxml.jackson.core:jackson-annotations:2.4.5 -> 2.11.0
| | | +--- com.fasterxml.jackson.core:jackson-databind:2.4.5 -> 2.11.0 (*)
| | | +--- com.fasterxml.jackson.datatype:jackson-datatype-joda:2.4.5 -> 2.11.0
| | | | +--- com.fasterxml.jackson.core:jackson-core:2.11.0
| | | | \--- joda-time:joda-time:2.9.9
| | | +--- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.4.5 -> 2.11.0
| | | | +--- com.fasterxml.jackson.core:jackson-databind:2.11.0 (*)
| | | | +--- org.yaml:snakeyaml:1.26
| | | | \--- com.fasterxml.jackson.core:jackson-core:2.11.0
| | | +--- io.swagger:swagger-models:1.5.3
| | | | +--- com.fasterxml.jackson.core:jackson-annotations:2.4.5 -> 2.11.0
| | | | +--- org.slf4j:slf4j-api:1.6.3 -> 1.7.30
| | | | \--- io.swagger:swagger-annotations:1.5.3 -> 1.5.22
| | | \--- com.google.guava:guava:18.0 -> 29.0-android
| | | +--- com.google.guava:failureaccess:1.0.1
| | | +--- com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
| | | +--- com.google.code.findbugs:jsr305:3.0.2
| | | +--- org.checkerframework:checker-compat-qual:2.5.5
| | | +--- com.google.errorprone:error_prone_annotations:2.3.4
| | | \--- com.google.j2objc:j2objc-annotations:1.3
| | \--- io.confluent:common-utils:5.4.0 (*)
| +--- io.confluent:common-config:5.4.0 (*)
| \--- io.confluent:common-utils:5.4.0 (*)
\--- org.apache.avro:avro:1.9.1 (*)
./gradlew buildEnvironment (Output)
classpath
+--- com.commercehub.gradle.plugin:gradle-avro-plugin:0.20.0
| \--- org.apache.avro:avro-compiler:1.9.2 <<<<<<<<<<<<<<<<<<<<<<<<<<
| +--- org.apache.avro:avro:1.9.2
| | +--- com.fasterxml.jackson.core:jackson-core:2.10.2 -> 2.11.0
| | +--- com.fasterxml.jackson.core:jackson-databind:2.10.2 -> 2.11.0
| | | +--- com.fasterxml.jackson.core:jackson-annotations:2.11.0
| | | \--- com.fasterxml.jackson.core:jackson-core:2.11.0
| | +--- org.apache.commons:commons-compress:1.19
| | \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.30
| +--- org.apache.commons:commons-lang3:3.9 -> 3.10
| +--- org.apache.velocity:velocity-engine-core:2.2
| | +--- org.apache.commons:commons-lang3:3.9 -> 3.10
| | \--- org.slf4j:slf4j-api:1.7.30
| +--- com.fasterxml.jackson.core:jackson-databind:2.10.2 -> 2.11.0 (*)
| +--- joda-time:joda-time:2.10.1
| \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.30
I am not sure if I should be editing my generated POJO class or I am missing something.
I was able to convert avro message into POJO by changing the schema as mentioned in below question. But I think it is hacky and problem is not yet solved.
Question - Avro is not able to deserialize Union with Logical Types in fields
答案1
得分: 1
如果有人仍然遇到这个错误,请注意联合中逻辑类型转换器存在问题,截止到目前为止,我没有找到一个可用的 Avro 版本。看起来问题可能已经通过此修复:https://github.com/apache/avro/pull/1721,但新版本仍需发布。
我成功修复了以下字段类型的问题,
通过手动设置转换:
字段类型:
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
设置转换:
ReflectData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
并使用 io.confluent.kafka.streams.serdes.avro.ReflectionAvroSerde 进行序列化/反序列化。
英文:
In case somebody still encounters this error, there is a problem with the converter for logical types in unions and I didn't find a working avro version as of this date. It looks like the issue might be fixed by: https://github.com/apache/avro/pull/1721, but a new version still needs to be released.
I was able to fix my issue for the following field type
by setting the conversion manually
field type:
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
setting the conversion:
ReflectData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
and serialization/deserialization using the io.confluent.kafka.streams.serdes.avro.ReflectionAvroSerde
答案2
得分: 0
请您确认一下您使用的是哪个版本的 avro-maven-plugin
来生成 POJO 类?在 avro 版本 1.9.0
中,已经弃用了 Joda-Time,改用了 Java8 JSR310,并将 Java8 设置为默认选项。详见 Apache Avro 1.9.0 发布说明。
当我从头开始生成一个 POJO 时,我得到的是 java.time.LocalDate BIRTH_DT
,而不是 org.joda.time.LocalDate BIRTH_DT
。
@Deprecated public java.time.LocalDate BIRTH_DT;
所以,在您的情况下,我认为很可能是类路径中存在 avro
版本不匹配,或者是一个过时的 POJO。我建议通过 mvn dependency:tree -Dincludes=org.apache.avro:avro
命令来验证 avro 版本,并重新生成 POJO。
英文:
Could you please clarify what version of avro-maven-plugin
are you using to generate the POJO ? As of avro version 1.9.0
, Joda-Time has been deprecated in favor of Java8 JSR310 and Java8 was set as default. See Apache Avro 1.9.0 release notes.
When I generate a POJO from scratch, I get java.time.LocalDate BIRTH_DT
and not org.joda.time.LocalDate BIRTH_DT
.
@Deprecated public java.time.LocalDate BIRTH_DT;
So, in your case I think there is most likely an avro
versions mismatch inside the classpath or a stale pojo. I would recommend verifying the avro version via mvn dependency:tree -Dincludes=org.apache.avro:avro
call and regenerating POJO.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论