为什么 repartition 会产生不同于默认的 200 个分区的分区数量?

huangapple go评论65阅读模式
英文:

Why does repartition give different number of partitions (than the default 200)?

问题

I tried to repartition a dataframe by column but it always returns a single partition.

Spark config:

I have tried both local[*] i.e. with 16 cores

after starting the spark instance, I check the partitions config by calling spark.conf.get('spark.sql.shuffle.partitions') and it gives 200 as expected

Sample Dataframe
I randomly create a dataframe by

simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)

Partitions

I run the command df.rdd.getNumPartitions() and it returns 16 as expected.

Repartition

then I run df.repartition("department").rdd.getNumPartitions() it returns 1 only.

no matter which combination of columns I put into the repartition function, it always gives me a dataframe with one partition only.

Investigation

If the number of partition is not placed in the repartition call, it use spark.sql.shuffle.partitions by default (i.e. 200). So I run the command df.repartition(200, "department").rdd.getNumPartitions() and the repartitioning happens as expected.

Problem

Why is the repartitioning by column without providing the number of partitions not working?

英文:

I tried to repartition a dataframe by column but it always returns a single partition.

Spark config:

I have tried both local[*] i.e. with 16 cores

after starting the spark instance, I check the partitions config by calling spark.conf.get('spark.sql.shuffle.partitions') and it gives 200 as expected

Sample Dataframe
I randomly create a dataframe by

simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)

Partitions

I run the command df.rdd.getNumPartitions() and it returns 16 as expected.

Repartition

then I run df.repartition("department").rdd.getNumPartitions() it returns 1 only.

no matter which combination of columns I put into the repartition function, it always gives me a dataframe with one partition only.

Investigation

If the number of partition is not placed in the repartition call, it use spark.sql.shuffle.partitions by default (i.e. 200). So I run the command df.repartition(200, "department").rdd.getNumPartitions() and the repartitioning happens as expected.

Problem

Why is the repartitioning by column without providing the number of partitions not working?

答案1

得分: 3

I think that repartition is working fine but AQE is coalescing partitions after shuffle as you have really small number of records

Sample code:

spark.conf.set("spark.sql.adaptive.enabled", True)

simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000),
    ("Kumar","Marketing","NY",91000,50,21000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
reaprtitioned = df.repartition("department")
reaprtitioned.show()
reaprtitioned.rdd.getNumPartitions()

This returns

Out[10]: 1

But with a small change:

spark.conf.set("spark.sql.adaptive.enabled", False)

The number is equal to sql.shuffle.partitions

Out[11]: 200

Here is the plan with AQE enabled:

== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Final Plan ==
   * Project (5)
   +- AQEShuffleRead (4)
      +- ShuffleQueryStage (3), Statistics(sizeInBytes=912.0 B, rowCount=11, isRuntime=true)
         +- Exchange (2)
            +- * Scan ExistingRDD (1)

The initial plan is used when AQE is disabled.

英文:

I think that repartition is working fine but AQE is coalescing partitions after shuffle as you have really small number of records

Sample code:

spark.conf.set("spark.sql.adaptive.enabled", True)

simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000),
    ("Kumar","Marketing","NY",91000,50,21000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
reaprtitioned = df.repartition("department")
reaprtitioned.show()
reaprtitioned.rdd.getNumPartitions()

This returns

Out[10]: 1

But with small change:

spark.conf.set("spark.sql.adaptive.enabled", False)

Number is equal to sql.shuffle.partitions

Out[11]: 200

Here is plan with AQE enabled:

== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Final Plan ==
   * Project (5)
   +- AQEShuffleRead (4)
      +- ShuffleQueryStage (3), Statistics(sizeInBytes=912.0 B, rowCount=11, isRuntime=true)
         +- Exchange (2)
            +- * Scan ExistingRDD (1)
+- == Initial Plan ==
   CollectLimit (8)
   +- Project (7)
      +- Exchange (6)
         +- Scan ExistingRDD (1)

The initial plan in used when AQE is disabled

huangapple
  • 本文由 发表于 2023年5月14日 01:54:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/76244180.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定