Flink序列化模式:无法序列化行错误

huangapple go评论58阅读模式
英文:

Flink SerializationSchema: Could not serialize row error

问题

以下是翻译好的部分:

我在使用 Flink 的 SerializationSchema 时遇到了一些问题。

以下是我的主要代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DeserializationSchema<Row> sourceDeserializer = new JsonRowDeserializationSchema.Builder( /*从 avsc 模式文件中提取 TypeInformation<Row>*/ ).build();
DataStream<Row> myDataStream = env.addSource(new MyCustomSource(sourceDeserializer));
final SinkFunction<Row> sink = new MyCustomSink(new JsonRowSerializationSchema.Builder(myDataStream.getType()).build());
myDataStream.addSink(sink).name("MyCustomSink");

env.execute("MyJob");

以下是我的自定义 Sink Function:

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("serial")
public class MyCustomSink implements SinkFunction<Row> {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSink.class);
    private final boolean print;
    private final SerializationSchema<Row> serializationSchema;

    public MyCustomSink(final SerializationSchema<Row> serializationSchema) {
        this.serializationSchema = serializationSchema;
    }

    @Override
    public void invoke(final Row value, final Context context) throws Exception {

        try {
            LOGGER.info("MyCustomSink- invoke : [{}]", new String(serializationSchema.serialize(value)));
        } catch (Exception e) {
            LOGGER.error("MyCustomSink- Error while sending data : " + e);
        }
    }
}

以下是我的自定义 Source Function(不确定它是否对我遇到的问题有用):

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.io.ByteStreams;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyCustomSource<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSource.class);
    private final DeserializationSchema<T> deserializationSchema;

    public MyCustomSource(final DeserializationSchema<T> deserializer) {
        this.deserializationSchema = deserializer;
    }

    @Override
    public void open(final Configuration parameters) {
        // ...
    }

    @Override
    public void run(final SourceContext<T> ctx) throws Exception {
        LOGGER.info("run");
        InputStream data = ...; // 获取输入 JSON 数据
        final T row = deserializationSchema.deserialize(ByteStreams.toByteArray(data));
        ctx.collect(row);
    }

    @Override
    public void cancel() {
        // ...
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return deserializationSchema.getProducedType();
    }
}

现在我运行我的代码,按顺序将一些数据发送到我的流水线:

==>

{
    "id": "sensor1",
    "data": {
        "rotation": 250
    }
}

在这里,数据被正确地打印到我的 Sink 中:MyCustomSink- invoke : [{"id":"sensor1","data":{"rotation":250}}]

==>

{
    "id": "sensor1"
}

在这里,数据被正确地打印到我的 Sink 中:MyCustomSink- invoke : [{"id":"sensor1","data":null}]

==>

{
    "id": "sensor1",
    "data": {
        "rotation": 250
    }
}

在这里,存在序列化错误。错误日志显示:MyCustomSink- Error while sending data : java.lang.RuntimeException: Could not serialize row 'sensor1,250'. Make sure that the schema matches the input.

我完全不明白为什么会出现这种情况。有人有想法吗?

注:

  • 使用 Flink 1.9.2

-- 编辑 --

我添加了 CustomSource 部分

-- 编辑 2 --

经过更多调查,似乎这个行为是由 JsonRowSerializationSchemaprivate transient ObjectNode node 引起的。如果我理解正确,这是用于优化的,但似乎是问题的原因。

这是否是正常行为?如果是,对于我的情况,应该如何正确使用这个类?(否则,是否有任何方法可以绕过这个问题?)

英文:

I have some trouble using flink's SerializationSchema.

Here is my main code :

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DeserializationSchema&lt;Row&gt; sourceDeserializer = new JsonRowDeserializationSchema.Builder( /*Extract TypeInformation&lt;Row&gt; from an avsc schema file*/ ).build();
DataStream&lt;Row&gt; myDataStream = env.addSource( new MyCustomSource(sourceDeserializer) ) ;
final SinkFunction&lt;Row&gt; sink = new MyCustomSink(new JsonRowSerializationSchema.Builder(myDataStream.getType()).build());
myDataStream.addSink(sink).name(&quot;MyCustomSink&quot;);

