Why (in "cluster" mode) is my UDF executed locally (in driver) instead on worker(s)

huangapple go评论73阅读模式
英文:

Why (in "cluster" mode) is my UDF executed locally (in driver) instead on worker(s)

问题

以下是您提供的代码段的翻译:

两个 Spark worker 正在运行代码如下JUnit):

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.testng.annotations.Test;

public class UdfTest {

    @Test
    public void simpleUdf() {
        SparkConf conf = new SparkConf()
                .set("spark.driver.host", "localhost")
                .setMaster("spark://host1:7077")
                .set("spark.jars", "/home/.../myjar.jar")
                .set("spark.submit.deployMode", "cluster")
                .setAppName("RESTWS ML");

        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

        List<Row> rows = new ArrayList<>();
        for (long i = 0; i < 10; i++) {
            rows.add(RowFactory.create("cr" + i));
        }

        Dataset<Row> textAsDataset = sparkSession.createDataFrame(rows,
            new StructType(new StructField[] { new StructField("contentRepositoryUUID", DataTypes.StringType, false, Metadata.empty()) }));

        sparkSession.udf().register("myUdf",
            (UDF1<String, String>)(col1) -> myUdf(col1), DataTypes.StringType);

        Dataset<Row> rowDataset = textAsDataset.withColumn("text", functions.callUDF("myUdf",
            textAsDataset.col("contentRepositoryUUID")
        ));
        rowDataset.show();
    }

    private String myUdf(String col1) {
        new Exception().printStackTrace();
        return col1 + " changed";
    }
}

创建了一个数据集(Dataset),我预期 Java 函数 myUdf() 将从 worker Java 进程中被调用,但实际上它是从驱动程序线程中调用的,堆栈跟踪始于 rowDataset.show() 行:

java.lang.Exception
    at UdfTest.myUdf(UdfTest.java:53)
    at UdfTest.lambda$simpleUdf$45ca9450$1(UdfTest.java:44)
    ...

Spark 是如何决定是否可以从 worker 调用 UDF 的呢?

奇怪的是,它曾经可以工作,但现在我试图复现这个“分布式 UDF”场景,某些东西已经改变,以至于我不能再复现它。查看 Spark 的 DEBUG 日志对我没有帮助,不幸的是。


