英文:
How to write PySpark script to convert an Emil content into long string for CSV file?
问题
我有一个包含名为'Notes'的列的数据集,其中包含电子邮件内容。我有一个Python脚本,在PySpark中尝试使用regexp_replace()函数将电子邮件字符串中的特殊字符替换为一行,以避免CSV分隔符、换行符等。
# t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','/[^0-9]+,/"|\n|\t|\r',''))
t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','[,|"\\n\\r\\t^>_^>]+','-'))
# t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','[^A-Z0-9_]+',''))
在regexp_replace()之后,数据集中的电子邮件内容将在数据框中由"-"分隔成一个长字符串。(我选择了"-"字符)
Notes='-PER CUSTOMER -NEEDS DETAILED CLEANING-From: XXXXXX
<XXXXXXXX@commonspirit.org- -Sent: Thursday- May 18- 2023 4:29 PM-To:
XXXXXXSupport <XXXXXXXXX.com-Subject: -Here are the answers to your
questions:-1. kalkalkakalkaalkla. Additional notes from the operator
state -Not processed correctly. akakakakakak.-2. Procedure:-i.
Unknown-ii. Unknown-3. Patient/User Involvement-i. No-4. Caller
Contact Information: 1. xxxxxx- 2. Endoscopy Coordinator- 3.
11111111111 4. XXXXXXXX.org-')
我尝试了上面的代码,但它没有起作用。在打开CSV文件时,保存的CSV文件将电子邮件内容拆分为多行:
使用自定义函数保存为CSV格式
def write_csv_with_specific_file_name(sc, df, path, filename):
file_format = 'csv'
df.coalesce(1).write.option("header", "true").mode('overwrite').format(file_format).save(path)
try:
sc_uri = sc._gateway.jvm.java.net.URI
sc_path = sc._gateway.jvm.org.apache.hadoop.fs.Path
file_system = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
# fs = file_system.get(sc_uri("abfss://<container-name>@<account-name>.dfs.core.windows.net/<file-path>/"), configuration())
fs = sc_path(path).getFileSystem(sc._jsc.hadoopConfiguration())
src_path = None
status = fs.listStatus(sc_path(path))
for fileStatus in status:
temp = fileStatus.getPath().toString()
if "part" in temp:
src_path = sc_path(temp)
dest_path = sc_path(path + filename)
if fs.exists(src_path) and fs.isFile(src_path):
fs.rename(src_path, dest_path)
fs.delete(src_path, True)
except Exception as e:
raise Exception("Error renaming the part file to {}:{}".format(filename, e))
我不确定我做错了什么。有人可以帮助我吗?
谢谢!
英文:
Description:
I have a dataset that contains a column 'Notes' with email content. I have a Python script I tried in PySpark to replace special characters in the email string into one row each using the regexp_replace() function to avoid CSV delimiter, new line, etc.
# t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','/[^0-9]+,/"|\n|\t|\r',''))
t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','[,|"\n\r\t^>_]+','-'))
# t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','[^A-Z0-9_]+',''))
After the regexp_replace(), the email content in the dataset is in one long string, separated by "-" in the data frame. ( I choose the "-,""" character)
> Notes='-PER CUSTOMER -NEEDS DETAILED CLEANING-From: XXXXXX
> <XXXXXXXX@commonspirit.org- -Sent: Thursday- May 18- 2023 4:29 PM-To:
> XXXXXXSupport <XXXXXXXXX.com-Subject: -Here are the answers to your
> questions:-1. kalkalkakalkaalkla. Additional notes from the operator
> state -Not processed correctly. akakakakakak.-2. Procedure:-i.
> Unknown-ii. Unknown-3. Patient/User Involvement-i. No-4. Caller
> Contact Information: 1. xxxxxx- 2. Endoscopy Coordinator- 3.
> 11111111111 4. XXXXXXXX.org-')
I have tried the above code, but it is not working. The output in the saved CSV file splits email content into multiple rows when the CSV file opens:
Using a custom function to save into CSV format
def write_csv_with_specific_file_name(sc, df, path, filename):
file_format = 'csv'
df.coalesce(1).write.option("header", "true").mode('overwrite').format(file_format).save(path)
try:
sc_uri = sc._gateway.jvm.java.net.URI
sc_path = sc._gateway.jvm.org.apache.hadoop.fs.Path
file_system = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
# fs = file_system.get(sc_uri("abfss://<container-name>@<account-name>.dfs.core.windows.net/<file-path>/"), configuration())
fs = sc_path(path).getFileSystem(sc._jsc.hadoopConfiguration())
src_path = None
status = fs.listStatus(sc_path(path))
for fileStatus in status:
temp = fileStatus.getPath().toString()
if "part" in temp:
src_path = sc_path(temp)
dest_path = sc_path(path + filename)
if fs.exists(src_path) and fs.isFile(src_path):
fs.rename(src_path, dest_path)
fs.delete(src_path, True)
except Exception as e:
raise Exception("Error renaming the part file to {}:".format(filename, e))
I am not sure what I am doing wrong. Can anyone help me?
Thanks!
答案1
得分: 0
我已经在 regex_replace
函数中使用了下面的正则表达式来获得所需的输出:
t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes', r'[,\|&"\n\r\t^>_-]+', '-'))
正如您所看到的,下面的输出位于单行中:
您可以使用以下代码将数据框写入所需路径的CSV文件中:
email_df.coalesce(1).write.csv("<path>/<filename>.csv", header=True, mode="overwrite")
英文:
I have used the below regex in regex_replace
function to get the desired output:
t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes', r'[,\|"\n\r\t^>_-]+', '-'))
As you can see the below output is in a single row:
You can write the data frame into a csv file in the desired path using below code:
email_df.coalesce(1).write.csv("<path>/<filename>.csv", header=True, mode="overwrite")
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论