在计算的条件下将两个Pyspark数据框连接起来。

huangapple go评论92阅读模式
英文:

Join two pyspark dataframes on calculated value in condition

问题

我有两个数据框 df1 和 df2。我需要为给定的键(k1)找到最小日期,其中最小日期不能是 01-01-2020 或更高,并且如果找到,则用 df2 中的日期(开始和结束)替换它们。示例:

  1. data = [('A', 'a', '03-05-2010', '02-02-2019'),
  2. ('B', 'a', '02-12-2010', '01-02-2011'),
  3. ('B', 'b', '02-12-2010', '01-02-2011'),
  4. ('B', 'c', '02-12-2010', '01-02-2011'),
  5. ('B', 'd', '03-01-2013', '01-03-2015'),
  6. ('B', 'e', '04-01-2014', '01-01-2020'),
  7. ('C', 'a', '01-01-2020', '01-01-2020')
  8. ]
  9. schema = StructType([ \
  10. StructField("k1", StringType(), True), \
  11. StructField("k2", StringType(), True), \
  12. StructField("start", StringType(), True), \
  13. StructField("end", StringType(), True), \
  14. ])
  15. df1 = spark.createDataFrame(data=data, schema=schema)
  16. df1.show()
  17. +---+---+----------+----------+
  18. | k1| k2| start| end|
  19. +---+---+----------+----------+
  20. | A| a|03-05-2010|02-02-2019|
  21. | B| a|02-12-2010|01-02-2011|
  22. | B| b|02-12-2010|01-02-2011|
  23. | B| c|02-12-2010|01-02-2011|
  24. | B| d|03-01-2013|01-03-2015|
  25. | B| e|04-01-2014|01-01-2020|
  26. | C| a|01-01-2020|01-01-2020|
  27. +---+---+----------+----------+
  28. # df2
  29. +---+---+----------+----------+
  30. | k1| k2| start| end|
  31. +---+---+----------+----------+
  32. | A| a|03-05-2010|02-02-2019|
  33. | B| a|01-01-2008|01-02-2008|
  34. | B| b|01-11-2009|01-12-2009|
  35. | B| c|02-01-2010|01-02-2010|
  36. | B| e|04-01-2014|01-01-2020|
  37. | D| a|01-01-2000|01-01-2001|
  38. +---+---+----------+----------+

结果应如下所示:

  1. +---+---+----------+----------+
  2. | k1| k2| start| end|
  3. +---+---+----------+----------+
  4. | A| a|03-05-2010|02-02-2019| # 与键匹配,无更改
  5. | B| a|01-01-2008|01-02-2008| # 替换为 df2 中的开始和结束,因为 df1 中的 k1 的日期是最小值 01-02-2011
  6. | B| b|01-11-2009|01-12-2009| # 替换为 df2 中的开始和结束,因为 df1 中的 k1 的日期是最小值 01-02-2011
  7. | B| c|02-01-2010|01-02-2010| # 替换为 df2 中的开始和结束,因为 df1 中的 k1 的日期是最小值 01-02-2011
  8. | B| d|03-01-2013|01-03-2015| # 不更改,因为它不是 k1 的最小值
  9. | B| e|04-01-2014|01-01-2020| # 不更改,因为它不是 k1 的最小值,而且超出限制
  10. | C| a|01-01-2020|01-01-2020| # 无更改,因为在 df2 中找不到匹配值
  11. +---+---+----------+----------+

谢谢!!

英文:

