[PySpark][java.lang.StackOverflowError on df.write.csv]

huangapple go评论83阅读模式
英文:

[PySpark][java.lang.StackOverflowError on df.write.csv]

问题

我正在使用 PySpark 版本 3.1.1 在一个 Kubernetes 集群上。我尝试将一个包含约 240 万行和 130 列的 Spark 数据帧以 CSV 格式写入 HDFS。数据帧有 5 个分区。如果我减少数据量,代码可以正常工作。以下是代码片段:

engineered_df.cache()
engineered_df.count()
engineered_df = engineered_df.repartition(5)
engineered_df.write.csv(CommonConstants.HDFS_ENGINEERED_DATA_PATH, mode="overwrite")

以下是堆栈跟踪信息:

py4j.protocol.Py4JJavaError: An error occurred while calling o3543180.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
java.lang.StackOverflowError
	...

我已经尝试增加分区的数量。

英文:

I'm using PySpark version 3.1.1 on a Kubernetes cluster
I'm trying to write a spark dataframe to HDFS in csv format. Dataframe has about 2.4 million rows and 130 columns and has 5 partitions. If I reduce the data the code works fine.
Following is the code snippet:

engineered_df.cache()
engineered_df.count()
engineered_df = engineered_df.repartition(5)
engineered_df.write.csv(CommonConstants.HDFS_ENGINEERED_DATA_PATH, mode="overwrite")

And this is the stacktrace:

py4j.protocol.Py4JJavaError: An error occurred while calling o3543180.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:979)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
java.lang.StackOverflowError
	at java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:477)
	at sun.reflect.GeneratedMethodAccessor201.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
.
.
.
.
.
.
.
.

I have tried increasing the number of partitions.

答案1

得分: 1

似乎在您的DAG增长迅速并且在您的代码中有太多层次的转换发生时,会出现这个问题。由于DAG太长,JVM将无法保留操作以在最后执行延迟执行,当执行操作时。请尝试使用检查点:

spark.sparkContext.setCheckpointDir("./checkpoint")
engineered_df.checkpoint()
英文:

It seems this problem will occur when your DAG is growing fast and there's too many level of transformations happening in your code. The JVM will not be able to hold the operations to perform lazy execution when an action is performed in the end because the dag is too long, try checkpointing:

spark.sparkContext.setCheckpointDir("./checkpoint")
engineered_df.checkpoint()

huangapple
  • 本文由 发表于 2023年3月1日 15:46:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/75600815.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定