将 Avro GenericRecord 转换为 SpecificData 对象,同时将 Long 转换为 Instant。

huangapple go评论61阅读模式
英文:

Convert Avro GenericRecord to SpecificData object while converting Long to Instant

问题

我正在尝试将从Kafka获取的通用记录转换为我想在以后使用的特定对象。

这是我的代码:

public void listen(ConsumerRecord<String, GenericRecord> consumerRecord) {
    TxnEngineEvent event = (TxnEngineEvent) SpecificData.get().deepCopy(TxnEngineEvent.SCHEMA$, consumerRecord.value());

问题是从Long转换为Instant不起作用。我收到以下异常:

org.springframework.kafka.KafkaException: 停止容器嵌套异常是org.springframework.kafka.listener.ListenerExecutionFailedException监听器方法 'public void de.billpay.klarnabankingservice.consumer.kafka.KafkaSepaIncomingConsumer.listen(org.apache.kafka.clients.consumer.ConsumerRecord<String, org.apache.avro.generic.GenericRecord>)' 抛出异常嵌套异常是java.lang.ClassCastException: java.lang.Long无法强制转换为java.time.Instant嵌套异常是java.lang.ClassCastException: java.lang.Long无法强制转换为java.time.Instant
    at org.springframework.kafka.listener.ContainerStoppingErrorHandler.handle(ContainerStoppingErrorHandler.java:65) ~[spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1776) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1693) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1619) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1522) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1263) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1009) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:929) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_212]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_212]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: 监听器方法 'public void de.billpay.klarnabankingservice.consumer.kafka.KafkaSepaIncomingConsumer.listen(org.apache.kafka.clients.consumer.ConsumerRecord<String, org.apache.avro.generic.GenericRecord>)' 抛出异常嵌套异常是java.lang.ClassCastException: java.lang.Long无法强制转换为java.time.Instant嵌套异常是java.lang.ClassCastException: java.lang.Long无法强制转换为java.time.Instant
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1788) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    ... 10 common frames omitted
Caused by: java.lang.ClassCastException: java.lang.Long无法强制转换为java.time.Instant
    at com.klarna.messaging.Metadata.put(Metadata.java:204) ~[classes/:na]

导致此问题的字段具有以下模式:

{"name":"occurred_at","type":{"type":"long","logicalType":"timestamp-millis"}}

我已经成功调试到avro原生代码试图转换该值的点,问题在于它基于字段的类分配了转换器。由于类是java.lang.Long,而转换器的类是java.time.Instant,它们不匹配。我是否遗漏了什么?我想通过实现自己的转换器来解决这个问题,但是那将需要是java.lang.Long类型,这没有意义,而且我不能将值转换为java.time.Instant(甚至没有接口方法返回通用对象)。

英文:

Hi I am trying to convert generic record I get from kafka to specific object I want to use down the line.

Here is my code

public void listen(ConsumerRecord&lt;String, GenericRecord&gt; consumerRecord) {
TxnEngineEvent event = (TxnEngineEvent) SpecificData.get().deepCopy(TxnEngineEvent.SCHEMA$, consumerRecord.value());

The problem conversion from Long to Instant is not working. I get this exception

	at org.springframework.kafka.listener.ContainerStoppingErrorHandler.handle(ContainerStoppingErrorHandler.java:65) ~[spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1776) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1693) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1619) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1522) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1263) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1009) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:929) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_212]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method &#39;public void de.billpay.klarnabankingservice.consumer.kafka.KafkaSepaIncomingConsumer.listen(org.apache.kafka.clients.consumer.ConsumerRecord&lt;java.lang.String, org.apache.avro.generic.GenericRecord&gt;)&#39; threw exception; nested exception is java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant; nested exception is java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1788) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
... 10 common frames omitted
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
at com.klarna.messaging.Metadata.put(Metadata.java:204) ~[classes/:na]

The field causing this has this schema

{&quot;name&quot;:&quot;occurred_at&quot;,&quot;type&quot;:{&quot;type&quot;:&quot;long&quot;,&quot;logicalType&quot;:&quot;timestamp-millis&quot;}

I have been able to debug the issue down to point where avro native code is trying to convert the value and the problem is it is assigning the convertor based on class of the field. Since the class is java.lang.Long and the convertor's class is java.time.Instant it doesn't get matched. Is there something I am missing? I wanted to workaround this by implementing my own converter but That would need to be of type java.lang.Long which makes no sense + I can't convert the value to java.time.Instant (No interface method to return even generic Object)

答案1

得分: 1

我找到了一种使用Jackson对象映射器的方法:

ConsumerRecord<Key, Value> singleRecord = records.poll(2000, TimeUnit.MILLISECONDS);

ObjectMapper objectMapper = new ObjectMapper()
  .registerModule(new ParameterNamesModule())
  .registerModule(new Jdk8Module())
  .registerModule(new JavaTimeModule()).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

Value value = objectMapper.readValue(String.valueOf(singleRecord.value()), Value.class);
英文:

I found a way with jackson object mapper

ConsumerRecord&lt;Key, Value&gt; singleRecord = records.poll(2000, TimeUnit.MILLISECONDS);
ObjectMapper objectMapper = new ObjectMapper()
.registerModule(new ParameterNamesModule())
.registerModule(new Jdk8Module())
.registerModule(new JavaTimeModule()).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
Value value = objectMapper.readValue(String.valueOf(singleRecord.value()), Value.class);

huangapple
  • 本文由 发表于 2020年7月21日 22:05:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/63016321.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定