MapFunction的实现在Flink中不可序列化。

huangapple go评论104阅读模式
英文:

The implementation of the MapFunction is not serializable Flink

问题

Main Class:

public class CEP {

    private Integer streamsIdComp = 0;
    final private Map<Integer, DataStream<?>> dataStreams = new HashMap<>();
    final private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();

    public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream) {

        Preconditions.checkNotNull(inputStream, "dataStream");
        TypeInformation<T> streamType = inputStream.getType();

        KeyedStream<Tuple2<Integer, T>, Integer> keyedInputStream = inputStream
                .map(new MapFunction<T, Tuple2<Integer, T>>() {
                    @Override
                    public Tuple2<Integer, T> map(T value) throws Exception {
                        return Tuple2.of(streamsIdComp, value);
                    }
                })
                .keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
                        return integerTTuple2.f0;
                    }
                });
        return keyedInputStream;
    }

    public <T1> void addInputStream(DataStream<T1> inputStream) {

        TypeInformation<T1> streamType = inputStream.getType();

        dataStreamsTypes.put(streamsIdComp, streamType);
        dataStreams.put(streamsIdComp, this.converttoKeyedStream(inputStream));
        streamsIdComp++;
    }
}

Test Class:

public class CEPTest {

    @Test
    public void addInputStreamTest() throws Exception {
        //test if we can change keys in a keyedStream
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Record> input1 = env.fromElements(
                new Record("1", 1, "a"),
                new Record("2", 2, "b"),
                new Record("3", 3, "c"))
                .keyBy(Record::getBizName);

        DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4);

        CEP cepObject = new CEP();
        cepObject.addInputStream(input1);
        cepObject.addInputStream(input2);

    }
}

Error Message:

org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction 
is not serializable. The implementation accesses fields of its enclosing class, which is a 
common reason for non-serializability. A common solution is to make the function a proper 
(non-inner) class, or a static inner class.
java.io.NotSerializableException: CEP
英文:

I am trying to implement a class that enables the user to manipulate N input Streams without having constraints on Types of input Streams.

For starter, I wanted to transform all input DataStreams into keyedStreams.
So, I mapped the input dataStream<T> into a Tuple<Integer, T> and after that, I applied KeyBy to convert it into keystream.

I always get a problem of serialization, I tried to follow this guide https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html and it didn't work.

What I do like to know is :

  1. What is Serialization/Deserialization in Java ? and what is used for.
  2. What are the problems that I can counter in Flink with Serialization
  3. What is the problem in my code( you may find below the code and the error message)

Thank you very much.

Main Class:

public class CEP {
private  Integer streamsIdComp = 0;
final  private Map&lt;Integer, DataStream&lt;?&gt; &gt; dataStreams = new HashMap&lt;&gt;();
final  private Map&lt;Integer, TypeInformation&lt;?&gt;&gt; dataStreamsTypes = new HashMap&lt;&gt;();
public &lt;T&gt; KeyedStream&lt;Tuple2&lt;Integer, T&gt;, Integer&gt; converttoKeyedStream(DataStream&lt;T&gt; inputStream){
Preconditions.checkNotNull(inputStream, &quot;dataStream&quot;);
TypeInformation&lt;T&gt; streamType = inputStream.getType();
KeyedStream&lt;Tuple2&lt;Integer,T&gt;,Integer&gt; keyedInputStream = inputStream.
map(new MapFunction&lt;T, Tuple2&lt;Integer,T&gt;&gt;() {
@Override
public Tuple2&lt;Integer, T&gt; map(T value) throws Exception {
return Tuple2.of(streamsIdComp, value);
}
}).
keyBy(new KeySelector&lt;Tuple2&lt;Integer, T&gt;, Integer&gt;() {
@Override
public Integer getKey(Tuple2&lt;Integer, T&gt; integerTTuple2) throws Exception {
return integerTTuple2.f0;
}
});
return keyedInputStream;
}
public &lt;T1&gt; void addInputStream(DataStream&lt;T1&gt; inputStream) {
TypeInformation&lt;T1&gt; streamType = inputStream.getType();
dataStreamsTypes.put(streamsIdComp, streamType);
dataStreams.put(streamsIdComp, this.converttoKeyedStream(inputStream));
streamsIdComp++;
}
}

Test Class

