从两个完全相同的Spark Python数据框中选择不匹配的列和数值。

huangapple go评论65阅读模式
英文:

Select mismatched columns and values from two exactly same spark python dataframes

问题

我想选择两个来自不同数据源的完全相同的数据框中不匹配的列和它们的值。

我现在有的内容:

col_1 key_col col_2 col_3 col_4
a key1 b c d
w key2 x y z
col_1 key_col col_2 col_3 col_4
a key1 b p q
w key2 x y z

我有两个来自不同数据源的具有相同模式的数据框。

我想要的是:

使用"key_col"作为连接键,对这两个数据框进行内连接,并以以下格式输出:

对于连接后获得的表的每一行,返回以下行:

key_col 不匹配的列名称 第一个数据框中的不匹配的值 第二个数据框中的不匹配的值
key1 [col_3, col_4] [c,d] [p,q]

我正在寻找在pyspark中执行此操作的查询。

英文:

I want to select the mismatched columns and their values from two exactly same dataframes from different sources.

What I have now:

col_1 key_col col_2 col_3 col_4
a key1 b c d
w key2 x y z
col_1 key_col col_2 col_3 col_4
a key1 b p q
w key2 x y z

I have 2 dataframes with same schema from different data sources.

What I want:

Join (inner join) the 2 dataframes using the "key_col" as the join key and give the output in the following format:

For each row of the table obtained after the join, return the following row:

key_col mismatched_column_names mismatched_values_in_first_df mismatched_values_in_second_df
key 1 [col3, col4] [c,d] [p,q]

I am looking for the query to do so in pyspark.

答案1

得分: 1

这部分是代码,不需要翻译。

以下是输出的部分:

输出:

从两个完全相同的Spark Python数据框中选择不匹配的列和数值。

英文:

This would work:

df1.alias("df1").join(df2.alias("df2"), F.col("df1.key_col")==F.col("df2.key_col"))\
 .select("df1.key_col", *[F.when(F.col("df1."+col) != F.col("df2."+col), F.create_map("df1."+col,"df2."+col)).alias(col) for col in df1.schema.names if col!="key_col"])\
 .withColumn("merged", F.map_concat(*[F.coalesce(F.col(col), F.create_map().cast("map<string,string>")) for col in df1.schema.names if col!="key_col"]))\
 .withColumn("mismatched_column_names", F.array(*[F.when(F.col(col).isNotNull(), F.lit(col)) for col in df1.schema.names if col!="key_col"]))\
 .withColumn("mismatched_column_names", F.expr('filter(mismatched_column_names, x -> x is not null)'))\
 .withColumn("mismatched_values_in_first_df", F.map_keys("merged"))\
 .withColumn("mismatched_values_in_second_df", F.map_values("merged"))\
 .filter(F.size("mismatched_values_in_second_df") != 0)\
 .select("df1.key_col", "mismatched_column_names", "mismatched_values_in_first_df", "mismatched_values_in_second_df")\
 .show(truncate=False)

Input:

DF1:

从两个完全相同的Spark Python数据框中选择不匹配的列和数值。

DF2:

从两个完全相同的Spark Python数据框中选择不匹配的列和数值。

Output:

从两个完全相同的Spark Python数据框中选择不匹配的列和数值。

huangapple
  • 本文由 发表于 2023年2月24日 15:09:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/75553511.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定