Spark SQL左连接与子查询中的比较

huangapple go评论55阅读模式
英文:

spark sql left join with comparison in subquery

问题

更新于2022年2月20日的问题:

我有以下两个数据框:

df_a:

id date code
1 2021-06-27 A
1 2021-12-27 A
2 2021-12-27 A
3 2022-03-21 A
3 2022-08-01 A

df_b:

id date code
1 2021-05-19 A
1 2021-05-31 B
1 2021-08-27 C
3 2021-11-06 X
3 2022-02-15 Y
3 2022-12-30 Z

期望的结果:

id date code
1 2021-06-27 B
1 2021-12-27 C
2 2021-12-27 A
3 2022-03-21 Y
3 2022-08-01 Y

我想使用df_b的code来更新df_a的code,条件如下:

使用来自df_b的行,其中b.date是早于df_a.date的最新日期。

我尝试过:

select a.id, b.code
from df_a left outer join df_b
on a.id = b.id
and b.date = (select max(b.date) from df_b where id = a.id and date <= a.date)

但我得到了“相关的标量子查询只能用于过滤/聚合/投影和一些命令”错误。

英文:

question updated on Feb 20th:

I have the following 2 dataframes:

df_a:

id date code
1 2021-06-27 A
1 2021-12-27 A
2 2021-12-27 A
3 2022-03-21 A
3 2022-08-01 A

df_b:

id date code
1 2021-05-19 A
1 2021-05-31 B
1 2021-08-27 C
3 2021-11-06 X
3 2022-02-15 Y
3 2022-12-30 Z

expected result:

id date code
1 2021-06-27 B
1 2021-12-27 C
2 2021-12-27 A
3 2022-03-21 Y
3 2022-08-01 Y

I want to use df_b.code to update df_a.code by the following condition:

use the row from df_b where b.date is latest prior to the df_a.date.

I tried:

select a.id, b.code
from df_a left outer join df_b
on a.id = b.id
and b.date = (select max(b.date) from df_b where id = a.id and date <= a.date)

but I'm getting 'Correlated scalar sub-queries can only be used in a Filter/Aggregate/Project and a few commands' error

答案1

得分: 0

你可以使用窗口函数:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

win = Window.partitionBy(df_a.id).orderBy(df_b.date.desc())
(
    df_a
    .join(df_b,['id'])
    .filter(df_a.date > df_b.date)
    .withColumn("r", F.row_number().over(win))
    .filter(F.col("r")==1)
    .select(df_a.id, df_a.date, df_b.code)
).show()

输出:

| id|      date|code|
+---+----------+----+
|  1|2021-06-27|   B|
+---+----------+----+
英文:

You can use a window function:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

win = Window.partitionBy(df_a.id).orderBy(df_b.date.desc())
(
    df_a
    .join(df_b,['id'])
    .filter(df_a.date > df_b.date)
    .withColumn("r", F.row_number().over(win))
    .filter(F.col("r")==1)
    .select(df_a.id, df_a.date, df_b.code)
).show()

Output:

| id|      date|code|
+---+----------+----+
|  1|2021-06-27|   B|
+---+----------+----+

答案2

得分: 0

以下是代码部分的翻译:

Another approach is, get the lead date for the df1 first and join with between.

data1 = [[1, '2021-06-27', 'A']]
data2 = [[1, '2021-05-19', 'A'], [1, '2021-05-31', 'B'], [1, '2021-08-27', 'C']]
cols = ['id', 'date', 'code']

df1 = spark.createDataFrame(data1, cols).withColumn('date', f.col('date').cast('date'))
df2 = spark.createDataFrame(data2, cols).withColumn('date', f.col('date').cast('date'))

w = Window.partitionBy('id').orderBy('date')

df3 = df2.withColumn('date_after', f.lead('date', 1, '2999-12-31').over(w))
df3.show()

df1.alias('a') \
  .join(df3.alias('b'), (f.col('a.id') == f.col('b.id')) & (f.col('a.date').between(f.col('b.date'), f.col('b.date_after'))), 'left') \
  .withColumn('new_code', f.coalesce('b.code', 'a.code')) \
  .select('a.id', 'a.date', 'new_code').toDF('id', 'date', 'code') \
  .show()

请注意,这是代码的翻译,不包括注释或其他额外信息。

英文:

Another approach is, get the lead date for the df1 first and join with between.


data1 = [[1, '2021-06-27', 'A']]
data2 = [[1, '2021-05-19', 'A'], [1, '2021-05-31', 'B'], [1, '2021-08-27', 'C']]
cols = ['id', 'date', 'code']

df1 = spark.createDataFrame(data1, cols).withColumn('date', f.col('date').cast('date'))
df2 = spark.createDataFrame(data2, cols).withColumn('date', f.col('date').cast('date'))

w = Window.partitionBy('id').orderBy('date')

df3 = df2.withColumn('date_after', f.lead('date', 1, '2999-12-31').over(w))
df3.show()

df1.alias('a') \
  .join(df3.alias('b'), (f.col('a.id') == f.col('b.id')) & (f.col('a.date').between(f.col('b.date'), f.col('b.date_after'))), 'left') \
  .withColumn('new_code', f.coalesce('b.code', 'a.code')) \
  .select('a.id', 'a.date', 'new_code').toDF('id', 'date', 'code') \
  .show()

+---+----------+----+----------+
| id|      date|code|date_after|
+---+----------+----+----------+
|  1|2021-05-19|   A|2021-05-31|
|  1|2021-05-31|   B|2021-08-27|
|  1|2021-08-27|   C|2999-12-31|
+---+----------+----+----------+

+---+----------+----+
| id|      date|code|
+---+----------+----+
|  1|2021-06-27|   B|
+---+----------+----+

答案3

得分: 0

我在这里找到了确切的问题和正确的答案:
https://stackoverflow.com/questions/75294311/spark-sql-correlated-scalar-sub-queries-can-only-be-used-in-a-filter-aggregate-p

对于我的情况,这有效。感谢。

英文:

I found the exact question and with the correct answer here:
https://stackoverflow.com/questions/75294311/spark-sql-correlated-scalar-sub-queries-can-only-be-used-in-a-filter-aggregate-p

It worked for my case.
thanks

huangapple
  • 本文由 发表于 2023年2月18日 01:38:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/75487567.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定