英文:
Tuning while loops in pyspark (persisting or caching dataframes in a loop)
问题
我正在编写一个PySpark实现的迭代算法。算法的一部分涉及到迭代一个策略,直到不能再进行改进为止(即,贪婪地达到了局部最大值)。
optimize
函数返回一个包含三列的DataFrame,如下所示:
id | current_value | best_value |
---|---|---|
0 | 1 | 1 |
1 | 0 | 1 |
这个函数在一个while循环中使用,直到 current_value
和 best_value
相同时(表示不能再进行优化为止)。
# 初始化while循环
iterate = True
# 开始迭代,直到优化不再改进
while iterate:
# 创建(或覆盖)`df`
df = optimizeAll(df2) # 使用 `df2` 作为输入
df.persist().count()
# 检查停止条件
iterate = df.where('current_value != best_value').count() > 0
# 使用最新的结果更新 `df2`
if iterate:
df2 = df2.join(other=df, on='id', how='left') # <-- 我应该对此进行持久化吗?
当我手动传递输入时,这个函数运行得非常快。然而,我注意到随着迭代次数的增加,函数运行所需的时间呈指数增长。也就是说,第一次迭代在毫秒内完成,第二次在秒内完成,最终每次迭代需要高达10分钟的时间。
这个问题 暗示如果不对 df
进行缓存,while循环将在每次迭代时从头开始运行。这是真的吗?
如果是这样的话,我应该持久化哪些对象?我知道在定义 iterate
时,df
的持久化会被 count
触发。但是,df2
没有任何操作,所以即使我持久化它,是否会导致while循环每次都从头开始运行?同样地,我是否应该在循环中的某个时候取消持久化其中一个表格?
英文:
I am writing a PySpark implementation of an algorithm that is iterative in nature. Part of the algorithm involves iterating a strategy until no more improvements can be made (i.e., a local maximum has been greedily reached).
The function optimize
returns a three-column dataframe that looks as follows:
id | current_value | best_value |
---|---|---|
0 | 1 | 1 |
1 | 0 | 1 |
This function is used in a while loop until current_value
and best_value
are identical (meaning that no more optimizations can be made).
# Init while loop
iterate = True
# Start iterating until optimization yields same result as before
while iterate:
# Create (or overwrite) `df`
df = optimizeAll(df2) # Uses `df2` as input
df.persist().count()
# Check stopping condition
iterate = df.where('current_value != best_value').count() > 0
# Update `df2` with latest results
if iterate:
df2 = df2.join(other=df, on='id', how='left') # <- Should I persist this?
This function runs very quickly when I pass it the inputs manually. However, I have noticed that the time it takes for the function to run increases exponentially as it iterates. That is, the first iteration runs in milliseconds, the second one in seconds and eventually it takes up to 10 minutes per pass.
This question suggests that if df
isn't cached, the while loop will start running from scratch on every iteration. Is this true?
If so, which objects should I persist? I know that persisting df
will be triggered by the count
when defining iterate
. However, df2
has no action, so even if I persist it, will it make the while loop start from scratch every time? Likewise, should I unpersist either table at some point in the loop?
答案1
得分: 1
df
和 df2
在您的情况下都应该被持久化。在您上面的示例中,由于 df2
没有被持久化,每次调用 df.persist().count()
时,由于 df2
没有被持久化,只保留了谱系信息,它将从第一个迭代中的第一个 df2
开始连接,这显然不是一种高效的方法。即使您只能持久化 df
或 df2
中的一个,您应该首先持久化 df2
,这样在您提供的示例中会获得更一致的运行时间。
此外,不确定您使用的是哪个 Spark 版本,如果您使用的是 Spark >= 3.0.0,您应该启用 spark.sql.adaptive.enabled
和 spark.sql.adaptive.coalescePartitions.enabled
,因为 AQE 将优化查询计划(https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution 和 https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html)。如果您使用的是 Spark < 3.0.0,请在要优化连接和分组策略时,检查日志中的连接策略和数据倾斜条件。
英文:
Both df
and df2
should be persisted in your case if your resources are sufficient.
In your above example, as the df2
is not persisted, when you call df.persist().count()
every time, since df2
is not persisted and only lineage information left, it will start joining from the first df2
in the 1st iteration, which is obviously not an efficient approach. Even you can persist either df
or df2
only, you should persist df2
first in your example provided. It should give you a more consistent run time.
Beside, not sure which Spark version are you using, if you're using Spark >= 3.0.0, you should enable spark.sql.adaptive.enabled
and spark.sql.adaptive.coalescePartitions.enabled
since AQE will optimize the query plan (https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution and https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html). If you're using Spark < 3.0.0, you should check the joining strategy and data skew condition in your log when you want to optimize the joining and grouping strategy.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论