英文:
Is it possible to check if a Dataframe is a result of the application of only wide or narrow transformations in Spark?
问题
如何在 Apache Spark 的 Scala 接口中检查给定的 DataFrame 是否仅是窄转换的结果?例如,考虑下面的代码片段:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
val df: DataFrame = Seq(
("id-A", 1, 3),
("id-A", 5, 8),
("id-B", 1, 5),
("id-C", 3, 3),
("id-C", 2, 1)
)
.toDF("my_id", "col_1", "col_2")
val dfWithNarrowTransformations: DataFrame = df
.filter('my_id === "id-A")
.withColumn("col_3", 'col_1 + 'col_2)
val dfWithWideTransformations: DataFrame = df
.groupBy("my_id")
.agg(sum('col_1) as "col_4")
def hasWideTransformations(df: DataFrame): Boolean = ???
assert(hasWideTransformations(dfWithNarrowTransformations) == false)
assert(hasWideTransformations(dfWithWideTransformations) == true)
是否可以实现一个名为 hasWideTransformations
的函数,用于检查是否应用了广泛转换,无论转换的数量如何?
为了提供更多背景信息,我对此感兴趣是为了启用一个受限用户界面,仅支持在流模式中没有状态的转换的执行,以便可以在流和批处理模式下轻松一致地应用它们。
英文:
How could I check in the Scala interface of Apache Spark if a given DataFrame
is the result of only narrow transformations? For example, consider the code snippet below:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
val df: DataFrame = Seq(
("id-A", 1, 3),
("id-A", 5, 8),
("id-B", 1, 5),
("id-C", 3, 3),
("id-C", 2, 1),
)
.toDF("my_id", "col_1", "col_2")
val dfWithNarrowTransformations: DataFrame = df
.filter('my_id === "id-A")
.withColumn("col_3", 'col_1 + 'col_2)
val dfWithWideTransformations: DataFrame = df
.groupBy("my_id")
.agg(sum('col_1) as "col_4")
def hasWideTransformations(df: DataFrame): Boolean = ???
assert(hasWideTransformations(dfWithNarrowTransformations) == false)
assert(hasWideTransformations(dfWithWideTransformations) == true)
Would it be possible to implement a function hasWideTransformations
that checks if wide transformations were applied regardless of the number of transformations?
To give more context, I'm interested in doing this to enable a constrained user interface supporting only the execution of transformations that have no state in the streaming mode such that they can be easily and consistently applied in both streaming and batch modes.
答案1
得分: 2
你可以获取物理执行计划并检查该计划是否包含字符串“Exchange”。如果计划中包含Exchange,意味着存在洗牌操作,因此你有一个宽依赖转换。
要将物理执行计划作为字符串检索,可以使用内部数据集类字段queryExecution
,以及其方法explainString
。因此,你的hasWideTransformations
方法可以编写如下:
def hasWideTransformations(df: DataFrame): Boolean = df.queryExecution
.explainString(ExplainMode.fromString("simple"))
.contains("Exchange")
然而,queryExecution
字段是内部的Spark API,并被标记为不稳定,因此在将来的Spark版本中可能会更改。
英文:
You can retrieve the physical plan and check whether the plan contains the string Exchange
. If the plan contains exchange, it means that there is a shuffle thus you have a wide transformation.
To retrieve physical plan as a string, you can use inner dataset class field queryExecution
, with its method explainString
. Thus your method hasWideTransformations
can be written as follows:
def hasWideTransformations(df: DataFrame): Boolean = df.queryExecution
.explainString(ExplainMode.fromString("simple"))
.contains("Exchange")
However, queryExecution
field is internal spark's API and is tagged as unstable, so it might change in future version of Spark.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论