分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

huangapple go评论68阅读模式
英文:

Split .csv file column in 2 in Azure Synapse Analytics using PySpark

问题

I can help you with the translation. Here's the requested part translated into Chinese:

我有一个 .csv 文件(位于 Azure 数据湖存储 中),大致如下 ->

我想要创建一个笔记本(PySpark (Python)),可以在 Synapse Analytics 中执行(集成 -> 流水线)的其中一个流水线中。

笔记本中的代码应该能够将第二列拆分为两部分,并将所有行转换为 GB 单位,以便看起来像这样:

你能否帮忙编写 PySpark 代码?因为我是 Azure Synapse Analytics 的初学者,不太清楚如何操作。

!!重要提示: 我面临的问题是,所有操作都必须在同一文件中完成(不能创建新文件)。

提前感谢您的帮助。

英文:

I have a .csv file (in Azure Data Lake Storage), which looks approximately like this ->
分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

I want to create a notebook (PySpark (Python)), which could be implemented in the synapse analytics (integrate -> pipeline) in one of the pipelines.

The code in notebook should be able to separate 2nd column in 2 and transform all the rows to GB unit, so that it looks like this:
分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

Could you please help with the PySpark code? As I am beginner in Azure synapse analytics and not sure how to do it

!! IMPORTANT: The problem I have is that it all should be done in the same file (no new files have to be created)

Thanks in advance

答案1

得分: 1

One way to do is to use the split function:

Input:

+----+--------+-----+
|  ID|Consumed|Total|
+----+--------+-----+
|1234|80.14 GB|    x|
|2345|  670 KB|    y|
|3456| 9.38 GB|    Z|
+----+--------+-----+

Code:

from pyspark.sql.functions import split, col, lit
from pyspark.sql.types import DoubleType

df = spark.createDataFrame([(1234, '80.14 GB', 'x',),
                            (2345, '670 KB', 'y',),
                            (3456, '9.38 GB', 'Z',)]
                            , ['ID', 'Consumed', 'Total']
                          )
dfUnit = spark.createDataFrame([  (1.0, 'GB',),
                                  (0.001, 'MB',),
                                  (0.000001, 'KB',),]
                                , ['Scale', 'Unit']
                              )

df = (df
      .withColumn("ConsumedSplit",split(col("Consumed"), " "))
      .withColumn("Unit",col("ConsumedSplit")[1])
      .withColumn("Consumed",(col("ConsumedSplit")[0]).cast(DoubleType()))
      .join(dfUnit, on="Unit")
      .withColumn("Consumed",col("Consumed")*col("Scale"))
      .withColumn("Unit",lit("GB"))
      .select("ID", "Consumed", "Unit", "Total")
     )
df.show()

result:

+----+--------+----+-----+
|  ID|Consumed|Unit|Total|
+----+--------+----+-----+
|1234|   80.14|  GB|    x|
|3456|    9.38|  GB|    Z|
|2345|  6.7E-4|  GB|    y|
+----+--------+----+-----+

I would not recommend overwriting on the same file. It's always good practice to separate your stages. You could call the files that you are reading as raw files, e.g. saved in .../Raw/ folder and then write the newly generated files in somewhere like .../Preprocessed/ folder.

It might be a good idea to save the new file in a binary format, like Parquet, both for compression/fileSize plus you save the datatype of each column in the file itself.

英文:

One way to do is to use the split function:

Input:

+----+--------+-----+
|  ID|Consumed|Total|
+----+--------+-----+
|1234|80.14 GB|    x|
|2345|  670 KB|    y|
|3456| 9.38 GB|    Z|
+----+--------+-----+

Code:

from pyspark.sql.functions import split, col, lit
from pyspark.sql.types import DoubleType

df = spark.createDataFrame([(1234,'80.14 GB','x',),
                            (2345,'670 KB','y',),
                            (3456,'9.38 GB','Z',)]
                            , ['ID','Consumed','Total']
                          )
dfUnit = spark.createDataFrame([  (1.0,'GB',),
                                  (0.001,'MB',),
                                  (0.000001,'KB',),]
                                ,['Scale','Unit']
                              )

df = (df
      .withColumn("ConsumedSplit",split(col("Consumed"), " "))
      .withColumn("Unit",col("ConsumedSplit")[1])
      .withColumn("Consumed",(col("ConsumedSplit")[0]).cast(DoubleType()))
      .join(dfUnit, on="Unit")
      .withColumn("Consumed",col("Consumed")*col("Scale"))
      .withColumn("Unit",lit("GB"))
      .select("ID", "Consumed", "Unit", "Total")
     )
df.show()

result:

+----+--------+----+-----+
|  ID|Consumed|Unit|Total|
+----+--------+----+-----+
|1234|   80.14|  GB|    x|
|3456|    9.38|  GB|    Z|
|2345|  6.7E-4|  GB|    y|
+----+--------+----+-----+

