如何在Java/Scala Spark项目中使用PySpark UDF

huangapple go评论76阅读模式
英文:

How to use PySpark UDF in Java / Scala Spark project

问题

有很多关于如何从PySpark调用Java代码的问题,但没有关于从Java Spark项目中调用Python代码的问题。这对于需要在Python中实现的功能的大型旧Java项目非常有用。

英文:

There are a lot of questions: "How to call Java code from PySpark" and no one about Python code calling from Java Spark project. It's useful for big old Java projects, that requires a functionality, that was implemented in Python.

答案1

得分: 2

我也在我的[Medium][1]中分享了答案。

正如您所知,Apache Spark是用Scala编写的。PySpark不是一个独立的完全Python项目。有一个org.apache.spark.deploy.PythonRunner类,它:

  • 创建一个Py4J服务器
  • 导出Py4J服务器的主机、端口和密钥作为环境变量
  • 启动提供的Python脚本
  • 等待脚本进程的结束

反过来,在Python脚本中创建Spark上下文时,它会使用来自环境变量的凭据连接到Py4J服务器。Py4J允许您通过Java反射API使用任何JVM对象。换句话说,PySpark是Java Spark上下文的包装器。

这是一个使用Apache Spark的Python Runner的简单Java应用程序示例:

package example.python;

import org.apache.spark.deploy.PythonRunner;
import org.apache.spark.sql.SparkSession;

public class Main {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Shared Spark Context Example")
                .master("local[*]")
                .getOrCreate();
        spark.sparkContext().setLogLevel("ERROR");

        PythonRunner.main(new String[]{
                "src/main/python/example.py",
                "src/main/python/example.py"
        });

        spark.stop();
    }
}

但是,如果您尝试在example.py中初始化一个Spark Session,您将会收到异常:在JVM进程中只能有一个SparkContext。因此,第一个问题是:如何将现有的Java SparkContext放入PySpark中?下一个问题是:如何与PySpark共享DataFrame?

要共享现有的SparkContext,需要通过Py4J网关与JVM建立连接,通过公共静态变量提供对org.apache.spark.api.java.JavaSparkContext实例的访问,并通过JavaSparkContext#getConf()初始化pyspark.conf.SparkConf

使用Spark的表临时视图功能可以共享DataFrame。

以下是Java的更新代码:

package example.python;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.deploy.PythonRunner;
import org.apache.spark.sql.SparkSession;

public class Main {

    public static JavaSparkContext jsc;

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Spark Python Runner")
                .master("local[*]")
                .getOrCreate();
        spark.sparkContext().setLogLevel("ERROR");
        jsc = new JavaSparkContext(spark.sparkContext());

        var df = spark.read().textFile("src/main/resources/dataset.txt");
        df.createOrReplaceTempView("tbl");

        PythonRunner.main(new String[]{
                "src/main/python/example.py",
                "src/main/python/example.py"
        });

        spark.sql("SELECT * FROM tbl").show();

        spark.stop();
    }
}

以及Python:

#!/usr/bin/env python
# coding: utf-8
import sys

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import length, udf
from pyspark.sql.types import StringType

if __name__ == "__main__":
    gateway = pyspark.java_gateway.launch_gateway()
    jsc = gateway.jvm.example.python.Main.jsc
    conf = pyspark.conf.SparkConf(True, gateway.jvm, jsc.getConf())

    sc = pyspark.SparkContext(gateway=gateway, jsc=jsc, conf=conf)
    spark = SparkSession(sc)

    df = spark.sql("SELECT * FROM tbl")

    df = df.withColumn("len", length('value').alias('len'))

    df.createOrReplaceTempView("tbl")

    sys.exit(0)

更进一步,您可以在PySpark中注册PythonUDF,并在Java代码中调用它。

Python:

# ...
py_concat_of2_udf = udf(lambda x, y: str(x) + str(y), StringType())
spark.udf.register("py_concat_of2", py_concat_of2_udf)
# ...

Java:

// ...
spark.sql("SELECT * FROM tbl")
        .withColumn("pyfunc", callUDF("py_concat_of2", col("value"), col("len")))
        .show();
// ...

代码的标准输出如下:

+----------+---+------------+
|     value|len|      pyfunc|
+----------+---+------------+
|       one|  3|        one3|
|       two|  3|        two3|
|three four| 10|three four10|
|      five|  4|       five4|
+----------+---+------------+

它是如何工作的?有一个org.apache.spark.sql.catalyst.expressions.PythonUDF Scala类,其中包含一个org.apache.spark.api.python.PythonFunction对象。该对象包含command: Seq[Byte]变量,实际上是由Pickle序列化的Python lambda函数。

这种方法的负面影响是每个操作都会在标准输出中生成一个堆栈跟踪:

