ClassCastException while Flink run : cannot assign instance of java.util.LinkedHashMap to field org.apache.flink.runtime.jobgraph.JobVertex.results

huangapple go评论97阅读模式
英文:

ClassCastException while Flink run : cannot assign instance of java.util.LinkedHashMap to field org.apache.flink.runtime.jobgraph.JobVertex.results

问题

运行 Flink 作业时出错:

ClassCastException: 无法将 java.util.LinkedHashMap 的实例分配给 org.apache.flink.runtime.jobgraph.JobVertex.results,类型为 java.util.ArrayList,在 org.apache.flink.runtime.jobgraph.InputOutputFormatVertex 的实例中。


以下是源代码:

```java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ParameterTool params = ParameterTool.fromArgs(args);

env.getConfig().setGlobalJobParameters(params);

DataSet<String> text = env.readTextFile(params.get("input"));

DataSet<String> filtered = text.filter(new FilterFunction<String>()

{
    public boolean filter(String value)
    {
        return value.startsWith("N");
    }
});
DataSet<Tuple2<String, Integer>> tokenized = filtered.map(new Tokenizer());

DataSet<Tuple2<String, Integer>> counts = tokenized.groupBy(new int[] { 0 }).sum(1);
if (params.has("output"))
{
    counts.writeAsText(params.get("output"));

    env.execute("WordCount Example");
}
}

public static final class Tokenizer
        implements MapFunction<String, Tuple2<String, Integer>>
{
    public Tuple2<String, Integer> map(String value)
    {
        return new Tuple2(value, Integer.valueOf(1));
    }
}

错误:
错误图片


<details>
<summary>英文:</summary>

Error when run Flink job:

ClassCastException: cannot assign instance of java.util.LinkedHashMap to field org.apache.flink.runtime.jobgraph.JobVertex.results of type java.util.ArrayList in instance of org.apache.flink.runtime.jobgraph.InputOutputFormatVertex


Below is the source code

```java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ParameterTool params = ParameterTool.fromArgs(args);

        env.getConfig().setGlobalJobParameters(params);

        DataSet&lt;String&gt; text = env.readTextFile(params.get(&quot;input&quot;));

        DataSet&lt;String&gt; filtered = text.filter(new FilterFunction&lt;String&gt;()

        {
            public boolean filter(String value)
            {
                return value.startsWith(&quot;N&quot;);
            }
        });
        DataSet&lt;Tuple2&lt;String, Integer&gt;&gt; tokenized = filtered.map(new Tokenizer());

        DataSet&lt;Tuple2&lt;String, Integer&gt;&gt; counts = tokenized.groupBy(new int[] { 0 }).sum(1);
        if (params.has(&quot;output&quot;))
        {
            counts.writeAsText(params.get(&quot;output&quot;));

            env.execute(&quot;WordCount Example&quot;);
        }
    }

    public static final class Tokenizer
            implements MapFunction&lt;String, Tuple2&lt;String, Integer&gt;&gt;
    {
        public Tuple2&lt;String, Integer&gt; map(String value)
        {
            return new Tuple2(value, Integer.valueOf(1));
        }
    }

Error:
Error image

答案1

得分: 1

你是对的,David Anderson。这是我本地机器上的版本不匹配问题,我已经通过升级本地Flink集群版本到最新版来解决了。

英文:

You are right David Anderson, It was a version mismatch on my local machine,
I have fixed this by upgrading my local Flink cluster version to latest version.

huangapple
  • 本文由 发表于 2023年5月22日 15:27:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/76303872.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定