如何在PySpark中调用Spark Java UDF而不使用SQL?

huangapple go评论56阅读模式
英文:

How to call Spark Java UDF in PySpark without using SQL?

问题

让我们假设我在Java中实现了一个UDF。

package io.test;

import org.apache.spark.sql.api.java.UDF1;

public class TestUDF implements UDF1<Integer, Integer> {

  @Override
  public Integer call(Integer i) throws Exception {
    // Some Operations
  }

}

我正在使用PySpark,所以我可以在Python驱动程序脚本中注册UDF,如下所示:

session.udf.registerJavaFunction("test_udf", "io.test.TestUDF", IntegerType())

现在,我可以在Pyspark中的SQL查询中使用这个UDF,例如:

df.createOrReplaceTempView("MyTable")
df2 = spark_session.sql("select test_udf(my_col) as mapped from MyTable")

但是,我想知道在PySpark中是否有一种非SQL的方式来实现这一点,例如:

df.withColumn("mapped", functions.callUDF("test_udf", df.col("my_col")))

我检查过并发现callUDF()函数在Spark的Java API中可用,但在PySpark中不可用。

英文:

Let's say I have implemented a UDF in Java.

package io.test;

import org.apache.spark.sql.api.java.UDF1;

public class TestUDF implements UDF1&lt;Integer, Integer&gt; {

  @Override
  public Integer call(Integer i) throws Exception {
    // Some Operations
  }

}

I am using PySpark, so I am able to register the UDF in my Python driver script as follow :

session.udf.registerJavaFunction(&quot;test_udf&quot;, &quot;io.test.TestUDF&quot;, IntegerType())

This UDF is now available to me to be used in SQL queries in Pyspark, e.g.

df.createOrReplaceTempView(&quot;MyTable&quot;)
df2 = spark_session.sql(&quot;select test_udf(my_col) as mapped from MyTable&quot;)

However, I am wondering if there is a non-SQL way of achieving this in PySpark, e.g. something like below :

df.withColumn(&quot;mapped&quot;, functions.callUDF(&quot;test_udf&quot;, df.col(&quot;my_col&quot;)))

I checked and found that callUDF() function is available in Spark Java API but not in PySpark.

答案1

得分: 0

更新:

要使用在Java中实现的UDF,请使用 expr

.withColumn("asStr", F.expr("asStr(Number)"))

您可以将函数转换为UDF,使用 functions.udf 如下所示:

TestUDF = udf(TestUDFFunction, IntegerType())

然后像这样使用它:

myDF.withColumn("mapped", TestUDF(F.col("my_col")))

示例:

输入:

如何在PySpark中调用Spark Java UDF而不使用SQL?

UDF:

def asStr(num):
  return "Number is " + str(num)

asStr_udf = F.udf(asStr, StringType())

df1.withColumn("asStr", asStr_udf(F.col("Number"))).show()

输出:

如何在PySpark中调用Spark Java UDF而不使用SQL?

英文:

Update:

To use UDF implemented in Java, use it with expr:

.withColumn(&quot;asStr&quot;, F.expr(&quot;asStr(Number)&quot;))

You can convert the function to udf using functions.udf like this

TestUDF = udf(TestUDFFunction, IntegerType())

and use it as below:

myDF.withColumn(&quot;mapped&quot;, TestUDF(F.col(&quot;my_col&quot;)))

Example:

Input:

如何在PySpark中调用Spark Java UDF而不使用SQL?

UDF:

def asStr(num):
  return &quot;Number is &quot;+ str(num)

asStr_udf= F.udf(asStr, StringType())

df1.withColumn(&quot;asStr&quot;, asStr_udf(F.col(&quot;Number&quot;))).show()

Output:

如何在PySpark中调用Spark Java UDF而不使用SQL?

huangapple
  • 本文由 发表于 2023年2月27日 14:25:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/75577322.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定