I have two dataframes df1 and df2. I need to find for the given keys (k1) the minimum dates where the minimal date cannot be 01-01-2020 or above and where found replace the dates (start and end) from df2. The example;

  1. data = [('A','a','03-05-2010','02-02-2019'),
  2. ('B','a','02-12-2010','01-02-2011'),
  3. ('B','b','02-12-2010','01-02-2011'),
  4. ('B','c','02-12-2010','01-02-2011'),
  5. ('B','d','03-01-2013','01-03-2015'),
  6. ('B','e','04-01-2014','01-01-2020'),
  7. ('C','a','01-01-2020','01-01-2020')
  8. ]
  9. schema = StructType([ \
  10. StructField("k1",StringType(),True), \
  11. StructField("k2",StringType(),True), \
  12. StructField("start",StringType(),True), \
  13. StructField("end",StringType(),True), \
  14. ])
  15. df1 = spark.createDataFrame(data=data,schema=schema)
  16. df1.show()
  17. +---+---+----------+----------+
  18. | k1| k2| start| end|
  19. +---+---+----------+----------+
  20. | A| a|03-05-2010|02-02-2019|
  21. | B| a|02-12-2010|01-02-2011|
  22. | B| b|02-12-2010|01-02-2011|
  23. | B| c|02-12-2010|01-02-2011|
  24. | B| d|03-01-2013|01-03-2015|
  25. | B| e|04-01-2014|01-01-2020|
  26. | C| a|01-01-2020|01-01-2020|
  27. +---+---+----------+----------+
  28. #df2
  29. +---+---+----------+----------+
  30. | k1| k2| start| end|
  31. +---+---+----------+----------+
  32. | A| a|03-05-2010|02-02-2019|
  33. | B| a|01-01-2008|01-02-2008|
  34. | B| b|01-11-2009|01-12-2009|
  35. | B| c|02-01-2010|01-02-2010|
  36. | B| e|04-01-2014|01-01-2020|
  37. | D| a|01-01-2000|01-01-2001|
  38. +---+---+----------+----------+

and the result should look like this:

  1. +---+---+----------+----------+
  2. | k1| k2| start| end|
  3. +---+---+----------+----------+
  4. | A| a|03-05-2010|02-02-2019| #no change as it matches by key
  5. | B| a|01-01-2008|01-02-2008| #replaced with df2's start and end because the date in df1 for k1 was the minimal value 01-02-2011
  6. | B| b|01-11-2009|01-12-2009| #replaced with df2's start and end because the date in df1 for k1 was the minimal value 01-02-2011
  7. | B| c|02-01-2010|01-02-2010| #replaced with df2's start and end because the date in df1 for k1 was the minimal value 01-02-2011
  8. | B| d|03-01-2013|01-03-2015| #no change as its not the minimal value for k1
  9. | B| e|04-01-2014|01-01-2020| #no change as its not the minimal value for k1 plus its over the limit
  10. | C| a|01-01-2020|01-01-2020| #no change as no matching value found in df2
  11. +---+---+----------+----------+

thanks!!!

答案1

得分: 1

