英文:
Pyspark flatten rows with multiple values per ID into multiple columns
问题
我正在使用PySpark来转换和连接多个数据框,但其中一个数据框的列包含每个ID的多个值,导致这些ID有多行。我需要将这些行展平为一行,每个值都有一个唯一的列。我的数据如下:
ID | Text |
---|---|
1 | 一些文本 |
1 | 更多文本 |
1 | 还有更多文本 |
2 | 一些文本 |
2 | 还有更多文本 |
3 | null |
...最终的转换应该如下所示:
| ID | Text_1 | Text_2 | Text_3 |
| -------- | -------- |
| 1 | 一些文本 | 更多文本 | 还有更多文本 |
| 2 | 一些文本 | 还有更多文本 | null |
| 3 | null | null | null |
我该如何在不占用太多内存的情况下执行此操作?
我已经成功按ID对数据进行分组,以确定需要扩展到多少列,但不确定如何继续。
英文:
I am using PySpark to transform join several dataframes, but one of the dataframe columns contains multiple values per ID, resulting in multiple rows for those IDs. I need to flatten those rows into one one, with a unique column for each value. My data looks like this:
ID | Text |
---|---|
1 | some text |
1 | more text |
1 | still more text |
2 | some text |
2 | still more text |
3 | null |
...and the final transformation should look like this:
| ID | Text_1 | Text_2 | Text_3 |
| -------- | -------- |
| 1 | some text | more text | still more text |
| 2 | some text | still more text | null |
| 3 | null | null | null |
How can I do this without using up too much memory?
I was able to group the data by ID to count how many columns it needs to be expanded to, but I'm unsure how to move forward.
答案1
得分: 1
以下是翻译好的部分:
1. 创建输入数据框:
spark = ...
df = spark.createDataFrame([
[1, 'some text'],
[1, 'more text'],
[1, 'still more text'],
[2, 'some text'],
[2, 'still more text'],
[3, None]
], ["ID", "Text"])
2. 根据 ID
分组数据框并计算每个 ID
的字符串数量:
from pyspark.sql import functions as F
df2 = df.groupBy('ID').agg(F.collect_list('Text').alias('Text')) \
.withColumn('cnt', F.size('Text')).cache()
这里的 cache
很重要,因为 df2
数据框将被使用两次。
3. 获取单个 ID
的最大字符串数量:
max = df2.agg(F.max('cnt')).first()[0]
4. 为每列选择已收集数组的正确元素:
textcols = [F.col('Text')[i].alias(f'Text_{i+1}') for i in range(0, max)]
df2.select([F.col('ID')] + textcols).orderBy('ID').show()
结果:
+---+---------+---------------+---------------+
| ID| Text_1| Text_2| Text_3|
+---+---------+---------------+---------------+
| 1|some text| more text|still more text|
| 2|some text|still more text| null|
| 3| null| null| null|
+---+---------+---------------+---------------+
英文:
The way you started works:
1. Create the input dataframe:
spark = ...
df = spark.createDataFrame([
[1, 'some text'],
[1, 'more text'],
[1, 'still more text'],
[2, 'some text'],
[2, 'still more text'],
[3, None]
], ["ID", "Text"])
2. Group the dataframe by ID
and count the number of strings for each ID
:
from pyspark.sql import functions as F
df2 = df.groupBy('ID').agg(F.collect_list('Text').alias('Text')) \
.withColumn('cnt', F.size('Text')).cache()
The cache
is important here as the df2
dataframe will be used twice.
3. Get the maximum number of strings for a single ID
max = df2.agg(F.max('cnt')).first()[0]
4. Select the correct element of the collected array for each column
textcols = [F.col('Text')[i].alias(f'Text_{i+1}') for i in range(0, max)]
df2.select([F.col('ID')] + textcols).orderBy('ID').show()
Result:
+---+---------+---------------+---------------+
| ID| Text_1| Text_2| Text_3|
+---+---------+---------------+---------------+
| 1|some text| more text|still more text|
| 2|some text|still more text| null|
| 3| null| null| null|
+---+---------+---------------+---------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论