<details>
<summary>英文:</summary>
Two spark workers are running, the code is as follows (JUnit :
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.testng.annotations.Test;
public class UdfTest {
@Test
public void simpleUdf() {
SparkConf conf = new SparkConf()
.set(&quot;spark.driver.host&quot;, &quot;localhost&quot;)
.setMaster(&quot;spark://host1:7077&quot;)
.set(&quot;spark.jars&quot;, &quot;/home/.../myjar.jar&quot;)
.set(&quot;spark.submit.deployMode&quot;, &quot;cluster&quot;)
.setAppName(&quot;RESTWS ML&quot;);
SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
List&lt;Row&gt; rows = new ArrayList&lt;&gt;();
for (long i = 0; i &lt; 10; i++) {
rows.add(RowFactory.create(&quot;cr&quot; + i));
}
Dataset&lt;Row&gt; textAsDataset = sparkSession.createDataFrame(rows,
new StructType(new StructField[] { new StructField(&quot;contentRepositoryUUID&quot;, DataTypes.StringType, false, Metadata.empty()) }));
sparkSession.udf().register(&quot;myUdf&quot;,
(UDF1&lt;String, String&gt;)(col1) -&gt; myUdf(col1), DataTypes.StringType);
Dataset&lt;Row&gt; rowDataset = textAsDataset.withColumn(&quot;text&quot;, functions.callUDF(&quot;myUdf&quot;,
textAsDataset.col(&quot;contentRepositoryUUID&quot;)
));
rowDataset.show();
}
private String myUdf(String col1) {
new Exception().printStackTrace();
return col1 + &quot; changed&quot;;
}
}
A dataset is created and I expect the java function ```myUdf()``` to be called from the worker java processes, but it is called from the driver thread instead, the stacktrace originates from the ```rowDataset.show()``` line:
java.lang.Exception
at UdfTest.myUdf(UdfTest.java:53)
at UdfTest.lambda$simpleUdf$45ca9450$1(UdfTest.java:44)
at org.apache.spark.sql.UDFRegistration$$anonfun$259.apply(UDFRegistration.scala:759)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:108)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:107)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1063)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:152)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:92)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24$$anonfun$applyOrElse$23.apply(Optimizer.scala:1364)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24$$anonfun$applyOrElse$23.apply(Optimizer.scala:1364)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24.applyOrElse(Optimizer.scala:1364)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24.applyOrElse(Optimizer.scala:1359)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:248)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1359)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1358)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
at UdfTest.simpleUdf(UdfTest.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:571)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:707)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:979)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.privateRun(TestRunner.java:648)
at org.testng.TestRunner.run(TestRunner.java:505)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
at org.testng.SuiteRunner.run(SuiteRunner.java:364)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1187)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1116)
at org.testng.TestNG.runSuites(TestNG.java:1028)
at org.testng.TestNG.run(TestNG.java:996)
at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
How does Spark decide if the UDF can be called from workers? 
The strange thing is, that it already worked once but now as I tried to reproduce this &quot;distributed UDF&quot; scenario something has changed so that I cannot. Looking at Spark DEBUG logs didn&#39;t help me unfortunately.
</details>
# 答案1
**得分**: 6
尽管堆栈跟踪确实源自`show()`调用,但关键实际上是...
...
HERE --> at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
...
at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
...
你仍然处于查询优化阶段,这由驱动程序中的Catalyst完成。
这是Spark的一个文档不足的特殊性质,即使用`SparkSession.createDataFrame()`(在Scala中是`SparkSession.createDatset()`/`Seq.toDF()`)从本地集合创建的数据集仅在驱动程序内部是本地关系,而不是真正的分布式关系:
```scala
scala> val df = (0 to 5).toDF
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.queryExecution.analyzed
res45: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107]
scala> df.isLocal
res46: Boolean = true

与从RDD创建的数据集不同:

scala> val df_from_rdd = sc.parallelize(0 to 5).toDF
df_from_rdd: org.apache.spark.sql.DataFrame = [value: int]

scala> df_from_rdd.queryExecution.analyzed
res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#112]
+- ExternalRDD [obj#111]

scala> df_from_rdd.isLocal
res48: Boolean = false

诸如Dataset.withColumn()之类的操作实际上由驱动程序本身执行,作为优化查询计划的延迟评估的一部分,从不执行到执行阶段:

scala> val df_foo = df.withColumn("foo", functions.callUDF("myUdf", $"value"))
df_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]

scala> df_foo.queryExecution.analyzed
res49: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#107, UDF:myUdf(cast(value#107 as string)) AS foo#146]
+- LocalRelation [value#107]

scala> df_foo.queryExecution.optimizedPlan
java.lang.Exception
    at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(<console>:25)
    at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$anonfun$1.apply(<console>:26)
    at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$anonfun$1.apply(<console>:26)
    ...
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1358)
    ...
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
    at $line143.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:27)
    ...
res50: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#132]
// 注意:投影已经消失,合并到了本地关系中

scala> df_foo.queryExecution.optimizedPlan
res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#163]
// 注意:这次没有堆栈跟踪

与从RDD创建的数据集不同

```scala
scala> val df_from_rdd_foo = df_from_rdd.withColumn("foo", functions.callUDF("myUdf", $"value"))
df_from_rdd_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]

