英文:
How to subsample windows of a DataSet in Spark?
问题
假设我有一个看起来像这样的 DataSet
:
姓名 | 成绩
---------------
乔什 | 94
乔什 | 87
阿曼达 | 96
卡伦 | 78
阿曼达 | 90
乔什 | 88
我想创建一个新的 DataSet
,使得每个姓名都有3行数据,其中额外的行(如果有的话)是从相同姓名的行中随机抽取的(例如,卡伦将有三行相同的数据)。
如何在不通过循环遍历每个姓名的情况下实现这一点呢?
英文:
Let's say I have a DataSet
that look like this:
Name | Grade
---------------
Josh | 94
Josh | 87
Amanda | 96
Karen | 78
Amanda | 90
Josh | 88
I would like to create a new DataSet
where each name has 3 rows, where the additional rows (if any) are sampled from the ones of the same name (so Karen will have three identical rows, for example).
How do I do that without looping through each name?
答案1
得分: 1
Data preparation:
val df = Seq(("Josh", 94), ("Josh", 87), ("Amanda", 96), ("Karen", 78), ("Amanda", 90), ("Josh", 88)).toDF("Name", "Grade")
Perform the following, only if your Data is skewed for a Name:
Add a random number, and filter the top 3 random numbers for each Name:
val df2 = df.withColumn("random", round(rand()*10))
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("Name").orderBy("random")
val df3 = df2.withColumn("row_number", row_number.over(windowSpec))
.filter($"row_number" <= 3)
Now, aggregate the values for each Name and duplicate 3 times to ensure we have at least 3 records for each Name. Then finally take the 1st 3 values, and explode:
df4.groupBy("Name").agg(collect_list("Grade") as "grade_list")
.withColumn("temp_list", slice(flatten(array_repeat($"grade_list", 3)), 1, 3))
.select($"Name", explode($"temp_list") as "Grade").show
Notes:
- Since the above code will have a maximum of 3 values in
grade_list
, duplicating it 3 times won't harm. - In case you don't use the
Window
step, you can have a combination ofwhen(size($"grade_list") === n, ).otherwise()
to avoid unnecessary duplication.
英文:
Data preparation :
val df = Seq(("Josh",94),("Josh",87),("Amanda",96),("Karen",78),("Amanda",90),("Josh",88)).toDF("Name","Grade")
Perform the following , only if your Data is skewed
for a Name
:
Add a random number, and filter the top 3 random numbers for each Name
.
val df2 = df.withColumn("random", round(rand()*10))
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("Name").orderBy("random")
val df3 = df2.withColumn("row_number",row_number.over(windowSpec))
.filter($"row_number" <= 3)
Now, aggregate the values for each Name
and duplicate 3 times to ensure we have atleast 3 records for each Name
. Then finally take 1st 3 values, and explode
df4.groupBy("Name").agg(collect_list("Grade") as "grade_list")
.withColumn("temp_list", slice( flatten(array_repeat($"grade_list", 3)), 1,3))
.select($"Name",explode($"temp_list") as "Grade").show
Notes :
- Since the above code will have max 3 values in
grade_list
, hence Duplicating it 3 times won't harm. - Incase you don't use the
Window
step, you can have a combination ofwhen( size($"grade_list") === n, ).otherwise()
to above unnecessary duplication.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论