英文:
Skewed partitions when setting spark.sql.files.maxPartitionBytes
问题
我在一个 pyspark 的 Docker 容器中工作。
我尝试从本地文件系统读取 Parquet 文件到 Spark 中,并同时将其拆分为多个分区:
- 我下载了 NY Taxi 数据集,大小为 46Mb
curl -Lo taxi.parquet 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet'
- 我使用以下配置启动 Spark 会话(最大分区大小为 10Mb):
spark = (
SparkSession
.builder
.appName("Bucketing")
.master("local[4]")
.config("spark.sql.files.maxPartitionBytes", 10 * 1000 * 1024)
.getOrCreate()
)
- 我将 Parquet 文件读取到 DataFrame 中:
taxi_df = spark.read.parquet("taxi.parquet")
- 我触发一个操作:
taxi_df.show()
根据文档,spark.sql.files.maxPartitionBytes
应在读取文件时生效,但显然导致了分区倾斜。
问题: 有没有办法配置 Spark 以避免分区倾斜?
附加信息:
我尝试了另外两个 Parquet 文件:一个为 36Mb,另一个为 113Mb,每次都创建了正确数量的分区(分别为 3 和 13),但所有数据都进入了一个分区,其他分区保持为空,因此我认为这与我的 Spark 会话的 local[4]
配置无关。
英文:
I work inside a pyspark docker container.
I'm trying to read a parquet file from the local filesystem into spark splitting it into several partitions at the same time:
- I download the NY Taxi dataset, which is 46Mb in size
curl -Lo taxi.parquet 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet'
- I start a spark session with the following configuration (max partition size is 10Mb):
spark = (
SparkSession
.builder
.appName("Bucketing")
.master("local[4]")
.config("spark.sql.files.maxPartitionBytes", 10 * 1000 * 1024)
.getOrCreate()
)
- I read the parquet file into a DataFrame
taxi_df = spark.read.parquet("taxi.parquet")
- I trigger an action:
taxi_df.show()
- When I look into Spark UI, I see that there are 4 partitions, but all the data went to a single partition, the other three partitions are empty (skewed dataset):
According to the documentation the spark.sql.files.maxPartitionBytes
should take effect when reading files, but it apparently leads to skewed partitions.
Question: Is there a way to configure spark to avoid skewed partitions?
Additional:
I tried two more parquet files: one is 36Mb, the other one is 113Mb, and each time the correct number of partitions were created (3 and 13 accordingly), but all the data went to a single partitions while the others left empty, so I assume it's not related to the local[4]
config of my spark session
答案1
得分: 1
在这种情况下,实际上并没有遇到数据倾斜的问题。纽约出租车数据集在Spark之前并未进行分区,所以您实际上只是读取了一个分区。
要演示这一点,您可以使用以下命令启动spark-shell
:
spark-shell --master "local[4]" --conf "spark.files.maxPartitionBytes=10485760"
然后,您可以尝试以下操作:
val df = spark.read.parquet("yellow_tripdata_2023-01.parquet")
import org.apache.spark.sql.functions.spark_partition_id
df.groupBy(spark_partition_id).count.show
看起来只有一个分区!
现在,如果您想要在磁盘上对这个数据集进行分区,可以使用.repartition
。
在spark-shell
中执行以下命令:
df.repartition(4).write.parquet("test.parquet")
将会写入一个分区文件。您可以在命令行中验证:
$ tree yellow_tripdata_2023-01.parquet
yellow_tripdata_2023-01.parquet
$ tree test.parquet/
test.parquet/
├── part-00000-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00001-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00002-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00003-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
└── _SUCCESS
您可以看到test.parquet
也在磁盘上进行了分区。最后,您将看到这个.write.parquet
操作现在会正确平衡任务:
那么,spark.sql.files.maxPartitionBytes
是做什么的呢?
spark.sql.files.maxPartitionBytes
配置存在是为了防止在集群中的分区数多于核心数时处理过多的分区。
您可以想象,每个分区都带有一些开销。通过将其指定为某个值(默认为128MB),您告诉Spark:“如果分区太小,您可以对我的输入数据进行分组,但不要超过这个限制”。这仅在集群中的分区数多于核心数时才相关。
我们可以通过上面编写的代码进行测试。我们知道我们用大约20MB的4个分区写入了我们的test.parquet
文件。让我们使用spark.files.maxPartitionBytes=52428800
(50MB)读取此文件。这应该至少将2个输入分区组合成一个。
我们将使用2个集群大小进行此测试。首先使用4个核心:
spark-shell --master "local[4]" --conf "spark.files.maxPartitionBytes=52428800"
val df = spark.read.parquet("test.parquet")
df.groupBy(spark_partition_id).count.show
在这里,您看到输入分区没有被组合在一起。这是因为我们的集群有4个核心,因此完全并行化分区在工作器上,即使它们的大小小于maxPartitionBytes
也更有效。
接下来使用2个核心:
spark-shell --master "local[2]" --conf "spark.files.maxPartitionBytes=52428800"
val df = spark.read.parquet("test.parquet")
df.groupBy(spark_partition_id).count.show
在这里,输入分区确实被组合在一起!这是因为我们集群的CPU总数小于输入数据中的分区数,因此将它们组合在一起以减少开销是值得的。
如果您对此输入拆分的处理方式更详细的解释感兴趣,可以查看这个SO答案。
英文:
In this case, you are not really suffering from data skew. The NY Taxi Dataset is a file that has not been partitioned by Spark before, so you're actually reading in 1 partition only.
To demonstrate this, you can start up a spark-shell
with the following command:
spark-shell --master "local[4]" --conf "spark.files.maxPartitionBytes=10485760"
Then, you can try the following:
scala> val df = spark.read.parquet("yellow_tripdata_2023-01.parquet")
df: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp ... 17 more fields]
scala> import org.apache.spark.sql.functions.spark_partition_id
import org.apache.spark.sql.functions.spark_partition_id
scala> df.groupBy(spark_partition_id).count
res1: org.apache.spark.sql.DataFrame = [SPARK_PARTITION_ID(): int, count: bigint]
scala> df.groupBy(spark_partition_id).count.show
+--------------------+-------+
|SPARK_PARTITION_ID()| count|
+--------------------+-------+
| 1|3066766|
+--------------------+-------+
So it seems like there is only 1 partition!
Now, if you want to have this dataset partitioned on disk, you can .repartition
it.
Doing the following command in a spark-shell:
scala> df.repartition(4).write.parquet("test.parquet")
Will write away a partitioned file. You can verify this in the command line:
$ tree yellow_tripdata_2023-01.parquet
yellow_tripdata_2023-01.parquet
$ tree test.parquet/
test.parquet/
├── part-00000-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00001-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00002-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00003-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
└── _SUCCESS
You see that test.parquet
has been partitioned on disk as well. Finally, you'll see that this .write.parquet
action will have properly balanced tasks too now:
What does spark.sql.files.maxPartitionBytes do then?
The spark.sql.files.maxPartitionBytes
configuration exists to prevent processing too many partitions in case there are more partitions than cores in your cluster.
You can imagine that with each partition, there comes a bit of overhead. By specifying it to be equal to a certain value (128MB by default) you're telling Spark: "You can group my input data if the partitions are too small, but not larger than this limit". Again, this is only relevant if there are more partitions than cores in your cluster.
We can test this out with the code that we wrote above. We know we wrote our test.parquet
file with 4 partitions of around 20MB each. Let's read this file in with a spark.files.maxPartitionBytes=52428800
(50MB). This should at least group 2 input partitions into a single one.
We're going to do this test with 2 cluster sizes. Once with 4 cores:
spark-shell --master "local[4]" --conf "spark.files.maxPartitionBytes=52428800"
scala> val df = spark.read.parquet("test.parquet")
df: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp ... 17 more fields]
scala> df.groupBy(spark_partition_id).count.show
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
| 1|766691|
| 3|766692|
| 2|766691|
| 0|766692|
+--------------------+------+
Here you see that the input partitions were not grouped together. This is because our cluster has 4 cores, so it's still more efficient to fully parallelize the partitions over the workers, even though their size is smaller than maxPartitionBytes
.
And once with 2 cores:
spark-shell --master "local[2]" --conf "spark.files.maxPartitionBytes=52428800"
scala> val df = spark.read.parquet("test.parquet")
df: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp ... 17 more fields]
scala> df.groupBy(spark_partition_id).count.show
+--------------------+-------+
|SPARK_PARTITION_ID()| count|
+--------------------+-------+
| 1|1533383|
| 0|1533383|
+--------------------+-------+
Here the input partitions were grouped together indeed! This is because the total number of CPU in our cluster is smaller than the amount of partitions in the input data, so it's worth it to group them together in order to diminish overhead.
I've explained how this input splitting is handled in a bit more detail in this SO answer if you're interested in more info on this.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论