以下是代码的翻译部分:

  1. 也许这将是正确的
  2. pyspark.sql.types导入*
  3. 数据 = [('A''a''03-05-2010''02-02-2019')
  4. ('B''a''02-12-2010''01-02-2011')
  5. ('B''b''02-12-2010''01-02-2011')
  6. ('B''c''02-12-2010''01-02-2011')
  7. ('B''d''03-01-2013''01-03-2015')
  8. ('B''e''04-01-2014''01-01-2020')
  9. ('C''a''01-01-2020''01-01-2020')
  10. ]
  11. 模式 = StructType([ \
  12. StructField("k1"StringType()True)
  13. StructField("k2"StringType()True)
  14. StructField("start"StringType()True)
  15. StructField("end"StringType()True)
  16. ])
  17. df1 = spark.createDataFrame(data=dataschema=schema).alias("df1")
  18. df1.show()
  19. data2 = [('A''a''03-05-2010''02-02-2019')
  20. ('B''a''01-01-2008''01-02-2008')
  21. ('B''b''01-11-2009''01-12-2009')
  22. ('B''c''02-01-2010''01-02-2010')
  23. ('B''e''04-01-2014''01-01-2020')
  24. ('D''a''01-01-2000''01-01-2001')
  25. ]
  26. df2 = spark.createDataFrame(data=data2schema=schema).alias("df2")
  27. df2.show()
  28. Join
  29. import pyspark.sql.functions as F
  30. df1.join(df2,(df1.k1 == df2.k1&df1.k2 == df2.k2),
  31. how='left') \
  32. .select(df1.k1
  33. df1.k2
  34. F.expr("Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.start,'MM-dd-yyyy') AS TIMESTAMP)) >
  35. TO_DATE(CAST(UNIX_TIMESTAMP(df1.start'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.start ELSE df2.start END
  36. df1.start)").alias("start"),
  37. F.expr("Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.end,'MM-dd-yyyy') AS TIMESTAMP)) >
  38. TO_DATE(CAST(UNIX_TIMESTAMP(df1.end'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.end ELSE df2.end END
  39. df1.end)").alias("end")).sort(df1.k1.asc(),df1.k2.asc() ).show()
  40. 结果
  41. +---+---+----------+----------+
  42. | k1| k2| start| end|
  43. +---+---+----------+----------+
  44. | A| a|03-05-2010|02-02-2019|
  45. | B| a|01-01-2008|01-02-2008|
  46. | B| b|01-11-2009|01-12-2009|
  47. | B| c|02-01-2010|01-02-2010|
  48. | B| d|03-01-2013|01-03-2015|
  49. | B| e|04-01-2014|01-01-2020|
  50. | C| a|01-01-2020|01-01-2020|
  51. +---+---+----------+----------+

希望这对你有所帮助。

英文:

maybe this will be correct:

  1. from pyspark.sql.types import *
  2. data = [('A','a','03-05-2010','02-02-2019'),
  3. ('B','a','02-12-2010','01-02-2011'),
  4. ('B','b','02-12-2010','01-02-2011'),
  5. ('B','c','02-12-2010','01-02-2011'),
  6. ('B','d','03-01-2013','01-03-2015'),
  7. ('B','e','04-01-2014','01-01-2020'),
  8. ('C','a','01-01-2020','01-01-2020')
  9. ]
  10. schema = StructType([ \
  11. StructField("k1",StringType(),True), \
  12. StructField("k2",StringType(),True), \
  13. StructField("start",StringType(),True), \
  14. StructField("end",StringType(),True), \
  15. ])
  16. df1 = spark.createDataFrame(data=data,schema=schema).alias("df1")
  17. df1.show()
  18. data2 = [('A','a','03-05-2010','02-02-2019'),
  19. ('B','a','01-01-2008','01-02-2008'),
  20. ('B','b','01-11-2009','01-12-2009'),
  21. ('B','c','02-01-2010','01-02-2010'),
  22. ('B','e','04-01-2014','01-01-2020'),
  23. ('D','a','01-01-2000','01-01-2001')
  24. ]
  25. df2 = spark.createDataFrame(data=data2,schema=schema).alias("df2")
  26. df2.show()

Join:

  1. import pyspark.sql.functions as F
  2. df1.join(df2, (df1.k1 == df2.k1) & (df1.k2 == df2.k2), how='left') \
  3. .select(df1.k1,
  4. df1.k2,
  5. F.expr("""Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.start, 'MM-dd-yyyy') AS TIMESTAMP)) >
  6. TO_DATE(CAST(UNIX_TIMESTAMP(df1.start, 'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.start ELSE df2.start END,
  7. df1.start) """).alias("start"),
  8. F.expr("""Coalesce(CASE WHEN TO_DATE(CAST(UNIX_TIMESTAMP(df2.end, 'MM-dd-yyyy') AS TIMESTAMP)) >
  9. TO_DATE(CAST(UNIX_TIMESTAMP(df1.end, 'MM-dd-yyyy') AS TIMESTAMP)) THEN df1.end ELSE df2.end END,
  10. df1.end)""").alias("end")).sort(df1.k1.asc(),df1.k2.asc() ).show()

Result:

  1. +---+---+----------+----------+
  2. | k1| k2| start| end|
  3. +---+---+----------+----------+
  4. | A| a|03-05-2010|02-02-2019|
  5. | B| a|01-01-2008|01-02-2008|
  6. | B| b|01-11-2009|01-12-2009|
  7. | B| c|02-01-2010|01-02-2010|
  8. | B| d|03-01-2013|01-03-2015|
  9. | B| e|04-01-2014|01-01-2020|
  10. | C| a|01-01-2020|01-01-2020|
  11. +---+---+----------+----------+

huangapple
  • 本文由 发表于 2023年5月25日 09:04:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/76328265.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定