Unable to use .show() and unable to perform further operations on a spark dataframe after using a user defined function on a particular column

huangapple go评论93阅读模式
英文:

Unable to use .show() and unable to perform further operations on a spark dataframe after using a user defined function on a particular column

问题

  1. 我试图在Spark中使用一个UDF。在将UDF应用于列之后,df.show()既不起作用,我也无法对该数据框执行任何进一步的操作。所以,我运行了文档中给出的代码([链接](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html)),并得到了相同的错误。
  2. 以下是代码:
  3. ```python
  4. from pyspark.sql.types import IntegerType
  5. slen = udf(lambda s: len(s), IntegerType())
  6. @udf
  7. def to_upper(s):
  8. if s is not None:
  9. return s.upper()
  10. @udf(returnType=IntegerType())
  11. def add_one(x):
  12. if x is not None:
  13. return x + 1
  14. df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
  15. df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()

以下是错误:

  1. Py4JJavaError Traceback (most recent call last)
  2. ...
  3. Py4JJavaError: An error occurred while calling o134.showString.
  4. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 2) (LAPTOP-SI50IG8L executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
  5. ...
  6. Caused by: java.net.SocketTimeoutException: Accept timed out
  7. ...
  8. 我不知道问题出在哪里。
  9. **版本**
  10. Python: 3.10.10
  11. Pyspark: 3.3.2
  12. 我在Stack Overflow上读到一个答案,说UDF很慢,但我认为使用文档中给出的代码应该可以正常工作。
英文:

I was trying to use an UDF in spark. After applying the udf to a column, df.show() was not working neither I was able to apply any further operation on that dataframe. So, I ran the code which is given in the documentation here and got the same error

The code was:

  1. from pyspark.sql.types import IntegerType
  2. slen = udf(lambda s: len(s), IntegerType())
  3. @udf
  4. def to_upper(s):
  5. if s is not None:
  6. return s.upper()
  7. @udf(returnType=IntegerType())
  8. def add_one(x):
  9. if x is not None:
  10. return x + 1
  11. df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
  12. df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()

Here is the error:

  1. Py4JJavaError Traceback (most recent call last)
  2. Cell In[13], line 14
  3. 11 return x + 1
  4. 13 df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
  5. ---> 14 df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
  6. File C:\Program Files\Python310\lib\site-packages\pyspark\sql\dataframe.py:606, in DataFrame.show(self, n, truncate, vertical)
  7. 603 raise TypeError("Parameter 'vertical' must be a bool")
  8. 605 if isinstance(truncate, bool) and truncate:
  9. --> 606 print(self._jdf.showString(n, 20, vertical))
  10. 607 else:
  11. 608 try:
  12. File C:\Program Files\Python310\lib\site-packages\py4j\java_gateway.py:1321, in JavaMember.__call__(self, *args)
  13. 1315 command = proto.CALL_COMMAND_NAME +\
  14. 1316 self.command_header +\
  15. 1317 args_command +\
  16. 1318 proto.END_COMMAND_PART
  17. 1320 answer = self.gateway_client.send_command(command)
  18. -> 1321 return_value = get_return_value(
  19. 1322 answer, self.gateway_client, self.target_id, self.name)
  20. 1324 for temp_arg in temp_args:
  21. 1325 temp_arg._detach()
  22. File C:\Program Files\Python310\lib\site-packages\pyspark\sql\utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
  23. 188 def deco(*a: Any, **kw: Any) -> Any:
  24. 189 try:
  25. --> 190 return f(*a, **kw)
  26. 191 except Py4JJavaError as e:
  27. 192 converted = convert_exception(e.java_exception)
  28. File C:\Program Files\Python310\lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
  29. 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
  30. 325 if answer[1] == REFERENCE_TYPE:
  31. --> 326 raise Py4JJavaError(
  32. 327 "An error occurred while calling {0}{1}{2}.\n".
  33. 328 format(target_id, ".", name), value)
  34. 329 else:
  35. 330 raise Py4JError(
  36. 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
  37. 332 format(target_id, ".", name, value))
  38. Py4JJavaError: An error occurred while calling o134.showString.
  39. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 2) (LAPTOP-SI50IG8L executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
  40. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
  41. at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
  42. at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
  43. at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:157)
  44. at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
  45. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  46. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  47. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  48. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  49. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  50. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  51. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  52. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  53. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  54. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  55. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  56. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  57. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  58. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  59. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  60. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  61. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  62. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  63. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  64. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  65. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  66. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  67. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  68. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  69. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  70. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  71. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  72. at org.apache.spark.scheduler.Task.run(Task.scala:136)
  73. at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  74. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
  75. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  76. at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  77. at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  78. at java.lang.Thread.run(Unknown Source)
  79. Caused by: java.net.SocketTimeoutException: Accept timed out
  80. at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
  81. at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
  82. at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
  83. at java.net.PlainSocketImpl.accept(Unknown Source)
  84. at java.net.ServerSocket.implAccept(Unknown Source)
  85. at java.net.ServerSocket.accept(Unknown Source)
  86. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
  87. ... 38 more
  88. Driver stacktrace:
  89. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
  90. at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
  91. at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
  92. at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  93. at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  94. at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  95. at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
  96. at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
  97. at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
  98. at scala.Option.foreach(Option.scala:407)
  99. at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
  100. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
  101. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
  102. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
  103. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  104. at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
  105. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
  106. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
  107. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
  108. at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
  109. at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
  110. at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  111. at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
  112. at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
  113. at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
  114. at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
  115. at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
  116. at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
  117. at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
  118. at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
  119. at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  120. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  121. at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
  122. at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
  123. at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
  124. at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
  125. at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
  126. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  127. at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
  128. at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
  129. at java.lang.reflect.Method.invoke(Unknown Source)
  130. at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  131. at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  132. at py4j.Gateway.invoke(Gateway.java:282)
  133. at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  134. at py4j.commands.CallCommand.execute(CallCommand.java:79)
  135. at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
  136. at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
  137. at java.lang.Thread.run(Unknown Source)
  138. Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
  139. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
  140. at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
  141. at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
  142. at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:157)
  143. at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
  144. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  145. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  146. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  147. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  148. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  149. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  150. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  151. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  152. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  153. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  154. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  155. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  156. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  157. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  158. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  159. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  160. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  161. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  162. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  163. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  164. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  165. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  166. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  167. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  168. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  169. at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  170. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  171. at org.apache.spark.scheduler.Task.run(Task.scala:136)
  172. at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  173. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
  174. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  175. at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  176. at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  177. ... 1 more
  178. Caused by: java.net.SocketTimeoutException: Accept timed out
  179. at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
  180. at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
  181. at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
  182. at java.net.PlainSocketImpl.accept(Unknown Source)
  183. at java.net.ServerSocket.implAccept(Unknown Source)
  184. at java.net.ServerSocket.accept(Unknown Source)
  185. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
  186. ... 38 more

I don't know what is the problem.
Versions
Python: 3.10.10
Pyspark: 3.3.2

I read one answer on stack overflow saying that UDF's are slow but I think it should work with the code given in the documentation.

答案1

得分: 1

这是由PySpark和Python之间的连接引起的。您可以通过设置环境变量PYSPARK_DRIVER_PYTHONPYSPARK_PYTHON到您的Python路径,或者在使用spark-submit提交时设置spark.pyspark.driverspark.pyspark.python来解决这个问题。

英文:

It's caused by the connection between PySpark and Python. You can either set the environment variable PYSPARK_DRIVER_PYTHON and PYSPARK_PYTHON to your python or spark.pyspark.driver and spark.pyspark.python when you use spark-submit commit.

huangapple
  • 本文由 发表于 2023年4月7日 03:33:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/75953146.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定