Combine two pyspark dataframes (having different rows ) such that other dataframe gets added as new columns.(Null values in columns for extra rows)

huangapple go评论91阅读模式
英文:

Combine two pyspark dataframes (having different rows ) such that other dataframe gets added as new columns.(Null values in columns for extra rows)

问题

我有一个用例,需要将两个PySpark数据帧组合在一起,使另一个数据帧作为新列添加到原始数据帧中。重要的是要理解它们可以具有不同的行。

例如,如果我有以下3个版本的数据:

df1模式:Name|ID|past_features_height|past_features_width|past_features_version

数据:

    A|123|34|42|version1
    A|123|30|45|version2
    A|123|33|47|version3

df2模式:Name|ID|future_features_height|future_features_width|future_features_version

数据:

    A|123|32|45|version1
    A|123|35|42|version2
    A|123|37|43|version3
    A|123|39|46|version4
    A|123|40|32|version5

结果数据帧模式:
Name|ID|past_features_height|past_features_width|past_features_version|future_features_width|future_features_version

    A|123|34|42|version1|32|45|version1
    A|123|30|45|version2|35|42|version2
    A|123|33|47|version3|37|43|version3
    A|123|Null|Null|Null|39|46|version4
    A|123|Null|Null|Null|40|32|version5
英文:

I have a usecase where i need to combine two pyspark dataframes such that the other dataframe gets added to original one as new columns. Important is to understand that they can have different rows.

For example if i have below data 3 versions:

df1 schema: Name|ID|past_features_height|past_features_width|past_features_version

Data:

    A|123|34|42|version1
    A|123|30|45|version2
    A|123|33|47|version3

df2 schema: Name|ID|future_features_height|future_features_width|future_features_version

Data:

    A|123|32|45|version1
    A|123|35|42|version2
    A|123|37|43|version3
    A|123|39|46|version4
    A|123|40|32|version5

Result dataframe schema:
Name|ID|past_features_height|past_features_width|past_features_version|future_features_width|future_features_version

    A|123|34|42|version1|32|45|version1
    A|123|30|45|version2|35|42|version2
    A|123|33|47|version3|37|43|version3
    A|123|Null|Null|Null|39|46|version4
    A|123|Null|Null|Null|40|32|version5

答案1

得分: 1

以下是翻译好的部分:

如果你可以直接按版本加入,则可以使用:

data_old = [("A", 123, 34, 42, "version1"),
        ("A", 123, 30, 45, "version2"),
        ("A", 123, 33, 47, "version3")]

df_old = spark.createDataFrame(data_old, ["Name", "ID", "past_features_height", "past_features_width", "features_version"])

data_new = [("A", 123, 32, 45, "version1"),
            ("A", 123, 35, 42, "version2"),
            ("A", 123, 37, 43, "version3"),
            ("A", 123, 39, 46, "version4"),
            ("A", 123, 40, 32, "version5")]
df_new = spark.createDataFrame(data_new, ["Name", "ID", "future_features_height", "future_features_width", "features_version"])

df_old.join(df_new, ["Name", "ID", "features_version"], "full_outer").show()
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+
|Name| ID|features_version|past_features_height|past_features_width|future_features_height|future_features_width|
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+
|   A|123|        version1|                  34|                 42|                    32|                   45|
|   A|123|        version2|                  30|                 45|                    35|                   42|
|   A|123|        version3|                  33|                 47|                    37|                   43|
|   A|123|        version4|                null|               null|                    39|                   46|
|   A|123|        version5|                null|               null|                    40|                   32|
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+

但如果不能直接加入,那么你需要添加另一个标识符来对两个数据框中的版本进行排序,这种情况下,你可以使用如下的 row_numberWindow 函数:

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

data_old = [("A", 123, 34, 42, "version1"),
        ("A", 123, 30, 45, "version2"),
        ("A", 123, 33, 47, "version3")]

partition_old = Window.partitionBy("Name", "ID").orderBy("past_features_version")
df_old = spark.createDataFrame(data_old, ["Name", "ID", "past_features_height", "past_features_width", "past_features_version"])\
            .withColumn("row_number", row_number().over(partition_old))

