英文:
get a string between each / within string
问题
我有需要在每次出现反斜杠(\)后拆分的列数值。我需要提取每次反斜杠(\)出现时的单词,并创建新的列。如何在pyspark(databricks)中完成这个操作?任何帮助将不胜感激。
英文:
I have column Values that needs to be splited after every \ occurrence.
I need to fetch each word within each occurrence of \ and create new columns.
How do I do this in pyspark (databricks)? Any help is appreciated.
答案1
得分: 1
以下是您要求的代码部分的翻译结果:
- 替换
values
列中的所有不需要的字符为|
:
df = df \
.withColumn("Values", regexp_replace("Values", "\\\\", "|")) \
.withColumn("Values", regexp_replace("Values", "\\a", "|a")) \
.withColumn("Values", regexp_replace("Values", "\\t", "|t")) \
.withColumn("Values", regexp_replace("Values", "\\s", "|s")) \
.withColumn("Values", regexp_replace("Values", "\\n", "|n"))
- 现在使用指定的分隔符
|
拆分列values
:
df = df.withColumn("Values", split("Values", "\|"))
- 提取记录为数组格式:
records = df.rdd.map(lambda row: row.asDict()).collect()
- 为了插入具有不同列的记录,数据必须以键值对的形式发送:
output_records = []
for record in records:
values = record["Values"]
words = [value for value in values if len(value) > 0]
for i, word in enumerate(words):
column_name, column_values = f"col_{i+1}", word
if record.get(column_name, None) is None:
record[column_name] = word
del record["Values"]
output_records.append(record)
这是output_records
的样子:
[
{'FieldA': 1, 'FieldB': 'a', 'FieldC': 'hello', 'col_1': 'abc', 'col_2': 'def', 'col_3': 'ghi', 'col_4': 'jk-l', 'col_5': 'mno'},
{'FieldA': 2, 'FieldB': 'b', 'FieldC': 'you', 'col_1': 'I', 'col_2': 'like', 'col_3': 'to', 'col_4': 'Code'},
{'FieldA': 3, 'FieldB': 'b', 'FieldC': 'there', 'col_1': 'Th-at', 'col_2': 'works'}
]
- 现在使用
output_records
创建一个Spark DataFrame:
spark.createDataFrame(output_records).show()
输出:
+------+------+------+-----+-----+-----+-----+-----+
|FieldA|FieldB|FieldC|col_1|col_2|col_3|col_4|col_5|
+------+------+------+-----+-----+-----+-----+-----+
| 1| a| hello| abc| def| ghi| jk-l| mno|
| 2| b| you| I| like| to| Code| null|
| 3| b| there|Th-at|works| null| null| null|
+------+------+------+-----+-----+-----+-----+-----+
英文:
From your input, I am considering this is your DataFrame:
+------+------+------+---------------------+
|FieldA|FieldB|FieldC|Values |
+------+------+------+---------------------+
|1 |a |hello |\abc\def\ghi\jk-l\mno|
|2 |b |you |\I\like\to\Code |
|3 |b |there |\Th-at\works |
+------+------+------+---------------------+
- Replace all the unwanted characters from the
values
column with|
df = df \
.withColumn("Values", regexp_replace("Values", "\\\\", "|")) \
.withColumn("Values", regexp_replace("Values", "\\a", "|a")) \
.withColumn("Values", regexp_replace("Values", "\\t", "|t")) \
.withColumn("Values", regexp_replace("Values", "\\s", "|s")) \
.withColumn("Values", regexp_replace("Values", "\\n", "|n"))
- Now split the column values using the specified delimiter
|
df = df.withColumn("Values", split("Values", "\|"))
- Extract the records into array format
records = df.rdd.map(lambda row: row.asDict()).collect()
- In-order to insert records of varying column, the data has to be sent as key value pair
output_records=[]
for record in records:
values = record["Values"]
words = [value for value in values if len(value)>0]
for i, word in enumerate(words):
column_name, column_values = f"col_{i+1}", word
if record.get(column_name, None) is None:
record[column_name] = word
del record["Values"]
output_records.append(record)
This is how output_records
looks like:
[
{'FieldA': 1, 'FieldB': 'a', 'FieldC': 'hello', 'col_1': 'abc', 'col_2': 'def', 'col_3': 'ghi', 'col_4': 'jk-l', 'col_5': 'mno'},
{'FieldA': 2, 'FieldB': 'b', 'FieldC': 'you', 'col_1': 'I', 'col_2': 'like', 'col_3': 'to', 'col_4': 'Code'},
{'FieldA': 3, 'FieldB': 'b', 'FieldC': 'there', 'col_1': 'Th-at', 'col_2': 'works'}
]
- Now create a Spark DataFrame using this
output_records
spark.createDataFrame(output_records).show()
Output:
+------+------+------+-----+-----+-----+-----+-----+
|FieldA|FieldB|FieldC|col_1|col_2|col_3|col_4|col_5|
+------+------+------+-----+-----+-----+-----+-----+
| 1| a| hello| abc| def| ghi| jk-l| mno|
| 2| b| you| I| like| to| Code| null|
| 3| b| there|Th-at|works| null| null| null|
+------+------+------+-----+-----+-----+-----+-----+
答案2
得分: 0
以下是我的两分建议:
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.functions import split
data = [(1, 'a', 'hello', r'\abc\def\ghi\jk-l\mno'),
(2, 'b', 'you', r'\I\like\to\Code'),
(3, 'b', 'there', r'\Th-at\works')]
df = spark.createDataFrame(data, ['FieldA', 'FieldB', 'FieldC', 'Values'])
# 分割列,去除空格,基于数组创建动态列
df = df.withColumn('split_col', F.split(col('Values'), r'\\'))
df = df.withColumn("split_array", array_remove(df["split_col"], ""))
df = df.withColumn('cnt', F.size('split_array'))
max = df.agg(F.max('cnt')).first()[0]
textcols = [F.col('split_array')[i].alias(f'col{i+1}') for i in range(0, max)]
df.select([F.col('FieldA'), F.col('FieldB'), F.col('FieldC')] + textcols).show()
英文:
Here are my 2 cents:
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.functions import split
data = [(1, 'a', 'hello', r'\abc\def\ghi\jk-l\mno'),
(2, 'b', 'you', r'\I\like\to\Code'),
(3, 'b', 'there', r'\Th-at\works')]
df = spark.createDataFrame(data, ['FieldA', 'FieldB', 'FieldC', 'Values'])
# Split the column, remove the blanks, create dynamic columns based on the array
df = df.withColumn('split_col',F.split(col('Values'),r'\\'))
df = df.withColumn("split_array", array_remove(df["split_col"], ""))
df = df.withColumn('cnt', F.size('split_array'))
max = df.agg(F.max('cnt')).first()[0]
textcols = [F.col('split_array')[i].alias(f'col{i+1}') for i in range(0, max)]
df.select([F.col('FieldA'),F.col('FieldB'),F.col('FieldC')] + textcols).show()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论