英文:
pySpark performance issue - repeatedly query rawdata
问题
I'm using pySpark to query a large data set in AWS S3 bucket with below code. The rawdata query is very slow. Therefore I'm trying to improve the performance with cache and temp views. But looks like it doesn't work.
df_data = spark.read.parquet(baseName)
df_data = df_data.filter(df_data["supplyUuid"]=="xxxxxxxxx...")
df_data = df_data.select(["subscriptionState", "dateTime"])
df_data.cache().createOrReplaceTempView("subscriptionHistory")
display(df_data)```
Here I get a two columns table with 93 recorders.
Then I run below code to get the start and end date time of different subscription states.
```df_data.groupBy("subscriptionState").agg(f.min("dateTime"), f.max("dateTime"))```
Suppose, I've already cache the subset data with 93 recorders in a temp table. The group is run based on the subset. But actually I found the 2nd part of the code takes hours and seems like it queries the whole rawdata again.
What happens here? How to improve the performance?
<details>
<summary>英文:</summary>
I'm using pySpark to query a large data set in AWS S3 bucket with below code. The rawdata query is very slow. Therefore I'm trying to improve the performance with cache and temp views. But looks like it doesn't work.
baseName = "/local/rawdata"
df_data = spark.read.parquet(baseName)
df_data = df_data.filter(df_data["supplyUuid"]=="xxxxxxxxx...")
df_data = df_data.select(["subscriptionState", "dateTime"])
df_data.cache().createOrReplaceTempView("subscriptionHistory")
display(df_data)
Here I get a two columns table with 93 recorders.
Then I run below code to get the start and end date time of different subscription states.
df_data.groupBy("subscriptionState").agg(f.min("dateTime"), f.max("dateTime"))
Suppose, I've already cache the subset data with 93 recorders in a temp table. The group is run based on the subset. But actually I found the 2nd part of the code takes hours and seems like it query the whole rawdata again.
What happens here? How to improve the performance?
</details>
# 答案1
**得分**: 0
只是为了解释一下,Spark采用了惰性执行模型,这意味着除非最后有一个动作要执行,否则任何转换都不会被执行。
因此,调用`cache`只会在执行最后的第一个动作时缓存结果,因此除非您将在另一个动作中重用相同的数据框架,否则您将受益于缓存。
因此,调用`cache`然后`createOrReplaceTempView`然后`groupby`不会立即执行,执行时间是从读取文件到分组和聚合的整个操作。
由于我不了解您的完整逻辑和环境,我建议从`repartition`开始,这样您可以将负载分散到更多的工作节点上。
您也可以在官方[文档](https://spark.apache.org/docs/latest/sql-performance-tuning.html#coalesce-hints-for-sql-queries)中找到更多的优化提示。
<details>
<summary>英文:</summary>
Just to explain Spark has a lazy execution model, which means any transformations will not be executed unless there is an action coming at the end.
So calling `cache` will only cache the results while executing the first action at the end, so you will benefit from caching unless you are going to reuse the same dataframe in another action.
So calling `cache` then `createOrReplaceTempView` then `groupby` won't be executed immediately, and the execution time is for the whole operation from reading the files until the group by and aggregations.
Since I'm not aware of your complete logic and enviroment, I'd suggest to start with the `repartition` so you can split the load on more workers.
You can find also more tuning tips from the official [documentation](https://spark.apache.org/docs/latest/sql-performance-tuning.html#coalesce-hints-for-sql-queries).
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论