英文:
How to call Spark Java UDF in PySpark without using SQL?
问题
让我们假设我在Java中实现了一个UDF。
package io.test;
import org.apache.spark.sql.api.java.UDF1;
public class TestUDF implements UDF1<Integer, Integer> {
@Override
public Integer call(Integer i) throws Exception {
// Some Operations
}
}
我正在使用PySpark,所以我可以在Python驱动程序脚本中注册UDF,如下所示:
session.udf.registerJavaFunction("test_udf", "io.test.TestUDF", IntegerType())
现在,我可以在Pyspark中的SQL查询中使用这个UDF,例如:
df.createOrReplaceTempView("MyTable")
df2 = spark_session.sql("select test_udf(my_col) as mapped from MyTable")
但是,我想知道在PySpark中是否有一种非SQL的方式来实现这一点,例如:
df.withColumn("mapped", functions.callUDF("test_udf", df.col("my_col")))
我检查过并发现callUDF()
函数在Spark的Java API中可用,但在PySpark中不可用。
英文:
Let's say I have implemented a UDF in Java.
package io.test;
import org.apache.spark.sql.api.java.UDF1;
public class TestUDF implements UDF1<Integer, Integer> {
@Override
public Integer call(Integer i) throws Exception {
// Some Operations
}
}
I am using PySpark, so I am able to register the UDF in my Python driver script as follow :
session.udf.registerJavaFunction("test_udf", "io.test.TestUDF", IntegerType())
This UDF is now available to me to be used in SQL queries in Pyspark, e.g.
df.createOrReplaceTempView("MyTable")
df2 = spark_session.sql("select test_udf(my_col) as mapped from MyTable")
However, I am wondering if there is a non-SQL way of achieving this in PySpark, e.g. something like below :
df.withColumn("mapped", functions.callUDF("test_udf", df.col("my_col")))
I checked and found that callUDF()
function is available in Spark Java API but not in PySpark.
答案1
得分: 0
更新:
要使用在Java中实现的UDF,请使用 expr:
.withColumn("asStr", F.expr("asStr(Number)"))
您可以将函数转换为UDF,使用 functions.udf 如下所示:
TestUDF = udf(TestUDFFunction, IntegerType())
然后像这样使用它:
myDF.withColumn("mapped", TestUDF(F.col("my_col")))
示例:
输入:
UDF:
def asStr(num):
return "Number is " + str(num)
asStr_udf = F.udf(asStr, StringType())
df1.withColumn("asStr", asStr_udf(F.col("Number"))).show()
输出:
英文:
Update:
To use UDF implemented in Java, use it with expr:
.withColumn("asStr", F.expr("asStr(Number)"))
You can convert the function to udf using functions.udf like this
TestUDF = udf(TestUDFFunction, IntegerType())
and use it as below:
myDF.withColumn("mapped", TestUDF(F.col("my_col")))
Example:
Input:
UDF:
def asStr(num):
return "Number is "+ str(num)
asStr_udf= F.udf(asStr, StringType())
df1.withColumn("asStr", asStr_udf(F.col("Number"))).show()
Output:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论