英文:
Avro GenericRecord to nested POJO
问题
有没有一种方法可以将从Kafka消息中获取的GenericRecord反序列化为嵌套的POJO?实际上,我正在尝试将其反序列化为Scala的case class,但我意识到这更加困难。我在互联网上搜索过,似乎每个人都在手动执行此操作。您是否知道任何可以实现这一目标的库?
英文:
Is there a way to deserialize GenericRecord (which I just got from Kafka message) to nested POJO? I am actually tying to deserialize it to Scala's case class but I realize that's even harder. I searched through the internet and it seems everyone was doing it manually. Are you aware of any library which is able to do this?
答案1
得分: 0
这是一个适用于applicative模式的通用编解码派生解决方案:
https://github.com/danslapman/morphling
它不提供“导入和使用”的解决方案,但它提供了一种在不干扰shapeless/magnolia的情况下为您的协议编写自己的编解码派生机制的方式。
另外,如果您需要处理二进制数据,可以尝试:
https://github.com/scodec/scodec
它提供了一种非常Scala方式的解决此类问题的方法。
英文:
There is a pretty generic codec derivation solution for applicative schemas:
https://github.com/danslapman/morphling
It does not provide "import and use" solution but it does provide a way to write your own codec derivation mechanism for your protocol without messing up with shapeless/magnolia.
Also if you need to deal with binary data, try:
https://github.com/scodec/scodec
It provides pretty scala-way of solving such problems.
答案2
得分: 0
我能提供以下翻译:
我能够得到这个:
def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
val readerSchema = ReflectData.get().getSchema(targetType)
val idSize = 4
val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
this.configure(new KafkaAvroDeserializerConfig(configs))
def deserialize(topic: String, data: Array[Byte]): A = {
val bytes = ByteBuffer.wrap(data)
bytes.get() // 跳过魔法字节
val schemaId = bytes.getInt()
val writerSchema = schemaRegistry.getById(schemaId)
val length = bytes.limit() - 1 - idSize
val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
reader.read(null.asInstanceOf[A], decoder)
}
def close(): Unit = {}
}
val props = Map("schema.registry.url" -> schemaRegistryUrl)
deserializer.configure(props.asJava, false)
deserializer
}
注意:代码部分保持不变,只翻译了注释和字符串。
英文:
I was able to come up with this:
def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
val readerSchema = ReflectData.get().getSchema(targetType)
val idSize = 4
val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
this.configure(new KafkaAvroDeserializerConfig(configs))
def deserialize(topic: String, data: Array[Byte]): A = {
val bytes = ByteBuffer.wrap(data)
bytes.get() // skip magic byte
val schemaId = bytes.getInt()
val writerSchema = schemaRegistry.getById(schemaId)
val length = bytes.limit() - 1 - idSize
val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
reader.read(null.asInstanceOf[A], decoder)
}
def close(): Unit = {}
}
val props = Map("schema.registry.url" -> schemaRegistryUrl)
deserializer.configure(props.asJava, false)
deserializer
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论