英文:
Join two pyspark dataframes on calculated value in condition
问题
我有两个数据框 df1 和 df2。我需要为给定的键(k1)找到最小日期,其中最小日期不能是 01-01-2020 或更高,并且如果找到,则用 df2 中的日期(开始和结束)替换它们。示例:
data = [('A', 'a', '03-05-2010', '02-02-2019'),
('B', 'a', '02-12-2010', '01-02-2011'),
('B', 'b', '02-12-2010', '01-02-2011'),
('B', 'c', '02-12-2010', '01-02-2011'),
('B', 'd', '03-01-2013', '01-03-2015'),
('B', 'e', '04-01-2014', '01-01-2020'),
('C', 'a', '01-01-2020', '01-01-2020')
]
schema = StructType([ \
StructField("k1", StringType(), True), \
StructField("k2", StringType(), True), \
StructField("start", StringType(), True), \
StructField("end", StringType(), True), \
])
df1 = spark.createDataFrame(data=data, schema=schema)
df1.show()
+---+---+----------+----------+
| k1| k2| start| end|
+---+---+----------+----------+
| A| a|03-05-2010|02-02-2019|
| B| a|02-12-2010|01-02-2011|
| B| b|02-12-2010|01-02-2011|
| B| c|02-12-2010|01-02-2011|
| B| d|03-01-2013|01-03-2015|
| B| e|04-01-2014|01-01-2020|
| C| a|01-01-2020|01-01-2020|
+---+---+----------+----------+
# df2
+---+---+----------+----------+
| k1| k2| start| end|
+---+---+----------+----------+
| A| a|03-05-2010|02-02-2019|
| B| a|01-01-2008|01-02-2008|
| B| b|01-11-2009|01-12-2009|
| B| c|02-01-2010|01-02-2010|
| B| e|04-01-2014|01-01-2020|
| D| a|01-01-2000|01-01-2001|
+---+---+----------+----------+
结果应如下所示:
+---+---+----------+----------+
| k1| k2| start| end|
+---+---+----------+----------+
| A| a|03-05-2010|02-02-2019| # 与键匹配,无更改
| B| a|01-01-2008|01-02-2008| # 替换为 df2 中的开始和结束,因为 df1 中的 k1 的日期是最小值 01-02-2011
| B| b|01-11-2009|01-12-2009| # 替换为 df2 中的开始和结束,因为 df1 中的 k1 的日期是最小值 01-02-2011
| B| c|02-01-2010|01-02-2010| # 替换为 df2 中的开始和结束,因为 df1 中的 k1 的日期是最小值 01-02-2011
| B| d|03-01-2013|01-03-2015| # 不更改,因为它不是 k1 的最小值
| B| e|04-01-2014|01-01-2020| # 不更改,因为它不是 k1 的最小值,而且超出限制
| C| a|01-01-2020|01-01-2020| # 无更改,因为在 df2 中找不到匹配值
+---+---+----------+----------+
谢谢!!
英文:
I have two dataframes df1 and df2. I need to find for the given keys (k1) the minimum dates where the minimal date cannot be 01-01-2020 or above and where found replace the dates (start and end) from df2. The example;
data = [('A','a','03-05-2010','02-02-2019'),
('B','a','02-12-2010','01-02-2011'),
('B','b','02-12-2010','01-02-2011'),
('B','c','02-12-2010','01-02-2011'),
('B','d','03-01-2013','01-03-2015'),
('B','e','04-01-2014','01-01-2020'),
('C','a','01-01-2020','01-01-2020')
]
schema = StructType([ \
StructField("k1",StringType(),True), \
StructField("k2",StringType(),True), \
StructField("start",StringType(),True), \
StructField("end",StringType(),True), \
])
df1 = spark.createDataFrame(data=data,schema=schema)
df1.show()
+---+---+----------+----------+
| k1| k2| start| end|
+---+---+----------+----------+
| A| a|03-05-2010|02-02-2019|
| B| a|02-12-2010|01-02-2011|
| B| b|02-12-2010|01-02-2011|
| B| c|02-12-2010|01-02-2011|
| B| d|03-01-2013|01-03-2015|
| B| e|04-01-2014|01-01-2020|
| C| a|01-01-2020|01-01-2020|
+---+---+----------+----------+
#df2
+---+---+----------+----------+
| k1| k2| start| end|
+---+---+----------+----------+
| A| a|03-05-2010|02-02-2019|
| B| a|01-01-2008|01-02-2008|
| B| b|01-11-2009|01-12-2009|
| B| c|02-01-2010|01-02-2010|
| B| e|04-01-2014|01-01-2020|
| D| a|01-01-2000|01-01-2001|
+---+---+----------+----------+
and the result should look like this:
+---+---+----------+----------+
| k1| k2| start| end|
+---+---+----------+----------+
| A| a|03-05-2010|02-02-2019| #no change as it matches by key
| B| a|01-01-2008|01-02-2008| #replaced with df2's start and end because the date in df1 for k1 was the minimal value 01-02-2011
| B| b|01-11-2009|01-12-2009| #replaced with df2's start and end because the date in df1 for k1 was the minimal value 01-02-2011
| B| c|02-01-2010|01-02-2010| #replaced with df2's start and end because the date in df1 for k1 was the minimal value 01-02-2011
| B| d|03-01-2013|01-03-2015| #no change as its not the minimal value for k1
| B| e|04-01-2014|01-01-2020| #no change as its not the minimal value for k1 plus its over the limit
| C| a|01-01-2020|01-01-2020| #no change as no matching value found in df2
+---+---+----------+----------+
thanks!!!
答案1
得分: 1
以下是代码的翻译部分:
也许这将是正确的:
从pyspark.sql.types导入*
数据 = [('A','a','03-05-2010','02-02-2019'),
('B','a','02-12-2010','01-02-2011'),
('B','b','02-12-2010','01-02-2011'),
('B','c','02-12-2010','01-02-2011'),
('B','d','03-01-2013','01-03-2015'),
('B','e','04-01-2014','01-01-2020'),
('C','a','01-01-2020','01-01-2020')
]
模式 = StructType([ \
StructField("k1",StringType(),True),
StructField("k2",StringType(),True),
StructField("start",StringType(),True),
StructField("end",StringType(),True),
])
df1 = spark.createDataFrame(data=data,schema=schema).alias("df1")
df1.show()
data2 = [('A','a','03-05-2010','02-02-2019'),
('B','a','01-01-2008','01-02-2008'),
('B','b','01-11-2009','01-12-2009'),
('B','c','02-01-2010','01-02-2010'),
('B','e','04-01-2014','01-01-2020'),
('D','a','01-01-2000','01-01-2001')
]
df2 = spark.createDataFrame(data=data2,schema=schema).alias("df2")
df2.show()
Join:
import pyspark.sql.functions as F
df1.join(df2,(df1.k1 == df2.k1)&(df1.k2 == df2.k2),
how='left') \
.select(df1.k1,
df1.k2,
F.expr("Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.start,'MM-dd-yyyy') AS TIMESTAMP)) >
TO_DATE(CAST(UNIX_TIMESTAMP(df1.start,'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.start ELSE df2.start END,
df1.start)").alias("start"),
F.expr("Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.end,'MM-dd-yyyy') AS TIMESTAMP)) >
TO_DATE(CAST(UNIX_TIMESTAMP(df1.end,'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.end ELSE df2.end END,
df1.end)").alias("end")).sort(df1.k1.asc(),df1.k2.asc() ).show()
结果:
+---+---+----------+----------+
| k1| k2| start| end|
+---+---+----------+----------+
| A| a|03-05-2010|02-02-2019|
| B| a|01-01-2008|01-02-2008|
| B| b|01-11-2009|01-12-2009|
| B| c|02-01-2010|01-02-2010|
| B| d|03-01-2013|01-03-2015|
| B| e|04-01-2014|01-01-2020|
| C| a|01-01-2020|01-01-2020|
+---+---+----------+----------+
希望这对你有所帮助。
英文:
maybe this will be correct:
from pyspark.sql.types import *
data = [('A','a','03-05-2010','02-02-2019'),
('B','a','02-12-2010','01-02-2011'),
('B','b','02-12-2010','01-02-2011'),
('B','c','02-12-2010','01-02-2011'),
('B','d','03-01-2013','01-03-2015'),
('B','e','04-01-2014','01-01-2020'),
('C','a','01-01-2020','01-01-2020')
]
schema = StructType([ \
StructField("k1",StringType(),True), \
StructField("k2",StringType(),True), \
StructField("start",StringType(),True), \
StructField("end",StringType(),True), \
])
df1 = spark.createDataFrame(data=data,schema=schema).alias("df1")
df1.show()
data2 = [('A','a','03-05-2010','02-02-2019'),
('B','a','01-01-2008','01-02-2008'),
('B','b','01-11-2009','01-12-2009'),
('B','c','02-01-2010','01-02-2010'),
('B','e','04-01-2014','01-01-2020'),
('D','a','01-01-2000','01-01-2001')
]
df2 = spark.createDataFrame(data=data2,schema=schema).alias("df2")
df2.show()
Join:
import pyspark.sql.functions as F
df1.join(df2, (df1.k1 == df2.k1) & (df1.k2 == df2.k2), how='left') \
.select(df1.k1,
df1.k2,
F.expr("""Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.start, 'MM-dd-yyyy') AS TIMESTAMP)) >
TO_DATE(CAST(UNIX_TIMESTAMP(df1.start, 'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.start ELSE df2.start END,
df1.start) """).alias("start"),
F.expr("""Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.end, 'MM-dd-yyyy') AS TIMESTAMP)) >
TO_DATE(CAST(UNIX_TIMESTAMP(df1.end, 'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.end ELSE df2.end END,
df1.end)""").alias("end")).sort(df1.k1.asc(),df1.k2.asc() ).show()
Result:
+---+---+----------+----------+
| k1| k2| start| end|
+---+---+----------+----------+
| A| a|03-05-2010|02-02-2019|
| B| a|01-01-2008|01-02-2008|
| B| b|01-11-2009|01-12-2009|
| B| c|02-01-2010|01-02-2010|
| B| d|03-01-2013|01-03-2015|
| B| e|04-01-2014|01-01-2020|
| C| a|01-01-2020|01-01-2020|
+---+---+----------+----------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论