如何为Flink中的GenericRowData类型对象提供TypeInformation

huangapple go评论53阅读模式
英文:

How to provide TypeInformation for a GenericRowData type Objects in Flink

问题

Here's the translated code part you requested:

public class KafkaDeserializer implements DeserializationSchema<RowData> {

    private transient ObjectMapper mapper;

    @Override
    public void open(InitializationContext context) {
        mapper = JacksonMapperFactory.createObjectMapper();
    }

    @Override
    public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
        DeserializationSchema.super.deserialize(message, out);
    }

    public RowData deserialize(byte[] message) throws IOException {

        JsonNode messagePayload = mapper.readTree(message);

        GenericRowData genericRowData = GenericRowData.of(
            StringData.fromString(messagePayload.get("x")),
            StringData.fromString(messagePayload.get("Y")),
            StringData.fromString(messagePayload.get("Z"))
            // ...........
        );
    }

    public TypeInformation<RowData> getProducedType() {

        return TypeInformation.of(new TypeHint<RowData>() {})
    }
}

If you have any more specific questions or need further assistance with this code, please feel free to ask.

英文:

I am using a deserializer to parse a Kafka Stream (of JSON Strings) and I'm then using the GeneericRowData class to convert the Object node to type RowData instance, which is supported by hudi to write directly from DataStream. I'm expected to provide the TypeInformation, so that Flink doesn't fall back to generic serialization but I'm unable to find a way, as of how to provide a TypeInformation of that matches with the GenericRowData type. I tried using TypeHint but that doesn't work.
Here's a sample code for what I'm trying to achieve.

        public class KafkaDeserializer implements DeserializationSchema&lt;RowData&gt; {
    
        private transient ObjectMapper mapper;
    
        @Override
        public void open(InitializationContext context) {
            mapper = JacksonMapperFactory.createObjectMapper();
        }
    
        @Override
        public void deserialize(byte[] message, Collector&lt;RowData&gt; out) throws IOException {
            DeserializationSchema.super.deserialize(message, out);
        }
    
    
         public RowData deserialize(byte[] message) throws IOException {
    
    
         JsonNode messagePayload = mapper.readTree(message);
    
    
         GenericRowData genericRowData = GenericRowData.of(
                        StringData.fromString(messagePayload.get(&quot;x&quot;)),
                        StringData.fromString(messagePayload.get(&quot;Y&quot;)),
                        StringData.fromString(messagePayload.get(&quot;Z&quot;))
                        ...........
                        );
         }

         public TypeInformation&lt;RowData&gt; getProducedType() {

          return TypeInformation.of(new TypeHint&lt;&gt;() {})
     }
 }

Need some guidance on how can I implement the above function getProducedType().

答案1

得分: 1

只提供TypeInformation并不足以让Flink选择一个“好”的序列化器。TypeInformation必须返回一个除了Kryo之外的序列化器,而我相当确定这不会顺利运行。

你可以实现自己的GenericRowDataTypeInfo类,该类扩展了TypeInformation<GenericRowData>,并且从createSerializer()方法返回Flink的RowDataSerializer

英文:

Just providing TypeInformation isn't enough to for Flink to pick a "good" serializer. The TypeInformation has to return a serializer other than Kryo for GenericRowData, and I'm pretty sure that won't just work out of the box.

You could implement your own GenericRowDataTypeInfo class that extends TypeInformation&lt;GenericRowData&gt;, and returns Flink's RowDataSerializer from the createSerializer() method.

huangapple
  • 本文由 发表于 2023年5月11日 15:07:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/76224934.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定