Pyspark将具有多个值的行展开为多个列

huangapple go评论100阅读模式
英文:

Pyspark flatten rows with multiple values per ID into multiple columns

问题

我正在使用PySpark来转换和连接多个数据框,但其中一个数据框的列包含每个ID的多个值,导致这些ID有多行。我需要将这些行展平为一行,每个值都有一个唯一的列。我的数据如下:

ID Text
1 一些文本
1 更多文本
1 还有更多文本
2 一些文本
2 还有更多文本
3 null

...最终的转换应该如下所示:

| ID | Text_1 | Text_2 | Text_3 |
| -------- | -------- |
| 1 | 一些文本 | 更多文本 | 还有更多文本 |
| 2 | 一些文本 | 还有更多文本 | null |
| 3 | null | null | null |

我该如何在不占用太多内存的情况下执行此操作?

我已经成功按ID对数据进行分组,以确定需要扩展到多少列,但不确定如何继续。

英文:

I am using PySpark to transform join several dataframes, but one of the dataframe columns contains multiple values per ID, resulting in multiple rows for those IDs. I need to flatten those rows into one one, with a unique column for each value. My data looks like this:

ID Text
1 some text
1 more text
1 still more text
2 some text
2 still more text
3 null

...and the final transformation should look like this:

| ID | Text_1 | Text_2 | Text_3 |
| -------- | -------- |
| 1 | some text | more text | still more text |
| 2 | some text | still more text | null |
| 3 | null | null | null |

How can I do this without using up too much memory?

I was able to group the data by ID to count how many columns it needs to be expanded to, but I'm unsure how to move forward.

答案1

得分: 1

以下是翻译好的部分:

1. 创建输入数据框:

spark = ...
df = spark.createDataFrame([
    [1, 'some text'],
    [1, 'more text'],
    [1, 'still more text'],
    [2, 'some text'],
    [2, 'still more text'],
    [3, None]
], ["ID", "Text"])

2. 根据 ID 分组数据框并计算每个 ID 的字符串数量:

from pyspark.sql import functions as F

df2 = df.groupBy('ID').agg(F.collect_list('Text').alias('Text')) \
    .withColumn('cnt', F.size('Text')).cache()

这里的 cache 很重要,因为 df2 数据框将被使用两次。

3. 获取单个 ID 的最大字符串数量:

max = df2.agg(F.max('cnt')).first()[0]

4. 为每列选择已收集数组的正确元素:

textcols = [F.col('Text')[i].alias(f'Text_{i+1}') for i in range(0, max)]
df2.select([F.col('ID')] + textcols).orderBy('ID').show()

结果:

+---+---------+---------------+---------------+
| ID|   Text_1|         Text_2|         Text_3|
+---+---------+---------------+---------------+
|  1|some text|      more text|still more text|
|  2|some text|still more text|           null|
|  3|     null|           null|           null|
+---+---------+---------------+---------------+
英文:

The way you started works:

1. Create the input dataframe:

spark = ...
df = spark.createDataFrame([
    [1, 	'some text'],
    [1, 	'more text'],
    [1, 	'still more text'],
    [2, 	'some text'],
    [2, 	'still more text'],
    [3, 	None]
], ["ID", "Text"])

2. Group the dataframe by ID and count the number of strings for each ID:

from pyspark.sql import functions as F

df2 = df.groupBy('ID').agg(F.collect_list('Text').alias('Text')) \
    .withColumn('cnt', F.size('Text')).cache()

The cache is important here as the df2 dataframe will be used twice.

3. Get the maximum number of strings for a single ID

max = df2.agg(F.max('cnt')).first()[0]

4. Select the correct element of the collected array for each column

textcols = [F.col('Text')[i].alias(f'Text_{i+1}') for i in range(0, max)]
df2.select([F.col('ID')] + textcols).orderBy('ID').show()

Result:

+---+---------+---------------+---------------+
| ID|   Text_1|         Text_2|         Text_3|
+---+---------+---------------+---------------+
|  1|some text|      more text|still more text|
|  2|some text|still more text|           null|
|  3|     null|           null|           null|
+---+---------+---------------+---------------+

huangapple
  • 本文由 发表于 2023年3月7日 04:03:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/75655345.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定