英文:
[PySpark][java.lang.StackOverflowError on df.write.csv]
问题
我正在使用 PySpark 版本 3.1.1 在一个 Kubernetes 集群上。我尝试将一个包含约 240 万行和 130 列的 Spark 数据帧以 CSV 格式写入 HDFS。数据帧有 5 个分区。如果我减少数据量,代码可以正常工作。以下是代码片段:
engineered_df.cache()
engineered_df.count()
engineered_df = engineered_df.repartition(5)
engineered_df.write.csv(CommonConstants.HDFS_ENGINEERED_DATA_PATH, mode="overwrite")
以下是堆栈跟踪信息:
py4j.protocol.Py4JJavaError: An error occurred while calling o3543180.csv.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
java.lang.StackOverflowError
...
我已经尝试增加分区的数量。
英文:
I'm using PySpark version 3.1.1 on a Kubernetes cluster
I'm trying to write a spark dataframe to HDFS in csv format. Dataframe has about 2.4 million rows and 130 columns and has 5 partitions. If I reduce the data the code works fine.
Following is the code snippet:
engineered_df.cache()
engineered_df.count()
engineered_df = engineered_df.repartition(5)
engineered_df.write.csv(CommonConstants.HDFS_ENGINEERED_DATA_PATH, mode="overwrite")
And this is the stacktrace:
py4j.protocol.Py4JJavaError: An error occurred while calling o3543180.csv.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:979)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
java.lang.StackOverflowError
at java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:477)
at sun.reflect.GeneratedMethodAccessor201.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
.
.
.
.
.
.
.
.
I have tried increasing the number of partitions.
答案1
得分: 1
似乎在您的DAG增长迅速并且在您的代码中有太多层次的转换发生时,会出现这个问题。由于DAG太长,JVM将无法保留操作以在最后执行延迟执行,当执行操作时。请尝试使用检查点:
spark.sparkContext.setCheckpointDir("./checkpoint")
engineered_df.checkpoint()
英文:
It seems this problem will occur when your DAG is growing fast and there's too many level of transformations happening in your code. The JVM will not be able to hold the operations to perform lazy execution when an action is performed in the end because the dag is too long, try checkpointing:
spark.sparkContext.setCheckpointDir("./checkpoint")
engineered_df.checkpoint()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论