在Java Spark Dataframe中比较日期。

huangapple go评论128阅读模式
英文:

Comparing dates in Java Spark Dataframe

问题

以下是翻译好的内容:

我有下面的Spark DataFrame/数据集。Column_2列中的日期以字符串格式表示。

  1. Column_1 Column_2
  2. A 2020-08-05
  3. B 2020-08-01
  4. B 2020-09-20
  5. B 2020-12-31
  6. C 2020-05-10

我期望的输出DataFrame应该对于Column_1中的每个值只有一行,如果Column_1中的同一键对应多个日期,则应选择下一个可用日期。如果只有一行,则应保留日期。

期望的输出:

  1. Column_1 Column_2
  2. A 2020-08-05
  3. B 2020-09-20
  4. C 2020-05-10

是否有一种方法可以在Java Spark中实现这一目标?可能不需要使用UDF?

英文:

I have the below Spark dataframe/dataset. Column_2 has dates in string format.

  1. Column_1 Column_2
  2. A 2020-08-05
  3. B 2020-08-01
  4. B 2020-09-20
  5. B 2020-12-31
  6. C 2020-05-10

My expected output dataframe should have only one row per value in Column_1 and if there are multiple dates in column_2 for same key in column_1, then the next available date should be picked. if only one row is there, then the date should be retained

Expected Output:

  1. Column_1 Column_2
  2. A 2020-08-05
  3. B 2020-09-20
  4. C 2020-05-10

Is there a way to achieve this Java spark? possibly without using UDF?

答案1

得分: 1

也许这对你有帮助-

  1. dataset.show(false);
  2. dataset.printSchema();
  3. /**
  4. *+--------+----------+
  5. * |Column_1|Column_2 |
  6. * +--------+----------+
  7. * |A |2020-08-05|
  8. * |D |2020-08-01|
  9. * |D |2020-08-02|
  10. * |B |2020-08-01|
  11. * |B |2020-09-20|
  12. * |B |2020-12-31|
  13. * |C |2020-05-10|
  14. * +--------+----------+
  15. *
  16. * root
  17. * |-- Column_1: string (nullable = true)
  18. * |-- Column_2: string (nullable = true)
  19. */
  20. dataset.withColumn("Column_2", to_date(col("Column_2")))
  21. .withColumn("count", count("Column_2").over(Window.partitionBy("Column_1")))
  22. .withColumn("positive", when(col("count").gt(1),
  23. when(col("Column_2").gt(current_date()), col("Column_2"))
  24. ).otherwise(col("Column_2")))
  25. .withColumn("negative", when(col("count").gt(1),
  26. when(col("Column_2").lt(current_date()), col("Column_2"))
  27. ).otherwise(col("Column_2")))
  28. .groupBy("Column_1")
  29. .agg(min("positive").as("positive"), max("negative").as("negative"))
  30. .selectExpr("Column_1", "coalesce(positive, negative) as Column_2")
  31. .show(false);
  32. /**
  33. * +--------+----------+
  34. * |Column_1|Column_2 |
  35. * +--------+----------+
  36. * |A |2020-08-05|
  37. * |D |2020-08-02|
  38. * |B |2020-09-20|
  39. * |C |2020-05-10|
  40. * +--------+----------+
  41. */
英文:

Perhaps this is helpful-

  1. dataset.show(false);
  2. dataset.printSchema();
  3. /**
  4. *+--------+----------+
  5. * |Column_1|Column_2 |
  6. * +--------+----------+
  7. * |A |2020-08-05|
  8. * |D |2020-08-01|
  9. * |D |2020-08-02|
  10. * |B |2020-08-01|
  11. * |B |2020-09-20|
  12. * |B |2020-12-31|
  13. * |C |2020-05-10|
  14. * +--------+----------+
  15. *
  16. * root
  17. * |-- Column_1: string (nullable = true)
  18. * |-- Column_2: string (nullable = true)
  19. */
  20. dataset.withColumn("Column_2", to_date(col("Column_2")))
  21. .withColumn("count", count("Column_2").over(Window.partitionBy("Column_1")))
  22. .withColumn("positive", when(col("count").gt(1),
  23. when(col("Column_2").gt(current_date()), col("Column_2"))
  24. ).otherwise(col("Column_2")))
  25. .withColumn("negative", when(col("count").gt(1),
  26. when(col("Column_2").lt(current_date()), col("Column_2"))
  27. ).otherwise(col("Column_2")))
  28. .groupBy("Column_1")
  29. .agg(min("positive").as("positive"), max("negative").as("negative"))
  30. .selectExpr("Column_1", "coalesce(positive, negative) as Column_2")
  31. .show(false);
  32. /**
  33. * +--------+----------+
  34. * |Column_1|Column_2 |
  35. * +--------+----------+
  36. * |A |2020-08-05|
  37. * |D |2020-08-02|
  38. * |B |2020-09-20|
  39. * |C |2020-05-10|
  40. * +--------+----------+
  41. */

答案2

得分: 0

