英文:
Apache Spark: impact of repartitioning, sorting and caching on a join
问题
我正在探索Spark在将表与自身连接时的行为。我正在使用Databricks。
我的虚拟场景如下:
-
将外部表读取为DataFrame A(底层文件以delta格式存储)。
-
将DataFrame B定义为DataFrame A,仅选择特定列。
-
在列1和列2上连接DataFrame A和DataFrame B。
(是的,这没有太多意义,我只是在实验以理解Spark的底层机制)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower(concat(col("column4"), lit("_"), col("column5"))))
b = a.select("column1", "column2", "columnA")
c = a.join(b, how="left", on=["column1", "column2"])
我的第一次尝试是按原样运行代码(尝试1)。然后我尝试重新分区和缓存(尝试2)。
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower(concat(col("column4"), lit("_"), col("column5")))) \
.repartition(col("column1"), col("column2")).cache()
最后,我重新分区、排序并缓存。
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower(concat(col("column4"), lit("_"), col("column5")))) \
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
生成的相应DAG如附图所示。
我的问题是:
-
为什么在尝试1中,表似乎已被缓存,即使缓存没有明确指定。
-
为什么InMemoryTableScan总是后跟另一个相同类型的节点。
-
为什么在尝试3中,缓存似乎在两个阶段发生?
-
为什么在尝试3中,WholeStageCodegen只跟随一个(仅一个)InMemoryTableScan。
英文:
I am exploring Spark's behavior when joining a table to itself. I am using Databricks.
My dummy scenario is:
-
Read an external table as dataframe A (underlying files are in delta format)
-
Define dataframe B as dataframe A with only certain columns selected
-
Join dataframes A and B on column1 and column2
(Yes, it doesn't make much sense, I'm just experimenting to understand Spark's underlying mechanics)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
My first attempt was to run the code as it is (attempt 1). I then tried to repartition and cache (attempt 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Finally, I repartitioned, sorted and cached
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
The respective dags generated are as attached.
My questions are:
-
Why in attempt 1 the table appears to be cached even though caching has not been explicitly specified.
-
Why InMemoreTableScan is always followed by another node of this type.
-
Why in attempt 3 caching appears to take place on two stages?
-
Why in attempt 3 WholeStageCodegen follows one (and only one) InMemoreTableScan.
答案1
得分: 5
以下是已翻译的部分:
在这三个计划中,您所观察到的是DataBricks运行时和Spark的混合。
首先,在运行Databricks runtime 3.3+时,所有parquet文件的缓存都会自动启用。
相应的配置为:
spark.databricks.io.cache.enabled true
对于您的第二个查询,<i>InMemoryTableScan</i> 发生两次,因为在调用join时,Spark尝试并行计算Dataset A和Dataset B。假设不同的执行器被分配了上述任务,它们都必须从(Databricks)缓存中扫描表格。
对于第三个查询,<i>InMemoryTableScan</i> 本身并不涉及缓存。它只是表示catalyst形成的计划涉及多次扫描缓存表格。
PS:我无法理解第4点的含义
英文:
What you are observing in these 3 plans is a mixture of DataBricks runtime and Spark.
First of all, while running Databricks runtime 3.3+, caching is automatically enabled for all parquet files.
Corresponding config for that:
spark.databricks.io.cache.enabled true
For your second query, <i>InMemoryTableScan</i> is happening twice because right when join was called, spark tried to compute Dataset A and Dataset B in parallel. Assuming different executors got assigned the above tasks, both will have to scan the table from (Databricks) cache.
For the third one, <i>InMemoryTableScan</i> does not refer to caching in itself. It just means that whatever plan catalyst formed involved scanning the cached table multiple times.
PS: I can't visualize the point 4
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论