英文:
Select mismatched columns and values from two exactly same spark python dataframes
问题
我想选择两个来自不同数据源的完全相同的数据框中不匹配的列和它们的值。
我现在有的内容:
col_1 | key_col | col_2 | col_3 | col_4 |
---|---|---|---|---|
a | key1 | b | c | d |
w | key2 | x | y | z |
col_1 | key_col | col_2 | col_3 | col_4 |
---|---|---|---|---|
a | key1 | b | p | q |
w | key2 | x | y | z |
我有两个来自不同数据源的具有相同模式的数据框。
我想要的是:
使用"key_col"作为连接键,对这两个数据框进行内连接,并以以下格式输出:
对于连接后获得的表的每一行,返回以下行:
key_col | 不匹配的列名称 | 第一个数据框中的不匹配的值 | 第二个数据框中的不匹配的值 |
---|---|---|---|
key1 | [col_3, col_4] | [c,d] | [p,q] |
我正在寻找在pyspark中执行此操作的查询。
英文:
I want to select the mismatched columns and their values from two exactly same dataframes from different sources.
What I have now:
col_1 | key_col | col_2 | col_3 | col_4 |
---|---|---|---|---|
a | key1 | b | c | d |
w | key2 | x | y | z |
col_1 | key_col | col_2 | col_3 | col_4 |
---|---|---|---|---|
a | key1 | b | p | q |
w | key2 | x | y | z |
I have 2 dataframes with same schema from different data sources.
What I want:
Join (inner join) the 2 dataframes using the "key_col" as the join key and give the output in the following format:
For each row of the table obtained after the join, return the following row:
key_col | mismatched_column_names | mismatched_values_in_first_df | mismatched_values_in_second_df |
---|---|---|---|
key 1 | [col3, col4] | [c,d] | [p,q] |
I am looking for the query to do so in pyspark.
答案1
得分: 1
这部分是代码,不需要翻译。
以下是输出的部分:
输出:
英文:
This would work:
df1.alias("df1").join(df2.alias("df2"), F.col("df1.key_col")==F.col("df2.key_col"))\
.select("df1.key_col", *[F.when(F.col("df1."+col) != F.col("df2."+col), F.create_map("df1."+col,"df2."+col)).alias(col) for col in df1.schema.names if col!="key_col"])\
.withColumn("merged", F.map_concat(*[F.coalesce(F.col(col), F.create_map().cast("map<string,string>")) for col in df1.schema.names if col!="key_col"]))\
.withColumn("mismatched_column_names", F.array(*[F.when(F.col(col).isNotNull(), F.lit(col)) for col in df1.schema.names if col!="key_col"]))\
.withColumn("mismatched_column_names", F.expr('filter(mismatched_column_names, x -> x is not null)'))\
.withColumn("mismatched_values_in_first_df", F.map_keys("merged"))\
.withColumn("mismatched_values_in_second_df", F.map_values("merged"))\
.filter(F.size("mismatched_values_in_second_df") != 0)\
.select("df1.key_col", "mismatched_column_names", "mismatched_values_in_first_df", "mismatched_values_in_second_df")\
.show(truncate=False)
Input:
DF1:
DF2:
Output:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论