PySpark 自定义 UDF 模块未找到错误

huangapple go评论67阅读模式
英文:

PySpark custom UDF ModuleNotFoundError

问题

遇到使用自定义UDF时出现以下错误:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 603, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 449, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 251, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 71, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 160, in _read_with_length
    return this.loads(obj)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'jobs'

导入Spark脚本如下所示:

from jobs.lib_a import a
from jobs.udf import udf_function #这是一个UDF

脚本本身位于jobs/scripts/test_script.py,整个jobs文件夹被压缩,然后通过pyFiles添加到Spark中。

奇怪的是,来自jobs模块的其他导入工作正常,只有UDF失败。

我已经尝试了这个帖子中的方法,创建一个名为udf.zip的单独压缩文件,将UDF放在顶层,然后通过pyFiles添加到Spark,但仍然遇到ModuleNotFoundError,当我尝试导入udf时。

我还尝试过sys.path.append(<UDF路径>)

唯一有效的方法是将udf_function复制到Spark脚本test_script.py中。但在实际情况下,这不起作用,因为udf_function可以被其他Spark脚本共享。

底层系统信息:
Python 3.8
Spark 3.2
Spark在Kubernetes中运行

英文:

Running into the following error when use custom UDF

Traceback (most recent call last):
  File &quot;/usr/local/lib/python3.8/dist-packages/pyspark/worker.py&quot;, line 603, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File &quot;/usr/local/lib/python3.8/dist-packages/pyspark/worker.py&quot;, line 449, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File &quot;/usr/local/lib/python3.8/dist-packages/pyspark/worker.py&quot;, line 251, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File &quot;/usr/local/lib/python3.8/dist-packages/pyspark/worker.py&quot;, line 71, in read_command
    command = serializer._read_with_length(file)
  File &quot;/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py&quot;, line 160, in _read_with_length
    return self.loads(obj)
  File &quot;/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py&quot;, line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named &#39;jobs&#39;

The import spark scripts looks something like this

from jobs.lib_a import a
from jobs.udf import udf_function #This is a UDF

The scripts itself is located in
jobs/scripts/test_script.py, the entire jobs folder is zipped and then added to spark using pyFiles.

The weird thing is that the other import from jobs module works, only fail for udf.

I have tried approach in this post, creating a separate zip file called udf.zip, putting udf at top level and then add it to spark via pyFiles, but still run into ModuleNotFoundError when I try to import udf.

I have also tried sys.path.append(&lt;the udf path&gt;)

The only approach works is when I copy the udf_function into the spark script test_script.py. This wouldn't work in reality as the udf_function can be shared by other spark script.

The underlying system is:
Python 3.8
Spark 3.2
Spark is running in kubernetes

答案1

得分: 1

我能让它工作。

更多的背景是,我们正在利用K8s操作符上的Spark,所以我们通过pyFiles传递zip文件
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

这与以下操作相同

spark_session.sparkContext.addPyFile(&#39;/opt/spark/pyfiles/python.zip&#39;)

如果我们在Spark脚本中设置它

最初,我们传递如下

pyFiles:
- local:///opt/spark/pyfiles/spinner-python.zip

但local:///指向工作目录,我们必须将其更改为以下内容,带有额外的斜杠(/)以指向绝对路径。

pyFiles:
- local:////opt/spark/pyfiles/spinner-python.zip

当Pyspark序列化UDF时,它会将UDF代码的副本发送到所有工作节点,我们必须将PyFiles指向绝对路径,而不是相对路径。

英文:

I was able to make it work.

Some more context is that we are leveraging spark on k8s operator, so we pass in the zip file via pyFiles
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

This work the same as the following

spark_session.sparkContext.addPyFile(&#39;/opt/spark/pyfiles/python.zip&#39;)

if we set it up in spark script

Initially, we pass in as

pyFiles:
- local:///opt/spark/pyfiles/spinner-python.zip

But local:/// point to the working directory, we have to change it to the following with extra slash(/) to point to absolute.

pyFiles:
- local:////opt/spark/pyfiles/spinner-python.zip

When Pyspark serializes a UDF, it sends a copy of the UDF code to all the worker nodes, we have to point PyFiles to absolute path instead of relative path

huangapple
  • 本文由 发表于 2023年2月24日 10:07:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/75552025.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定