Spark UDF反序列化错误来自示例Java程序

huangapple go评论86阅读模式
英文:

Spark UDF deserialization error from sample Java program

问题

这个示例直接来自于 Spark 示例代码,所以我有点不知道正在发生什么。

import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class TestSpark {

    public static void main(String[] args) {

        SparkSession spark = SparkSession.builder()
                .appName("testspark")
                .master("spark://0.0.0.0:7077")
                 // 是的,这个文件存在,并且包含这个类
                .config("spark.jars", "out/artifacts/testspark_jar/testspark.jar")
                .getOrCreate();

        // 这个是可以工作的
        spark.sql("SELECT 5 + 1").show();

        spark.udf().register("plusOne", (UDF1<Integer, Integer>) x -> x + 1, DataTypes.IntegerType);

        // 这个会失败
        spark.sql("SELECT plusOne(5)").show();

    }

}

我在本地运行在一个 Spark Standalone 集群上的 localhost 上。

Worker 持续失败并显示:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
    ...

我正在使用 Java 11,并且使用 Spark 3.0.1。

我在这个非常相似的问题中找到了可能是答案的内容:https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser/28367602#28367602

然而,尽管确保我的 TestSpark 编译成了一个供 SparkSession 使用的 JAR 文件,我仍然得到相同的错误。

任何帮助将不胜感激。似乎在 Java/Scala 边界上发生了一些问题,但我对 Scala 互操作性的了解还不足以进一步分析。

英文:

This example is taken directly from the Spark example code so I'm at a bit of a loss figuring out what's going on.

import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class TestSpark {

    public static void main(String[] args) {

        SparkSession spark = SparkSession.builder()
                .appName(&quot;testspark&quot;)
                .master(&quot;spark://0.0.0.0:7077&quot;)
                 // Yes this file exists, and contains this class
                .config(&quot;spark.jars&quot;, &quot;out/artifacts/testspark_jar/testspark.jar&quot;)
                .getOrCreate();

        // This works
        spark.sql(&quot;SELECT 5 + 1&quot;).show();

        spark.udf().register(&quot;plusOne&quot;, (UDF1&lt;Integer, Integer&gt;) x -&gt; x + 1, DataTypes.IntegerType);

        // This fails
        spark.sql(&quot;SELECT plusOne(5)&quot;).show();

    }

}

I'm running this on a Spark Standalone cluster running on localhost.

The worker consistently fails with:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
	at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2205)
	at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2168)
	at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1422)
	at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2450)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1175)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2295)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

I'm running on Java 11, using Spark 3.0.1.

I did find this very similar question which looks like it would be the answer: https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser/28367602#28367602

However, after ensuring that my TestSpark is compiled into a JAR that is supplied to the SparkSession, I still get the same error.

Any help would be greatly appreciated. It seems as if something is going on at the Java/Scala border, but I don't know enough about Scala interop to analyze further.

答案1

得分: 2

已链接的问题中的答案(https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser/28367602#28367602)是正确的。

由于我没有正确编译JAR文件(我认为与清单有关,但不能百分之百确定),所以又变得模糊不清。

在正确编译JAR文件(使用mvn package而不是我的IDE)并将其提供给spark.jars配置属性之后,代码将按预期工作。

英文:

The answer in the question I already linked (https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser/28367602#28367602) was correct.

It was obfuscated again because I wasn't compiling the JAR correctly (I think something with the manifest, but not 100% sure).

After compiling the JAR correctly (using mvn package instead of my IDE) and supplying that in the spark.jars config property, the code works as expected.

huangapple
  • 本文由 发表于 2020年10月11日 05:32:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/64298511.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定