英文:
How does Spark Handles Partitions and Shuffles
问题
深入研究Spark(PySpark)是第一次,我已经充满了对底层机制的好奇心。
从文档中,我理解到在创建RDD时,Spark会均匀地将数据分布在可用的分区中。例如...
rdd = spark.sparkContext.parallelize([("A", 1), ("A", 2), ("A", 3), ("B", 1), ("B", 2), ("C", 1), ("C", 2), ("C", 3)])
所以我理解如果有4个分区,那么以下是正确的...
P1: ("A", 1), ("A", 2)
P2: ("A", 3), ("B", 1)
P3: ("B", 2), ("C", 1)
P4: ("C", 2), ("C", 3)
问题1: 我明白每个分区始终会有2个元素,但它们是否始终会获得相同的两个元素?也就是说,如果我运行上面的rdd代码100万次,元素是否每次都属于相同的分区?
我的下一个问题与groupByKey()
有关。假设我们要为每个键找到总和...
sums = rdd.groupByKey().mapValues(sum)
这里是Spark让我困惑的地方。我知道仍然有4个分区,但不确定在分组引起的洗牌后数据是如何分布的。
问题2: Spark如何决定将记录发送到哪里?洗牌后分区是什么样子的?
以下是我在分组和求和转换之后可以考虑的一些不同情况...
情景1(均匀分布)
P1: ("A", 6)
P2: ("B", 3)
P3: ("C", 6)
P4: 空
情景2(倾斜)
P1: ("A", 6), ("B", 3)
P2: ("C", 6)
P3: 空
P4: 空
情景3(非常倾斜且无序)
P1: 空
P2: 空
P3: ("A", 6), ("B", 3), ("C", 6)
P4: 空
我希望Spark在洗牌后始终遵循一种分发数据的过程。如果有人有答案,我很愿意听到。
谢谢!
英文:
Diving deep into Spark (PySpark) for the first time, and I'm already flooded with under-the-hood curiosity.
From the docs, I understand that when creating an RDD, Spark splits the data evenly across the available partitions. For example...
rdd = spark.sparkContext.parallelize([("A", 1), ("A", 2), ("A", 3), ("B", 1), ("B", 2), ("C", 1), ("C", 2), ("C", 3)])
So my understanding is that if there is 4 partitions then the following is true...
P1: ("A", 1), ("A", 2)
P2: ("A", 3), ("B", 1)
P3: ("B", 2), ("C", 1)
P4: ("C", 2), ("C", 3)
QUESTION 1: I get that each partition will always get 2 elements, but will they always get the SAME two elements? i.e If I run the rdd code above 1 million times, will the elements belong to the same partition every time?
My next question has to do with groupByKey()
. Say we are finding the sum for each key..
sums = rdd.groupByKey().mapValues(sum)
Here is where Spark loses me. I know there are still 4 partitions but not sure how the data is distributed after the shuffle caused by the grouping.
QUESTION 2: How does Spark decide where to send records? And what do the partitions look like after the shuffle?
Here are a few different possible scenarios I can think of after the grouping and sum transformations...
SCENARIO 1 (EVENLY)
P1: ("A", 6)
P2: ("B", 3)
P3: ("C", 6)
P4: EMPTY
SCENARIO 2 (SKEWED)
P1: ("A", 6), ("B", 3)
P2: ("C", 6)
P3: EMPTY
P4: EMPTY
SCENARIO 3 (REALLY SKEWED AND OUT OF ORDER)
P1: EMPTY
P2: EMPTY
P3: ("A", 6), ("B", 3), ("C", 6)
P4: EMPTY
I'm hoping Spark has a process it always follows for distributing data after shuffles. If anyone has any answers I'd love to hear them.
Thanks!
答案1
得分: 1
The number of partitions can be specified by a parameter of parallelize()
. Otherwise, it is defaultParallelism. For example, assume that you are using local mode, then local[4]
will result in 4 partitions.
I get that each partition will always get 2 elements, but will they always get the SAME two elements?
When the number of partitions is fixed, how elements are assigned within partitions are determined by different partitioning strategies according to the types of elements. By default, it is partitioned roughly evenly which is implemented simply based on sequences' indices.
Therefore, given 8 elements with 4 partitions, each partition has 2 elements. No matter how many times you run the code, how elements are partitioned is determined.
I know there are still 4 partitions but not sure how the data is distributed after the shuffle caused by the grouping.
The signature of groupByKey()
is:
It hash-partitions the resulting RDD with numPartitions
(4 in this case) partitions. Given an element (k, v), it will be shuffled to partition hash(k) % 4.
I think the answer above also fits Question 2. Note that developers can also specify user-defined Partitioner based on Range/Hash methods.
英文:
The number of partitions can be specified by a parameter of parallelize()
. Otherwise, it is defaultParallelism. For example, assume that you are using local mode, then local[4]
will result in 4 partitions.
Question 1
> I get that each partition will always get 2 elements, but will they always get the SAME two elements?
When the number of partitions is fixed, how elements are assigned within partitions are determined by different partitioning strategies according to the types of elements. By default, it is partitioned roughly evenly which is implemented simply based on sequences' indices. Although the code is written in Scala, I think you should have no trouble of understanding it:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
Therefore, given 8 elements with 4 partitions, each partition has 2 elements. No matter how many times you run the code, how elements are partitioned is determined.
> I know there are still 4 partitions but not sure how the data is distributed after the shuffle caused by the grouping.
The signature of groupByKey()
is:
def groupByKey(self: RDD[tuple[K, V]],
numPartitions: int | None = None,
partitionFunc: (K) -> int = portable_hash) -> RDD[tuple[K, Iterable[V]]]
It hash-partitions the resulting RDD with numPartitions
(4 in this case) partitions. Given an element (k, v), it will be shuffled to partition hash(k) % 4.
Question 2
I think the answer above also fits Question 2. Note that developers can also specify user-defined Partitioner based on Range/Hash methods.
答案2
得分: 1
以下是您要翻译的内容:
给定一个具有4个分区的RDD,如下所示:
val rdd = sc.parallelize(Seq(("A", 1), ("A", 2), ("A", 3), ("B", 1), ("B", 2), ("C", 1), ("C", 2), ("C", 3)), 4)
无论您运行多少次相同的代码行,您都可能始终获得相同的结果,因为您正在使用parallelize
方法处理一小部分数据。当读取数据时,数据的分区方式取决于许多因素,例如底层文件系统、正在读取的文件类型、执行器数量、驱动程序核心数量等。
所以来回答您的第一个问题,分区结构不一定会每次读取数据时都保持相同。
对于第二个问题,每当您使用基于键的操作时,都会引入一个洗牌操作,将具有相同键的分区移动到同一分区。
例如,以下代码:
val sumRdd = rdd.groupByKey.mapValues(_.sum)
仍然会返回一个具有4个分区的RDD。数据将根据默认的分区方案 - 哈希分区来移动,该方案根据以下逻辑决定分区键:
object.hashCode % numPartitions
因此,具有相同hashCode
的所有对象将移动到相同的分区,您将有一个底层结构,如下所示:
sumRdd.mapPartitionsWithIndex{ (idx, itr) => itr.toList.map( c => c+" -> partition#"+idx ).iterator }.collect
// Array[String] = Array((A,6) -> partition#1, (B,3) -> partition#2, (C,6) -> partition#3)
sumRdd.getNumPartitions
仍然会返回4
,但由于一个分区为空,不会为该分区调用任何任务。
英文:
Given an RDD with 4 partitions like this:
<!-- language-all: scala -->
val rdd = sc.parallelize(Seq(("A", 1), ("A", 2), ("A", 3), ("B", 1), ("B", 2), ("C", 1), ("C", 2), ("C", 3)), 4)
You might always get the same result irrespective of how many instances you run of the same line because you're using the parallelize
method with a small sample of data. How data gets partitioned when it is read depends on a lot of factors like the underlying file systems, the type of files being read, number of executors, number of driver cores etc.
So to answer your question 1, no the partition structure will not necessarily remain the same everytime you read the data.
For question 2, whenever you used key
based operations, a shuffle is introduced which moves partitions with the same key in the same partition.
For example the following code:
val sumRdd = rdd.groupByKey.mapValues(_.sum)
Would still return an RDD with 4 partitions. The data will be moved based on the default partitioning scheme - Hash Partitioning which decides the partition key based on the logic:
object.hashCode % numPartitions
So all objects with the same hashCode
will move to the same partition and you will have an underlying structure like:
sumRdd.mapPartitionsWithIndex{ (idx, itr) => itr.toList.map( c => c+" -> partition#"+idx ).iterator }.collect
// Array[String] = Array((A,6) -> partition#1, (B,3) -> partition#2, (C,6) -> partition#3)
sumRdd.getNumPartitions
will still return 4
but since one partition is empty, no tasks will be invoked for that partition.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论