英文:
Why does repartition give different number of partitions (than the default 200)?
问题
I tried to repartition a dataframe by column but it always returns a single partition.
Spark config:
I have tried both local[*]
i.e. with 16 cores
after starting the spark instance, I check the partitions config by calling spark.conf.get('spark.sql.shuffle.partitions')
and it gives 200 as expected
Sample Dataframe
I randomly create a dataframe by
simpleData = [("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000),
("Raman","Finance","CA",99000,40,24000),
("Scott","Finance","NY",83000,36,19000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000)
]
schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
Partitions
I run the command df.rdd.getNumPartitions()
and it returns 16 as expected.
Repartition
then I run df.repartition("department").rdd.getNumPartitions()
it returns 1 only.
no matter which combination of columns I put into the repartition function, it always gives me a dataframe with one partition only.
Investigation
If the number of partition is not placed in the repartition call, it use spark.sql.shuffle.partitions
by default (i.e. 200). So I run the command df.repartition(200, "department").rdd.getNumPartitions()
and the repartitioning happens as expected.
Problem
Why is the repartitioning by column without providing the number of partitions not working?
英文:
I tried to repartition a dataframe by column but it always returns a single partition.
Spark config:
I have tried both local[*]
i.e. with 16 cores
after starting the spark instance, I check the partitions config by calling spark.conf.get('spark.sql.shuffle.partitions')
and it gives 200 as expected
Sample Dataframe
I randomly create a dataframe by
simpleData = [("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000),
("Raman","Finance","CA",99000,40,24000),
("Scott","Finance","NY",83000,36,19000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000)
]
schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
Partitions
I run the command df.rdd.getNumPartitions()
and it returns 16 as expected.
Repartition
then I run df.repartition("department").rdd.getNumPartitions()
it returns 1 only.
no matter which combination of columns I put into the repartition function, it always gives me a dataframe with one partition only.
Investigation
If the number of partition is not placed in the repartition call, it use spark.sql.shuffle.partitions
by default (i.e. 200). So I run the command df.repartition(200, "department").rdd.getNumPartitions()
and the repartitioning happens as expected.
Problem
Why is the repartitioning by column without providing the number of partitions not working?
答案1
得分: 3
I think that repartition is working fine but AQE is coalescing partitions after shuffle as you have really small number of records
Sample code:
spark.conf.set("spark.sql.adaptive.enabled", True)
simpleData = [("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000),
("Raman","Finance","CA",99000,40,24000),
("Scott","Finance","NY",83000,36,19000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000),
("Kumar","Marketing","NY",91000,50,21000),
("Kumar","Marketing","NY",91000,50,21000)
]
schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
reaprtitioned = df.repartition("department")
reaprtitioned.show()
reaprtitioned.rdd.getNumPartitions()
This returns
Out[10]: 1
But with a small change:
spark.conf.set("spark.sql.adaptive.enabled", False)
The number is equal to sql.shuffle.partitions
Out[11]: 200
Here is the plan with AQE enabled:
== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Final Plan ==
* Project (5)
+- AQEShuffleRead (4)
+- ShuffleQueryStage (3), Statistics(sizeInBytes=912.0 B, rowCount=11, isRuntime=true)
+- Exchange (2)
+- * Scan ExistingRDD (1)
The initial plan is used when AQE is disabled.
英文:
I think that repartition is working fine but AQE is coalescing partitions after shuffle as you have really small number of records
Sample code:
spark.conf.set("spark.sql.adaptive.enabled", True)
simpleData = [("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000),
("Raman","Finance","CA",99000,40,24000),
("Scott","Finance","NY",83000,36,19000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000),
("Kumar","Marketing","NY",91000,50,21000),
("Kumar","Marketing","NY",91000,50,21000)
]
schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
reaprtitioned = df.repartition("department")
reaprtitioned.show()
reaprtitioned.rdd.getNumPartitions()
This returns
Out[10]: 1
But with small change:
spark.conf.set("spark.sql.adaptive.enabled", False)
Number is equal to sql.shuffle.partitions
Out[11]: 200
Here is plan with AQE enabled:
== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Final Plan ==
* Project (5)
+- AQEShuffleRead (4)
+- ShuffleQueryStage (3), Statistics(sizeInBytes=912.0 B, rowCount=11, isRuntime=true)
+- Exchange (2)
+- * Scan ExistingRDD (1)
+- == Initial Plan ==
CollectLimit (8)
+- Project (7)
+- Exchange (6)
+- Scan ExistingRDD (1)
The initial plan in used when AQE is disabled
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论