ERROR DAGScheduler: Failed to update accumulator 37 (org.apache.spark.api.python.PythonAccumulatorV2) for task 0
java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.connect0(Native Method)
at java.base/sun.nio.ch.Net.connect(Net.java:579)
at java.base/sun.nio.ch.Net.connect(Net.java:568)
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:588)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
at java.base/java.net.Socket.connect(Socket.java:633)
at java.base/java.net.Socket.connect(Socket.java:583)
at java.base/java.net.Socket.<init>(Socket.java:507)
at java.base/java.net.Socket.<init>(Socket.java:287)
at org.apache.spark.api.python.PythonAccumulatorV2.openSocket(PythonRDD.scala:701)
at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:723)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1(DAGScheduler.scala:1610)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1$adapted(DAGScheduler.scala:1601)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1749)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2857)
at org.apache.spark.scheduler.DAGScheduler
<details>
<summary>英文:</summary>
I&#39;ve also shared the answer in my [Medium][1].
As you know, Apache Spark is written in Scala. PySpark is not a separate full-python project. There is `org.apache.spark.deploy.PythonRunner` class that:
- creates a Py4J server
- exports the Py4J server’s host, port, and secret as environment variables
- launches the provided Python scripts
- waits for the end of the script’s process
In its turn, when Spark Context is being created in the Python script, it connects to the Py4J server using credentials from the environment variables. Py4J allows you to use any JVM object via the Java Reflection API. In other words, PySpark is a wrapper of the Java Spark Context.
Example of a simple Java app that uses Apache Spark’s Python Runner:
package example.python;
import org.apache.spark.deploy.PythonRunner;
import org.apache.spark.sql.SparkSession;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName(&quot;Shared Spark Context Example&quot;)
.master(&quot;local[*]&quot;)
.getOrCreate();
spark.sparkContext().setLogLevel(&quot;ERROR&quot;);
PythonRunner.main(new String[]{
&quot;src/main/python/example.py&quot;,
&quot;src/main/python/example.py&quot;
});
spark.stop();
}
}
But if you try to initialize a Spark Session in example.py, you get the exception: there is can be only one SparkContext in a JVM process. So the first question is: how to put an existing Java SparkContext into PySpark? And the next question is: how to share a DataFrame with PySpark?
To share an existing SparkContext, it’s needed to make a connection to the JVM over the Py4J gateway, provide access to an instance of `org.apache.spark.api.java.JavaSparkContext` by a public static variable, and initialize `pyspark.conf.SparkConf` by `JavaSparkContext#getConf()`.
Sharing of a DataFrame is possible with Spark’s table temporary view functionality.
Here is the updated code for Java:
package example.python;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.deploy.PythonRunner;
import org.apache.spark.sql.SparkSession;
public class Main {
public static JavaSparkContext jsc;
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName(&quot;Spark Python Runner&quot;)
.master(&quot;local[*]&quot;)
.getOrCreate();
spark.sparkContext().setLogLevel(&quot;ERROR&quot;);
jsc = new JavaSparkContext(spark.sparkContext());
var df = spark.read().textFile(&quot;src/main/resources/dataset.txt&quot;);
df.createOrReplaceTempView(&quot;tbl&quot;);
PythonRunner.main(new String[]{
&quot;src/main/python/example.py&quot;,
&quot;src/main/python/example.py&quot;
});
spark.sql(&quot;SELECT * FROM tbl&quot;).show();
spark.stop();
}
}
And Python:
#!/usr/bin/env python
# coding: utf-8
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import length, udf
from pyspark.sql.types import StringType
if __name__ == &quot;__main__&quot;:
gateway = pyspark.java_gateway.launch_gateway()
jsc = gateway.jvm.example.python.Main.jsc
conf = pyspark.conf.SparkConf(True, gateway.jvm, jsc.getConf())
sc = pyspark.SparkContext(gateway=gateway, jsc=jsc, conf=conf)
spark = SparkSession(sc)
df = spark.sql(&quot;SELECT * FROM tbl&quot;)
df = df.withColumn(&quot;len&quot;, length(&#39;value&#39;).alias(&#39;len&#39;))
df.createOrReplaceTempView(&quot;tbl&quot;)
sys.exit(0)
Even more. It’s possible to register a PythonUDF in PySpark and call it in Java code afterwise.
Python:
# ...
py_concat_of2_udf = udf(lambda x, y: str(x) + str(y), StringType())
spark.udf.register(&quot;py_concat_of2&quot;, py_concat_of2_udf)
# ...
Java:
// ...
spark.sql(&quot;SELECT * FROM tbl&quot;)
.withColumn(&quot;pyfunc&quot;, callUDF(&quot;py_concat_of2&quot;, col(&quot;value&quot;), col(&quot;len&quot;)))
.show();
// ...
The stdout of the code:
+----------+---+------------+
|     value|len|      pyfunc|
+----------+---+------------+
|       one|  3|        one3|
|       two|  3|        two3|
|three four| 10|three four10|
|      five|  4|       five4|
+----------+---+------------+
How does it work? There is `org.apache.spark.sql.catalyst.expressions.PythonUDF` Scala class that contains an `org.apache.spark.api.python.PythonFunction` object. The object contains `command: Seq[Byte]` variable, which is actually a Python lambda serialized by Pickle.
The negative side of this approach is a stacktrace in stdout for each action:
ERROR DAGScheduler: Failed to update accumulator 37 (org.apache.spark.api.python.PythonAccumulatorV2) for task 0
java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.connect0(Native Method)
at java.base/sun.nio.ch.Net.connect(Net.java:579)
at java.base/sun.nio.ch.Net.connect(Net.java:568)
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:588)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
at java.base/java.net.Socket.connect(Socket.java:633)
at java.base/java.net.Socket.connect(Socket.java:583)
at java.base/java.net.Socket.&lt;init&gt;(Socket.java:507)
at java.base/java.net.Socket.&lt;init&gt;(Socket.java:287)
at org.apache.spark.api.python.PythonAccumulatorV2.openSocket(PythonRDD.scala:701)
at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:723)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1(DAGScheduler.scala:1610)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1$adapted(DAGScheduler.scala:1601)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1749)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2857)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
The `org.apache.spark.api.python.PythonAccumulatorV2` object is created by `pyspark.SparkContext` and is used for Apache Spark metrics.
[1]: https://medium.com/@a.iatsuk/apache-spark-context-sharing-across-jvm-and-python-processes-aadde34872bc
</details>

huangapple
  • 本文由 发表于 2023年7月31日 18:52:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76802912.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定