Kafka Streams 的 GlobalKTable 在 Tombstone 操作时抛出反序列化异常 – null 值记录

huangapple go评论72阅读模式
英文:

Kafka streams GlobalKTable throws Deserialization exception on Tombstone - null value- records

问题

我有一个基于Spring Cloud Stream的Kafka Streams应用程序,我将一个全局KTable绑定到一个Compact主题。当我向主题推送一个Tombstone记录(非空键与null值)时,我的Kafka Streams应用程序会因反序列化异常而失败。失败的原因是我的反序列化程序无法处理null记录。

根据文档,我认为GlobalKTable甚至不会“看到”null值记录。这不是这种情况吗?我需要在反序列化程序中处理null记录吗?

org.apache.kafka.common.errors.SerializationException: 无法反序列化
Caused by: java.lang.IllegalArgumentException: 参数src为null
    	at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4693)
    	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3511)
    	at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:47)
    	at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:39)
    	at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    	at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    	at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:91)
    	at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:240)
    	at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:289)
英文:

I have a Spring cloud stream based Kafka Streams application where I'm binding a Global KTable to a Compact topic. When I push a Tombstone record to the topic (Non-empty key with null value) - my Kafka streams application fails with Deserialization exception. The failure is because my deserializer does not handle null records.

From the documentation, I thought GlobalKTable will not even "see" null-value records. Is it not the case? Do I need to handle null records in the deserializer?

<!-- begin snippet: js hide: false console: true babel: false -->

<!-- language: lang-html -->

org.apache.kafka.common.errors.SerializationException: Unable to deserialize
Caused by: java.lang.IllegalArgumentException: argument &quot;src&quot; is null
	at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4693)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3511)
	at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:47)
	at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:39)
	at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
	at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
	at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:91)
	at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:240)
	at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:289)

<!-- end snippet -->

答案1

得分: 6

是的;您必须检查 null 并返回 null。请查看任何标准的反序列化器。

KafkaConsumerFetcher 不同(在调用之前检查 null),kafka-streams 无条件调用它。请参阅

 在 org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63) 处。
英文:

Yes; you have to check for null and return null. See any of the standard deserializers.

Unlike the KafkaConsumer 's Fetcher (which checks for null before calling), the kafka-streams calls it unconditionally. See

 at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)

huangapple
  • 本文由 发表于 2020年9月3日 05:29:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/63713786.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定