pyspark 引用不同的数据框架

huangapple go评论61阅读模式
英文:

pyspark refer a different dataframe

问题

Sure, here is the translated content:

我有两个数据框 - df1 和 df2。它们都有一个共同的列(id)。我想在 df1 中添加一个新列(称为 A):使用这个共同的列 id,引用 df2 中的一列(称为 'B'),并将来自 df2 的这个 B 列的值存储在 df1 的列 A 中。df1 有超过 5 列,而 df2 只有 2 列。

我尝试了左连接...但是,结果出现了一些重复的行(来自 df1,即“左”数据框)。是否有一种方法(编写一个用户定义的函数),只需从 df1 获取 id,并使用该 id 引用 df2 并在 df1 中填充一个新列?

我不想使用 group-by 来消除任何重复项。我希望 df1 中的所有行都保留,并只是使用 ID 列从 df2 获取其相应的值,并填充到 df1 中。我是 PySpark 的新手...不知道如何做。

更新:以下是 df1 和 df2 的完整模式。

df1: <日期,品牌,十六进制数,文本,主题索引>
df2: <主题索引,城市>

根据主题索引,我想在 df1 中填充相应的城市到一个单独的列中。在最终的结果数据框中,我希望与 df1 具有相同的模式 + 一个新列 "cities"。最终数据框的行数必须与 df1 的行数相同。最终数据框中的每一行(不包括 "cities" 列)必须与 df1 的行相同。

在纯 Python 中,也许我可以使用一个带有字典的应用函数来完成。我在寻找类似的东西....

更新 2: 城市列是字符串列表。例如:['奥斯汀','芝加哥','波士顿']。而且,这个列表对于每个主题索引都是唯一的。实质上,在 df2 中,主题索引 -> 城市列表是唯一的,df2 的两行不相同。

英文:

I have two dataframes - df1 and df2. I have one common column in both (id). I want to add a new column in df1 (say A) by: using this common column id, refer a col (say 'B') in df2 and store this B-col value from df2 into df1 in col-A. df1 has more than 5 cols and and df2 has only 2 columns.

I tried left-join...but, it is resulting in a few duplicate rows (from df1, the "left" dataframe). Is there a way (write a UDF) to just get the id from df1 and using that id, refer df2 and populate a new col in df1?

I do not want to do group-by to eliminate any duplicates. I want all the rows in df1 and just use the ID col to get its respective value from df2 and populate in df1. I am new to PySpark...and do not know how to do it.

UPDATE: Here is full schema of the df1 and df2.

df1: &lt;date, brand, hex_num, text, topic_idx&gt;
df2: &lt;topic_idx, cities&gt;

Based on topic_idx, I want the respective cities to be populated in df1 in a separate col. In the final resulting df, I want the same schema as df1 + a new col "cities". The number of rows of final df must be the same as that of df1. Each row in final df (excluding "cities" col) must be identical to that of df1.

In pure python, perhaps I could have done with a apply func with a dict. I was looking for something similar....

UPDATE 2: The cities column is list of strings. For eg: ['austin', 'chicago', 'boston']. And, this list is unique for each topic_idx. In essence, in df2, the topic_idx -> city_list is unique and no two rows of df2 are identical.

答案1

得分: 1

根据你的问题,看起来 df2 在 id 列中有重复项,这就是你在连接后得到额外行的原因。

我的建议如下:

  1. 如果情况允许,可以在连接之前从 df2 中删除重复的行。或者
  2. 在 df2 上编写一个窗口函数,生成一个基于 id 列分区和按其他列排序的行号或排名。然后根据每个分区中行的行号/排名来筛选记录,称之为 df3。我更喜欢这种方法,因为它让你在选择要保留的行时具有更多控制权。然后连接 df1 和 df3。
  3. 在连接之后,你可以创建新的列。

如果你可以提供数据框的架构,我可以为你提供代码。请告诉我这是否解决了问题。

更新
根据分享的细节,可以按 topic_idx 在 df2 上运行一个 group by 操作,并将城市收集为列表。

df2.show(truncate=False)
+---------+-------+
|topic_idx|cities |
+---------+-------+
|1        |New York|
|1        |London |
|2        |Paris  |
|2        |Berlin |
|2        |Madrid |
+---------+-------+

