英文:
spark sql left join with comparison in subquery
问题
更新于2022年2月20日的问题:
我有以下两个数据框:
df_a:
id | date | code |
---|---|---|
1 | 2021-06-27 | A |
1 | 2021-12-27 | A |
2 | 2021-12-27 | A |
3 | 2022-03-21 | A |
3 | 2022-08-01 | A |
df_b:
id | date | code |
---|---|---|
1 | 2021-05-19 | A |
1 | 2021-05-31 | B |
1 | 2021-08-27 | C |
3 | 2021-11-06 | X |
3 | 2022-02-15 | Y |
3 | 2022-12-30 | Z |
期望的结果:
id | date | code |
---|---|---|
1 | 2021-06-27 | B |
1 | 2021-12-27 | C |
2 | 2021-12-27 | A |
3 | 2022-03-21 | Y |
3 | 2022-08-01 | Y |
我想使用df_b的code来更新df_a的code,条件如下:
使用来自df_b的行,其中b.date是早于df_a.date的最新日期。
我尝试过:
select a.id, b.code
from df_a left outer join df_b
on a.id = b.id
and b.date = (select max(b.date) from df_b where id = a.id and date <= a.date)
但我得到了“相关的标量子查询只能用于过滤/聚合/投影和一些命令”错误。
英文:
question updated on Feb 20th:
I have the following 2 dataframes:
df_a:
id | date | code |
---|---|---|
1 | 2021-06-27 | A |
1 | 2021-12-27 | A |
2 | 2021-12-27 | A |
3 | 2022-03-21 | A |
3 | 2022-08-01 | A |
df_b:
id | date | code |
---|---|---|
1 | 2021-05-19 | A |
1 | 2021-05-31 | B |
1 | 2021-08-27 | C |
3 | 2021-11-06 | X |
3 | 2022-02-15 | Y |
3 | 2022-12-30 | Z |
expected result:
id | date | code |
---|---|---|
1 | 2021-06-27 | B |
1 | 2021-12-27 | C |
2 | 2021-12-27 | A |
3 | 2022-03-21 | Y |
3 | 2022-08-01 | Y |
I want to use df_b.code to update df_a.code by the following condition:
use the row from df_b where b.date is latest prior to the df_a.date.
I tried:
select a.id, b.code
from df_a left outer join df_b
on a.id = b.id
and b.date = (select max(b.date) from df_b where id = a.id and date <= a.date)
but I'm getting 'Correlated scalar sub-queries can only be used in a Filter/Aggregate/Project and a few commands' error
答案1
得分: 0
你可以使用窗口函数:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
win = Window.partitionBy(df_a.id).orderBy(df_b.date.desc())
(
df_a
.join(df_b,['id'])
.filter(df_a.date > df_b.date)
.withColumn("r", F.row_number().over(win))
.filter(F.col("r")==1)
.select(df_a.id, df_a.date, df_b.code)
).show()
输出:
| id| date|code|
+---+----------+----+
| 1|2021-06-27| B|
+---+----------+----+
英文:
You can use a window function:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
win = Window.partitionBy(df_a.id).orderBy(df_b.date.desc())
(
df_a
.join(df_b,['id'])
.filter(df_a.date > df_b.date)
.withColumn("r", F.row_number().over(win))
.filter(F.col("r")==1)
.select(df_a.id, df_a.date, df_b.code)
).show()
Output:
| id| date|code|
+---+----------+----+
| 1|2021-06-27| B|
+---+----------+----+
答案2
得分: 0
以下是代码部分的翻译:
Another approach is, get the lead date for the df1 first and join with between.
data1 = [[1, '2021-06-27', 'A']]
data2 = [[1, '2021-05-19', 'A'], [1, '2021-05-31', 'B'], [1, '2021-08-27', 'C']]
cols = ['id', 'date', 'code']
df1 = spark.createDataFrame(data1, cols).withColumn('date', f.col('date').cast('date'))
df2 = spark.createDataFrame(data2, cols).withColumn('date', f.col('date').cast('date'))
w = Window.partitionBy('id').orderBy('date')
df3 = df2.withColumn('date_after', f.lead('date', 1, '2999-12-31').over(w))
df3.show()
df1.alias('a') \
.join(df3.alias('b'), (f.col('a.id') == f.col('b.id')) & (f.col('a.date').between(f.col('b.date'), f.col('b.date_after'))), 'left') \
.withColumn('new_code', f.coalesce('b.code', 'a.code')) \
.select('a.id', 'a.date', 'new_code').toDF('id', 'date', 'code') \
.show()
请注意,这是代码的翻译,不包括注释或其他额外信息。
英文:
Another approach is, get the lead date for the df1 first and join with between.
data1 = [[1, '2021-06-27', 'A']]
data2 = [[1, '2021-05-19', 'A'], [1, '2021-05-31', 'B'], [1, '2021-08-27', 'C']]
cols = ['id', 'date', 'code']
df1 = spark.createDataFrame(data1, cols).withColumn('date', f.col('date').cast('date'))
df2 = spark.createDataFrame(data2, cols).withColumn('date', f.col('date').cast('date'))
w = Window.partitionBy('id').orderBy('date')
df3 = df2.withColumn('date_after', f.lead('date', 1, '2999-12-31').over(w))
df3.show()
df1.alias('a') \
.join(df3.alias('b'), (f.col('a.id') == f.col('b.id')) & (f.col('a.date').between(f.col('b.date'), f.col('b.date_after'))), 'left') \
.withColumn('new_code', f.coalesce('b.code', 'a.code')) \
.select('a.id', 'a.date', 'new_code').toDF('id', 'date', 'code') \
.show()
+---+----------+----+----------+
| id| date|code|date_after|
+---+----------+----+----------+
| 1|2021-05-19| A|2021-05-31|
| 1|2021-05-31| B|2021-08-27|
| 1|2021-08-27| C|2999-12-31|
+---+----------+----+----------+
+---+----------+----+
| id| date|code|
+---+----------+----+
| 1|2021-06-27| B|
+---+----------+----+
答案3
得分: 0
我在这里找到了确切的问题和正确的答案:
https://stackoverflow.com/questions/75294311/spark-sql-correlated-scalar-sub-queries-can-only-be-used-in-a-filter-aggregate-p
对于我的情况,这有效。感谢。
英文:
I found the exact question and with the correct answer here:
https://stackoverflow.com/questions/75294311/spark-sql-correlated-scalar-sub-queries-can-only-be-used-in-a-filter-aggregate-p
It worked for my case.
thanks
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论