英文:
PySpark custom UDF ModuleNotFoundError
问题
遇到使用自定义UDF时出现以下错误:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 603, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 449, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 251, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 71, in read_command
command = serializer._read_with_length(file)
File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 160, in _read_with_length
return this.loads(obj)
File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'jobs'
导入Spark脚本如下所示:
from jobs.lib_a import a
from jobs.udf import udf_function #这是一个UDF
脚本本身位于jobs/scripts/test_script.py
,整个jobs文件夹被压缩,然后通过pyFiles添加到Spark中。
奇怪的是,来自jobs模块的其他导入工作正常,只有UDF失败。
我已经尝试了这个帖子中的方法,创建一个名为udf.zip的单独压缩文件,将UDF放在顶层,然后通过pyFiles添加到Spark,但仍然遇到ModuleNotFoundError,当我尝试导入udf
时。
我还尝试过sys.path.append(<UDF路径>)
。
唯一有效的方法是将udf_function复制到Spark脚本test_script.py
中。但在实际情况下,这不起作用,因为udf_function可以被其他Spark脚本共享。
底层系统信息:
Python 3.8
Spark 3.2
Spark在Kubernetes中运行
英文:
Running into the following error when use custom UDF
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 603, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 449, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 251, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/usr/local/lib/python3.8/dist-packages/pyspark/worker.py", line 71, in read_command
command = serializer._read_with_length(file)
File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'jobs'
The import spark scripts looks something like this
from jobs.lib_a import a
from jobs.udf import udf_function #This is a UDF
The scripts itself is located in
jobs/scripts/test_script.py
, the entire jobs folder is zipped and then added to spark using pyFiles.
The weird thing is that the other import from jobs module works, only fail for udf.
I have tried approach in this post, creating a separate zip file called udf.zip, putting udf at top level and then add it to spark via pyFiles, but still run into ModuleNotFoundError when I try to import udf
.
I have also tried sys.path.append(<the udf path>)
The only approach works is when I copy the udf_function into the spark script test_script.py
. This wouldn't work in reality as the udf_function can be shared by other spark script.
The underlying system is:
Python 3.8
Spark 3.2
Spark is running in kubernetes
答案1
得分: 1
我能让它工作。
更多的背景是,我们正在利用K8s操作符上的Spark,所以我们通过pyFiles传递zip文件
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
这与以下操作相同
spark_session.sparkContext.addPyFile('/opt/spark/pyfiles/python.zip')
如果我们在Spark脚本中设置它
最初,我们传递如下
pyFiles:
- local:///opt/spark/pyfiles/spinner-python.zip
但local:///指向工作目录,我们必须将其更改为以下内容,带有额外的斜杠(/)以指向绝对路径。
pyFiles:
- local:////opt/spark/pyfiles/spinner-python.zip
当Pyspark序列化UDF时,它会将UDF代码的副本发送到所有工作节点,我们必须将PyFiles指向绝对路径,而不是相对路径。
英文:
I was able to make it work.
Some more context is that we are leveraging spark on k8s operator, so we pass in the zip file via pyFiles
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
This work the same as the following
spark_session.sparkContext.addPyFile('/opt/spark/pyfiles/python.zip')
if we set it up in spark script
Initially, we pass in as
pyFiles:
- local:///opt/spark/pyfiles/spinner-python.zip
But local:/// point to the working directory, we have to change it to the following with extra slash(/) to point to absolute.
pyFiles:
- local:////opt/spark/pyfiles/spinner-python.zip
When Pyspark serializes a UDF, it sends a copy of the UDF code to all the worker nodes, we have to point PyFiles to absolute path instead of relative path
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论