from pyspark.sql.functions import collect_list

df3 = df2.groupBy('topic_idx').agg(collect_list('cities').alias('city_list'))
df3.show(truncate=False)

+---------+------------------+
|topic_idx|city_list         |
+---------+------------------+
|1        |[New York, London]|
|2        |[Paris, Berlin, Madrid]|
+---------+------------------+

然后,在 topic_idx 上将 df3 与 df1 进行连接。

df1.show(truncate=False)
+----------+-----+-------+---------------------+---------+
|date      |brand|hex_num|text                 |topic_idx|
+----------+-----+-------+---------------------+---------+
|2023-06-01|A    |#FF0000|Sample text 1        |1        |
|2023-06-02|B    |#00FF00|Sample text 2        |2        |
|2023-06-03|C    |#0000FF|Sample text 3        |3        |
+----------+-----+-------+---------------------+---------+

final_df = df1.join(df3, ['topic_idx'], how='left')
final_df.show(truncate=False)

+---------+----------+-----+-------+---------------------+------------------+
|topic_idx|date      |brand|hex_num|text                 |city_list         |
+---------+----------+-----+-------+---------------------+------------------+
|1        |2023-06-01|A    |#FF0000|Sample text 1        |[New York, London]|
|2        |2023-06-02|B    |#00FF00|Sample text 2        |[Paris, Berlin, Madrid]|
|3        |2023-06-03|C    |#0000FF|Sample text 3        |null              |
+---------+----------+-----+-------+---------------------+------------------+

以上是代码部分的翻译。

英文:

From your question, it seems like df2 has duplicates in id column which is why you are getting additional rows post join.

My suggestion would be as follows

  1. Remove the duplicate rows in df2 before the join if the situation permits. Or
  2. Write a window function on df2, generate a row_number or rank by partition over id column and order by some other column. Then filter out the records based on the row_number/ rank of the rows within each partition, call it df3. I would prefer this as it gives you a lot more control in choosing the rows you want to keep. Then join df1 and df3.
  3. Post join, you can create the new column.

If you can edit the question by providing the schema of the dataframes, I can give you the code as well. LMK if this solves the issue.

UPDATE
As per the details shared,

  1. Run a group by on df2 on topic_idx and collect the cities as a list.
df2.show(truncate=False)
+---------+-------+
|topic_idx|cities |
+---------+-------+
|1        |New York|
|1        |London |
|2        |Paris  |
|2        |Berlin |
|2        |Madrid |
+---------+-------+

from pyspark.sql.functions import collect_list

df3 = df2.groupBy(&#39;topic_idx&#39;).agg(collect_list(&#39;cities&#39;).alias(&#39;city_list&#39;))
df3.show(truncate=False)

+---------+------------------+
|topic_idx|city_list         |
+---------+------------------+
|1        |[New York, London]|
|2        |[Paris, Berlin, Madrid]|
+---------+------------------+
  1. Join df3 against df1 on topic_idx.
df1.show(truncate=False)
+----------+-----+-------+---------------------+---------+
|date      |brand|hex_num|text                 |topic_idx|
+----------+-----+-------+---------------------+---------+
|2023-06-01|A    |#FF0000|Sample text 1        |1        |
|2023-06-02|B    |#00FF00|Sample text 2        |2        |
|2023-06-03|C    |#0000FF|Sample text 3        |3        |
+----------+-----+-------+---------------------+---------+

final_df = df1.join(df3, [&#39;topic_idx&#39;], how=&#39;left&#39;)
final_df.show(truncate=False)

+---------+----------+-----+-------+---------------------+------------------+
|topic_idx|date      |brand|hex_num|text                 |city_list         |
+---------+----------+-----+-------+---------------------+------------------+
|1        |2023-06-01|A    |#FF0000|Sample text 1        |[New York, London]|
|2        |2023-06-02|B    |#00FF00|Sample text 2        |[Paris, Berlin, Madrid]|
|3        |2023-06-03|C    |#0000FF|Sample text 3        |null              |
+---------+----------+-----+-------+---------------------+------------------+

huangapple
  • 本文由 发表于 2023年6月13日 02:48:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/76459488.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定