data_new = [("A", 123, 32, 45, "version1"),
            ("A", 123, 35, 42, "version2"),
            ("A", 123, 37, 43, "version3"),
            ("A", 123, 39, 46, "version4"),
            ("A", 123, 40, 32, "version5")]
partition_new = Window.partitionBy("Name", "ID").orderBy("future_features_version")
df_new = spark.createDataFrame(data_new, ["Name", "ID", "future_features_height", "future_features_width", "future_features_version"])\
            .withColumn("row_number", row_number().over(partition_new))

df_old.join(df_new, ["Name", "ID", "row_number"], "full_outer").show()
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
|Name| ID|row_number|past_features_height|past_features_width|past_features_version|future_features_height|future_features_width|future_features_version|
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
|   A|123|         1|                  34|                 42|             version1|                    32|                   45|               version1|
|   A|123|         2|                  30|                 45|             version2|                    35|                   42|               version2|
|   A|123|         3|                  33|                 47|             version3|                    37|                   43|               version3|
|   A|123|         4|                null|               null|                 null|                    39|                   46|               version4|
|   A|123|         5|                null|               null|                 null|                    40|                   32|               version5|
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
英文:

In case you can join by version directly then you can use:

data_old = [("A", 123, 34, 42, "version1"),
        ("A", 123, 30, 45, "version2"),
        ("A", 123, 33, 47, "version3")]

df_old = spark.createDataFrame(data_old, ["Name", "ID", "past_features_height", "past_features_width", "features_version"])\

data_new = [("A", 123, 32, 45, "version1"),
            ("A", 123, 35, 42, "version2"),
            ("A", 123, 37, 43, "version3"),
            ("A", 123, 39, 46, "version4"),
            ("A", 123, 40, 32, "version5")]
df_new = spark.createDataFrame(data_new, ["Name", "ID", "future_features_height", "future_features_width", "features_version"])\

df_old.join(df_new, ["Name", "ID", "features_version"], "full_outer").show()
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+
|Name| ID|features_version|past_features_height|past_features_width|future_features_height|future_features_width|
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+
|   A|123|        version1|                  34|                 42|                    32|                   45|
|   A|123|        version2|                  30|                 45|                    35|                   42|
|   A|123|        version3|                  33|                 47|                    37|                   43|
|   A|123|        version4|                null|               null|                    39|                   46|
|   A|123|        version5|                null|               null|                    40|                   32|
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+

But if you can't, then you need to add another identifier to order the versions between the two df, so in this case you can use the row_number over a Window function like this:

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

data_old = [("A", 123, 34, 42, "version1"),
        ("A", 123, 30, 45, "version2"),
        ("A", 123, 33, 47, "version3")]

partition_old = Window.partitionBy("Name", "ID").orderBy("past_features_version")
df_old = spark.createDataFrame(data_old, ["Name", "ID", "past_features_height", "past_features_width", "past_features_version"])\
            .withColumn("row_number", row_number().over(partition_old))

data_new = [("A", 123, 32, 45, "version1"),
            ("A", 123, 35, 42, "version2"),
            ("A", 123, 37, 43, "version3"),
            ("A", 123, 39, 46, "version4"),
            ("A", 123, 40, 32, "version5")]
partition_new = Window.partitionBy("Name", "ID").orderBy("future_features_version")
df_new = spark.createDataFrame(data_new, ["Name", "ID", "future_features_height", "future_features_width", "future_features_version"])\
            .withColumn("row_number", row_number().over(partition_new))

df_old.join(df_new, ["Name", "ID", "row_number"], "full_outer").show()
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
|Name| ID|row_number|past_features_height|past_features_width|past_features_version|future_features_height|future_features_width|future_features_version|
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
|   A|123|         1|                  34|                 42|             version1|                    32|                   45|               version1|
|   A|123|         2|                  30|                 45|             version2|                    35|                   42|               version2|
|   A|123|         3|                  33|                 47|             version3|                    37|                   43|               version3|
|   A|123|         4|                null|               null|                 null|                    39|                   46|               version4|
|   A|123|         5|                null|               null|                 null|                    40|                   32|               version5|
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+

huangapple
  • 本文由 发表于 2023年8月5日 15:20:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/76840543.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定