在计算的条件下将两个Pyspark数据框连接起来。

huangapple go评论68阅读模式
英文:

Join two pyspark dataframes on calculated value in condition

问题

我有两个数据框 df1 和 df2。我需要为给定的键(k1)找到最小日期,其中最小日期不能是 01-01-2020 或更高,并且如果找到,则用 df2 中的日期(开始和结束)替换它们。示例:

data = [('A', 'a', '03-05-2010', '02-02-2019'),
   ('B', 'a', '02-12-2010', '01-02-2011'),
   ('B', 'b', '02-12-2010', '01-02-2011'),
   ('B', 'c', '02-12-2010', '01-02-2011'),
   ('B', 'd', '03-01-2013', '01-03-2015'),
   ('B', 'e', '04-01-2014', '01-01-2020'),
   ('C', 'a', '01-01-2020', '01-01-2020')
 ]

schema = StructType([ \
   StructField("k1", StringType(), True), \
   StructField("k2", StringType(), True), \
   StructField("start", StringType(), True), \
   StructField("end", StringType(), True), \
 ])

df1 = spark.createDataFrame(data=data, schema=schema)
df1.show()

+---+---+----------+----------+
| k1| k2|     start|       end|
+---+---+----------+----------+
|  A|  a|03-05-2010|02-02-2019|
|  B|  a|02-12-2010|01-02-2011|
|  B|  b|02-12-2010|01-02-2011|
|  B|  c|02-12-2010|01-02-2011|
|  B|  d|03-01-2013|01-03-2015|
|  B|  e|04-01-2014|01-01-2020|
|  C|  a|01-01-2020|01-01-2020|
+---+---+----------+----------+

# df2

+---+---+----------+----------+
| k1| k2|     start|       end|
+---+---+----------+----------+
|  A|  a|03-05-2010|02-02-2019|
|  B|  a|01-01-2008|01-02-2008|
|  B|  b|01-11-2009|01-12-2009|
|  B|  c|02-01-2010|01-02-2010|
|  B|  e|04-01-2014|01-01-2020|
|  D|  a|01-01-2000|01-01-2001|
+---+---+----------+----------+

结果应如下所示:

+---+---+----------+----------+
| k1| k2|     start|       end|
+---+---+----------+----------+
|  A|  a|03-05-2010|02-02-2019| # 与键匹配,无更改
|  B|  a|01-01-2008|01-02-2008| # 替换为 df2 中的开始和结束,因为 df1 中的 k1 的日期是最小值 01-02-2011
|  B|  b|01-11-2009|01-12-2009| # 替换为 df2 中的开始和结束,因为 df1 中的 k1 的日期是最小值 01-02-2011
|  B|  c|02-01-2010|01-02-2010| # 替换为 df2 中的开始和结束,因为 df1 中的 k1 的日期是最小值 01-02-2011
|  B|  d|03-01-2013|01-03-2015| # 不更改,因为它不是 k1 的最小值
|  B|  e|04-01-2014|01-01-2020| # 不更改,因为它不是 k1 的最小值,而且超出限制
|  C|  a|01-01-2020|01-01-2020| # 无更改,因为在 df2 中找不到匹配值
+---+---+----------+----------+

谢谢!!

英文:

I have two dataframes df1 and df2. I need to find for the given keys (k1) the minimum dates where the minimal date cannot be 01-01-2020 or above and where found replace the dates (start and end) from df2. The example;

data = [('A','a','03-05-2010','02-02-2019'),
('B','a','02-12-2010','01-02-2011'),
('B','b','02-12-2010','01-02-2011'),
('B','c','02-12-2010','01-02-2011'),
('B','d','03-01-2013','01-03-2015'),
('B','e','04-01-2014','01-01-2020'),
('C','a','01-01-2020','01-01-2020')
]
schema = StructType([ \
StructField("k1",StringType(),True), \
StructField("k2",StringType(),True), \
StructField("start",StringType(),True), \
StructField("end",StringType(),True), \
])
df1 = spark.createDataFrame(data=data,schema=schema)
df1.show()
+---+---+----------+----------+
| k1| k2|     start|       end|
+---+---+----------+----------+
|  A|  a|03-05-2010|02-02-2019|
|  B|  a|02-12-2010|01-02-2011|
|  B|  b|02-12-2010|01-02-2011|
|  B|  c|02-12-2010|01-02-2011|
|  B|  d|03-01-2013|01-03-2015|
|  B|  e|04-01-2014|01-01-2020|
|  C|  a|01-01-2020|01-01-2020|
+---+---+----------+----------+
#df2
+---+---+----------+----------+
| k1| k2|     start|       end|
+---+---+----------+----------+
|  A|  a|03-05-2010|02-02-2019|
|  B|  a|01-01-2008|01-02-2008|
|  B|  b|01-11-2009|01-12-2009|
|  B|  c|02-01-2010|01-02-2010|
|  B|  e|04-01-2014|01-01-2020|
|  D|  a|01-01-2000|01-01-2001|
+---+---+----------+----------+

and the result should look like this:

+---+---+----------+----------+
| k1| k2|     start|       end|
+---+---+----------+----------+
|  A|  a|03-05-2010|02-02-2019| #no change as it matches by key
|  B|  a|01-01-2008|01-02-2008| #replaced with df2's start and end because the date in df1 for k1 was the minimal value 01-02-2011 
|  B|  b|01-11-2009|01-12-2009| #replaced with df2's start and end because the date in df1 for k1 was the minimal value 01-02-2011
|  B|  c|02-01-2010|01-02-2010| #replaced with df2's start and end because the date in df1 for k1 was the minimal value 01-02-2011
|  B|  d|03-01-2013|01-03-2015| #no change as its not the minimal value for k1
|  B|  e|04-01-2014|01-01-2020| #no change as its not the minimal value for k1 plus its over the limit
|  C|  a|01-01-2020|01-01-2020| #no change as no matching value found in df2
+---+---+----------+----------+

