英文:
Apply generic to deserialize in Kafka using Flink
问题
我正试图反序列化对象以处理Flink中的逻辑。由于记录通过Kafka消费,所以我使用了KafkaDeserialization
类。
当添加新类型的对象时,我需要像下面这样添加反序列化类。
//现有的反序列化类
class ADeserialize {
extends KafkaDeserializationSchema[TypeAClass] {
val mapper: ObjectMapper = new ObjectMapper
override def isEndOfStream(nextElement: TypeAClass): Boolean = false
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): TypeAClass = {
val jsonNode = mapper.readValue(value, classOf[JsonNode])
}
}
override def getProducedType: TypeInformation[TypeAClass] = Types.CASE_CLASS[TypeAClass]
}
//新添加的反序列化类
class BDeserialize {
extends KafkaDeserializationSchema[TypeBClass] {
val mapper: ObjectMapper = new ObjectMapper
override def isEndOfStream(nextElement: TypeBClass): Boolean = false
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): TypeBClass = {
val jsonNode = mapper.readValue(value, classOf[JsonNode])
}
}
override def getProducedType: TypeInformation[TypeBClass] = Types.CASE_CLASS[TypeBClass]
}
如您所见,如果添加不同的数据源,那么我需要多次创建,这会生成重复的数据源。为了防止这个问题,我认为将数据源转换成通用数据源是一个好主意。但是我尝试使用KafkaDeserializationSchema
来进行转换,但失败了。我的Flink版本是1.11
,因为它是遗留版本。
任何帮助将不胜感激。
英文:
I am trying to deserialize the object to process the logic in Flink.
Since the records are consumed via Kafka so I used the KafkaDeserialization
class.
When the new type of the object is added, I need to add the deserialize class as below.
//Existing deserialize class
class ADeserialize {
extends KafkaDeserializationSchema[TypeAClass] {
val mapper: ObjectMapper = new ObjectMapper
override def isEndOfStream(nextElement: TypeAClass): Boolean = false
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): TypeAClass = {
val jsonNode = mapper.readValue(value, classOf[JsonNode])
}
}
override def getProducedType: TypeInformation[TypeAClass] = Types.CASE_CLASS[TypeAClass]
}
//Newly added deserialize class
class BDeserialize {
extends KafkaDeserializationSchema[TypeBClass] {
val mapper: ObjectMapper = new ObjectMapper
override def isEndOfStream(nextElement: TypeBClass): Boolean = false
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): TypeBClass = {
val jsonNode = mapper.readValue(value, classOf[JsonNode])
}
}
override def getProducedType: TypeInformation[TypeBClass] = Types.CASE_CLASS[TypeBClass]
}
As you can see, if there are different sources added, then I need to create many times which generates the duplicated sources. To prevent this issue, I think that convering the sources into generic one is idea. But I simply failed to convert it using KafkaDeserializationSchema
. My flink version is 1.11
since it is legacy.
Any help will be appreciated Thanks.
答案1
得分: 1
你想要的是类似这样的:
class MyJsonDeserializationSchema[T](implicit typeInfo: TypeInformation[T]) extends KafkaDeserializationSchema[T] {
val mapper: ObjectMapper = new ObjectMapper
override def isEndOfStream(nextElement: T): Boolean = false
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): T = {
val jsonNode = mapper.readValue(value, classOf[T])
}
}
override def getProducedType: TypeInformation[T] = typeInfo
}
英文:
What You want is something like :
class MyJsonDeserializationSchema[T](implicit typeInfo: TypeInformation[T) extends KafkaDeserializationSchema[T] {
val mapper: ObjectMapper = new ObjectMapper
override def isEndOfStream(nextElement: T): Boolean = false
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): T = {
val jsonNode = mapper.readValue(value, classOf[T])
}
}
override def getProducedType: TypeInformation[T] = typeInfo
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论