如何在Spark中对DataSet进行窗口子采样?

huangapple go评论57阅读模式
英文:

How to subsample windows of a DataSet in Spark?

问题

假设我有一个看起来像这样的 DataSet

姓名    | 成绩
---------------
乔什    | 94
乔什    | 87
阿曼达  | 96
卡伦   | 78
阿曼达  | 90
乔什    | 88

我想创建一个新的 DataSet,使得每个姓名都有3行数据,其中额外的行(如果有的话)是从相同姓名的行中随机抽取的(例如,卡伦将有三行相同的数据)。

如何在不通过循环遍历每个姓名的情况下实现这一点呢?

英文:

Let's say I have a DataSet that look like this:

Name    | Grade
---------------
Josh    | 94
Josh    | 87
Amanda  | 96
Karen   | 78
Amanda  | 90
Josh    | 88

I would like to create a new DataSet where each name has 3 rows, where the additional rows (if any) are sampled from the ones of the same name (so Karen will have three identical rows, for example).

How do I do that without looping through each name?

答案1

得分: 1

Data preparation:

val df = Seq(("Josh", 94), ("Josh", 87), ("Amanda", 96), ("Karen", 78), ("Amanda", 90), ("Josh", 88)).toDF("Name", "Grade")

Perform the following, only if your Data is skewed for a Name:
Add a random number, and filter the top 3 random numbers for each Name:

val df2 = df.withColumn("random", round(rand()*10))

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("Name").orderBy("random")

val df3 = df2.withColumn("row_number", row_number.over(windowSpec))
             .filter($"row_number" <= 3)

Now, aggregate the values for each Name and duplicate 3 times to ensure we have at least 3 records for each Name. Then finally take the 1st 3 values, and explode:

df4.groupBy("Name").agg(collect_list("Grade") as "grade_list")
   .withColumn("temp_list", slice(flatten(array_repeat($"grade_list", 3)), 1, 3))
   .select($"Name", explode($"temp_list") as "Grade").show

Notes:

  • Since the above code will have a maximum of 3 values in grade_list, duplicating it 3 times won't harm.
  • In case you don't use the Window step, you can have a combination of when(size($"grade_list") === n, ).otherwise() to avoid unnecessary duplication.
英文:

Data preparation :

 val df = Seq((&quot;Josh&quot;,94),(&quot;Josh&quot;,87),(&quot;Amanda&quot;,96),(&quot;Karen&quot;,78),(&quot;Amanda&quot;,90),(&quot;Josh&quot;,88)).toDF(&quot;Name&quot;,&quot;Grade&quot;)

Perform the following , only if your Data is skewed for a Name :
Add a random number, and filter the top 3 random numbers for each Name.

val df2 = df.withColumn(&quot;random&quot;, round(rand()*10))

import org.apache.spark.sql.expressions.Window
val windowSpec  = Window.partitionBy(&quot;Name&quot;).orderBy(&quot;random&quot;)

val df3 = df2.withColumn(&quot;row_number&quot;,row_number.over(windowSpec))
             .filter($&quot;row_number&quot; &lt;= 3)

Now, aggregate the values for each Name and duplicate 3 times to ensure we have atleast 3 records for each Name. Then finally take 1st 3 values, and explode

df4.groupBy(&quot;Name&quot;).agg(collect_list(&quot;Grade&quot;) as &quot;grade_list&quot;)
.withColumn(&quot;temp_list&quot;, slice( flatten(array_repeat($&quot;grade_list&quot;, 3)), 1,3))
.select($&quot;Name&quot;,explode($&quot;temp_list&quot;) as &quot;Grade&quot;).show

Notes :

  • Since the above code will have max 3 values in grade_list , hence Duplicating it 3 times won't harm.
  • Incase you don't use the Window step, you can have a combination of when( size($&quot;grade_list&quot;) === n, ).otherwise() to above unnecessary duplication.

huangapple
  • 本文由 发表于 2020年10月18日 19:55:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/64412993.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定