env.execute(&quot;MyJob&quot;);

Here is my custom Sink Function :

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings(&quot;serial&quot;)
public class MyCustomSink implements SinkFunction&lt;Row&gt; {

	private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSink.class);
	private final boolean print;
	private final SerializationSchema&lt;Row&gt; serializationSchema;

	public MyCustomSink(final SerializationSchema&lt;Row&gt; serializationSchema) {
		this.serializationSchema = serializationSchema;
	}

	@Override
	public void invoke(final Row value, final Context context) throws Exception {

		try {
			LOGGER.info(&quot;MyCustomSink- invoke : [{}]&quot;, new String(serializationSchema.serialize(value)));
		}catch (Exception e){
			LOGGER.error(&quot;MyCustomSink- Error while sending data : &quot; + e);
		}
	}
}

And here is my custom Source Function (not sure it is useful for the problem I have) :

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.io.ByteStreams;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyCustomSource&lt;T&gt; extends RichSourceFunction&lt;T&gt; implements ResultTypeQueryable&lt;T&gt; {

	/** logger */
	private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSource.class);

	/** the JSON deserializer */
	private final DeserializationSchema&lt;T&gt; deserializationSchema;

	public MyCustomSource(final DeserializationSchema&lt;T&gt; deserializer) {
		this.deserializationSchema = deserializer;
	}

	@Override
	public void open(final Configuration parameters) {
		...
	}

	@Override
	public void run(final SourceContext&lt;T&gt; ctx) throws Exception {
		LOGGER.info(&quot;run&quot;);
		InputStream data = ...; // Retrieve the input json data
		final T row = deserializationSchema
						.deserialize(ByteStreams.toByteArray(data));
		ctx.collect(row);
		
	}

	@Override
	public void cancel() {
		...
	}

	@Override
	public TypeInformation&lt;T&gt; getProducedType() {
		return deserializationSchema.getProducedType();
	}
}

Now I run my code and I send some data sequentially to my pipeline :

==&gt; 
{
    &quot;id&quot;: &quot;sensor1&quot;,
	&quot;data&quot;:{
		&quot;rotation&quot;: 250
	}
}

Here, the data is correctly printed by my sink : MyCustomSink- invoke : [{&quot;id&quot;:&quot;sensor1&quot;,&quot;data&quot;:{&quot;rotation&quot;:250}}]

==&gt; 
{
    &quot;id&quot;: &quot;sensor1&quot;
}

Here, the data is correctly printed by my sink : MyCustomSink- invoke : [{&quot;id&quot;:&quot;sensor1&quot;,&quot;data&quot;:null}]

==&gt; 
{
    &quot;id&quot;: &quot;sensor1&quot;,
	&quot;data&quot;:{
		&quot;rotation&quot;: 250
	}
}

Here, there is an error on serialization. The error log printed is :

MyCustomSink- Error while sending data : java.lang.RuntimeException: Could not serialize row &#39;sensor1,250&#39;. Make sure that the schema matches the input.

I do not understand at all why I have this behavior. Someone have an idea ?

Notes:

  • Using Flink 1.9.2

-- EDIT --

I added the CustomSource part

-- EDIT 2 --

After more investigations, it looks like this behavior is caused by the private transient ObjectNode node of the JsonRowSerializationSchema. If I understand correctly, this is used for optimization, but seems to be the cause of my problem.

Is it the normal behavior, and if it is, what would be the correct use of this class in my case ? (Else, is there any way to bypass this problem ?)

答案1

得分: 1

这是一个JsonRowSerializationSchema的错误,在最新的Flink版本中已经修复 - 我相信,这个PR解决了上述问题。

英文:

This is a JsonRowSerializationSchema bug which has been fixed in most recent Flink versions - I believe, this PR addresses the issue above.

huangapple
  • 本文由 发表于 2020年9月2日 14:47:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/63700121.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定