Aggregation on set of columns in Dataframe using Spark and Scala (get max non-null element of each column using selectExpr)

huangapple go评论92阅读模式
英文:

Aggregation on set of columns in Dataframe using Spark and Scala (get max non-null element of each column using selectExpr)

问题

I have a Dataframe as follows:

  1. val df = Seq(
  2. ("GUID1", Some(1), Some(22), Some(30), Some(56)),
  3. ("GUID1", Some(4), None, Some(35), Some(52)),
  4. ("GUID1", None, Some(24), None, Some(58)),
  5. ("GUID2", Some(5), Some(21), Some(31), None)
  6. ).toDF("GUID", "A", "B", "C", "D")
  7. df.show
  8. +-----+----+----+----+----+
  9. | GUID| A| B| C| D|
  10. +-----+----+----+----+----+
  11. |GUID1| 1| 22| 30| 56|
  12. |GUID1| 4|null| 35| 52|
  13. |GUID1|null| 24|null| 58|
  14. |GUID2| 5| 21| 31|null|
  15. +-----+----+----+----+----+

The goal is to aggregate such that min / max or some custom values for a set of columns needs to be derived.

For example, I want to get max non-null of columns A and B and min of C and D using the below Arrays.

  1. val max_cols = Array("A", "B")
  2. val min_cols = Array("C", "D")
  3. val df1 = df.groupBy("GUID")
  4. .agg(collect_list(struct(max_cols.head, max_cols: _*))).as("values")
  5. .selectExpr("GUID", "array_max(filter(values, x-> x.c.isNotNull))[c] for (c <- values)")

This line is not working.

Expected output is:

  1. +-----+---+---+---+----+
  2. | GUID| A| B| C| D|
  3. +-----+---+---+---+----+
  4. |GUID1| 4| 24| 30| 52|
  5. |GUID2| 5| 21| 31|null|
  6. +-----+---+---+---+----+

I got a similar link in PySpark (https://stackoverflow.com/questions/70447738/pyspark-get-latest-non-null-element-of-every-column-in-one-row), but not able to get it working in Scala.

Any idea how to solve it?

英文:

I have a Dataframe as follows:

  1. val df = Seq(
  2. (&quot;GUID1&quot;, Some(1), Some(22), Some(30), Some(56)),
  3. (&quot;GUID1&quot;, Some(4), None, Some(35), Some(52)),
  4. (&quot;GUID1&quot;, None, Some(24), None, Some(58)),
  5. (&quot;GUID2&quot;, Some(5), Some(21), Some(31), None)
  6. ).toDF(&quot;GUID&quot;, &quot;A&quot;, &quot;B&quot;, &quot;C&quot;, &quot;D&quot; )
  7. df.show
  8. +-----+----+----+----+----+
  9. | GUID| A| B| C| D|
  10. +-----+----+----+----+----+
  11. |GUID1| 1| 22| 30| 56|
  12. |GUID1| 4|null| 35| 52|
  13. |GUID1|null| 24|null| 58|
  14. |GUID2| 5| 21| 31|null|
  15. +-----+----+----+----+----+

This is a simplified Dataframe (in reality, there are more than 30 columns)

The goal is to aggregate such that min / max or some custom values for a set of columns needs to be derived.
For example, I want to get max non-null of columns A and B and min of C and D using the below Arrays.

  1. val max_cols = Array(&quot;A&quot;, &quot;B&quot;)
  2. val min_cols = Array(&quot;C&quot;, &quot;D&quot;)
  3. val df1 = df.groupBy(&quot;GUID&quot;).agg(collect_list(struct(max_cols.head, max_cols: _*))).as(&quot;values&quot;)
  4. .selectExpr(&quot;GUID&quot;, &quot;array_max(filter(values, x-&gt; x.c.isNotNull))[c] for (c &lt;- values)&quot;)

This line is not working

Expected output is:

  1. +-----+---+---+---+----+
  2. | GUID| A| B| C| D|
  3. +-----+---+---+---+----+
  4. |GUID1| 4| 24| 30| 52|
  5. |GUID2| 5| 21| 31|null|
  6. +-----+---+---+---+----+

I got a similar link in PySpark (https://stackoverflow.com/questions/70447738/pyspark-get-latest-non-null-element-of-every-column-in-one-row), but not able to get it working in Scala.

Any idea how to solve it?

答案1

得分: 0

你可以创建最小/最大表达式,并在 agg 中使用它。默认情况下,最小/最大会忽略空值,除非所有值都为null。

  1. var exprs = min_cols.map(x => min(x).as(x)) ++ max_cols.map(x => max(x).as(x))
  2. df.groupBy("GUID").agg(exprs.head, exprs.tail: _*).show
英文:

You can create min/max expressions and use it in agg. min/max will ignore nulls unless all values are null by default.

  1. var exprs = min_cols.map(x =&gt; min(x).as(x)) ++ max_cols.map(x =&gt; max(x).as(x))
  2. df.groupBy(&quot;GUID&quot;).agg(exprs.head, exprs.tail: _*).show

答案2

得分: 0

尝试这个:

  1. import org.apache.spark.sql.functions._
  2. df.groupBy("GUID").agg(
  3. max("A").alias("A"),
  4. max("B").alias("B"),
  5. min("C").alias("C"),
  6. min("D").alias("D")
  7. ).show()

输出:

  1. +-----+---+---+---+----+
  2. | GUID| A| B| C| D|
  3. +-----+---+---+---+----+
  4. |GUID1| 4| 24| 30| 52|
  5. |GUID2| 5| 21| 31|null|
  6. +-----+---+---+---+----+
英文:

Try this:

  1. import org.apache.spark.sql.functions._
  2. df.groupBy(&quot;GUID&quot;).agg(
  3. max(&quot;A&quot;).alias(&quot;A&quot;),
  4. max(&quot;B&quot;).alias(&quot;B&quot;),
  5. min(&quot;C&quot;).alias(&quot;C&quot;),
  6. min(&quot;D&quot;).alias(&quot;D&quot;)
  7. ).show()

Output:

  1. +-----+---+---+---+----+
  2. | GUID| A| B| C| D|
  3. +-----+---+---+---+----+
  4. |GUID1| 4| 24| 30| 52|
  5. |GUID2| 5| 21| 31|null|
  6. +-----+---+---+---+----+

huangapple
  • 本文由 发表于 2023年5月17日 23:21:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/76273702.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定