英文:
Spark executor OOM while joining very small dataset (non-zero exit code 143)
问题
我在一个小数据集(总共41MB)上执行左外连接。我的执行器内存为10GB。当我在执行器上不使用任何内存持久性时,作业可以正常运行。但是当我使用Cache()
或persist(StorageLevel.MEMORY_AND_DISK())
时,作业会持续失败,并显示以下异常(但当我使用persist(StorageLevel.DISK_ONLY())
时可以正常工作):
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
# Executing /bin/sh -c "kill 42447"...
23/06/14 19:38:57 WARN TaskMemoryManager: Failed to allocate a page (64102862 bytes), try again.
23/06/14 19:38:57 INFO TaskMemoryManager: Memory used in task 4
23/06/14 19:38:57 INFO TaskMemoryManager: 64102862 bytes of memory were used by task 4 but are not associated with specific consumers
23/06/14 19:38:57 INFO TaskMemoryManager: 5500051918 bytes of memory are used for execution and 38002226 bytes of memory are used for storage
23/06/14 19:38:57 WARN BlockManager: Block rdd_27_0 could not be removed as it was not found on disk or in memory
并且执行器被杀死,并显示以下信息:
Diagnostics: [2023-05-28 16:24:44.080]Container killed on request. Exit code is 143\n[2023-05-28 16:24:44.080]Container exited with a non-zero exit code 143.
代码部分如下:
Dataset<Row> currentDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation");
currentDataSet.createOrReplaceTempView(CURRENT_USER_VIEW);
currentDataSet.cache() //<- this causes the issue
Dataset<Row> previousDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation2);
previousDataSet.createOrReplaceTempView(PREVIOUS_USER_VIEW);
previousDataSet.cache() //<- this causes the issue
String currentRunColumn = "c.".concat("userid");
String previousRunColumn = "p.".concat("userid");
Dataset<Row> addedRecordDataSets = sparkSession.sql("SELECT " + currentRunColumn + " FROM " +
CURRENT_USER_VIEW + " AS c " +
" LEFT JOIN " + PREVIOUS_USER_VIEW + " AS p " +
" ON " + currentRunColumn + " == " + previousRunColumn +
" WHERE " + previousRunColumn + " IS NULL ");
dataSet.coalesce(1).persist(StorageLevel.DISK_ONLY()).foreachPartition(persist());
根据我对上述日志的理解,在发生异常时,执行内存约为5GB,并且在尝试获取更多分配时失败,从而导致任务失败。但是,有人可以帮助我理解为什么41MB的数据会导致5GB的执行内存吗?有没有一种估算执行所需内存的方法?我已经查阅了此和此两个优秀的资源。
更新
尽管数据大小非常小,但峰值执行内存为48.1 GB。但是,即使执行器大小只有3GB,作业也不会中断。但是,当我添加内存缓存时,即使非常小,作业也会因OOM而中断(即使执行器大小为10GB,因此执行大小约为5GB),我怀疑是否存在有关清除缓存的错误?我正在使用Spark的2.4.x版本。
英文:
I am doing left outer join on a small data set(total 41 mb). My executor memory is 10GB. When i am not using any in memory persistence on executor the job works fine. But when i use Cache() or persist(StorageLevel.MEMORY_AND_DISK()) the job consistently fails with below exception (but works when i use persist(StorageLevel.DISK_ONLY()))
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
# Executing /bin/sh -c "kill 42447"...
23/06/14 19:38:57 WARN TaskMemoryManager: Failed to allocate a page (64102862 bytes), try again.
23/06/14 19:38:57 INFO TaskMemoryManager: Memory used in task 4
23/06/14 19:38:57 INFO TaskMemoryManager: 64102862 bytes of memory were used by task 4 but are not associated with specific consumers
23/06/14 19:38:57 INFO TaskMemoryManager: 5500051918 bytes of memory are used for execution and 38002226 bytes of memory are used for storage
23/06/14 19:38:57 WARN BlockManager: Block rdd_27_0 could not be removed as it was not found on disk or in memory
and executor is killed with
Diagnostics: [2023-05-28 16:24:44.080]Container killed on request. Exit code is 143\n[2023-05-28 16:24:44.080]Container exited with a non-zero exit code 143.
Below is the screenshot when job works fine.
Below is the screenshot when job fails
below is the screenshot for storage
code
Dataset<Row> currentDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation");
currentDataSet.createOrReplaceTempView(CURRENT_USER_VIEW);
currentDataSet.cache() //<- this causes the issue
Dataset<Row> previousDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation2);
previousDataSet.createOrReplaceTempView(PREVIOUS_USER_VIEW);
previousDataSet.cache() //<- this causes the issue
String currentRunColumn = "c.".concat("userid");
String previousRunColumn = "p.".concat("userid");
Dataset<Row> addedRecordDataSets = sparkSession.sql("SELECT " + currentRunColumn + " FROM " +
CURRENT_USER_VIEW + " AS c " +
" LEFT JOIN " + PREVIOUS_USER_VIEW + " AS p " +
" ON " + currentRunColumn + " == " + previousRunColumn +
" WHERE " + previousRunColumn + " IS NULL ");
dataSet.coalesce(1).persist(StorageLevel.DISK_ONLY()).foreachPartition(persist());
As par my understanding of above logs, at the time of exception execution memory is around 5gb and while it is trying to get more allocation it fails and the task fails. But can someone please help me understand how 41mb data can result into 5gb of execution memory? Is there a way to estimate the memory required for execution? I have gone through this and this amazing resources.
Update
Even though the data size is very small, peak execution memory is 48.1 GB. But the job does not die even when executor size is only 3gb. but when i add inmemory cache even though very small, the job dies with OOM (even when executor size is 10gb, hence execution size ~5gb), i am suspecting some bug wrt evicting cache? i am using 2.4.x of spark.
答案1
得分: 1
问题的根本原因在我们的情况下是使用了.cache(),当我将.cache更改为.persist(Disk)或完全删除它时,我不再遇到这个问题。
在更深入的调查中,这个问题似乎与 https://issues.apache.org/jira/browse/SPARK-21492 有关。
所以,
a. 更新到 Spark 2.4.5 或 3.0.0 可能会修复这个问题。
b. 在数据集上移除 .cache()。
更新
我在 Spark 3.1.5 中进行了检查,不再遇到这个问题。
英文:
The root cause of the issue in our case was using .cache(), when i changed the .cache to .persist(Disk) or removed it all together, i did not face the issue.
Upon digging deeper this issues seems to be related to
https://issues.apache.org/jira/browse/SPARK-21492
So
a. updating to spark 2.4.5 or 3.0.0 could fix the issue.
b. remove .cache() on dataset.
UPDATE
i checked in spark 3.1.5, and no longer seeing the issue.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论