Avro GenericRecord 转换为嵌套的 POJO

huangapple go评论90阅读模式
英文:

Avro GenericRecord to nested POJO

问题

有没有一种方法可以将从Kafka消息中获取的GenericRecord反序列化为嵌套的POJO?实际上,我正在尝试将其反序列化为Scala的case class,但我意识到这更加困难。我在互联网上搜索过,似乎每个人都在手动执行此操作。您是否知道任何可以实现这一目标的库?

英文:

Is there a way to deserialize GenericRecord (which I just got from Kafka message) to nested POJO? I am actually tying to deserialize it to Scala's case class but I realize that's even harder. I searched through the internet and it seems everyone was doing it manually. Are you aware of any library which is able to do this?

答案1

得分: 0

这是一个适用于applicative模式的通用编解码派生解决方案:
https://github.com/danslapman/morphling

它不提供“导入和使用”的解决方案,但它提供了一种在不干扰shapeless/magnolia的情况下为您的协议编写自己的编解码派生机制的方式。

另外,如果您需要处理二进制数据,可以尝试:
https://github.com/scodec/scodec

它提供了一种非常Scala方式的解决此类问题的方法。

英文:

There is a pretty generic codec derivation solution for applicative schemas:

https://github.com/danslapman/morphling

It does not provide "import and use" solution but it does provide a way to write your own codec derivation mechanism for your protocol without messing up with shapeless/magnolia.

Also if you need to deal with binary data, try:

https://github.com/scodec/scodec

It provides pretty scala-way of solving such problems.

答案2

得分: 0

我能提供以下翻译:

  1. 我能够得到这个
  2. def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
  3. val readerSchema = ReflectData.get().getSchema(targetType)
  4. val idSize = 4
  5. val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
  6. def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
  7. this.configure(new KafkaAvroDeserializerConfig(configs))
  8. def deserialize(topic: String, data: Array[Byte]): A = {
  9. val bytes = ByteBuffer.wrap(data)
  10. bytes.get() // 跳过魔法字节
  11. val schemaId = bytes.getInt()
  12. val writerSchema = schemaRegistry.getById(schemaId)
  13. val length = bytes.limit() - 1 - idSize
  14. val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
  15. val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
  16. reader.read(null.asInstanceOf[A], decoder)
  17. }
  18. def close(): Unit = {}
  19. }
  20. val props = Map("schema.registry.url" -> schemaRegistryUrl)
  21. deserializer.configure(props.asJava, false)
  22. deserializer
  23. }

注意:代码部分保持不变,只翻译了注释和字符串。

英文:

I was able to come up with this:

  1. def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
  2. val readerSchema = ReflectData.get().getSchema(targetType)
  3. val idSize = 4
  4. val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
  5. def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
  6. this.configure(new KafkaAvroDeserializerConfig(configs))
  7. def deserialize(topic: String, data: Array[Byte]): A = {
  8. val bytes = ByteBuffer.wrap(data)
  9. bytes.get() // skip magic byte
  10. val schemaId = bytes.getInt()
  11. val writerSchema = schemaRegistry.getById(schemaId)
  12. val length = bytes.limit() - 1 - idSize
  13. val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
  14. val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
  15. reader.read(null.asInstanceOf[A], decoder)
  16. }
  17. def close(): Unit = {}
  18. }
  19. val props = Map("schema.registry.url" -> schemaRegistryUrl)
  20. deserializer.configure(props.asJava, false)
  21. deserializer

}

huangapple
  • 本文由 发表于 2020年1月6日 18:24:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/59610380.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定