thanks!!!

答案1

得分: 1

以下是代码的翻译部分:

也许这将是正确的

从pyspark.sql.types导入*
数据 = [('A''a''03-05-2010''02-02-2019')
       ('B''a''02-12-2010''01-02-2011')
       ('B''b''02-12-2010''01-02-2011')
       ('B''c''02-12-2010''01-02-2011')
       ('B''d''03-01-2013''01-03-2015')
       ('B''e''04-01-2014''01-01-2020')
       ('C''a''01-01-2020''01-01-2020')
     ]

模式 = StructType([ \
       StructField("k1"StringType()True)
       StructField("k2"StringType()True)
       StructField("start"StringType()True)
       StructField("end"StringType()True)
     ])

df1 = spark.createDataFrame(data=dataschema=schema).alias("df1")
df1.show()

data2 = [('A''a''03-05-2010''02-02-2019')
       ('B''a''01-01-2008''01-02-2008')
       ('B''b''01-11-2009''01-12-2009')
       ('B''c''02-01-2010''01-02-2010')
       ('B''e''04-01-2014''01-01-2020')
       ('D''a''01-01-2000''01-01-2001')
     ]
df2 = spark.createDataFrame(data=data2schema=schema).alias("df2")
df2.show()

Join

import pyspark.sql.functions as F
df1.join(df2,(df1.k1 == df2.k1&df1.k2 == df2.k2),
            how='left') \
            .select(df1.k1
                    df1.k2
                    F.expr("Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.start,'MM-dd-yyyy') AS TIMESTAMP)) > 
                            TO_DATE(CAST(UNIX_TIMESTAMP(df1.start'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.start ELSE df2.start END
                            df1.start)").alias("start"),
                    F.expr("Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.end,'MM-dd-yyyy') AS TIMESTAMP)) > 
                            TO_DATE(CAST(UNIX_TIMESTAMP(df1.end'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.end ELSE df2.end END 
                            df1.end)").alias("end")).sort(df1.k1.asc(),df1.k2.asc() ).show()

结果

+---+---+----------+----------+
| k1| k2|     start|       end|
+---+---+----------+----------+
|  A|  a|03-05-2010|02-02-2019|
|  B|  a|01-01-2008|01-02-2008|
|  B|  b|01-11-2009|01-12-2009|
|  B|  c|02-01-2010|01-02-2010|
|  B|  d|03-01-2013|01-03-2015|
|  B|  e|04-01-2014|01-01-2020|
|  C|  a|01-01-2020|01-01-2020|
+---+---+----------+----------+

希望这对你有所帮助。

英文:

maybe this will be correct:

from pyspark.sql.types import *
data = [('A','a','03-05-2010','02-02-2019'),
('B','a','02-12-2010','01-02-2011'),
('B','b','02-12-2010','01-02-2011'),
('B','c','02-12-2010','01-02-2011'),
('B','d','03-01-2013','01-03-2015'),
('B','e','04-01-2014','01-01-2020'),
('C','a','01-01-2020','01-01-2020')
]
schema = StructType([ \
StructField("k1",StringType(),True), \
StructField("k2",StringType(),True), \
StructField("start",StringType(),True), \
StructField("end",StringType(),True), \
])
df1 = spark.createDataFrame(data=data,schema=schema).alias("df1")
df1.show()
data2 = [('A','a','03-05-2010','02-02-2019'),
('B','a','01-01-2008','01-02-2008'),
('B','b','01-11-2009','01-12-2009'),
('B','c','02-01-2010','01-02-2010'),
('B','e','04-01-2014','01-01-2020'),
('D','a','01-01-2000','01-01-2001')
]
df2 = spark.createDataFrame(data=data2,schema=schema).alias("df2")
df2.show()

Join:

import pyspark.sql.functions as F
df1.join(df2, (df1.k1 == df2.k1) & (df1.k2 == df2.k2), how='left') \
.select(df1.k1,
df1.k2,
F.expr("""Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.start, 'MM-dd-yyyy') AS TIMESTAMP)) > 
TO_DATE(CAST(UNIX_TIMESTAMP(df1.start, 'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.start ELSE df2.start END,
df1.start) """).alias("start"),
F.expr("""Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.end, 'MM-dd-yyyy') AS TIMESTAMP)) > 
TO_DATE(CAST(UNIX_TIMESTAMP(df1.end, 'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.end ELSE df2.end END, 
df1.end)""").alias("end")).sort(df1.k1.asc(),df1.k2.asc() ).show()

Result:

+---+---+----------+----------+
| k1| k2|     start|       end|
+---+---+----------+----------+
|  A|  a|03-05-2010|02-02-2019|
|  B|  a|01-01-2008|01-02-2008|
|  B|  b|01-11-2009|01-12-2009|
|  B|  c|02-01-2010|01-02-2010|
|  B|  d|03-01-2013|01-03-2015|
|  B|  e|04-01-2014|01-01-2020|
|  C|  a|01-01-2020|01-01-2020|
+---+---+----------+----------+

huangapple
  • 本文由 发表于 2023年5月25日 09:04:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/76328265.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定