英文:
The implementation of the MapFunction is not serializable Flink
问题
Main Class:
public class CEP {
private Integer streamsIdComp = 0;
final private Map<Integer, DataStream<?>> dataStreams = new HashMap<>();
final private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();
public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream) {
Preconditions.checkNotNull(inputStream, "dataStream");
TypeInformation<T> streamType = inputStream.getType();
KeyedStream<Tuple2<Integer, T>, Integer> keyedInputStream = inputStream
.map(new MapFunction<T, Tuple2<Integer, T>>() {
@Override
public Tuple2<Integer, T> map(T value) throws Exception {
return Tuple2.of(streamsIdComp, value);
}
})
.keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
return integerTTuple2.f0;
}
});
return keyedInputStream;
}
public <T1> void addInputStream(DataStream<T1> inputStream) {
TypeInformation<T1> streamType = inputStream.getType();
dataStreamsTypes.put(streamsIdComp, streamType);
dataStreams.put(streamsIdComp, this.converttoKeyedStream(inputStream));
streamsIdComp++;
}
}
Test Class:
public class CEPTest {
@Test
public void addInputStreamTest() throws Exception {
//test if we can change keys in a keyedStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Record> input1 = env.fromElements(
new Record("1", 1, "a"),
new Record("2", 2, "b"),
new Record("3", 3, "c"))
.keyBy(Record::getBizName);
DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4);
CEP cepObject = new CEP();
cepObject.addInputStream(input1);
cepObject.addInputStream(input2);
}
}
Error Message:
org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction
is not serializable. The implementation accesses fields of its enclosing class, which is a
common reason for non-serializability. A common solution is to make the function a proper
(non-inner) class, or a static inner class.
java.io.NotSerializableException: CEP
英文:
I am trying to implement a class that enables the user to manipulate N input Streams without having constraints on Types of input Streams.
For starter, I wanted to transform all input DataStreams into keyedStreams.
So, I mapped the input dataStream<T> into a Tuple<Integer, T> and after that, I applied KeyBy to convert it into keystream.
I always get a problem of serialization, I tried to follow this guide https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html and it didn't work.
What I do like to know is :
- What is Serialization/Deserialization in Java ? and what is used for.
- What are the problems that I can counter in Flink with Serialization
- What is the problem in my code( you may find below the code and the error message)
Thank you very much.
Main Class:
public class CEP {
private Integer streamsIdComp = 0;
final private Map<Integer, DataStream<?> > dataStreams = new HashMap<>();
final private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();
public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){
Preconditions.checkNotNull(inputStream, "dataStream");
TypeInformation<T> streamType = inputStream.getType();
KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
map(new MapFunction<T, Tuple2<Integer,T>>() {
@Override
public Tuple2<Integer, T> map(T value) throws Exception {
return Tuple2.of(streamsIdComp, value);
}
}).
keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
return integerTTuple2.f0;
}
});
return keyedInputStream;
}
public <T1> void addInputStream(DataStream<T1> inputStream) {
TypeInformation<T1> streamType = inputStream.getType();
dataStreamsTypes.put(streamsIdComp, streamType);
dataStreams.put(streamsIdComp, this.converttoKeyedStream(inputStream));
streamsIdComp++;
}
}
Test Class
public class CEPTest {
@Test
public void addInputStreamTest() throws Exception {
//test if we can change keys in a keyedStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Record> input1 = env.fromElements(
new Record("1", 1, "a"),
new Record("2", 2, "b"),
new Record("3", 3, "c"))
.keyBy(Record::getBizName);
DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4);
CEP cepObject = new CEP();
cepObject.addInputStream(input1);
cepObject.addInputStream(input2);
}
}
Error Message
org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction
is not serializable. The implementation accesses fields of its enclosing class, which is a
common reason for non-serializability. A common solution is to make the function a proper
(non-inner) class, or a static inner class.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)
at CEP.converttoKeyedStream(CEP.java:25)
at CEP.addInputStream(CEP.java:45)
at CEPTest.addInputStreamTest(CEPTest.java:33)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.io.NotSerializableException: CEP
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 29 more
答案1
得分: 7
Flink是一个分布式框架。这意味着你的程序有可能在成千上万个节点上运行。这也意味着每个工作节点都必须接收要执行的代码以及所需的上下文。简单来说,系统中流动的事件和要执行的函数都必须是可序列化的 - 因为它们通过网络传输。这就是为什么序列化在分布式编程中很重要。
简而言之,序列化是将数据编码为字节表示的过程,可以在另一个节点(另一个JVM)上传输和恢复。
回到问题。这是你的原因:
Caused by: java.io.NotSerializableException: CEP
其中的问题是由这一行引起的:
return Tuple2.of(streamsIdComp, value);
你正在使用streamsIdComp变量,它是CEP类中的一个字段。这意味着,Flink必须对整个类进行序列化,以便在执行MapFunction时能够访问此字段。你可以通过在converttoKeyedStream函数中引入局部变量来解决这个问题:
public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){
Preconditions.checkNotNull(inputStream, "dataStream");
TypeInformation<T> streamType = inputStream.getType();
// 请注意,此变量是局部的
int localStreamsIdComp = streamsIdComp;
KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
map(new MapFunction<T, Tuple2<Integer,T>>() {
@Override
public Tuple2<Integer, T> map(T value) throws Exception {
// 并且在这里被使用
return Tuple2.of(localStreamsIdComp, value);
}
}).
keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
return integerTTuple2.f0;
}
});
return keyedInputStream;
}
这样,Flink只需要对这个单独的变量进行序列化,而不是整个类本身。
英文:
Flink is a distributed framework. That means, your program is potentially going to run on a thousands of nodes. This also means that each worker node has to receive code to be executed along with the required context. Simplifying a bit, both events flowing through the system and functions to be executed have to be serializable - as they are transfer via the wire. This is why serialization is important in distributed programming in general.
In short, serialization is a process of encoding data into byte representation that can be transferred and restored on another node (another JVM).
Back to the problem. Here is your cause:
Caused by: java.io.NotSerializableException: CEP
which is caused by line
return Tuple2.of(streamsIdComp, value);
You are using streamsIdComp variable which is a field in CEP class. That means, Flink has to serialize whole class to be able to access this field when executing MapFunction. You can overcome it by introducing local variable in converttoKeyedStream function:
public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){
Preconditions.checkNotNull(inputStream, "dataStream");
TypeInformation<T> streamType = inputStream.getType();
// note this variable is local
int localStreamsIdComp = streamsIdComp;
KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
map(new MapFunction<T, Tuple2<Integer,T>>() {
@Override
public Tuple2<Integer, T> map(T value) throws Exception {
// and is used here
return Tuple2.of(localStreamsIdComp, value);
}
}).
keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
return integerTTuple2.f0;
}
});
return keyedInputStream;
}
that way Flink has to serialize just this single variable, not the whole class itself.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论