创建DataFrame首先

  1. df_b = spark.createDataFrame([("A","2020-08-05"),("B","2020-08-01"),("B","2020-09-20"),("B","2020-12-31"),("C","2020-05-10")],["col1","col2"])
  2. _w = W.partitionBy("col1").orderBy("col1")
  3. df_b = df_b.withColumn("rn", F.row_number().over(_w))

在这里的逻辑是选择每个组的第二个元素,如果任何组有多行。为了做到这一点,我们可以首先为每个组分配一个行号,然后在每个组中的行数大于1的情况下选择每个组的前两行。

  1. case = F.expr("""
  2. CASE WHEN rn =1 THEN 1
  3. WHEN rn =2 THEN 1
  4. END""")
  5. df_b = df_b.withColumn('case_condition', case)
  6. df_b = df_b.filter(F.col("case_condition") == F.lit("1"))

中间输出

  1. +----+----------+---+--------------+
  2. |col1| col2| rn|case_condition|
  3. +----+----------+---+--------------+
  4. | B|2020-08-01| 1| 1|
  5. | B|2020-09-20| 2| 1|
  6. | C|2020-05-10| 1| 1|
  7. | A|2020-08-05| 1| 1|
  8. +----+----------+---+--------------+

现在,最后只需取每个组的最后一个元素--

  1. df = df_b.groupBy("col1").agg(F.last("col2").alias("col2")).orderBy("col1")
  2. df.show()
  3. +----+----------+
  4. |col1| col2|
  5. +----+----------+
  6. | A|2020-08-05|
  7. | B|2020-09-20|
  8. | C|2020-05-10|
  9. +----+----------+
英文:

Create the DataFrame First

  1. df_b = spark.createDataFrame([("A","2020-08-05"),("B","2020-08-01"),("B","2020-09-20"),("B","2020-12-31"),("C","2020-05-10")],[ "col1","col2"])
  2. _w = W.partitionBy("col1").orderBy("col1")
  3. df_b = df_b.withColumn("rn", F.row_number().over(_w))

The logic here to pick the second element of each group if any group has a more than one row. In order to do that we can first assign a row number to every group and we will pick first element of every group where row count is 1 and , first 2 row of every group where row count is more than 1 in every group.

  1. case = F.expr("""
  2. CASE WHEN rn =1 THEN 1
  3. WHEN rn =2 THEN 1
  4. END""")
  5. df_b = df_b.withColumn('case_condition', case)
  6. df_b = df_b.filter(F.col("case_condition") == F.lit("1"))

Intermediate Output

  1. +----+----------+---+--------------+
  2. |col1| col2| rn|case_condition|
  3. +----+----------+---+--------------+
  4. | B|2020-08-01| 1| 1|
  5. | B|2020-09-20| 2| 1|
  6. | C|2020-05-10| 1| 1|
  7. | A|2020-08-05| 1| 1|
  8. +----+----------+---+--------------+

Now, finally just take the last element of every group --

  1. df = df_b.groupBy("col1").agg(F.last("col2").alias("col2")).orderBy("col1")
  2. df.show()
  3. +----+----------+
  4. |col1| col2|
  5. +----+----------+
  6. | A|2020-08-05|
  7. | B|2020-09-20|
  8. | C|2020-05-10|
  9. +----+----------+

答案3

得分: 0

SCALA: 这将提供结果。

  1. import org.apache.spark.sql.expressions.Window
  2. val w = Window.partitionBy("Column_1")
  3. df.withColumn("count", count("Column_2").over(w))
  4. .withColumn("later", expr("IF(Column_2 > date(current_timestamp), True, False)"))
  5. .filter("count = 1 or (count != 1 and later = True)")
  6. .groupBy("Column_1")
  7. .agg(min("Column_2").alias("Column_2"))
  8. .orderBy("Column_1")
  9. .show(false)
  10. +--------+----------+
  11. |Column_1|Column_2 |
  12. +--------+----------+
  13. |A |2020-08-05|
  14. |B |2020-09-20|
  15. |C |2020-05-10|
  16. +--------+----------+

有一个例外情况,如果Column_1的日期计数大于1并且没有日期晚于current_timestamp,则不会为Column_1的值提供结果。

英文:

SCALA: This will give the result.

  1. import org.apache.spark.sql.expressions.Window
  2. val w = Window.partitionBy("Column_1")
  3. df.withColumn("count", count("Column_2").over(w))
  4. .withColumn("later", expr("IF(Column_2 > date(current_timestamp), True, False)"))
  5. .filter("count = 1 or (count != 1 and later = True)")
  6. .groupBy("Column_1")
  7. .agg(min("Column_2").alias("Column_2"))
  8. .orderBy("Column_1")
  9. .show(false)
  10. +--------+----------+
  11. |Column_1|Column_2 |
  12. +--------+----------+
  13. |A |2020-08-05|
  14. |B |2020-09-20|
  15. |C |2020-05-10|
  16. +--------+----------+

It has an exception that if the count of the dates for the Column_1 is larger than 1 and there is no date after the current_timestamp, it will not give the result for the value of Column_1.

huangapple
  • 本文由 发表于 2020年8月4日 15:17:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/63241916.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定