Spark executor OOM while joining very small dataset (non-zero exit code 143)

huangapple go评论119阅读模式
英文:

Spark executor OOM while joining very small dataset (non-zero exit code 143)

问题

我在一个小数据集(总共41MB)上执行左外连接。我的执行器内存为10GB。当我在执行器上不使用任何内存持久性时,作业可以正常运行。但是当我使用Cache()persist(StorageLevel.MEMORY_AND_DISK())时,作业会持续失败,并显示以下异常(但当我使用persist(StorageLevel.DISK_ONLY())时可以正常工作):

# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 42447"...
23/06/14 19:38:57 WARN TaskMemoryManager: Failed to allocate a page (64102862 bytes), try again.
23/06/14 19:38:57 INFO TaskMemoryManager: Memory used in task 4
23/06/14 19:38:57 INFO TaskMemoryManager: 64102862 bytes of memory were used by task 4 but are not associated with specific consumers
23/06/14 19:38:57 INFO TaskMemoryManager: 5500051918 bytes of memory are used for execution and 38002226 bytes of memory are used for storage
23/06/14 19:38:57 WARN BlockManager: Block rdd_27_0 could not be removed as it was not found on disk or in memory

并且执行器被杀死,并显示以下信息:

Diagnostics: [2023-05-28 16:24:44.080]Container killed on request. Exit code is 143\n[2023-05-28 16:24:44.080]Container exited with a non-zero exit code 143.

以下是作业正常工作时的截图。
Spark executor OOM while joining very small dataset (non-zero exit code 143)

以下是作业失败时的截图。
Spark executor OOM while joining very small dataset (non-zero exit code 143)

以下是存储的截图。
Spark executor OOM while joining very small dataset (non-zero exit code 143)

代码部分如下:

Dataset<Row> currentDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation");
currentDataSet.createOrReplaceTempView(CURRENT_USER_VIEW);
currentDataSet.cache() //<- this causes the issue
Dataset<Row> previousDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation2);
previousDataSet.createOrReplaceTempView(PREVIOUS_USER_VIEW);
previousDataSet.cache() //<- this causes the issue

String currentRunColumn = "c.".concat("userid");
String previousRunColumn = "p.".concat("userid");

Dataset<Row> addedRecordDataSets = sparkSession.sql("SELECT " + currentRunColumn + " FROM " +
        CURRENT_USER_VIEW + " AS c " +
        " LEFT JOIN " + PREVIOUS_USER_VIEW + " AS p " +
        " ON " + currentRunColumn + " == " + previousRunColumn +
        " WHERE " + previousRunColumn + " IS NULL ");

dataSet.coalesce(1).persist(StorageLevel.DISK_ONLY()).foreachPartition(persist());

根据我对上述日志的理解,在发生异常时,执行内存约为5GB,并且在尝试获取更多分配时失败,从而导致任务失败。但是,有人可以帮助我理解为什么41MB的数据会导致5GB的执行内存吗?有没有一种估算执行所需内存的方法?我已经查阅了两个优秀的资源。

更新
尽管数据大小非常小,但峰值执行内存为48.1 GB。但是,即使执行器大小只有3GB,作业也不会中断。但是,当我添加内存缓存时,即使非常小,作业也会因OOM而中断(即使执行器大小为10GB,因此执行大小约为5GB),我怀疑是否存在有关清除缓存的错误?我正在使用Spark的2.4.x版本。

英文:

I am doing left outer join on a small data set(total 41 mb). My executor memory is 10GB. When i am not using any in memory persistence on executor the job works fine. But when i use Cache() or persist(StorageLevel.MEMORY_AND_DISK()) the job consistently fails with below exception (but works when i use persist(StorageLevel.DISK_ONLY()))

# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 42447"...
23/06/14 19:38:57 WARN TaskMemoryManager: Failed to allocate a page (64102862 bytes), try again.
23/06/14 19:38:57 INFO TaskMemoryManager: Memory used in task 4
23/06/14 19:38:57 INFO TaskMemoryManager: 64102862 bytes of memory were used by task 4 but are not associated with specific consumers
23/06/14 19:38:57 INFO TaskMemoryManager: 5500051918 bytes of memory are used for execution and 38002226 bytes of memory are used for storage
23/06/14 19:38:57 WARN BlockManager: Block rdd_27_0 could not be removed as it was not found on disk or in memory

and executor is killed with

Diagnostics: [2023-05-28 16:24:44.080]Container killed on request. Exit code is 143\n[2023-05-28 16:24:44.080]Container exited with a non-zero exit code 143.

Below is the screenshot when job works fine.
Spark executor OOM while joining very small dataset (non-zero exit code 143)
Below is the screenshot when job fails
Spark executor OOM while joining very small dataset (non-zero exit code 143)
below is the screenshot for storage
Spark executor OOM while joining very small dataset (non-zero exit code 143)

code

Dataset<Row> currentDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation");
currentDataSet.createOrReplaceTempView(CURRENT_USER_VIEW);
currentDataSet.cache() //<- this causes the issue
Dataset<Row> previousDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation2);
previousDataSet.createOrReplaceTempView(PREVIOUS_USER_VIEW);
previousDataSet.cache() //<- this causes the issue

String currentRunColumn = "c.".concat("userid");
String previousRunColumn = "p.".concat("userid");

Dataset<Row> addedRecordDataSets = sparkSession.sql("SELECT " + currentRunColumn + " FROM " +
        CURRENT_USER_VIEW + " AS c " +
        " LEFT JOIN " + PREVIOUS_USER_VIEW + " AS p " +
        " ON " + currentRunColumn + " == " + previousRunColumn +
        " WHERE " + previousRunColumn + " IS NULL ");

dataSet.coalesce(1).persist(StorageLevel.DISK_ONLY()).foreachPartition(persist());

As par my understanding of above logs, at the time of exception execution memory is around 5gb and while it is trying to get more allocation it fails and the task fails. But can someone please help me understand how 41mb data can result into 5gb of execution memory? Is there a way to estimate the memory required for execution? I have gone through this and this amazing resources.

Update
Even though the data size is very small, peak execution memory is 48.1 GB. But the job does not die even when executor size is only 3gb. but when i add inmemory cache even though very small, the job dies with OOM (even when executor size is 10gb, hence execution size ~5gb), i am suspecting some bug wrt evicting cache? i am using 2.4.x of spark.

答案1

得分: 1

问题的根本原因在我们的情况下是使用了.cache(),当我将.cache更改为.persist(Disk)或完全删除它时,我不再遇到这个问题。

在更深入的调查中,这个问题似乎与 https://issues.apache.org/jira/browse/SPARK-21492 有关。

所以,

a. 更新到 Spark 2.4.5 或 3.0.0 可能会修复这个问题。

b. 在数据集上移除 .cache()。

更新

我在 Spark 3.1.5 中进行了检查,不再遇到这个问题。

英文:

The root cause of the issue in our case was using .cache(), when i changed the .cache to .persist(Disk) or removed it all together, i did not face the issue.

Upon digging deeper this issues seems to be related to

https://issues.apache.org/jira/browse/SPARK-21492

So

a. updating to spark 2.4.5 or 3.0.0 could fix the issue.

b. remove .cache() on dataset.

UPDATE

i checked in spark 3.1.5, and no longer seeing the issue.

huangapple
  • 本文由 发表于 2023年6月15日 00:56:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/76475916.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定