可以检查一个DataFrame是否是在Spark中仅应用了宽变换或窄变换的结果吗?

huangapple go评论61阅读模式
英文:

Is it possible to check if a Dataframe is a result of the application of only wide or narrow transformations in Spark?

问题

如何在 Apache Spark 的 Scala 接口中检查给定的 DataFrame 是否仅是窄转换的结果?例如,考虑下面的代码片段:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

val df: DataFrame = Seq(
  ("id-A", 1, 3),
  ("id-A", 5, 8),
  ("id-B", 1, 5),
  ("id-C", 3, 3),
  ("id-C", 2, 1)
)
.toDF("my_id", "col_1", "col_2")

val dfWithNarrowTransformations: DataFrame = df
.filter('my_id === "id-A")
.withColumn("col_3", 'col_1 + 'col_2)

val dfWithWideTransformations: DataFrame = df
.groupBy("my_id")
.agg(sum('col_1) as "col_4")

def hasWideTransformations(df: DataFrame): Boolean = ???

assert(hasWideTransformations(dfWithNarrowTransformations) == false)
assert(hasWideTransformations(dfWithWideTransformations) == true)

是否可以实现一个名为 hasWideTransformations 的函数,用于检查是否应用了广泛转换,无论转换的数量如何?

为了提供更多背景信息,我对此感兴趣是为了启用一个受限用户界面,仅支持在流模式中没有状态的转换的执行,以便可以在流和批处理模式下轻松一致地应用它们。

英文:

How could I check in the Scala interface of Apache Spark if a given DataFrame is the result of only narrow transformations? For example, consider the code snippet below:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

val df: DataFrame = Seq(
  ("id-A", 1, 3),
  ("id-A", 5, 8),
  ("id-B", 1, 5),
  ("id-C", 3, 3),
  ("id-C", 2, 1),
)
.toDF("my_id", "col_1", "col_2")

val dfWithNarrowTransformations: DataFrame = df
.filter('my_id === "id-A")
.withColumn("col_3", 'col_1 + 'col_2)

val dfWithWideTransformations: DataFrame = df
.groupBy("my_id")
.agg(sum('col_1) as "col_4")

def hasWideTransformations(df: DataFrame): Boolean = ???

assert(hasWideTransformations(dfWithNarrowTransformations) == false)
assert(hasWideTransformations(dfWithWideTransformations) == true)

Would it be possible to implement a function hasWideTransformations that checks if wide transformations were applied regardless of the number of transformations?

To give more context, I'm interested in doing this to enable a constrained user interface supporting only the execution of transformations that have no state in the streaming mode such that they can be easily and consistently applied in both streaming and batch modes.

答案1

得分: 2

你可以获取物理执行计划并检查该计划是否包含字符串“Exchange”。如果计划中包含Exchange,意味着存在洗牌操作,因此你有一个宽依赖转换。

要将物理执行计划作为字符串检索,可以使用内部数据集类字段queryExecution,以及其方法explainString。因此,你的hasWideTransformations方法可以编写如下:

def hasWideTransformations(df: DataFrame): Boolean = df.queryExecution
  .explainString(ExplainMode.fromString("simple"))
  .contains("Exchange")

然而,queryExecution字段是内部的Spark API,并被标记为不稳定,因此在将来的Spark版本中可能会更改。

英文:

You can retrieve the physical plan and check whether the plan contains the string Exchange. If the plan contains exchange, it means that there is a shuffle thus you have a wide transformation.

To retrieve physical plan as a string, you can use inner dataset class field queryExecution, with its method explainString. Thus your method hasWideTransformations can be written as follows:

def hasWideTransformations(df: DataFrame): Boolean = df.queryExecution
  .explainString(ExplainMode.fromString("simple"))
  .contains("Exchange")

However, queryExecution field is internal spark's API and is tagged as unstable, so it might change in future version of Spark.

huangapple
  • 本文由 发表于 2023年6月1日 04:41:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/76377128.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定