TypeError in pySpark UDF functions

huangapple go评论92阅读模式
英文:

TypeError in pySpark UDF functions

问题

我有这个函数:

  1. def ead(lista):
  2. ind_mmff, isdebala, isfubala, k1, k2, ead = lista
  3. try:
  4. isdebala = float(isdebala)
  5. isfubala = float(isfubala)
  6. k1 = float(k1)
  7. k2 = float(k2)
  8. ead = float(ead)
  9. except ValueError:
  10. return '错误:无效的输入'
  11. min_deb = min(0, isdebala)
  12. min_fub = min(0, isfubala)
  13. if ind_mmff == '0':
  14. ead_dai = abs(min_deb * k1 / 100 + min_fub * k2 / 100)
  15. else:
  16. ead_dai = ead
  17. return ead_dai

然后,我定义一个用户定义函数(UDF),如下所示:

  1. ead_udf = udf(lambda z: ead(z), FloatType())

目标是在我的数据框(df)中创建一个名为ead_calc的列,如下所示:

  1. df = df.withColumn('ead_calc', ead_udf(array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))

执行df.select('ead_calc').show()后,会引发以下错误:

  1. Py4JJavaError: An error occurred while calling o3026.showString.
  2. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 813.0 failed 4 times, most recent failure: Lost task 3.3 in stage 813.0 (TID 12054, csslncclowp0006.unix.aacc.corp, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  3. File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
  4. process()
  5. File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
  6. serializer.dump_stream(func(split_index, iterator), outfile)
  7. File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
  8. self.serializer.dump_stream(self._batched(iterator), stream)
  9. File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
  10. for obj in iterator:
  11. File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
  12. for item in iterator:
  13. File "<string>", line 1, in <lambda>
  14. File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
  15. return lambda *a: f(*a)
  16. File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
  17. return f(*args, **kwargs)
  18. File "<ipython-input-93-25e605cffdae>", line 1, in <lambda>
  19. File "<ipython-input-92-a1937fe32209>", line 12, in ead
  20. TypeError: _() takes 1 positional argument but 2 were given

错误位于min_deb = min(0, isdebala)。不知道如何解决这个问题,因为min函数显然需要两个参数。

目标是在我的数据框(df)中创建一个名为ead_calc的列,如下所示:

  1. df = df.withColumn('ead_calc', ead_udf(array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))
英文:

I've got this function:

  1. def ead(lista):
  2. ind_mmff, isdebala, isfubala, k1, k2, ead = lista
  3. try:
  4. isdebala = float(isdebala)
  5. isfubala = float(isfubala)
  6. k1 = float(k1)
  7. k2 = float(k2)
  8. ead = float(ead)
  9. except ValueError:
  10. return &#39;Error: invalid input&#39;
  11. min_deb = min(0, isdebala)
  12. min_fub = min(0, isfubala)
  13. if ind_mmff == &#39;0&#39;:
  14. ead_dai = abs(min_deb * k1 / 100 + min_fub * k2 / 100)
  15. else:
  16. ead_dai = ead
  17. return ead_dai

Afterwards, I define a UDF such as:

  1. ead_udf = udf(lambda z: ead(z), FloatType())

The aim is to create a ead_calc column in my df dataframe such as:

  1. df = df.withColumn(&#39;ead_calc&#39;, ead_udf (array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))

After executing df.select(&#39;ead_calc&#39;).show() the following error raises:

  1. Py4JJavaError: An error occurred while calling o3026.showString.
  2. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 813.0 failed 4 times, most recent failure: Lost task 3.3 in stage 813.0 (TID 12054, csslncclowp0006.unix.aacc.corp, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  3. File &quot;/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py&quot;, line 377, in main
  4. process()
  5. File &quot;/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py&quot;, line 372, in process
  6. serializer.dump_stream(func(split_index, iterator), outfile)
  7. File &quot;/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py&quot;, line 345, in dump_stream
  8. self.serializer.dump_stream(self._batched(iterator), stream)
  9. File &quot;/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py&quot;, line 141, in dump_stream
  10. for obj in iterator:
  11. File &quot;/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py&quot;, line 334, in _batched
  12. for item in iterator:
  13. File &quot;&lt;string&gt;&quot;, line 1, in &lt;lambda&gt;
  14. File &quot;/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py&quot;, line 85, in &lt;lambda&gt;
  15. return lambda *a: f(*a)
  16. File &quot;/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/util.py&quot;, line 99, in wrapper
  17. return f(*args, **kwargs)
  18. File &quot;&lt;ipython-input-93-25e605cffdae&gt;&quot;, line 1, in &lt;lambda&gt;
  19. File &quot;&lt;ipython-input-92-a1937fe32209&gt;&quot;, line 12, in ead
  20. TypeError: _() takes 1 positional argument but 2 were given

The error is located at min_deb = min(0, isdebala). Don't know how to solve this issue since min function obviously requires 2 arguments.

The aim is to create a ead_calc column in my df dataframe such as:

  1. df = df.withColumn(&#39;ead_calc&#39;, ead_udf (array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))

答案1

得分: 1

I think you have imported the wrong min function, I guess you have imported the one from pyspark by using from pyspark.sql.functions import *, the pyspark min function takes only one argument (column) but the python one takes two arguments

Trying to import only the needed functions and it seems working (Just added some random input)

  1. from pyspark.sql.functions import udf, array
  2. from pyspark.sql.types import StructField, StructType, FloatType
  3. def ead(lista):
  4. ind_mmff, isdebala, isfubala, k1, k2, ead = lista
  5. try:
  6. isdebala = float(isdebala)
  7. isfubala = float(isfubala)
  8. k1 = float(k1)
  9. k2 = float(k2)
  10. ead = float(ead)
  11. except ValueError:
  12. return 'Error: invalid input'
  13. min_deb = min(0, isdebala)
  14. min_fub = min(0, isfubala)
  15. if ind_mmff == '0':
  16. ead_dai = abs(min_deb * k1 / 100 + min_fub * k2 / 100)
  17. else:
  18. ead_dai = ead
  19. return ead_dai
  20. ead_udf = udf(lambda z: ead(z), FloatType())
  21. schema = StructType([
  22. StructField('ind_mmff', FloatType(), True),
  23. StructField('isdebala', FloatType(), True),
  24. StructField('isfubala', FloatType(), True),
  25. StructField('k1', FloatType(), True),
  26. StructField('k2', FloatType(), True),
  27. StructField('ead_final_motor', FloatType(), True)
  28. ])
  29. df = spark.createDataFrame(data=[(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)],schema=schema)
  30. df = df.withColumn('ead_calc', ead_udf (array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))
  31. df.show()
  1. +--------+--------+--------+---+---+---------------+--------+
  2. |ind_mmff|isdebala|isfubala| k1| k2|ead_final_motor|ead_calc|
  3. +--------+--------+--------+---+---+---------------+--------+
  4. | 1.0| 2.0| 3.0|4.0|5.0| 6.0| 6.0|
  5. +--------+--------+--------+---+---+---------------+--------+
英文:

I think you have imported the wrong min function, I guess you have imported the one from pyspark by using from pyspark.sql.functions import *, the pyspark min function takes only one argument (column) but the python one takes two arguments

Trying to import only the needed functions and it seems working (Just added some random input)

  1. from pyspark.sql.functions import udf, array
  2. from pyspark.sql.types import StructField, StructType, FloatType
  3. def ead(lista):
  4. ind_mmff, isdebala, isfubala, k1, k2, ead = lista
  5. try:
  6. isdebala = float(isdebala)
  7. isfubala = float(isfubala)
  8. k1 = float(k1)
  9. k2 = float(k2)
  10. ead = float(ead)
  11. except ValueError:
  12. return &#39;Error: invalid input&#39;
  13. min_deb = min(0, isdebala)
  14. min_fub = min(0, isfubala)
  15. if ind_mmff == &#39;0&#39;:
  16. ead_dai = abs(min_deb * k1 / 100 + min_fub * k2 / 100)
  17. else:
  18. ead_dai = ead
  19. return ead_dai
  20. ead_udf = udf(lambda z: ead(z), FloatType())
  21. schema = StructType([
  22. StructField(&#39;ind_mmff&#39;, FloatType(), True),
  23. StructField(&#39;isdebala&#39;, FloatType(), True),
  24. StructField(&#39;isfubala&#39;, FloatType(), True),
  25. StructField(&#39;k1&#39;, FloatType(), True),
  26. StructField(&#39;k2&#39;, FloatType(), True),
  27. StructField(&#39;ead_final_motor&#39;, FloatType(), True)
  28. ])
  29. df = spark.createDataFrame(data=[(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)],schema=schema)
  30. df = df.withColumn(&#39;ead_calc&#39;, ead_udf (array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))
  31. df.show()
  1. +--------+--------+--------+---+---+---------------+--------+
  2. |ind_mmff|isdebala|isfubala| k1| k2|ead_final_motor|ead_calc|
  3. +--------+--------+--------+---+---+---------------+--------+
  4. | 1.0| 2.0| 3.0|4.0|5.0| 6.0| 6.0|
  5. +--------+--------+--------+---+---+---------------+--------+

huangapple
  • 本文由 发表于 2023年6月6日 01:13:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/76408657.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定