英文:
Does spark read the same file twice, if two stages are using the same DataFrame?
问题
以下是翻译好的内容:
以下代码读取相同的CSV文件两次,尽管只调用了一个操作
端到端可运行示例:
import pandas as pd
import numpy as np
df1= pd.DataFrame(np.arange(1_000).reshape(-1,1))
df1.index = np.random.choice(range(10),size=1000)
df1.to_csv("./df1.csv",index_label="index")
############################################################################
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField
spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.adaptive.enabled","false").getOrCreate()
schema = StructType([StructField('index', StringType(), True),
StructField('0', StringType(), True)])
df1 = spark.read.csv("./df1.csv", header=True, schema = schema)
df2 = df1.groupby("index").agg(F.mean("0"))
df3 = df1.join(df2,on='index')
df3.explain()
df3.count()
Web UI中的SQL选项卡显示如下内容:
正如您所见,df1文件被读取了两次。这是否是预期行为?为什么会发生这种情况?我只执行了一个操作,因此管道的相同部分不应该运行多次。
我已经在这里找到了答案。问题几乎相同,但在那个问题中使用了RDD,而我在pyspark API中使用的是数据框。在那个问题中,建议如果要避免多次文件扫描,那么使用DataFrames API将有所帮助,这也是为什么首先存在DataFrames API的原因
然而,事实证明,我在使用DataFrames时也遇到了完全相同的问题。看起来Spark很奇怪,它以效率著称,但却这么低效(主要是我可能只是漏掉了某些东西,这不是有效的批评 :))
英文:
The following code reads the same csv twice even though only one action is called
End to end runnable example:
import pandas as pd
import numpy as np
df1= pd.DataFrame(np.arange(1_000).reshape(-1,1))
df1.index = np.random.choice(range(10),size=1000)
df1.to_csv("./df1.csv",index_label = "index")
############################################################################
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField
spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.adaptive.enabled","false").getOrCreate()
schema = StructType([StructField('index', StringType(), True),
StructField('0', StringType(), True)])
df1 = spark.read.csv("./df1.csv", header=True, schema = schema)
df2 = df1.groupby("index").agg(F.mean("0"))
df3 = df1.join(df2,on='index')
df3.explain()
df3.count()
The sql tab in the web UI shows the following:
As you can see, the df1 file is read twice. Is this the expected behavior? Why is that happening? I have just one action so the same part of the pipeline should not run multiple times.
I have read the answer here. The question is almost the same, but in that question RDDs are used and I am using dataframes in pyspark API. In that question it is suggested that if multiple file scans are to be avoided then DataFrames API would help and this is the reason why DataFrama API exists in the first place
However, as it turns out, I am facing the exactly same issue with the DataFrames as well. It seems rather weird of spark, which is celebrated for its efficiency, to be this inefficient (Mostly I am just missing something and that is not a valid criticism :))
答案1
得分: 2
是的。这对于连接(JOIN)和合并(UNION)操作来说是典型的,无论是对于两个数据框(DF's)还是弹性分布式数据集(RDD's),只要它们从相同的底层数据源读取,除非使用了.cache()
。然后,Spark用户界面显示读取操作,但会显示已经应用缓存的绿点。就像他们说的那样,这里没有什么可看的。
英文:
Yes. This is typical for JOIN and UNION operations for both DF's, RDD's when reading from same underlying source, unless a .cache()
is used. The Spark UI then shows reads, but with green dot for caching having been applied. Nothing to see here as they say.
答案2
得分: 2
以下是翻译好的内容:
我看待这个问题的方式是按阶段划分。每个阶段是一组在数据的不同分区上并行运行的相同任务的集合。
您的查询有4个阶段:
所以您已经注意到,Stage 0
和 Stage 1
都在读取您的CSV文件。但这是有道理的,Stage 0
和 Stage 1
完全独立于彼此。它们可能会读取相同的数据,但它们正在执行不同的任务。
一般来说,Stage 0
中的所有任务都会在 Stage 1
中的任何任务开始计算之前首先执行。因此,如果您想避免重复读取数据,您需要执行以下操作之一:
-
拥有一个计算两个输出的阶段(在这种情况下,为
Stage 2
的两个输入)。这将显着改变Spark的架构,因为现在只存在始终具有一个输出的阶段。 -
Spark 可能(在幕后)确实会(如 thebluephantom 所说)决定为您对此数据集进行
.cache
,但这意味着它实际上会填满您的存储内存,而您甚至没有要求它这样做(并且可能会使进一步的计算性能变差)。这将使得很难知道您的存储内存填满了多少,如果底层过程实际上开始缓存您的数据的话。
英文:
The way I see this is in terms of stages. Each stage is a collection of identical tasks that run in parallel on different partitions of your data.
Your query has 4 stages:
So as you have noticed, Stage 0
and Stage 1
both read your CSV file. But that makes sense, Stage 0
and Stage 1
are completely independent from one another. They might be reading in the same data but they are doing different things.
In general, all the tasks in Stage 0
will get executed first before any task in Stage 1
starts to get computed. So if you would want to avoid double reading in of the data, you would need to do either of these:
-
Have a stage that computes 2 outputs (in this case for both inputs of
Stage 2
). This would significantly change the Spark architecture since now there only exist stages with 1 output always. -
Spark could (under the hood) indeed (like thebluephantom says) decide to
.cache
this dataset for you, but that means it is actually filling up your storage memory without you even having asked this (and risk further computations to become less performant). That would make it hard to know how filled up your storage memory is, if underlying processes would actually start caching your data.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论