英文:
Convert Avro GenericRecord to SpecificData object while converting Long to Instant
问题
我正在尝试将从Kafka获取的通用记录转换为我想在以后使用的特定对象。
这是我的代码:
public void listen(ConsumerRecord<String, GenericRecord> consumerRecord) {
TxnEngineEvent event = (TxnEngineEvent) SpecificData.get().deepCopy(TxnEngineEvent.SCHEMA$, consumerRecord.value());
问题是从Long转换为Instant不起作用。我收到以下异常:
org.springframework.kafka.KafkaException: 停止容器;嵌套异常是org.springframework.kafka.listener.ListenerExecutionFailedException:监听器方法 'public void de.billpay.klarnabankingservice.consumer.kafka.KafkaSepaIncomingConsumer.listen(org.apache.kafka.clients.consumer.ConsumerRecord<String, org.apache.avro.generic.GenericRecord>)' 抛出异常;嵌套异常是java.lang.ClassCastException: java.lang.Long无法强制转换为java.time.Instant;嵌套异常是java.lang.ClassCastException: java.lang.Long无法强制转换为java.time.Instant
at org.springframework.kafka.listener.ContainerStoppingErrorHandler.handle(ContainerStoppingErrorHandler.java:65) ~[spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1776) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1693) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1619) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1522) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1263) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1009) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:929) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_212]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: 监听器方法 'public void de.billpay.klarnabankingservice.consumer.kafka.KafkaSepaIncomingConsumer.listen(org.apache.kafka.clients.consumer.ConsumerRecord<String, org.apache.avro.generic.GenericRecord>)' 抛出异常;嵌套异常是java.lang.ClassCastException: java.lang.Long无法强制转换为java.time.Instant;嵌套异常是java.lang.ClassCastException: java.lang.Long无法强制转换为java.time.Instant
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1788) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
... 10 common frames omitted
Caused by: java.lang.ClassCastException: java.lang.Long无法强制转换为java.time.Instant
at com.klarna.messaging.Metadata.put(Metadata.java:204) ~[classes/:na]
导致此问题的字段具有以下模式:
{"name":"occurred_at","type":{"type":"long","logicalType":"timestamp-millis"}}
我已经成功调试到avro原生代码试图转换该值的点,问题在于它基于字段的类分配了转换器。由于类是java.lang.Long,而转换器的类是java.time.Instant,它们不匹配。我是否遗漏了什么?我想通过实现自己的转换器来解决这个问题,但是那将需要是java.lang.Long类型,这没有意义,而且我不能将值转换为java.time.Instant(甚至没有接口方法返回通用对象)。
英文:
Hi I am trying to convert generic record I get from kafka to specific object I want to use down the line.
Here is my code
public void listen(ConsumerRecord<String, GenericRecord> consumerRecord) {
TxnEngineEvent event = (TxnEngineEvent) SpecificData.get().deepCopy(TxnEngineEvent.SCHEMA$, consumerRecord.value());
The problem conversion from Long to Instant is not working. I get this exception
at org.springframework.kafka.listener.ContainerStoppingErrorHandler.handle(ContainerStoppingErrorHandler.java:65) ~[spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1776) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1693) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1619) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1522) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1263) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1009) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:929) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_212]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.billpay.klarnabankingservice.consumer.kafka.KafkaSepaIncomingConsumer.listen(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, org.apache.avro.generic.GenericRecord>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant; nested exception is java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1788) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
... 10 common frames omitted
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
at com.klarna.messaging.Metadata.put(Metadata.java:204) ~[classes/:na]
The field causing this has this schema
{"name":"occurred_at","type":{"type":"long","logicalType":"timestamp-millis"}
I have been able to debug the issue down to point where avro native code is trying to convert the value and the problem is it is assigning the convertor based on class of the field. Since the class is java.lang.Long and the convertor's class is java.time.Instant it doesn't get matched. Is there something I am missing? I wanted to workaround this by implementing my own converter but That would need to be of type java.lang.Long which makes no sense + I can't convert the value to java.time.Instant (No interface method to return even generic Object)
答案1
得分: 1
我找到了一种使用Jackson对象映射器的方法:
ConsumerRecord<Key, Value> singleRecord = records.poll(2000, TimeUnit.MILLISECONDS);
ObjectMapper objectMapper = new ObjectMapper()
.registerModule(new ParameterNamesModule())
.registerModule(new Jdk8Module())
.registerModule(new JavaTimeModule()).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
Value value = objectMapper.readValue(String.valueOf(singleRecord.value()), Value.class);
英文:
I found a way with jackson object mapper
ConsumerRecord<Key, Value> singleRecord = records.poll(2000, TimeUnit.MILLISECONDS);
ObjectMapper objectMapper = new ObjectMapper()
.registerModule(new ParameterNamesModule())
.registerModule(new Jdk8Module())
.registerModule(new JavaTimeModule()).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
Value value = objectMapper.readValue(String.valueOf(singleRecord.value()), Value.class);
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论