Spark分区数始终为1吗?

huangapple go评论67阅读模式
英文:

Spark partition count is always 1?

问题

我正在本地模式下运行Spark,但在执行简单的groupBy操作时,我期望会有多个分区存在,但分区计数始终为1:

val spark = SparkSession.builder()
  .appName("Test App")
  .master("local[5]")
  .getOrCreate()
spark.sparkContext.setLogLevel("OFF")

import spark.implicits._

val simpleData = Seq(("James", "Sales", "NY", 90000, 34, 10000),
  ("Michael", "Sales", "NY", 86000, 56, 20000),
  ("Robert", "Sales", "CA", 81000, 30, 23000),
  ("Maria", "Finance", "CA", 90000, 24, 23000),
  ("Raman", "Finance", "CA", 99000, 40, 24000),
  ("Scott", "Finance", "NY", 83000, 36, 19000),
  ("Jen", "Finance", "NY", 79000, 53, 15000),
  ("Jeff", "Marketing", "CA", 80000, 25, 18000),
  ("Kumar", "Marketing", "NY", 91000, 50, 21000)
)
val df = simpleData.toDF("employee_name", "department", "state", "salary", "age", "bonus")

val df2 = df.groupBy("state").count()

println(df2.rdd.getNumPartitions) // 总是打印1

上述代码会打印1。由于默认的洗牌分区数是200,所以我期望会有多于1个分区。我甚至手动设置了洗牌分区数,但仍然得到1:

spark.conf.set("spark.sql.shuffle.partitions", 500)

是某些设置不正确,还是我对洗牌分区的理解有误?

英文:

I'm running Spark in local mode and I am expecting multiple partitions to be present when doing simple groupBy but the partition count is always coming out as 1:

val spark = SparkSession.builder()
  .appName("Test App")
  .master("local[5]")
  .getOrCreate()
spark.sparkContext.setLogLevel("OFF")

import spark.implicits._

val simpleData = Seq(("James", "Sales", "NY", 90000, 34, 10000),
  ("Michael", "Sales", "NY", 86000, 56, 20000),
  ("Robert", "Sales", "CA", 81000, 30, 23000),
  ("Maria", "Finance", "CA", 90000, 24, 23000),
  ("Raman", "Finance", "CA", 99000, 40, 24000),
  ("Scott", "Finance", "NY", 83000, 36, 19000),
  ("Jen", "Finance", "NY", 79000, 53, 15000),
  ("Jeff", "Marketing", "CA", 80000, 25, 18000),
  ("Kumar", "Marketing", "NY", 91000, 50, 21000)
)
val df = simpleData.toDF("employee_name", "department", "state", "salary", "age", "bonus")

val df2 = df.groupBy("state").count()

println(df2.rdd.getNumPartitions) // always prints 1

Above prints 1. Since the default shuffle partition count is 200, I'm expecting there to be more than 1 partition. I even set the shuffle partition count manually but still getting 1:

spark.conf.set("spark.sql.shuffle.partitions", 500)

Is some setting not correct, or my understanding is wrong about shuffle partitions?

答案1

得分: 1

这是由于自适应查询执行,它通过查看实际数据量(以及其他因素)在运行时优化了事务。按照下面的方式将其关闭,将会得到预期的行为:

...
import spark.implicits._
spark.conf.set("spark.sql.adaptive.enabled", false) // <-- 这里

val simpleData = Seq(("James", "Sales", "NY", 90000, 34, 10000),
...

然后,您将获得200或您明确设置的任何其他值。例如:

200
import spark.implicits._
simpleData: Seq[(String, String, String, Int, Int, Int)] = List((James, Sales, NY, 90000, 34, 10000), (Michael, Sales, NY, 86000, 56, 20000), (Robert, Sales, CA, 81000, 30, 23000), (Maria, Finance, CA, 90000, 24, 23000), (Raman, Finance, CA, 99000, 40, 24000), (Scott, Finance, NY, 83000, 36, 19000), (Jen, Finance, NY, 79000, 53, 15000), (Jeff, Marketing, CA, 80000, 25, 18000), (Kumar, Marketing, NY, 91000, 50, 21000))
df: org.apache.spark.sql.DataFrame = [employee_name: string, department: string ... 4 more fields]
df2: org.apache.spark.sql.DataFrame = [state: string, count: bigint]

这里有一个链接:https://sparkbyexamples.com/spark/spark-adaptive-query-execution/

英文:

This is due to Adaptative Query Execution which optimizes things at runtime by looking at the actual data volume (among other things).

Turning it off as per below will give you expected behaviour:

...
import spark.implicits._
spark.conf.set(&quot;spark.sql.adaptive.enabled&quot;, false) // &lt;-- this
val simpleData = Seq((&quot;James&quot;, &quot;Sales&quot;, &quot;NY&quot;, 90000, 34, 10000),
...

You will then get 200 or any other value you explicitly set. E.g.

200
import spark.implicits._
simpleData: Seq[(String, String, String, Int, Int, Int)] = List((James,Sales,NY,90000,34,10000), (Michael,Sales,NY,86000,56,20000), (Robert,Sales,CA,81000,30,23000), (Maria,Finance,CA,90000,24,23000), (Raman,Finance,CA,99000,40,24000), (Scott,Finance,NY,83000,36,19000), (Jen,Finance,NY,79000,53,15000), (Jeff,Marketing,CA,80000,25,18000), (Kumar,Marketing,NY,91000,50,21000))
df: org.apache.spark.sql.DataFrame = [employee_name: string, department: string ... 4 more fields]
df2: org.apache.spark.sql.DataFrame = [state: string, count: bigint]

Here a link https://sparkbyexamples.com/spark/spark-adaptive-query-execution/

huangapple
  • 本文由 发表于 2023年5月28日 09:49:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/76349659.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定