Aggregation on set of columns in Dataframe using Spark and Scala (get max non-null element of each column using selectExpr)

huangapple go评论69阅读模式
英文:

Aggregation on set of columns in Dataframe using Spark and Scala (get max non-null element of each column using selectExpr)

问题

I have a Dataframe as follows:

val df = Seq(
    ("GUID1", Some(1), Some(22), Some(30), Some(56)),
    ("GUID1", Some(4), None, Some(35), Some(52)),
    ("GUID1", None, Some(24), None, Some(58)),
    ("GUID2", Some(5), Some(21), Some(31), None)
).toDF("GUID", "A", "B", "C", "D")
df.show
+-----+----+----+----+----+
| GUID|   A|   B|   C|   D|
+-----+----+----+----+----+
|GUID1|   1|  22|  30|  56|
|GUID1|   4|null|  35|  52|
|GUID1|null|  24|null|  58|
|GUID2|   5|  21|  31|null|
+-----+----+----+----+----+

The goal is to aggregate such that min / max or some custom values for a set of columns needs to be derived.

For example, I want to get max non-null of columns A and B and min of C and D using the below Arrays.

val max_cols = Array("A", "B")
val min_cols = Array("C", "D")

val df1 = df.groupBy("GUID")
    .agg(collect_list(struct(max_cols.head, max_cols: _*))).as("values")
    .selectExpr("GUID", "array_max(filter(values, x-> x.c.isNotNull))[c] for (c <- values)")

This line is not working.

Expected output is:

+-----+---+---+---+----+
| GUID|  A|  B|  C|   D|
+-----+---+---+---+----+
|GUID1|  4| 24| 30|  52|
|GUID2|  5| 21| 31|null|
+-----+---+---+---+----+

I got a similar link in PySpark (https://stackoverflow.com/questions/70447738/pyspark-get-latest-non-null-element-of-every-column-in-one-row), but not able to get it working in Scala.

Any idea how to solve it?

英文:

I have a Dataframe as follows:

val df = Seq(
		(&quot;GUID1&quot;,	Some(1),  	Some(22),		Some(30),	Some(56)),
		(&quot;GUID1&quot;,	Some(4), 		None,			Some(35), 	Some(52)),
		(&quot;GUID1&quot;,	None, 		Some(24),		None, 		Some(58)),
		(&quot;GUID2&quot;,	Some(5), 		Some(21),		Some(31), 	None)
).toDF(&quot;GUID&quot;, 	&quot;A&quot;, 	&quot;B&quot;, 	&quot;C&quot;, 	&quot;D&quot; )
df.show
+-----+----+----+----+----+
| GUID|   A|   B|   C|   D|
+-----+----+----+----+----+
|GUID1|   1|  22|  30|  56|
|GUID1|   4|null|  35|  52|
|GUID1|null|  24|null|  58|
|GUID2|   5|  21|  31|null|
+-----+----+----+----+----+

This is a simplified Dataframe (in reality, there are more than 30 columns)

The goal is to aggregate such that min / max or some custom values for a set of columns needs to be derived.
For example, I want to get max non-null of columns A and B and min of C and D using the below Arrays.

val max_cols = Array(&quot;A&quot;, 	&quot;B&quot;)
val min_cols = Array(&quot;C&quot;, 	&quot;D&quot;)

val df1 = df.groupBy(&quot;GUID&quot;).agg(collect_list(struct(max_cols.head, max_cols: _*))).as(&quot;values&quot;)
            .selectExpr(&quot;GUID&quot;, &quot;array_max(filter(values, x-&gt; x.c.isNotNull))[c] for (c &lt;- values)&quot;)

This line is not working

Expected output is:

+-----+---+---+---+----+
| GUID|  A|  B|  C|   D|
+-----+---+---+---+----+
|GUID1|  4| 24| 30|  52|
|GUID2|  5| 21| 31|null|
+-----+---+---+---+----+

I got a similar link in PySpark (https://stackoverflow.com/questions/70447738/pyspark-get-latest-non-null-element-of-every-column-in-one-row), but not able to get it working in Scala.

Any idea how to solve it?

答案1

得分: 0

你可以创建最小/最大表达式,并在 agg 中使用它。默认情况下,最小/最大会忽略空值,除非所有值都为null。

var exprs = min_cols.map(x => min(x).as(x)) ++ max_cols.map(x => max(x).as(x))

df.groupBy("GUID").agg(exprs.head, exprs.tail: _*).show
英文:

You can create min/max expressions and use it in agg. min/max will ignore nulls unless all values are null by default.

var exprs = min_cols.map(x =&gt; min(x).as(x)) ++ max_cols.map(x =&gt; max(x).as(x))

df.groupBy(&quot;GUID&quot;).agg(exprs.head, exprs.tail: _*).show

答案2

得分: 0

尝试这个:

import org.apache.spark.sql.functions._

df.groupBy("GUID").agg(
    max("A").alias("A"),
    max("B").alias("B"),
    min("C").alias("C"),
    min("D").alias("D")
).show()

输出:

+-----+---+---+---+----+
| GUID|  A|  B|  C|   D|
+-----+---+---+---+----+
|GUID1|  4| 24| 30|  52|
|GUID2|  5| 21| 31|null|
+-----+---+---+---+----+
英文:

Try this:

import org.apache.spark.sql.functions._

df.groupBy(&quot;GUID&quot;).agg(
    max(&quot;A&quot;).alias(&quot;A&quot;),
    max(&quot;B&quot;).alias(&quot;B&quot;),
    min(&quot;C&quot;).alias(&quot;C&quot;),
    min(&quot;D&quot;).alias(&quot;D&quot;)
).show()

Output:

+-----+---+---+---+----+
| GUID|  A|  B|  C|   D|
+-----+---+---+---+----+
|GUID1|  4| 24| 30|  52|
|GUID2|  5| 21| 31|null|
+-----+---+---+---+----+

huangapple
  • 本文由 发表于 2023年5月17日 23:21:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/76273702.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定