Pandas转换为Pyspark(重复/展开)

huangapple go评论68阅读模式
英文:

Pandas to Pyspark conversion (repeat/explode)

问题

我试图将我用Python/Pandas编写的笔记本修改/转换为使用Pyspark。我正在处理的数据集(通常情况下)非常混乱,因此在内置的Pyspark函数方面,我需要执行一些可能与标准不太一样的操作。

因此,我在转换中遇到困难的部分是这个(这是我在Pandas中的代码):

# 逐个展开所有列

exploded = [
    df[['User_Name', 'cert_len']].loc[df.index.repeat(df['cert_len'])].reset_index(drop=True)['User_Name'],
    df['Certification'].str.split(',').explode().reset_index(drop=True),
    df['Provider'].str.split(',').explode().reset_index(drop=True),
    df['Credential_ID'].str.split(',').explode().reset_index(drop=True),
    
]

# 将展开的列连接在一起

df_final = pd.concat(exploded, axis=1)

'User_Name'的值实际上是数字,类似于例如105432。 'cert_len'的值是'Certification'列中项目的计数。其余列中的值是由逗号分隔的连接字符串。例如,如果'cert_len'为5,则'Certification'列的值可能是'Certified Scrum Master, AWS Cloud Practitioner, Tensorflow Developer, CompTS Security+, AWS Developer'等等。也就是说,每行都有一个'User_Name'值,然后每个后续列值包含由逗号分隔的关于其证书的所有信息。期望的最终格式应该是每个认证一行。

我的具体问题是,您会注意到在'exploded'列表中的第一行代码中,我对'User_Name'列的展开方式与其余列略有不同。我正在做的是将'User_Name'列中的值重复为新行,以便行数与'cert_len'列中的数字相等。然后,当展开所有其他列时,一切都能匹配。希望这样讲清楚了。

我能够想出的唯一有效解决方案涉及UDF,我假设这是您在Spark中要避免的事情,因为整个重点是大数据,而UDF将逐行运行?这种策略实际上涉及修改'User_Name'列中的值,使其与其他列的格式相匹配(例如,如果'User_Name'为105432,而'cert_len'为5,则'User_Name'变成105432,105432,105432,105432,105432),然后像其他列一样展开。展开所有其他列不会给我带来麻烦,只有'User_Name'列。

基本上,我想知道是否有一种不涉及UDF的方法,或者是否有其他值得追求的策略,可以实现与上述所有操作相同的效果。如果我表达模糊或遗漏了任何内容,请随时要求澄清。非常感谢!

英文:

I’m trying to take a notebook that I’ve written in Python/Pandas and modify/convert it to use Pyspark. The dataset I’m working with is (as real world datasets often are) complete and utter garbage, and so some of the things I have to do to it are potentially a little non-standard as far as built-in Pyspark functions are concerned.

So, the part of the conversion I’m getting hung up on is this (this is what I’ve got in Pandas):

# Unstack all the columns individually

exploded = [
    df[['User_Name', 'cert_len']].loc[df.index.repeat(df['cert_len'])].reset_index(drop=True)['User_Name'],
    df['Certification'].str.split(',').explode().reset_index(drop=True),
    df['Provider'].str.split(',').explode().reset_index(drop=True),
    df['Credential_ID'].str.split(',').explode().reset_index(drop=True),
    …
]

# Concat unstacked columns back together

df_final = pd.concat(exploded, axis=1)

'User_Name' values are actually numbers, something like, e.g., 105432. The 'cert_len' values are a count of items in the 'Certification' column. Values in the remaining columns are concatenated strings, joined by commas. For instance if 'cert_len' was 5, the value in 'Certification' would be something like 'Certified Scrum Master,AWS Cloud Practitioner,Tensorflow Developer,CompTS Security+,AWS Developer'. Etc. That is to say that each row has a single 'User_Name' value and then each subsequent column value contains all the info about their certs separated by commas. The desired end format should be one row per certification.

