英文:
Combine two pyspark dataframes (having different rows ) such that other dataframe gets added as new columns.(Null values in columns for extra rows)
问题
我有一个用例,需要将两个PySpark数据帧组合在一起,使另一个数据帧作为新列添加到原始数据帧中。重要的是要理解它们可以具有不同的行。
例如,如果我有以下3个版本的数据:
df1模式:Name|ID|past_features_height|past_features_width|past_features_version
数据:
A|123|34|42|version1
A|123|30|45|version2
A|123|33|47|version3
df2模式:Name|ID|future_features_height|future_features_width|future_features_version
数据:
A|123|32|45|version1
A|123|35|42|version2
A|123|37|43|version3
A|123|39|46|version4
A|123|40|32|version5
结果数据帧模式:
Name|ID|past_features_height|past_features_width|past_features_version|future_features_width|future_features_version
A|123|34|42|version1|32|45|version1
A|123|30|45|version2|35|42|version2
A|123|33|47|version3|37|43|version3
A|123|Null|Null|Null|39|46|version4
A|123|Null|Null|Null|40|32|version5
英文:
I have a usecase where i need to combine two pyspark dataframes such that the other dataframe gets added to original one as new columns. Important is to understand that they can have different rows.
For example if i have below data 3 versions:
df1 schema: Name|ID|past_features_height|past_features_width|past_features_version
Data:
A|123|34|42|version1
A|123|30|45|version2
A|123|33|47|version3
df2 schema: Name|ID|future_features_height|future_features_width|future_features_version
Data:
A|123|32|45|version1
A|123|35|42|version2
A|123|37|43|version3
A|123|39|46|version4
A|123|40|32|version5
Result dataframe schema:
Name|ID|past_features_height|past_features_width|past_features_version|future_features_width|future_features_version
A|123|34|42|version1|32|45|version1
A|123|30|45|version2|35|42|version2
A|123|33|47|version3|37|43|version3
A|123|Null|Null|Null|39|46|version4
A|123|Null|Null|Null|40|32|version5
答案1
得分: 1
以下是翻译好的部分:
如果你可以直接按版本加入,则可以使用:
data_old = [("A", 123, 34, 42, "version1"),
("A", 123, 30, 45, "version2"),
("A", 123, 33, 47, "version3")]
df_old = spark.createDataFrame(data_old, ["Name", "ID", "past_features_height", "past_features_width", "features_version"])
data_new = [("A", 123, 32, 45, "version1"),
("A", 123, 35, 42, "version2"),
("A", 123, 37, 43, "version3"),
("A", 123, 39, 46, "version4"),
("A", 123, 40, 32, "version5")]
df_new = spark.createDataFrame(data_new, ["Name", "ID", "future_features_height", "future_features_width", "features_version"])
df_old.join(df_new, ["Name", "ID", "features_version"], "full_outer").show()
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+
|Name| ID|features_version|past_features_height|past_features_width|future_features_height|future_features_width|
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+
| A|123| version1| 34| 42| 32| 45|
| A|123| version2| 30| 45| 35| 42|
| A|123| version3| 33| 47| 37| 43|
| A|123| version4| null| null| 39| 46|
| A|123| version5| null| null| 40| 32|
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+
但如果不能直接加入,那么你需要添加另一个标识符来对两个数据框中的版本进行排序,这种情况下,你可以使用如下的 row_number
和 Window
函数:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
data_old = [("A", 123, 34, 42, "version1"),
("A", 123, 30, 45, "version2"),
("A", 123, 33, 47, "version3")]
partition_old = Window.partitionBy("Name", "ID").orderBy("past_features_version")
df_old = spark.createDataFrame(data_old, ["Name", "ID", "past_features_height", "past_features_width", "past_features_version"])\
.withColumn("row_number", row_number().over(partition_old))
data_new = [("A", 123, 32, 45, "version1"),
("A", 123, 35, 42, "version2"),
("A", 123, 37, 43, "version3"),
("A", 123, 39, 46, "version4"),
("A", 123, 40, 32, "version5")]
partition_new = Window.partitionBy("Name", "ID").orderBy("future_features_version")
df_new = spark.createDataFrame(data_new, ["Name", "ID", "future_features_height", "future_features_width", "future_features_version"])\
.withColumn("row_number", row_number().over(partition_new))
df_old.join(df_new, ["Name", "ID", "row_number"], "full_outer").show()
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
|Name| ID|row_number|past_features_height|past_features_width|past_features_version|future_features_height|future_features_width|future_features_version|
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
| A|123| 1| 34| 42| version1| 32| 45| version1|
| A|123| 2| 30| 45| version2| 35| 42| version2|
| A|123| 3| 33| 47| version3| 37| 43| version3|
| A|123| 4| null| null| null| 39| 46| version4|
| A|123| 5| null| null| null| 40| 32| version5|
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
英文:
In case you can join by version directly then you can use:
data_old = [("A", 123, 34, 42, "version1"),
("A", 123, 30, 45, "version2"),
("A", 123, 33, 47, "version3")]
df_old = spark.createDataFrame(data_old, ["Name", "ID", "past_features_height", "past_features_width", "features_version"])\
data_new = [("A", 123, 32, 45, "version1"),
("A", 123, 35, 42, "version2"),
("A", 123, 37, 43, "version3"),
("A", 123, 39, 46, "version4"),
("A", 123, 40, 32, "version5")]
df_new = spark.createDataFrame(data_new, ["Name", "ID", "future_features_height", "future_features_width", "features_version"])\
df_old.join(df_new, ["Name", "ID", "features_version"], "full_outer").show()
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+
|Name| ID|features_version|past_features_height|past_features_width|future_features_height|future_features_width|
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+
| A|123| version1| 34| 42| 32| 45|
| A|123| version2| 30| 45| 35| 42|
| A|123| version3| 33| 47| 37| 43|
| A|123| version4| null| null| 39| 46|
| A|123| version5| null| null| 40| 32|
+----+---+----------------+--------------------+-------------------+----------------------+---------------------+
But if you can't, then you need to add another identifier to order the versions between the two df, so in this case you can use the row_number
over a Window
function like this:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
data_old = [("A", 123, 34, 42, "version1"),
("A", 123, 30, 45, "version2"),
("A", 123, 33, 47, "version3")]
partition_old = Window.partitionBy("Name", "ID").orderBy("past_features_version")
df_old = spark.createDataFrame(data_old, ["Name", "ID", "past_features_height", "past_features_width", "past_features_version"])\
.withColumn("row_number", row_number().over(partition_old))
data_new = [("A", 123, 32, 45, "version1"),
("A", 123, 35, 42, "version2"),
("A", 123, 37, 43, "version3"),
("A", 123, 39, 46, "version4"),
("A", 123, 40, 32, "version5")]
partition_new = Window.partitionBy("Name", "ID").orderBy("future_features_version")
df_new = spark.createDataFrame(data_new, ["Name", "ID", "future_features_height", "future_features_width", "future_features_version"])\
.withColumn("row_number", row_number().over(partition_new))
df_old.join(df_new, ["Name", "ID", "row_number"], "full_outer").show()
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
|Name| ID|row_number|past_features_height|past_features_width|past_features_version|future_features_height|future_features_width|future_features_version|
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
| A|123| 1| 34| 42| version1| 32| 45| version1|
| A|123| 2| 30| 45| version2| 35| 42| version2|
| A|123| 3| 33| 47| version3| 37| 43| version3|
| A|123| 4| null| null| null| 39| 46| version4|
| A|123| 5| null| null| null| 40| 32| version5|
+----+---+----------+--------------------+-------------------+---------------------+----------------------+---------------------+-----------------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论