public class CEPTest {
@Test
public void addInputStreamTest() throws Exception {
//test if we can change keys in a keyedStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream&lt;Record&gt; input1 = env.fromElements(
new Record(&quot;1&quot;, 1, &quot;a&quot;),
new Record(&quot;2&quot;, 2, &quot;b&quot;),
new Record(&quot;3&quot;, 3, &quot;c&quot;))
.keyBy(Record::getBizName);
DataStream&lt;Integer&gt; input2 = env.fromElements(1, 2, 3, 4);
CEP cepObject = new CEP();
cepObject.addInputStream(input1);
cepObject.addInputStream(input2);
}
}

Error Message

org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction 
is not serializable. The implementation accesses fields of its enclosing class, which is a 
common reason for non-serializability. A common solution is to make the function a proper 
(non-inner) class, or a static inner class.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)
at CEP.converttoKeyedStream(CEP.java:25)
at CEP.addInputStream(CEP.java:45)
at CEPTest.addInputStreamTest(CEPTest.java:33)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.io.NotSerializableException: CEP
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 29 more

答案1

得分: 7

Flink是一个分布式框架。这意味着你的程序有可能在成千上万个节点上运行。这也意味着每个工作节点都必须接收要执行的代码以及所需的上下文。简单来说,系统中流动的事件和要执行的函数都必须是可序列化的 - 因为它们通过网络传输。这就是为什么序列化在分布式编程中很重要。


简而言之,序列化是将数据编码为字节表示的过程,可以在另一个节点(另一个JVM)上传输和恢复。


回到问题。这是你的原因:

Caused by: java.io.NotSerializableException: CEP

其中的问题是由这一行引起的:

return Tuple2.of(streamsIdComp, value);

你正在使用streamsIdComp变量,它是CEP类中的一个字段。这意味着,Flink必须对整个类进行序列化,以便在执行MapFunction时能够访问此字段。你可以通过在converttoKeyedStream函数中引入局部变量来解决这个问题:

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

    Preconditions.checkNotNull(inputStream, "dataStream");
    TypeInformation<T> streamType = inputStream.getType();
    // 请注意,此变量是局部的
    int localStreamsIdComp = streamsIdComp;

    KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
            map(new MapFunction<T, Tuple2<Integer,T>>() {
                @Override
                public Tuple2<Integer, T> map(T value) throws Exception {
                    // 并且在这里被使用
                    return Tuple2.of(localStreamsIdComp, value);
                }
            }).
            keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
                @Override
                public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
                    return integerTTuple2.f0;
                }
            });
    return keyedInputStream;
}

这样,Flink只需要对这个单独的变量进行序列化,而不是整个类本身。

英文:

Flink is a distributed framework. That means, your program is potentially going to run on a thousands of nodes. This also means that each worker node has to receive code to be executed along with the required context. Simplifying a bit, both events flowing through the system and functions to be executed have to be serializable - as they are transfer via the wire. This is why serialization is important in distributed programming in general.


In short, serialization is a process of encoding data into byte representation that can be transferred and restored on another node (another JVM).


Back to the problem. Here is your cause:

Caused by: java.io.NotSerializableException: CEP

which is caused by line

return Tuple2.of(streamsIdComp, value);

You are using streamsIdComp variable which is a field in CEP class. That means, Flink has to serialize whole class to be able to access this field when executing MapFunction. You can overcome it by introducing local variable in converttoKeyedStream function:

public &lt;T&gt; KeyedStream&lt;Tuple2&lt;Integer, T&gt;, Integer&gt; converttoKeyedStream(DataStream&lt;T&gt; inputStream){
Preconditions.checkNotNull(inputStream, &quot;dataStream&quot;);
TypeInformation&lt;T&gt; streamType = inputStream.getType();
// note this variable is local
int localStreamsIdComp = streamsIdComp;
KeyedStream&lt;Tuple2&lt;Integer,T&gt;,Integer&gt; keyedInputStream = inputStream.
map(new MapFunction&lt;T, Tuple2&lt;Integer,T&gt;&gt;() {
@Override
public Tuple2&lt;Integer, T&gt; map(T value) throws Exception {
// and is used here
return Tuple2.of(localStreamsIdComp, value);
}
}).
keyBy(new KeySelector&lt;Tuple2&lt;Integer, T&gt;, Integer&gt;() {
@Override
public Integer getKey(Tuple2&lt;Integer, T&gt; integerTTuple2) throws Exception {
return integerTTuple2.f0;
}
});
return keyedInputStream;
}

that way Flink has to serialize just this single variable, not the whole class itself.

huangapple
  • 本文由 发表于 2020年4月10日 03:24:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/61128734.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定