In spark dataframe add columns to from one df to another without creating combination of matching rows

huangapple go评论106阅读模式
英文:

In spark dataframe add columns to from one df to another without creating combination of matching rows

问题

我的数据框如下所示:

df1 = spark.createDataFrame([(1, "a"), (1, "b"), (1, "c")], ["col1", "col2"])

+----+----+
|col1|col2|
+----+----+
|   1|   a|
|   1|   b|
|   1|   c|
+----+----+

df2 = spark.createDataFrame([(1, "k1"), (1, "k2"), (1, "k3"), (1, "k4")], ["col1", "col3"])

+----+----+
|col1|col3|
+----+----+
|   1|  k1|
|   1|  k2|
|   1|  k3|
|   1|  k4|
+----+----+

我想生成:

df3 = spark.createDataFrame([(1, "a", "k1"), (1, "b", "k2"), (1, "c", "k3"), (1, None, "k4")], ["col1", "col2", "col3"])

期望的输出如下

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|  k1|
|   1|   b|  k2|
|   1|   c|  k3|
|   1|null|  k4|
+----+----+----+

我尝试了 df1.join(df2, on='col1', how='leftouter') 并得到了以下结果:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|  k4|
|   1|   a|  k3|
|   1|   a|  k2|
|   1|   a|  k1|
|   1|   b|  k4|
|   1|   b|  k3|
|   1|   b|  k2|
|   1|   b|  k1|
|   1|   c|  k4|
|   1|   c|  k3|
|   1|   c|  k2|
|   1|   c|  k1|
+----+----+----+

我查看了 https://stackoverflow.com/questions/41049490/merge-rows-from-one-dataframe-that-do-not-match-specific-columns-in-another-data。这几乎是我想要的。但是,它使用了 pandas 数据框。我不确定是否在执行此操作时从 Spark 数据框切换到 pandas 数据框是否是一个好主意。是否有一种本机的 PySpark 方法可以实现这个目标?

需要帮助进行转换以获得期望的输出。

英文:

My data frames are like the following:

df1 = spark.createDataFrame([(1, "a"), (1, "b"), (1, "c")], ("col1", "col2"))

        +----+----+
        |col1|col2|
        +----+----+
        |   1|   a|
        |   1|   b|
        |   1|   c|
        +----+----+
        
df2 = spark.createDataFrame([(1, "k1"), (1, "k2"), (1, "k3"),(1,"k4")], ("col1", "col3"))

        +----+----+
        |col1|col3|
        +----+----+
        |   1|  k1|
        |   1|  k2|
        |   1|  k3|
        |   1|  k4|
        +----+----+

I want to generate

df3 = spark.createDataFrame([(1, "a", "k1"), (1, "b", "k2"), (1, "c", "k3"),(1, None, "k4")], ("col1", "col2", "col3"))

i.e., desired output:

    +----+----+----+
    |col1|col2|col3|
    +----+----+----+
    |   1|   a|  k1|
    |   1|   b|  k2|
    |   1|   c|  k3|
    |   1|null|  k4|
    +----+----+----+

I tried df1.join(df2, on='col1', how="leftouter") and got:

    +----+----+----+
    |col1|col2|col3|
    +----+----+----+
    |   1|   a|  k4|
    |   1|   a|  k3|
    |   1|   a|  k2|
    |   1|   a|  k1|
    |   1|   b|  k4|
    |   1|   b|  k3|
    |   1|   b|  k2|
    |   1|   b|  k1|
    |   1|   c|  k4|
    |   1|   c|  k3|
    |   1|   c|  k2|
    |   1|   c|  k1|
    +----+----+----+

I looked into https://stackoverflow.com/questions/41049490/merge-rows-from-one-dataframe-that-do-not-match-specific-columns-in-another-data . This very nearly what I want. But, It is using pandas df. Im not sure if switching form spark df to pandas df to do just this operation is a good idea. Is there a native pysprak way of doing this?

Need help with the transformation that gives the desired output.

答案1

得分: 0

你可以使用 window 来创建一个新的 rank 列,表示你的两个数据帧中行的顺序,然后根据第一列和这个 rank 列连接你的两个数据帧,最后删除 rank 列,如下所示:

from pyspark.sql import Window
from pyspark.sql import functions as F

window1 = Window.partitionBy("col1").orderBy("col2")
window2 = Window.partitionBy("col1").orderBy("col3")

ranked_df1 = df1.withColumn("rank", F.row_number().over(window1))
ranked_df2 = df2.withColumn("rank", F.row_number().over(window2))

result_df = ranked_df1.join(
    ranked_df2, 
    on=['col1', 'rank'], 
    how='full_outer'
).drop('rank')

使用与你问题中定义的 df1df2 数据帧,你会得到以下的 result_df 数据帧:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |a   |k1  |
|1   |b   |k2  |
|1   |c   |k3  |
|1   |null|k4  |
+----+----+----+
英文:

You can create a new rank column representing order of your rows in your two dataframes using window, join your two dataframes on your first column and this rank column and finally drop rank column, as follows:

from pyspark.sql import Window
from pyspark.sql import functions as F

window1 = Window.partitionBy("col1").orderBy("col2")
window2 = Window.partitionBy("col1").orderBy("col3")

ranked_df1 = df1.withColumn("rank", F.row_number().over(window1))
ranked_df2 = df2.withColumn("rank", F.row_number().over(window2))

result_df = ranked_df1.join(
    ranked_df2, 
    on=['col1', 'rank'], 
    how='full_outer'
).drop('rank')

With df1 and df2 dataframes defined as in your question, you get the following result_df dataframe:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |a   |k1  |
|1   |b   |k2  |
|1   |c   |k3  |
|1   |null|k4  |
+----+----+----+

huangapple
  • 本文由 发表于 2023年6月5日 18:19:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76405456.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定