Apply generic to deserialize in Kafka using Flink

huangapple go评论56阅读模式
英文:

Apply generic to deserialize in Kafka using Flink

问题

我正试图反序列化对象以处理Flink中的逻辑。由于记录通过Kafka消费,所以我使用了KafkaDeserialization类。

当添加新类型的对象时,我需要像下面这样添加反序列化类。

//现有的反序列化类
class ADeserialize {
extends KafkaDeserializationSchema[TypeAClass] {
val mapper: ObjectMapper = new ObjectMapper

override def isEndOfStream(nextElement: TypeAClass): Boolean = false

override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): TypeAClass = {
val jsonNode = mapper.readValue(value, classOf[JsonNode])
}
}

override def getProducedType: TypeInformation[TypeAClass] = Types.CASE_CLASS[TypeAClass]
}

//新添加的反序列化类
class BDeserialize {
extends KafkaDeserializationSchema[TypeBClass] {
val mapper: ObjectMapper = new ObjectMapper

override def isEndOfStream(nextElement: TypeBClass): Boolean = false

override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): TypeBClass = {
val jsonNode = mapper.readValue(value, classOf[JsonNode])
}
}

override def getProducedType: TypeInformation[TypeBClass] = Types.CASE_CLASS[TypeBClass]
}

如您所见,如果添加不同的数据源,那么我需要多次创建,这会生成重复的数据源。为了防止这个问题,我认为将数据源转换成通用数据源是一个好主意。但是我尝试使用KafkaDeserializationSchema来进行转换,但失败了。我的Flink版本是1.11,因为它是遗留版本。

任何帮助将不胜感激。

英文:

I am trying to deserialize the object to process the logic in Flink.
Since the records are consumed via Kafka so I used the KafkaDeserialization class.

When the new type of the object is added, I need to add the deserialize class as below.

//Existing deserialize class
class ADeserialize {
    extends KafkaDeserializationSchema[TypeAClass] {
  val mapper: ObjectMapper = new ObjectMapper

  override def isEndOfStream(nextElement: TypeAClass): Boolean = false

  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): TypeAClass = {
        val jsonNode = mapper.readValue(value, classOf[JsonNode])
    }
  }

  override def getProducedType: TypeInformation[TypeAClass] = Types.CASE_CLASS[TypeAClass]
}


//Newly added deserialize class
class BDeserialize {
    extends KafkaDeserializationSchema[TypeBClass] {
  val mapper: ObjectMapper = new ObjectMapper

  override def isEndOfStream(nextElement: TypeBClass): Boolean = false

  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): TypeBClass = {
        val jsonNode = mapper.readValue(value, classOf[JsonNode])
    }
  }

  override def getProducedType: TypeInformation[TypeBClass] = Types.CASE_CLASS[TypeBClass]
}

As you can see, if there are different sources added, then I need to create many times which generates the duplicated sources. To prevent this issue, I think that convering the sources into generic one is idea. But I simply failed to convert it using KafkaDeserializationSchema. My flink version is 1.11 since it is legacy.

Any help will be appreciated Thanks.

答案1

得分: 1

你想要的是类似这样的:

class MyJsonDeserializationSchema[T](implicit typeInfo: TypeInformation[T]) extends KafkaDeserializationSchema[T] {
  val mapper: ObjectMapper = new ObjectMapper

  override def isEndOfStream(nextElement: T): Boolean = false

  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): T = {
        val jsonNode = mapper.readValue(value, classOf[T])
    }
  }

  override def getProducedType: TypeInformation[T] = typeInfo

}
英文:

What You want is something like :

class MyJsonDeserializationSchema[T](implicit typeInfo: TypeInformation[T) extends KafkaDeserializationSchema[T] {
  val mapper: ObjectMapper = new ObjectMapper

  override def isEndOfStream(nextElement: T): Boolean = false

  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): T = {
        val jsonNode = mapper.readValue(value, classOf[T])
    }
  }

  override def getProducedType: TypeInformation[T] = typeInfo

}

huangapple
  • 本文由 发表于 2023年2月6日 13:03:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/75357485.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定