scala> df_from_rdd_foo.queryExecution.optimizedPlan
res52: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#112, UDF:myUdf(cast(value#112 as string)) AS foo#135]
+- SerializeFromObject [input[0, int, false] AS value#112]
   +- ExternalRDD [obj#111]

这在执行器的stderr中不产生堆栈跟踪,即不调用UDF。另一方面:

scala> df_from_rdd_foo.show()
+-----+---------+
|value|      foo|
+-----+---------+
|    0|0 changed|
|    1|1 changed|
|    2|2 changed|
|    3|3 changed|
|    4|4 changed|
|    5|5 changed|
+-----+---------+

在执行器的stderr中生成以下堆栈跟踪:

java.lang.Exception
    at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(<console>:25)
    at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$anonfun$1.apply(<console>:26)
    at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$anonfun$1.apply(<console>:26)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$anonfun$13$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$anonfun$2.apply(SparkPlan.scala:255)
    ...
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Spark将本地关系视为字面值,这也可以从它们在SQL中的表示方式中看到(代码来自[这里](https://

英文:

Although the stack trace indeed originates from the show() call, the key is acutally...

...
HERE --&gt; at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
...
at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
...

You are still in the query optimisation phase, which is done by Catalyst in the driver.

The reason for this is a poorly documented peculiarity of Spark, namely that datasets created from local collections using SparkSession.createDataFrame() (SparkSession.createDatset()/Seq.toDF() in Scala) are merely local relations inside the driver and not really distributed:

scala&gt; val df = (0 to 5).toDF
df: org.apache.spark.sql.DataFrame = [value: int]
scala&gt; df.queryExecution.analyzed
res45: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107]
scala&gt; df.isLocal
res46: Boolean = true

unlike datasets created from RDDs:

scala&gt; val df_from_rdd = sc.parallelize(0 to 5).toDF
df_from_rdd: org.apache.spark.sql.DataFrame = [value: int]
scala&gt; df_from_rdd.queryExecution.analyzed
res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#112]
+- ExternalRDD [obj#111]
scala&gt; df_from_rdd.isLocal
res48: Boolean = false

Operations such as Dataset.withColumn() are actually executed by the driver itself as part of the lazy evaluation of the optimised query plan and never make it to the execution stage:

scala&gt; val df_foo = df.withColumn(&quot;foo&quot;, functions.callUDF(&quot;myUdf&quot;, $&quot;value&quot;))
df_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]
scala&gt; df_foo.queryExecution.analyzed
res49: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#107, UDF:myUdf(cast(value#107 as string)) AS foo#146]
+- LocalRelation [value#107]
scala&gt; df_foo.queryExecution.optimizedPlan
java.lang.Exception
at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(&lt;console&gt;:25)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(&lt;console&gt;:26)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(&lt;console&gt;:26)
...
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1358)
...
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at $line143.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.&lt;init&gt;(&lt;console&gt;:27)
...
res50: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#132]
// Notice: the projection is gone, merged into the local relation
scala&gt; df_foo.queryExecution.optimizedPlan
res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#163]
// Notice: no stack trace this time

unlike when dealing with a dataset created from an RDD:

scala&gt; val df_from_rdd_foo = df_from_rdd.withColumn(&quot;foo&quot;, functions.callUDF(&quot;myUdf&quot;, $&quot;value&quot;))
df_from_rdd_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]
scala&gt; df_from_rdd_foo.queryExecution.optimizedPlan
res52: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#112, UDF:myUdf(cast(value#112 as string)) AS foo#135]
+- SerializeFromObject [input[0, int, false] AS value#112]
+- ExternalRDD [obj#111]

which produces no stack trace in the executor's stderr, i.e., the UDF isn't called. On the other hand:

scala&gt; df_from_rdd_foo.show()
+-----+---------+
|value|      foo|
+-----+---------+
|    0|0 changed|
|    1|1 changed|
|    2|2 changed|
|    3|3 changed|
|    4|4 changed|
|    5|5 changed|
+-----+---------+

produces the following stack trace in the executor's stderr:

java.lang.Exception
at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(&lt;console&gt;:25)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(&lt;console&gt;:26)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(&lt;console&gt;:26)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Spark treats local relations as literals, which can also be seen in the way they are represented in SQL (code adapted from here):

scala&gt; df.queryExecution.analyzed.collect { case r: LocalRelation =&gt; r }.head.toSQL(&quot;bar&quot;)
res55: String = VALUES (0), (1), (2), (3), (4), (5) AS bar(value)
scala&gt; df_foo.queryExecution.optimizedPlan.collect { case r: LocalRelation =&gt; r }.head.toSQL(&quot;bar&quot;)
res56: String = VALUES (0, &#39;0 changed&#39;), (1, &#39;1 changed&#39;), (2, &#39;2 changed&#39;), (3, &#39;3 changed&#39;), (4, &#39;4 changed&#39;), (5, &#39;5 changed&#39;) AS bar(value, foo)

or alternatively as code:

scala&gt; df.queryExecution.analyzed.asCode
res57: String = LocalRelation(
List(value#107),
Vector([0,0], [0,1], [0,2], [0,3], [0,4], [0,5]),
false
)
scala&gt; df_foo.queryExecution.analyzed.asCode
res58: String = Project(
List(value#107, UDF:myUdf(cast(value#107 as string)) AS foo#163),
LocalRelation(
List(value#107),
Vector([0,0], [0,1], [0,2], [0,3], [0,4], [0,5]),
false
)
)
scala&gt; df_foo.queryExecution.optimizedPlan.asCode
res59: String = LocalRelation(
List(value#107, foo#163),
Vector([0,0 changed], [1,1 changed], [2,2 changed], [3,3 changed], [4,4 changed], [5,5 changed]),
false
)

Think of what's happening as the equivalent of your Java compiler replacing code such as int a = 2 * 3; with int a = 6; with the actual computation being done by the compiler.

huangapple
  • 本文由 发表于 2020年4月7日 00:06:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/61064044.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定