英文:
ClassCastException while Flink run : cannot assign instance of java.util.LinkedHashMap to field org.apache.flink.runtime.jobgraph.JobVertex.results
问题
运行 Flink 作业时出错:
ClassCastException: 无法将 java.util.LinkedHashMap 的实例分配给 org.apache.flink.runtime.jobgraph.JobVertex.results,类型为 java.util.ArrayList,在 org.apache.flink.runtime.jobgraph.InputOutputFormatVertex 的实例中。
以下是源代码:
```java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params);
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<String> filtered = text.filter(new FilterFunction<String>()
{
public boolean filter(String value)
{
return value.startsWith("N");
}
});
DataSet<Tuple2<String, Integer>> tokenized = filtered.map(new Tokenizer());
DataSet<Tuple2<String, Integer>> counts = tokenized.groupBy(new int[] { 0 }).sum(1);
if (params.has("output"))
{
counts.writeAsText(params.get("output"));
env.execute("WordCount Example");
}
}
public static final class Tokenizer
implements MapFunction<String, Tuple2<String, Integer>>
{
public Tuple2<String, Integer> map(String value)
{
return new Tuple2(value, Integer.valueOf(1));
}
}
错误:
错误图片
<details>
<summary>英文:</summary>
Error when run Flink job:
ClassCastException: cannot assign instance of java.util.LinkedHashMap to field org.apache.flink.runtime.jobgraph.JobVertex.results of type java.util.ArrayList in instance of org.apache.flink.runtime.jobgraph.InputOutputFormatVertex
Below is the source code
```java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params);
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<String> filtered = text.filter(new FilterFunction<String>()
{
public boolean filter(String value)
{
return value.startsWith("N");
}
});
DataSet<Tuple2<String, Integer>> tokenized = filtered.map(new Tokenizer());
DataSet<Tuple2<String, Integer>> counts = tokenized.groupBy(new int[] { 0 }).sum(1);
if (params.has("output"))
{
counts.writeAsText(params.get("output"));
env.execute("WordCount Example");
}
}
public static final class Tokenizer
implements MapFunction<String, Tuple2<String, Integer>>
{
public Tuple2<String, Integer> map(String value)
{
return new Tuple2(value, Integer.valueOf(1));
}
}
Error:
Error image
答案1
得分: 1
你是对的,David Anderson。这是我本地机器上的版本不匹配问题,我已经通过升级本地Flink集群版本到最新版来解决了。
英文:
You are right David Anderson, It was a version mismatch on my local machine,
I have fixed this by upgrading my local Flink cluster version to latest version.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论