
huangapple go评论61阅读模式

Is it possible to check if a Dataframe is a result of the application of only wide or narrow transformations in Spark?


如何在 Apache Spark 的 Scala 接口中检查给定的 DataFrame 是否仅是窄转换的结果?例如,考虑下面的代码片段:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

val df: DataFrame = Seq(
  ("id-A", 1, 3),
  ("id-A", 5, 8),
  ("id-B", 1, 5),
  ("id-C", 3, 3),
  ("id-C", 2, 1)
.toDF("my_id", "col_1", "col_2")

val dfWithNarrowTransformations: DataFrame = df
.filter('my_id === "id-A")
.withColumn("col_3", 'col_1 + 'col_2)

val dfWithWideTransformations: DataFrame = df
.agg(sum('col_1) as "col_4")

def hasWideTransformations(df: DataFrame): Boolean = ???

assert(hasWideTransformations(dfWithNarrowTransformations) == false)
assert(hasWideTransformations(dfWithWideTransformations) == true)

是否可以实现一个名为 hasWideTransformations 的函数,用于检查是否应用了广泛转换,无论转换的数量如何?



How could I check in the Scala interface of Apache Spark if a given DataFrame is the result of only narrow transformations? For example, consider the code snippet below:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

val df: DataFrame = Seq(
  ("id-A", 1, 3),
  ("id-A", 5, 8),
  ("id-B", 1, 5),
  ("id-C", 3, 3),
  ("id-C", 2, 1),
.toDF("my_id", "col_1", "col_2")

val dfWithNarrowTransformations: DataFrame = df
.filter('my_id === "id-A")
.withColumn("col_3", 'col_1 + 'col_2)

val dfWithWideTransformations: DataFrame = df
.agg(sum('col_1) as "col_4")

def hasWideTransformations(df: DataFrame): Boolean = ???

assert(hasWideTransformations(dfWithNarrowTransformations) == false)
assert(hasWideTransformations(dfWithWideTransformations) == true)

Would it be possible to implement a function hasWideTransformations that checks if wide transformations were applied regardless of the number of transformations?

To give more context, I'm interested in doing this to enable a constrained user interface supporting only the execution of transformations that have no state in the streaming mode such that they can be easily and consistently applied in both streaming and batch modes.


得分: 2



def hasWideTransformations(df: DataFrame): Boolean = df.queryExecution

然而,queryExecution字段是内部的Spark API,并被标记为不稳定,因此在将来的Spark版本中可能会更改。


You can retrieve the physical plan and check whether the plan contains the string Exchange. If the plan contains exchange, it means that there is a shuffle thus you have a wide transformation.

To retrieve physical plan as a string, you can use inner dataset class field queryExecution, with its method explainString. Thus your method hasWideTransformations can be written as follows:

def hasWideTransformations(df: DataFrame): Boolean = df.queryExecution

However, queryExecution field is internal spark's API and is tagged as unstable, so it might change in future version of Spark.

  • 本文由 发表于 2023年6月1日 04:41:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/76377128.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
