英文:
Comparing dates in Java Spark Dataframe
问题
以下是翻译好的内容:
我有下面的Spark DataFrame/数据集。Column_2列中的日期以字符串格式表示。
Column_1 Column_2
A 2020-08-05
B 2020-08-01
B 2020-09-20
B 2020-12-31
C 2020-05-10
我期望的输出DataFrame应该对于Column_1中的每个值只有一行,如果Column_1中的同一键对应多个日期,则应选择下一个可用日期。如果只有一行,则应保留日期。
期望的输出:
Column_1 Column_2
A 2020-08-05
B 2020-09-20
C 2020-05-10
是否有一种方法可以在Java Spark中实现这一目标?可能不需要使用UDF?
英文:
I have the below Spark dataframe/dataset. Column_2 has dates in string format.
Column_1 Column_2
A 2020-08-05
B 2020-08-01
B 2020-09-20
B 2020-12-31
C 2020-05-10
My expected output dataframe should have only one row per value in Column_1 and if there are multiple dates in column_2 for same key in column_1, then the next available date should be picked. if only one row is there, then the date should be retained
Expected Output:
Column_1 Column_2
A 2020-08-05
B 2020-09-20
C 2020-05-10
Is there a way to achieve this Java spark? possibly without using UDF?
答案1
得分: 1
也许这对你有帮助-
dataset.show(false);
dataset.printSchema();
/**
*+--------+----------+
* |Column_1|Column_2 |
* +--------+----------+
* |A |2020-08-05|
* |D |2020-08-01|
* |D |2020-08-02|
* |B |2020-08-01|
* |B |2020-09-20|
* |B |2020-12-31|
* |C |2020-05-10|
* +--------+----------+
*
* root
* |-- Column_1: string (nullable = true)
* |-- Column_2: string (nullable = true)
*/
dataset.withColumn("Column_2", to_date(col("Column_2")))
.withColumn("count", count("Column_2").over(Window.partitionBy("Column_1")))
.withColumn("positive", when(col("count").gt(1),
when(col("Column_2").gt(current_date()), col("Column_2"))
).otherwise(col("Column_2")))
.withColumn("negative", when(col("count").gt(1),
when(col("Column_2").lt(current_date()), col("Column_2"))
).otherwise(col("Column_2")))
.groupBy("Column_1")
.agg(min("positive").as("positive"), max("negative").as("negative"))
.selectExpr("Column_1", "coalesce(positive, negative) as Column_2")
.show(false);
/**
* +--------+----------+
* |Column_1|Column_2 |
* +--------+----------+
* |A |2020-08-05|
* |D |2020-08-02|
* |B |2020-09-20|
* |C |2020-05-10|
* +--------+----------+
*/
英文:
Perhaps this is helpful-
dataset.show(false);
dataset.printSchema();
/**
*+--------+----------+
* |Column_1|Column_2 |
* +--------+----------+
* |A |2020-08-05|
* |D |2020-08-01|
* |D |2020-08-02|
* |B |2020-08-01|
* |B |2020-09-20|
* |B |2020-12-31|
* |C |2020-05-10|
* +--------+----------+
*
* root
* |-- Column_1: string (nullable = true)
* |-- Column_2: string (nullable = true)
*/
dataset.withColumn("Column_2", to_date(col("Column_2")))
.withColumn("count", count("Column_2").over(Window.partitionBy("Column_1")))
.withColumn("positive", when(col("count").gt(1),
when(col("Column_2").gt(current_date()), col("Column_2"))
).otherwise(col("Column_2")))
.withColumn("negative", when(col("count").gt(1),
when(col("Column_2").lt(current_date()), col("Column_2"))
).otherwise(col("Column_2")))
.groupBy("Column_1")
.agg(min("positive").as("positive"), max("negative").as("negative"))
.selectExpr("Column_1", "coalesce(positive, negative) as Column_2")
.show(false);
/**
* +--------+----------+
* |Column_1|Column_2 |
* +--------+----------+
* |A |2020-08-05|
* |D |2020-08-02|
* |B |2020-09-20|
* |C |2020-05-10|
* +--------+----------+
*/
答案2
得分: 0
创建DataFrame首先
df_b = spark.createDataFrame([("A","2020-08-05"),("B","2020-08-01"),("B","2020-09-20"),("B","2020-12-31"),("C","2020-05-10")],["col1","col2"])
_w = W.partitionBy("col1").orderBy("col1")
df_b = df_b.withColumn("rn", F.row_number().over(_w))
在这里的逻辑是选择每个组的第二个元素,如果任何组有多行。为了做到这一点,我们可以首先为每个组分配一个行号,然后在每个组中的行数大于1的情况下选择每个组的前两行。
case = F.expr("""
CASE WHEN rn =1 THEN 1
WHEN rn =2 THEN 1
END""")
df_b = df_b.withColumn('case_condition', case)
df_b = df_b.filter(F.col("case_condition") == F.lit("1"))
中间输出
+----+----------+---+--------------+
|col1| col2| rn|case_condition|
+----+----------+---+--------------+
| B|2020-08-01| 1| 1|
| B|2020-09-20| 2| 1|
| C|2020-05-10| 1| 1|
| A|2020-08-05| 1| 1|
+----+----------+---+--------------+
现在,最后只需取每个组的最后一个元素--
df = df_b.groupBy("col1").agg(F.last("col2").alias("col2")).orderBy("col1")
df.show()
+----+----------+
|col1| col2|
+----+----------+
| A|2020-08-05|
| B|2020-09-20|
| C|2020-05-10|
+----+----------+
英文:
Create the DataFrame First
df_b = spark.createDataFrame([("A","2020-08-05"),("B","2020-08-01"),("B","2020-09-20"),("B","2020-12-31"),("C","2020-05-10")],[ "col1","col2"])
_w = W.partitionBy("col1").orderBy("col1")
df_b = df_b.withColumn("rn", F.row_number().over(_w))
The logic here to pick the second element of each group if any group has a more than one row. In order to do that we can first assign a row number to every group and we will pick first element of every group where row count is 1 and , first 2 row of every group where row count is more than 1 in every group.
case = F.expr("""
CASE WHEN rn =1 THEN 1
WHEN rn =2 THEN 1
END""")
df_b = df_b.withColumn('case_condition', case)
df_b = df_b.filter(F.col("case_condition") == F.lit("1"))
Intermediate Output
+----+----------+---+--------------+
|col1| col2| rn|case_condition|
+----+----------+---+--------------+
| B|2020-08-01| 1| 1|
| B|2020-09-20| 2| 1|
| C|2020-05-10| 1| 1|
| A|2020-08-05| 1| 1|
+----+----------+---+--------------+
Now, finally just take the last element of every group --
df = df_b.groupBy("col1").agg(F.last("col2").alias("col2")).orderBy("col1")
df.show()
+----+----------+
|col1| col2|
+----+----------+
| A|2020-08-05|
| B|2020-09-20|
| C|2020-05-10|
+----+----------+
答案3
得分: 0
SCALA: 这将提供结果。
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("Column_1")
df.withColumn("count", count("Column_2").over(w))
.withColumn("later", expr("IF(Column_2 > date(current_timestamp), True, False)"))
.filter("count = 1 or (count != 1 and later = True)")
.groupBy("Column_1")
.agg(min("Column_2").alias("Column_2"))
.orderBy("Column_1")
.show(false)
+--------+----------+
|Column_1|Column_2 |
+--------+----------+
|A |2020-08-05|
|B |2020-09-20|
|C |2020-05-10|
+--------+----------+
有一个例外情况,如果Column_1
的日期计数大于1
并且没有日期晚于current_timestamp
,则不会为Column_1
的值提供结果。
英文:
SCALA: This will give the result.
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("Column_1")
df.withColumn("count", count("Column_2").over(w))
.withColumn("later", expr("IF(Column_2 > date(current_timestamp), True, False)"))
.filter("count = 1 or (count != 1 and later = True)")
.groupBy("Column_1")
.agg(min("Column_2").alias("Column_2"))
.orderBy("Column_1")
.show(false)
+--------+----------+
|Column_1|Column_2 |
+--------+----------+
|A |2020-08-05|
|B |2020-09-20|
|C |2020-05-10|
+--------+----------+
It has an exception that if the count of the dates for the Column_1
is larger than 1
and there is no date after the current_timestamp
, it will not give the result for the value of Column_1
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论