英文:
Flink SerializationSchema: Could not serialize row error
问题
以下是翻译好的部分:
我在使用 Flink 的 SerializationSchema 时遇到了一些问题。
以下是我的主要代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DeserializationSchema<Row> sourceDeserializer = new JsonRowDeserializationSchema.Builder( /*从 avsc 模式文件中提取 TypeInformation<Row>*/ ).build();
DataStream<Row> myDataStream = env.addSource(new MyCustomSource(sourceDeserializer));
final SinkFunction<Row> sink = new MyCustomSink(new JsonRowSerializationSchema.Builder(myDataStream.getType()).build());
myDataStream.addSink(sink).name("MyCustomSink");
env.execute("MyJob");
以下是我的自定义 Sink Function:
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("serial")
public class MyCustomSink implements SinkFunction<Row> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSink.class);
private final boolean print;
private final SerializationSchema<Row> serializationSchema;
public MyCustomSink(final SerializationSchema<Row> serializationSchema) {
this.serializationSchema = serializationSchema;
}
@Override
public void invoke(final Row value, final Context context) throws Exception {
try {
LOGGER.info("MyCustomSink- invoke : [{}]", new String(serializationSchema.serialize(value)));
} catch (Exception e) {
LOGGER.error("MyCustomSink- Error while sending data : " + e);
}
}
}
以下是我的自定义 Source Function(不确定它是否对我遇到的问题有用):
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.io.ByteStreams;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyCustomSource<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSource.class);
private final DeserializationSchema<T> deserializationSchema;
public MyCustomSource(final DeserializationSchema<T> deserializer) {
this.deserializationSchema = deserializer;
}
@Override
public void open(final Configuration parameters) {
// ...
}
@Override
public void run(final SourceContext<T> ctx) throws Exception {
LOGGER.info("run");
InputStream data = ...; // 获取输入 JSON 数据
final T row = deserializationSchema.deserialize(ByteStreams.toByteArray(data));
ctx.collect(row);
}
@Override
public void cancel() {
// ...
}
@Override
public TypeInformation<T> getProducedType() {
return deserializationSchema.getProducedType();
}
}
现在我运行我的代码,按顺序将一些数据发送到我的流水线:
==>
{
"id": "sensor1",
"data": {
"rotation": 250
}
}
在这里,数据被正确地打印到我的 Sink 中:MyCustomSink- invoke : [{"id":"sensor1","data":{"rotation":250}}]
==>
{
"id": "sensor1"
}
在这里,数据被正确地打印到我的 Sink 中:MyCustomSink- invoke : [{"id":"sensor1","data":null}]
==>
{
"id": "sensor1",
"data": {
"rotation": 250
}
}
在这里,存在序列化错误。错误日志显示:MyCustomSink- Error while sending data : java.lang.RuntimeException: Could not serialize row 'sensor1,250'. Make sure that the schema matches the input.
我完全不明白为什么会出现这种情况。有人有想法吗?
注:
- 使用 Flink 1.9.2
-- 编辑 --
我添加了 CustomSource 部分
-- 编辑 2 --
经过更多调查,似乎这个行为是由 JsonRowSerializationSchema
的 private transient ObjectNode node
引起的。如果我理解正确,这是用于优化的,但似乎是问题的原因。
这是否是正常行为?如果是,对于我的情况,应该如何正确使用这个类?(否则,是否有任何方法可以绕过这个问题?)
英文:
I have some trouble using flink's SerializationSchema.
Here is my main code :
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DeserializationSchema<Row> sourceDeserializer = new JsonRowDeserializationSchema.Builder( /*Extract TypeInformation<Row> from an avsc schema file*/ ).build();
DataStream<Row> myDataStream = env.addSource( new MyCustomSource(sourceDeserializer) ) ;
final SinkFunction<Row> sink = new MyCustomSink(new JsonRowSerializationSchema.Builder(myDataStream.getType()).build());
myDataStream.addSink(sink).name("MyCustomSink");
env.execute("MyJob");
Here is my custom Sink Function :
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("serial")
public class MyCustomSink implements SinkFunction<Row> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSink.class);
private final boolean print;
private final SerializationSchema<Row> serializationSchema;
public MyCustomSink(final SerializationSchema<Row> serializationSchema) {
this.serializationSchema = serializationSchema;
}
@Override
public void invoke(final Row value, final Context context) throws Exception {
try {
LOGGER.info("MyCustomSink- invoke : [{}]", new String(serializationSchema.serialize(value)));
}catch (Exception e){
LOGGER.error("MyCustomSink- Error while sending data : " + e);
}
}
}
And here is my custom Source Function (not sure it is useful for the problem I have) :
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.io.ByteStreams;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyCustomSource<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSource.class);
/** the JSON deserializer */
private final DeserializationSchema<T> deserializationSchema;
public MyCustomSource(final DeserializationSchema<T> deserializer) {
this.deserializationSchema = deserializer;
}
@Override
public void open(final Configuration parameters) {
...
}
@Override
public void run(final SourceContext<T> ctx) throws Exception {
LOGGER.info("run");
InputStream data = ...; // Retrieve the input json data
final T row = deserializationSchema
.deserialize(ByteStreams.toByteArray(data));
ctx.collect(row);
}
@Override
public void cancel() {
...
}
@Override
public TypeInformation<T> getProducedType() {
return deserializationSchema.getProducedType();
}
}
Now I run my code and I send some data sequentially to my pipeline :
==>
{
"id": "sensor1",
"data":{
"rotation": 250
}
}
Here, the data is correctly printed by my sink : MyCustomSink- invoke : [{"id":"sensor1","data":{"rotation":250}}]
==>
{
"id": "sensor1"
}
Here, the data is correctly printed by my sink : MyCustomSink- invoke : [{"id":"sensor1","data":null}]
==>
{
"id": "sensor1",
"data":{
"rotation": 250
}
}
Here, there is an error on serialization. The error log printed is :
MyCustomSink- Error while sending data : java.lang.RuntimeException: Could not serialize row 'sensor1,250'. Make sure that the schema matches the input.
I do not understand at all why I have this behavior. Someone have an idea ?
Notes:
- Using Flink 1.9.2
-- EDIT --
I added the CustomSource part
-- EDIT 2 --
After more investigations, it looks like this behavior is caused by the private transient ObjectNode node
of the JsonRowSerializationSchema
. If I understand correctly, this is used for optimization, but seems to be the cause of my problem.
Is it the normal behavior, and if it is, what would be the correct use of this class in my case ? (Else, is there any way to bypass this problem ?)
答案1
得分: 1
这是一个JsonRowSerializationSchema
的错误,在最新的Flink版本中已经修复 - 我相信,这个PR解决了上述问题。
英文:
This is a JsonRowSerializationSchema
bug which has been fixed in most recent Flink versions - I believe, this PR addresses the issue above.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论