I would not recommend overwriting on the same file. It's always good practice to separate your stages. You could call the files that you are reading as raw files, e.g. saved in .../Raw/ folder and then write the newly generated files in somewhere like .../Preprocessed/ folder.

It might be a good idea to save the new file in a binary format, like Parquet, both for compression/fileSize plus you save the datatype of each column in the file itself.

答案2

得分: 1

我从存储中读取CSV文件到一个数据框中。以下是我的文件:

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

我通过拆分"Consumed"列创建了一个名为'Unit'的新列:

split_cols = pyspark.sql.functions.split(df['Consumed'], ' ')

df = df.withColumn('Unit', split_cols.getItem(1))
df = df.withColumn('Unit', when(df['Unit'] == 'KB', 'GB').otherwise(df['Unit']))

我使用以下代码将KB值转换为GB的"Consumed"值:

df =  df.withColumn("Consumed",
                   when(df["Consumed"].contains("GB"),
                        round(regexp_extract(df["Consumed"], r"(\d+\.?\d*)", 1).cast("double"), 2)
                        )
                   .when(df["Consumed"].contains("KB"),
                         round(regexp_extract(df["Consumed"], r"(\d+\.?\d*)", 1).cast("double")/1000000, 5)
                        )
                   .otherwise(0)
                  )

当我使用上述方法时,将670KB的值转换为科学计数法,即6.7E-4。

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

因此,我使用以下命令对其进行格式化:

df = df.withColumn("Consumed", when(col("Consumed") == 6.7E-4, format_number(col("Consumed"),5)).otherwise(col("Consumed")))

并以以下格式选择列:

df = df.select('ID', 'Consumed', 'Unit', 'Total')

输出:

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

!! 重要提示: 我的问题是所有这些操作都应在同一文件中完成(不得创建新文件)。

使用以下过程挂载路径:

创建路径的链接服务并使用以下代码创建挂载:

mssparkutils.fs.mount(
    "abfss://<container>@<storageacc>.dfs.core.windows.net",
    "<mount container>",  
    {"linkedService":"AzureDataLakeStorage1"}
)

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

jobid = mssparkutils.env.getJobId()
path = '/synfs/' + jobid + '/<mount container>/<filename>.csv'

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

我使用以下代码将更新后的数据框覆盖到文件中:

df.toPandas().to_csv(path, index=False)

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

更新后的文件:

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

英文:

I read csv file form my storage into a dataframe.
Here is my file:

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

I created new column called 'Unit' by splitting consumed column;

split_cols = pyspark.sql.functions.split(df[&#39;Consumed&#39;], &#39; &#39;)

    df = df.withColumn(&#39;Unit&#39;, split_cols.getItem(1))
    df = df.withColumn(&#39;Unit&#39;, when(df[&#39;Unit&#39;] == &#39;KB&#39;, &#39;GB&#39;).otherwise(df[&#39;Unit&#39;]))

I converted kb value into GB of consumed value by using below code:

df =  df.withColumn(&quot;Consumed&quot;,
                   when(df[&quot;Consumed&quot;].contains(&quot;GB&quot;),
                        round(regexp_extract(df[&quot;Consumed&quot;], r&quot;(\d+\.?\d*)&quot;, 1).cast(&quot;double&quot;), 2)
                        )
                   .when(df[&quot;Consumed&quot;].contains(&quot;KB&quot;),
                         round(regexp_extract(df[&quot;Consumed&quot;], r&quot;(\d+\.?\d*)&quot;, 1).cast(&quot;double&quot;)/1000000, 5)
                        )
                   .otherwise(0)
                  )

When I try with above one, I am getting 670 kb value as 6.7E-4 i.e., it is converting as scientific notification.

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

So, I formatted it by using below command

df = df.withColumn(&quot;Consumed&quot;, when(col(&quot;Consumed&quot;) == 6.7E-4, format_number(col(&quot;Consumed&quot;),5)).otherwise(col(&quot;Consumed&quot;)))

And selected columns in below format

df = df.select(&#39;ID&#39;, &#39;Consumed&#39;, &#39;Unit&#39;, &#39;Total&#39;)

Output:

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

>!! IMPORTANT: The problem I have is that it all should be done in the same file (no new files have to be created)

Mount the path using below procedure:

Create linked service of path and created mount using below code:

mssparkutils.fs.mount(
&quot;abfss://&lt;container&gt;@&lt;storageacc&gt;.dfs.core.windows.net&quot;,
&quot;&lt;mount container&gt;&quot;,  
{&quot;linkedService&quot;:&quot;AzureDataLakeStorage1&quot;}

)

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

jobid=mssparkutils.env.getJobId()
path=&#39;/synfs/&#39;+jobid+&#39;/&lt;mount container&gt;/&lt;filename&gt;.csv&#39;

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

I overwrite the updated dataframe into filename using below code:

df.toPandas().to_csv(path,index = False)

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

Updated file:

分割 .csv 文件列在 Azure Synapse Analytics 中使用 PySpark

huangapple
  • 本文由 发表于 2023年3月21日 00:41:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/75793010.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定