英文:
Aggregation on set of columns in Dataframe using Spark and Scala (get max non-null element of each column using selectExpr)
问题
I have a Dataframe as follows:
val df = Seq(
("GUID1", Some(1), Some(22), Some(30), Some(56)),
("GUID1", Some(4), None, Some(35), Some(52)),
("GUID1", None, Some(24), None, Some(58)),
("GUID2", Some(5), Some(21), Some(31), None)
).toDF("GUID", "A", "B", "C", "D")
df.show
+-----+----+----+----+----+
| GUID| A| B| C| D|
+-----+----+----+----+----+
|GUID1| 1| 22| 30| 56|
|GUID1| 4|null| 35| 52|
|GUID1|null| 24|null| 58|
|GUID2| 5| 21| 31|null|
+-----+----+----+----+----+
The goal is to aggregate such that min / max or some custom values for a set of columns needs to be derived.
For example, I want to get max non-null of columns A and B and min of C and D using the below Arrays.
val max_cols = Array("A", "B")
val min_cols = Array("C", "D")
val df1 = df.groupBy("GUID")
.agg(collect_list(struct(max_cols.head, max_cols: _*))).as("values")
.selectExpr("GUID", "array_max(filter(values, x-> x.c.isNotNull))[c] for (c <- values)")
This line is not working.
Expected output is:
+-----+---+---+---+----+
| GUID| A| B| C| D|
+-----+---+---+---+----+
|GUID1| 4| 24| 30| 52|
|GUID2| 5| 21| 31|null|
+-----+---+---+---+----+
I got a similar link in PySpark (https://stackoverflow.com/questions/70447738/pyspark-get-latest-non-null-element-of-every-column-in-one-row), but not able to get it working in Scala.
Any idea how to solve it?
英文:
I have a Dataframe as follows:
val df = Seq(
("GUID1", Some(1), Some(22), Some(30), Some(56)),
("GUID1", Some(4), None, Some(35), Some(52)),
("GUID1", None, Some(24), None, Some(58)),
("GUID2", Some(5), Some(21), Some(31), None)
).toDF("GUID", "A", "B", "C", "D" )
df.show
+-----+----+----+----+----+
| GUID| A| B| C| D|
+-----+----+----+----+----+
|GUID1| 1| 22| 30| 56|
|GUID1| 4|null| 35| 52|
|GUID1|null| 24|null| 58|
|GUID2| 5| 21| 31|null|
+-----+----+----+----+----+
This is a simplified Dataframe (in reality, there are more than 30 columns)
The goal is to aggregate such that min / max or some custom values for a set of columns needs to be derived.
For example, I want to get max non-null of columns A and B and min of C and D using the below Arrays.
val max_cols = Array("A", "B")
val min_cols = Array("C", "D")
val df1 = df.groupBy("GUID").agg(collect_list(struct(max_cols.head, max_cols: _*))).as("values")
.selectExpr("GUID", "array_max(filter(values, x-> x.c.isNotNull))[c] for (c <- values)")
This line is not working
Expected output is:
+-----+---+---+---+----+
| GUID| A| B| C| D|
+-----+---+---+---+----+
|GUID1| 4| 24| 30| 52|
|GUID2| 5| 21| 31|null|
+-----+---+---+---+----+
I got a similar link in PySpark (https://stackoverflow.com/questions/70447738/pyspark-get-latest-non-null-element-of-every-column-in-one-row), but not able to get it working in Scala.
Any idea how to solve it?
答案1
得分: 0
你可以创建最小/最大表达式,并在 agg
中使用它。默认情况下,最小/最大会忽略空值,除非所有值都为null。
var exprs = min_cols.map(x => min(x).as(x)) ++ max_cols.map(x => max(x).as(x))
df.groupBy("GUID").agg(exprs.head, exprs.tail: _*).show
英文:
You can create min/max expressions and use it in agg
. min/max will ignore nulls unless all values are null by default.
var exprs = min_cols.map(x => min(x).as(x)) ++ max_cols.map(x => max(x).as(x))
df.groupBy("GUID").agg(exprs.head, exprs.tail: _*).show
答案2
得分: 0
尝试这个:
import org.apache.spark.sql.functions._
df.groupBy("GUID").agg(
max("A").alias("A"),
max("B").alias("B"),
min("C").alias("C"),
min("D").alias("D")
).show()
输出:
+-----+---+---+---+----+
| GUID| A| B| C| D|
+-----+---+---+---+----+
|GUID1| 4| 24| 30| 52|
|GUID2| 5| 21| 31|null|
+-----+---+---+---+----+
英文:
Try this:
import org.apache.spark.sql.functions._
df.groupBy("GUID").agg(
max("A").alias("A"),
max("B").alias("B"),
min("C").alias("C"),
min("D").alias("D")
).show()
Output:
+-----+---+---+---+----+
| GUID| A| B| C| D|
+-----+---+---+---+----+
|GUID1| 4| 24| 30| 52|
|GUID2| 5| 21| 31|null|
+-----+---+---+---+----+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论