Spark SQL左连接与子查询中的比较

huangapple go评论79阅读模式
英文:

spark sql left join with comparison in subquery

问题

更新于2022年2月20日的问题:

我有以下两个数据框:

df_a:

id date code
1 2021-06-27 A
1 2021-12-27 A
2 2021-12-27 A
3 2022-03-21 A
3 2022-08-01 A

df_b:

id date code
1 2021-05-19 A
1 2021-05-31 B
1 2021-08-27 C
3 2021-11-06 X
3 2022-02-15 Y
3 2022-12-30 Z

期望的结果:

id date code
1 2021-06-27 B
1 2021-12-27 C
2 2021-12-27 A
3 2022-03-21 Y
3 2022-08-01 Y

我想使用df_b的code来更新df_a的code,条件如下:

使用来自df_b的行,其中b.date是早于df_a.date的最新日期。

我尝试过:

  1. select a.id, b.code
  2. from df_a left outer join df_b
  3. on a.id = b.id
  4. and b.date = (select max(b.date) from df_b where id = a.id and date <= a.date)

但我得到了“相关的标量子查询只能用于过滤/聚合/投影和一些命令”错误。

英文:

question updated on Feb 20th:

I have the following 2 dataframes:

df_a:

id date code
1 2021-06-27 A
1 2021-12-27 A
2 2021-12-27 A
3 2022-03-21 A
3 2022-08-01 A

df_b:

id date code
1 2021-05-19 A
1 2021-05-31 B
1 2021-08-27 C
3 2021-11-06 X
3 2022-02-15 Y
3 2022-12-30 Z

expected result:

id date code
1 2021-06-27 B
1 2021-12-27 C
2 2021-12-27 A
3 2022-03-21 Y
3 2022-08-01 Y

I want to use df_b.code to update df_a.code by the following condition:

use the row from df_b where b.date is latest prior to the df_a.date.

I tried:

  1. select a.id, b.code
  2. from df_a left outer join df_b
  3. on a.id = b.id
  4. and b.date = (select max(b.date) from df_b where id = a.id and date <= a.date)

but I'm getting 'Correlated scalar sub-queries can only be used in a Filter/Aggregate/Project and a few commands' error

答案1

得分: 0

你可以使用窗口函数:

  1. from pyspark.sql import functions as F
  2. from pyspark.sql.window import Window
  3. win = Window.partitionBy(df_a.id).orderBy(df_b.date.desc())
  4. (
  5. df_a
  6. .join(df_b,['id'])
  7. .filter(df_a.date > df_b.date)
  8. .withColumn("r", F.row_number().over(win))
  9. .filter(F.col("r")==1)
  10. .select(df_a.id, df_a.date, df_b.code)
  11. ).show()

输出:

  1. | id| date|code|
  2. +---+----------+----+
  3. | 1|2021-06-27| B|
  4. +---+----------+----+
英文:

You can use a window function:

  1. from pyspark.sql import functions as F
  2. from pyspark.sql.window import Window
  3. win = Window.partitionBy(df_a.id).orderBy(df_b.date.desc())
  4. (
  5. df_a
  6. .join(df_b,['id'])
  7. .filter(df_a.date > df_b.date)
  8. .withColumn("r", F.row_number().over(win))
  9. .filter(F.col("r")==1)
  10. .select(df_a.id, df_a.date, df_b.code)
  11. ).show()

Output:

  1. | id| date|code|
  2. +---+----------+----+
  3. | 1|2021-06-27| B|
  4. +---+----------+----+

答案2

得分: 0

以下是代码部分的翻译:

  1. Another approach is, get the lead date for the df1 first and join with between.
  2. data1 = [[1, '2021-06-27', 'A']]
  3. data2 = [[1, '2021-05-19', 'A'], [1, '2021-05-31', 'B'], [1, '2021-08-27', 'C']]
  4. cols = ['id', 'date', 'code']
  5. df1 = spark.createDataFrame(data1, cols).withColumn('date', f.col('date').cast('date'))
  6. df2 = spark.createDataFrame(data2, cols).withColumn('date', f.col('date').cast('date'))
  7. w = Window.partitionBy('id').orderBy('date')
  8. df3 = df2.withColumn('date_after', f.lead('date', 1, '2999-12-31').over(w))
  9. df3.show()
  10. df1.alias('a') \
  11. .join(df3.alias('b'), (f.col('a.id') == f.col('b.id')) & (f.col('a.date').between(f.col('b.date'), f.col('b.date_after'))), 'left') \
  12. .withColumn('new_code', f.coalesce('b.code', 'a.code')) \
  13. .select('a.id', 'a.date', 'new_code').toDF('id', 'date', 'code') \
  14. .show()

请注意,这是代码的翻译,不包括注释或其他额外信息。

英文:

Another approach is, get the lead date for the df1 first and join with between.

  1. data1 = [[1, '2021-06-27', 'A']]
  2. data2 = [[1, '2021-05-19', 'A'], [1, '2021-05-31', 'B'], [1, '2021-08-27', 'C']]
  3. cols = ['id', 'date', 'code']
  4. df1 = spark.createDataFrame(data1, cols).withColumn('date', f.col('date').cast('date'))
  5. df2 = spark.createDataFrame(data2, cols).withColumn('date', f.col('date').cast('date'))
  6. w = Window.partitionBy('id').orderBy('date')
  7. df3 = df2.withColumn('date_after', f.lead('date', 1, '2999-12-31').over(w))
  8. df3.show()
  9. df1.alias('a') \
  10. .join(df3.alias('b'), (f.col('a.id') == f.col('b.id')) & (f.col('a.date').between(f.col('b.date'), f.col('b.date_after'))), 'left') \
  11. .withColumn('new_code', f.coalesce('b.code', 'a.code')) \
  12. .select('a.id', 'a.date', 'new_code').toDF('id', 'date', 'code') \
  13. .show()
  14. +---+----------+----+----------+
  15. | id| date|code|date_after|
  16. +---+----------+----+----------+
  17. | 1|2021-05-19| A|2021-05-31|
  18. | 1|2021-05-31| B|2021-08-27|
  19. | 1|2021-08-27| C|2999-12-31|
  20. +---+----------+----+----------+
  21. +---+----------+----+
  22. | id| date|code|
  23. +---+----------+----+
  24. | 1|2021-06-27| B|
  25. +---+----------+----+

答案3

得分: 0

我在这里找到了确切的问题和正确的答案:
https://stackoverflow.com/questions/75294311/spark-sql-correlated-scalar-sub-queries-can-only-be-used-in-a-filter-aggregate-p

对于我的情况,这有效。感谢。

英文:

I found the exact question and with the correct answer here:
https://stackoverflow.com/questions/75294311/spark-sql-correlated-scalar-sub-queries-can-only-be-used-in-a-filter-aggregate-p

It worked for my case.
thanks

huangapple
  • 本文由 发表于 2023年2月18日 01:38:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/75487567.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定