Avro GenericRecord 转换为嵌套的 POJO

huangapple go评论67阅读模式
英文:

Avro GenericRecord to nested POJO

问题

有没有一种方法可以将从Kafka消息中获取的GenericRecord反序列化为嵌套的POJO?实际上,我正在尝试将其反序列化为Scala的case class,但我意识到这更加困难。我在互联网上搜索过,似乎每个人都在手动执行此操作。您是否知道任何可以实现这一目标的库?

英文:

Is there a way to deserialize GenericRecord (which I just got from Kafka message) to nested POJO? I am actually tying to deserialize it to Scala's case class but I realize that's even harder. I searched through the internet and it seems everyone was doing it manually. Are you aware of any library which is able to do this?

答案1

得分: 0

这是一个适用于applicative模式的通用编解码派生解决方案:
https://github.com/danslapman/morphling

它不提供“导入和使用”的解决方案,但它提供了一种在不干扰shapeless/magnolia的情况下为您的协议编写自己的编解码派生机制的方式。

另外,如果您需要处理二进制数据,可以尝试:
https://github.com/scodec/scodec

它提供了一种非常Scala方式的解决此类问题的方法。

英文:

There is a pretty generic codec derivation solution for applicative schemas:

https://github.com/danslapman/morphling

It does not provide "import and use" solution but it does provide a way to write your own codec derivation mechanism for your protocol without messing up with shapeless/magnolia.

Also if you need to deal with binary data, try:

https://github.com/scodec/scodec

It provides pretty scala-way of solving such problems.

答案2

得分: 0

我能提供以下翻译:

我能够得到这个

      def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
    val readerSchema = ReflectData.get().getSchema(targetType)
    val idSize = 4


    val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
        def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
          this.configure(new KafkaAvroDeserializerConfig(configs))

      def deserialize(topic: String, data: Array[Byte]): A = {
          val bytes = ByteBuffer.wrap(data)
          bytes.get() // 跳过魔法字节
          val schemaId = bytes.getInt()
          val writerSchema = schemaRegistry.getById(schemaId)
          val length = bytes.limit() - 1 - idSize
          val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
          val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
          reader.read(null.asInstanceOf[A], decoder)
        }

      def close(): Unit = {}
    }
    val props = Map("schema.registry.url" -> schemaRegistryUrl)
    deserializer.configure(props.asJava, false)
    deserializer
  }

注意:代码部分保持不变,只翻译了注释和字符串。

英文:

I was able to come up with this:

  def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
val readerSchema = ReflectData.get().getSchema(targetType)
val idSize = 4


val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
    def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
      this.configure(new KafkaAvroDeserializerConfig(configs))

  def deserialize(topic: String, data: Array[Byte]): A = {
      val bytes = ByteBuffer.wrap(data)
      bytes.get() // skip magic byte
      val schemaId = bytes.getInt()
      val writerSchema = schemaRegistry.getById(schemaId)
      val length = bytes.limit() - 1 - idSize
      val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
      val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
      reader.read(null.asInstanceOf[A], decoder)
    }

  def close(): Unit = {}
}
val props = Map("schema.registry.url" -> schemaRegistryUrl)
deserializer.configure(props.asJava, false)
deserializer

}

huangapple
  • 本文由 发表于 2020年1月6日 18:24:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/59610380.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定