英文:
Spark number of tasks not equal to number of partitions
问题
在阅读文件作业中,如parquet扫描,它不匹配。例如,我需要对一个由258个parquet文件组成的表进行完全读取,但该读取作业决定使用8个任务,这与spark.conf.set('spark.sql.shuffle.partitions',X)
不一致(假设X未设置为8)。
因此,似乎在文件阅读作业中,它会独立于需要读取的分区数量选择任务数量。例如,我有一个表,当我运行:
df = spark.sql('select * from transaction')
df.rdd.getNumPartitions()
它会显示57,它由258个parquet文件组成(我猜测一个parquet文件不等于一个分区)。
但是,当为需要在之后进行分组的连接扫描此文件时,它只使用8个任务。
那么,如果文件数量是258,分区数量是57,为什么Spark会决定使用8个任务,而不考虑spark.sql.shuffle.partitions
的设置呢?
英文:
I have read that the number of partitions are related to the number of tasks. When I read a query plan on any job that is not the file reading job (for instance, the merge job of a join) I do see that it gets as many tasks as number of partitions of each table that come into the job. Also, it follows the spark.conf.set('spark.sql.shuffle.partitions',X)
definition.
But in the reading files jobs, like parquet scan, it does not matches. For example, I need a full read on a table that is made of 258 parquet files, but that reading job decided to use 8 tasks, which is not aligned with spark.conf.set('spark.sql.shuffle.partitions',X)
(assuming X is not set to 8)
So it seems on file reading jobs it selects the number of tasks independently from the number of partitions it needs to read. For instance. I have a table that when I run
df=spark.sql('select * from transaction')
df.rdd.getNumPartitions()
it says 57, and it's made of 258 parquet files (I guess 1 parquet is not equal to one partition)
But then, when scanning this file for a join that needs all rows because it groups by afterwards, it just uses 8 task.
So why if number of files is 258, number of partitions is 57 spark decides to go with 8 taks regardless of what spark.sql.shuffle.partitions
says?
答案1
得分: 1
从Spark官方文档中,spark.sql.shuffle.partitions
的含义是:
> 配置在联接或聚合时用于数据洗牌的分区数量。
这意味着它仅适用于数据洗牌作业,而不影响读取Parquet文件时的分区数量。另一方面,spark.sql.files.maxPartitionBytes
影响:
> 读取文件时打包到单个分区的最大字节数。此配置仅在使用基于文件的数据源,如Parquet、JSON和ORC时有效。
你提到的另一个问题是,在执行纯粹的文件扫描与文件扫描+联接和聚合时,任务数量不同。
如果不考虑AQE启用,我认为其中一个原因可能是您的数据已经按某些列(例如日期)分区。由于Parquet文件包含列的统计信息,当查询被优化(联接时),查询引擎意识到并非所有数据都需要扫描,因此分区数量不同。您可以设置一个测试案例,多次读取相同的表而不进行其他操作并保持配置不变,您应该会看到分区数量保持不变且执行计划一致。
英文:
From the Spark official docs the meaning of spark.sql.shuffle.partitions
is:
> Configures the number of partitions to use when shuffling data for joins or aggregations
Which means that it only applies on data shuffling job but not file reading. Therefore, it doesn't affect the number of partitions when you read the parquet files. On the other hand, spark.sql.files.maxPartitionBytes
affects:
> The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
The other question that you mentioned is that there are different number of tasks when you do the file scanning purely v.s file scanning + joining and aggregation.
Without considering the AQE enabling, I think one of the reasons might be your data is already partitioned by some columns, for example date. As parquet files have the statistical information of columns, when the query is optimized (joining), query engine realized that not all the data need to be scanned and therefore the number of partitions are different. You can setup a test case of reading the same table multiple time without further action and constant configuration, you should see the constant number of partitions and execution plan.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论