Apache Spark:重新分区、排序和缓存对连接的影响。

huangapple go评论69阅读模式
英文:

Apache Spark: impact of repartitioning, sorting and caching on a join

问题

我正在探索Spark在将表与自身连接时的行为。我正在使用Databricks。

我的虚拟场景如下:

  1. 将外部表读取为DataFrame A(底层文件以delta格式存储)。

  2. 将DataFrame B定义为DataFrame A,仅选择特定列。

  3. 在列1和列2上连接DataFrame A和DataFrame B。

(是的,这没有太多意义,我只是在实验以理解Spark的底层机制)

a = spark.read.table("table") \
    .select("column1", "column2", "column3", "column4") \
    .withColumn("columnA", lower(concat(col("column4"), lit("_"), col("column5"))))

b = a.select("column1", "column2", "columnA")

c = a.join(b, how="left", on=["column1", "column2"])

我的第一次尝试是按原样运行代码(尝试1)。然后我尝试重新分区和缓存(尝试2)。

a = spark.read.table("table") \
    .select("column1", "column2", "column3", "column4") \
    .withColumn("columnA", lower(concat(col("column4"), lit("_"), col("column5")))) \
    .repartition(col("column1"), col("column2")).cache()

最后,我重新分区、排序并缓存。

a = spark.read.table("table") \
    .select("column1", "column2", "column3", "column4") \
    .withColumn("columnA", lower(concat(col("column4"), lit("_"), col("column5")))) \
    .repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

生成的相应DAG如附图所示。

我的问题是:

  1. 为什么在尝试1中,表似乎已被缓存,即使缓存没有明确指定。

  2. 为什么InMemoryTableScan总是后跟另一个相同类型的节点。

  3. 为什么在尝试3中,缓存似乎在两个阶段发生?

  4. 为什么在尝试3中,WholeStageCodegen只跟随一个(仅一个)InMemoryTableScan。

Apache Spark:重新分区、排序和缓存对连接的影响。

Apache Spark:重新分区、排序和缓存对连接的影响。

Apache Spark:重新分区、排序和缓存对连接的影响。

英文:

I am exploring Spark's behavior when joining a table to itself. I am using Databricks.

My dummy scenario is:

  1. Read an external table as dataframe A (underlying files are in delta format)

  2. Define dataframe B as dataframe A with only certain columns selected

  3. Join dataframes A and B on column1 and column2

(Yes, it doesn't make much sense, I'm just experimenting to understand Spark's underlying mechanics)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])

My first attempt was to run the code as it is (attempt 1). I then tried to repartition and cache (attempt 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()

Finally, I repartitioned, sorted and cached

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

The respective dags generated are as attached.

My questions are:

  1. Why in attempt 1 the table appears to be cached even though caching has not been explicitly specified.

  2. Why InMemoreTableScan is always followed by another node of this type.

  3. Why in attempt 3 caching appears to take place on two stages?

  4. Why in attempt 3 WholeStageCodegen follows one (and only one) InMemoreTableScan.

Apache Spark:重新分区、排序和缓存对连接的影响。

Apache Spark:重新分区、排序和缓存对连接的影响。

Apache Spark:重新分区、排序和缓存对连接的影响。

答案1

得分: 5

以下是已翻译的部分:

在这三个计划中,您所观察到的是DataBricks运行时和Spark的混合。

首先,在运行Databricks runtime 3.3+时,所有parquet文件的缓存都会自动启用。
相应的配置为:
spark.databricks.io.cache.enabled true

对于您的第二个查询,<i>InMemoryTableScan</i> 发生两次,因为在调用join时,Spark尝试并行计算Dataset A和Dataset B。假设不同的执行器被分配了上述任务,它们都必须从(Databricks)缓存中扫描表格。

对于第三个查询,<i>InMemoryTableScan</i> 本身并不涉及缓存。它只是表示catalyst形成的计划涉及多次扫描缓存表格。

PS:我无法理解第4点的含义 Apache Spark:重新分区、排序和缓存对连接的影响。

英文:

What you are observing in these 3 plans is a mixture of DataBricks runtime and Spark.

First of all, while running Databricks runtime 3.3+, caching is automatically enabled for all parquet files.
Corresponding config for that:
spark.databricks.io.cache.enabled true

For your second query, <i>InMemoryTableScan</i> is happening twice because right when join was called, spark tried to compute Dataset A and Dataset B in parallel. Assuming different executors got assigned the above tasks, both will have to scan the table from (Databricks) cache.

For the third one, <i>InMemoryTableScan</i> does not refer to caching in itself. It just means that whatever plan catalyst formed involved scanning the cached table multiple times.

PS: I can't visualize the point 4 Apache Spark:重新分区、排序和缓存对连接的影响。

huangapple
  • 本文由 发表于 2020年1月3日 18:28:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/59576929.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定