So my specific issue, you'll notice in the code in the first line in the 'exploded' list, notice that I’m exploding the 'User_Name' column slightly differently than the rest of the columns; . What I’m doing there is taking the value in the User_Name column and repeating it as new rows so that the number of rows is equal to the number in the 'cert_len' column. Then when all the other columns are exploded everything matches up. Hope that make sense.

The only working solution I’ve been able to come up with involves a UDF, which I presume is the sort of thing you’d want to avoid in Spark, since the whole point is big data and a UDF would run row by row? That strategy actually involved modifying the value in the User_Name column so that it matched the format of the other columns (e.g. if User_Name was 105432 and cert_len was 5, 'User_Name' becomes 105432,105432,105432,105432,105432) and then exploding it like the rest. Exploding all the other columns isn't giving me trouble, just the 'User_Name' column.

Basically, what I’m wondering is if there’s a way to do that without a UDF or if there’s some other strategy worth pursuing that anyone can think of that would accomplish the same as all the above. Please of course ask for clarification if I’ve been vague or left anything out. Thanks a bunch!

答案1

得分: 1

以下是翻译好的部分:

设置

df.show()

+---------+--------+-------------+--------+
|User_Name|cert_len|Certification|Provider|
+---------+--------+-------------+--------+
|   105432|       2|          A,B|     P,Q|
|   105433|       3|        C,D,E|   R,S,T|
|   105434|       1|            F|       U|
+---------+--------+-------------+--------+

Pyspark 解决方案

# 定义 ID 列
ids = ['User_name', 'cert_len']

# 定义要拆分和展开的列
cols = ['Certification', 'Provider']

# 像在 pandas 中一样,我们拆分字符串
arr = [F.split(c, ',').alias(c) for c in cols]

# 将拆分的字符串在每行中压缩并展开
df1 = df.select(*ids, F.explode(F.arrays_zip(*arr)).alias('temp'))

# 对于每个列在 cols 中,从 temp 列中提取相关列,并将新列分配回原始数据框
df1 = df1.select(*ids, *[F.col('temp')[c].alias(c) for c in cols])

结果

df1.show()

+---------+--------+-------------+--------+
|User_name|cert_len|Certification|Provider|
+---------+--------+-------------+--------+
|   105432|       2|            A|       P|
|   105432|       2|            B|       Q|
|   105433|       3|            C|       R|
|   105433|       3|            D|       S|
|   105433|       3|            E|       T|
|   105434|       1|            F|       U|
+---------+--------+-------------+--------+
英文:

Setup

df.show()

+---------+--------+-------------+--------+
|User_Name|cert_len|Certification|Provider|
+---------+--------+-------------+--------+
|   105432|       2|          A,B|     P,Q|
|   105433|       3|        C,D,E|   R,S,T|
|   105434|       1|            F|       U|
+---------+--------+-------------+--------+

Pyspark Solution

# Define the id columns
ids = ['User_name', 'cert_len']

# Define the columns which you want to split and explode
cols = ['Certification', 'Provider']

# Like in pandas we split the strings
arr = [F.split(c, ',').alias(c) for c in cols]

# Zip the splited strings in each row and explode
df1 = df.select(*ids, F.explode(F.arrays_zip(*arr)).alias('temp'))

# for each column in cols extract the relevant column from temp
# column and assign the new column back to the original dataframe
df1 = df1.select(*ids, *[F.col('temp')[c].alias(c) for c in cols])

Result

df1.show()

+---------+--------+-------------+--------+
|User_name|cert_len|Certification|Provider|
+---------+--------+-------------+--------+
|   105432|       2|            A|       P|
|   105432|       2|            B|       Q|
|   105433|       3|            C|       R|
|   105433|       3|            D|       S|
|   105433|       3|            E|       T|
|   105434|       1|            F|       U|
+---------+--------+-------------+--------+

huangapple
  • 本文由 发表于 2023年3月31日 22:49:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/75899892.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定