英文:
Split .csv file column in 2 in Azure Synapse Analytics using PySpark
问题
I can help you with the translation. Here's the requested part translated into Chinese:
我有一个 .csv
文件(位于 Azure 数据湖存储
中),大致如下 ->
我想要创建一个笔记本(PySpark (Python)
),可以在 Synapse Analytics 中执行(集成 -> 流水线)的其中一个流水线中。
笔记本中的代码应该能够将第二列拆分为两部分,并将所有行转换为 GB 单位,以便看起来像这样:
你能否帮忙编写 PySpark
代码?因为我是 Azure Synapse Analytics 的初学者,不太清楚如何操作。
!!重要提示: 我面临的问题是,所有操作都必须在同一文件中完成(不能创建新文件)。
提前感谢您的帮助。
英文:
I have a .csv
file (in Azure Data Lake Storage
), which looks approximately like this ->
I want to create a notebook (PySpark (Python)
), which could be implemented in the synapse analytics (integrate -> pipeline) in one of the pipelines.
The code in notebook should be able to separate 2nd column in 2 and transform all the rows to GB unit, so that it looks like this:
Could you please help with the PySpark
code? As I am beginner in Azure synapse analytics
and not sure how to do it
!! IMPORTANT: The problem I have is that it all should be done in the same file (no new files have to be created)
Thanks in advance
答案1
得分: 1
One way to do is to use the split function:
Input:
+----+--------+-----+
| ID|Consumed|Total|
+----+--------+-----+
|1234|80.14 GB| x|
|2345| 670 KB| y|
|3456| 9.38 GB| Z|
+----+--------+-----+
Code:
from pyspark.sql.functions import split, col, lit
from pyspark.sql.types import DoubleType
df = spark.createDataFrame([(1234, '80.14 GB', 'x',),
(2345, '670 KB', 'y',),
(3456, '9.38 GB', 'Z',)]
, ['ID', 'Consumed', 'Total']
)
dfUnit = spark.createDataFrame([ (1.0, 'GB',),
(0.001, 'MB',),
(0.000001, 'KB',),]
, ['Scale', 'Unit']
)
df = (df
.withColumn("ConsumedSplit",split(col("Consumed"), " "))
.withColumn("Unit",col("ConsumedSplit")[1])
.withColumn("Consumed",(col("ConsumedSplit")[0]).cast(DoubleType()))
.join(dfUnit, on="Unit")
.withColumn("Consumed",col("Consumed")*col("Scale"))
.withColumn("Unit",lit("GB"))
.select("ID", "Consumed", "Unit", "Total")
)
df.show()
result:
+----+--------+----+-----+
| ID|Consumed|Unit|Total|
+----+--------+----+-----+
|1234| 80.14| GB| x|
|3456| 9.38| GB| Z|
|2345| 6.7E-4| GB| y|
+----+--------+----+-----+
I would not recommend overwriting on the same file. It's always good practice to separate your stages. You could call the files that you are reading as raw files, e.g. saved in .../Raw/
folder and then write the newly generated files in somewhere like .../Preprocessed/
folder.
It might be a good idea to save the new file in a binary format, like Parquet, both for compression/fileSize plus you save the datatype of each column in the file itself.
英文:
One way to do is to use the split function:
Input:
+----+--------+-----+
| ID|Consumed|Total|
+----+--------+-----+
|1234|80.14 GB| x|
|2345| 670 KB| y|
|3456| 9.38 GB| Z|
+----+--------+-----+
Code:
from pyspark.sql.functions import split, col, lit
from pyspark.sql.types import DoubleType
df = spark.createDataFrame([(1234,'80.14 GB','x',),
(2345,'670 KB','y',),
(3456,'9.38 GB','Z',)]
, ['ID','Consumed','Total']
)
dfUnit = spark.createDataFrame([ (1.0,'GB',),
(0.001,'MB',),
(0.000001,'KB',),]
,['Scale','Unit']
)
df = (df
.withColumn("ConsumedSplit",split(col("Consumed"), " "))
.withColumn("Unit",col("ConsumedSplit")[1])
.withColumn("Consumed",(col("ConsumedSplit")[0]).cast(DoubleType()))
.join(dfUnit, on="Unit")
.withColumn("Consumed",col("Consumed")*col("Scale"))
.withColumn("Unit",lit("GB"))
.select("ID", "Consumed", "Unit", "Total")
)
df.show()
result:
+----+--------+----+-----+
| ID|Consumed|Unit|Total|
+----+--------+----+-----+
|1234| 80.14| GB| x|
|3456| 9.38| GB| Z|
|2345| 6.7E-4| GB| y|
+----+--------+----+-----+
I would not recommend overwriting on the same file. It's always good practice to separate your stages. You could call the files that you are reading as raw files, e.g. saved in .../Raw/
folder and then write the newly generated files in somewhere like .../Preprocessed/
folder.
It might be a good idea to save the new file in a binary format, like Parquet, both for compression/fileSize plus you save the datatype of each column in the file itself.
答案2
得分: 1
我从存储中读取CSV文件到一个数据框中。以下是我的文件:
我通过拆分"Consumed"列创建了一个名为'Unit'的新列:
split_cols = pyspark.sql.functions.split(df['Consumed'], ' ')
df = df.withColumn('Unit', split_cols.getItem(1))
df = df.withColumn('Unit', when(df['Unit'] == 'KB', 'GB').otherwise(df['Unit']))
我使用以下代码将KB值转换为GB的"Consumed"值:
df = df.withColumn("Consumed",
when(df["Consumed"].contains("GB"),
round(regexp_extract(df["Consumed"], r"(\d+\.?\d*)", 1).cast("double"), 2)
)
.when(df["Consumed"].contains("KB"),
round(regexp_extract(df["Consumed"], r"(\d+\.?\d*)", 1).cast("double")/1000000, 5)
)
.otherwise(0)
)
当我使用上述方法时,将670KB的值转换为科学计数法,即6.7E-4。
因此,我使用以下命令对其进行格式化:
df = df.withColumn("Consumed", when(col("Consumed") == 6.7E-4, format_number(col("Consumed"),5)).otherwise(col("Consumed")))
并以以下格式选择列:
df = df.select('ID', 'Consumed', 'Unit', 'Total')
输出:
!! 重要提示: 我的问题是所有这些操作都应在同一文件中完成(不得创建新文件)。
使用以下过程挂载路径:
创建路径的链接服务并使用以下代码创建挂载:
mssparkutils.fs.mount(
"abfss://<container>@<storageacc>.dfs.core.windows.net",
"<mount container>",
{"linkedService":"AzureDataLakeStorage1"}
)
jobid = mssparkutils.env.getJobId()
path = '/synfs/' + jobid + '/<mount container>/<filename>.csv'
我使用以下代码将更新后的数据框覆盖到文件中:
df.toPandas().to_csv(path, index=False)
更新后的文件:
英文:
I read csv file form my storage into a dataframe.
Here is my file:
I created new column called 'Unit' by splitting consumed column;
split_cols = pyspark.sql.functions.split(df['Consumed'], ' ')
df = df.withColumn('Unit', split_cols.getItem(1))
df = df.withColumn('Unit', when(df['Unit'] == 'KB', 'GB').otherwise(df['Unit']))
I converted kb value into GB of consumed value by using below code:
df = df.withColumn("Consumed",
when(df["Consumed"].contains("GB"),
round(regexp_extract(df["Consumed"], r"(\d+\.?\d*)", 1).cast("double"), 2)
)
.when(df["Consumed"].contains("KB"),
round(regexp_extract(df["Consumed"], r"(\d+\.?\d*)", 1).cast("double")/1000000, 5)
)
.otherwise(0)
)
When I try with above one, I am getting 670 kb value as 6.7E-4 i.e., it is converting as scientific notification.
So, I formatted it by using below command
df = df.withColumn("Consumed", when(col("Consumed") == 6.7E-4, format_number(col("Consumed"),5)).otherwise(col("Consumed")))
And selected columns in below format
df = df.select('ID', 'Consumed', 'Unit', 'Total')
Output:
>!! IMPORTANT: The problem I have is that it all should be done in the same file (no new files have to be created)
Mount the path using below procedure:
Create linked service of path and created mount using below code:
mssparkutils.fs.mount(
"abfss://<container>@<storageacc>.dfs.core.windows.net",
"<mount container>",
{"linkedService":"AzureDataLakeStorage1"}
)
jobid=mssparkutils.env.getJobId()
path='/synfs/'+jobid+'/<mount container>/<filename>.csv'
I overwrite the updated dataframe into filename using below code:
df.toPandas().to_csv(path,index = False)
Updated file:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论