在PySpark中调优while循环(在循环中持久化或缓存数据框)。

huangapple go评论71阅读模式
英文:

Tuning while loops in pyspark (persisting or caching dataframes in a loop)

问题

我正在编写一个PySpark实现的迭代算法。算法的一部分涉及到迭代一个策略,直到不能再进行改进为止(即,贪婪地达到了局部最大值)。

optimize 函数返回一个包含三列的DataFrame,如下所示:

id current_value best_value
0 1 1
1 0 1

这个函数在一个while循环中使用,直到 current_valuebest_value 相同时(表示不能再进行优化为止)。

# 初始化while循环
iterate = True

# 开始迭代,直到优化不再改进
while iterate:

    # 创建(或覆盖)`df`
    df = optimizeAll(df2) # 使用 `df2` 作为输入
    df.persist().count()

    # 检查停止条件
    iterate = df.where('current_value != best_value').count() > 0

    # 使用最新的结果更新 `df2`
    if iterate:
        df2 = df2.join(other=df, on='id', how='left') #  <-- 我应该对此进行持久化吗?

当我手动传递输入时,这个函数运行得非常快。然而,我注意到随着迭代次数的增加,函数运行所需的时间呈指数增长。也就是说,第一次迭代在毫秒内完成,第二次在秒内完成,最终每次迭代需要高达10分钟的时间。

这个问题 暗示如果不对 df 进行缓存,while循环将在每次迭代时从头开始运行。这是真的吗?

如果是这样的话,我应该持久化哪些对象?我知道在定义 iterate 时,df 的持久化会被 count 触发。但是,df2 没有任何操作,所以即使我持久化它,是否会导致while循环每次都从头开始运行?同样地,我是否应该在循环中的某个时候取消持久化其中一个表格?

英文:

I am writing a PySpark implementation of an algorithm that is iterative in nature. Part of the algorithm involves iterating a strategy until no more improvements can be made (i.e., a local maximum has been greedily reached).

The function optimize returns a three-column dataframe that looks as follows:

id current_value best_value
0 1 1
1 0 1

This function is used in a while loop until current_value and best_value are identical (meaning that no more optimizations can be made).

# Init while loop
iterate = True

# Start iterating until optimization yields same result as before
while iterate:

    # Create (or overwrite) `df`
    df = optimizeAll(df2) # Uses `df2` as input
    df.persist().count()

    # Check stopping condition
    iterate = df.where(&#39;current_value != best_value&#39;).count() &gt; 0

    # Update `df2` with latest results
    if iterate:
        df2 = df2.join(other=df, on=&#39;id&#39;, how=&#39;left&#39;) #&#160;&lt;- Should I persist this?

This function runs very quickly when I pass it the inputs manually. However, I have noticed that the time it takes for the function to run increases exponentially as it iterates. That is, the first iteration runs in milliseconds, the second one in seconds and eventually it takes up to 10 minutes per pass.

This question suggests that if df isn't cached, the while loop will start running from scratch on every iteration. Is this true?

If so, which objects should I persist? I know that persisting df will be triggered by the count when defining iterate. However, df2 has no action, so even if I persist it, will it make the while loop start from scratch every time? Likewise, should I unpersist either table at some point in the loop?

答案1

得分: 1

dfdf2 在您的情况下都应该被持久化。在您上面的示例中,由于 df2 没有被持久化,每次调用 df.persist().count() 时,由于 df2 没有被持久化,只保留了谱系信息,它将从第一个迭代中的第一个 df2 开始连接,这显然不是一种高效的方法。即使您只能持久化 dfdf2 中的一个,您应该首先持久化 df2,这样在您提供的示例中会获得更一致的运行时间。

此外,不确定您使用的是哪个 Spark 版本,如果您使用的是 Spark >= 3.0.0,您应该启用 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled,因为 AQE 将优化查询计划(https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution 和 https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html)。如果您使用的是 Spark < 3.0.0,请在要优化连接和分组策略时,检查日志中的连接策略和数据倾斜条件。

英文:

Both df and df2 should be persisted in your case if your resources are sufficient.

In your above example, as the df2 is not persisted, when you call df.persist().count() every time, since df2 is not persisted and only lineage information left, it will start joining from the first df2 in the 1st iteration, which is obviously not an efficient approach. Even you can persist either df or df2 only, you should persist df2 first in your example provided. It should give you a more consistent run time.

Beside, not sure which Spark version are you using, if you're using Spark >= 3.0.0, you should enable spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled since AQE will optimize the query plan (https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution and https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html). If you're using Spark < 3.0.0, you should check the joining strategy and data skew condition in your log when you want to optimize the joining and grouping strategy.

huangapple
  • 本文由 发表于 2023年6月8日 22:11:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76432752.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定