解压大文件使用Databricks PySpark

huangapple go评论53阅读模式
英文:

Unzipping Large Files Using Databricks PySpark

问题

我有一个情景,其中有两个属于两个不同的Azure存储账户的"blob容器"。这两个容器都被挂载到一个Databricks工作区中。第一个容器"cnt-input"有一个包含大量"zip"文件(每天约20K个),每个文件大小约为5GB。压缩比为1/10,意味着解压后的结果是多个总大小约为50GB的"csv"文件。每天都会将"zip"文件推送到此容器中。我想创建一个pyspark笔记本,其中:

1- 逐步处理文件;检测新到达的文件并保留已处理的文件。

2- 以批处理的方式处理大量文件(例如20K个),解压并将解压后的"csv"文件保存在第二个"blob"容器"cnt-output"中。

3- 处理应该高效且最佳化(Databricks运行时版本为12.1,集群有8个"Standard_DS3_v2"类型的工作节点)。

英文:

I have a scenario, where there are two blob containers belonging to two different azure storage accounts. Both containers are mounted in a Databricks workspace. The first container cnt-input has a folder with large number of zip files (20K per day), each is appx 5GB in size. The compression ratio is 1/10, meaning that when decompressed, the result is multiple csv files with total size of appx. 50GB. Zip files are pushed to this container on a daily basis. I would like to create a pyspark notebook that:

1- Process files incrementally; detect new incoming files and leave the ones already processed.

2- Take a large number of files as a batch (e.g. 20K) and unzip it, and save the decompressed csv files in the second blob container: cnt-output.

3- The process should be performant and optimal (Databricks runtime version is 12.1, and the cluster has 8 worker nodes of type Standard_DS3_v2).

答案1

得分: 2

以下是翻译好的部分:

你可以按照以下方法操作。

这里,我正在加载zip文件,提取并使用自动加载器概念将其上传到Blob容器中。
它会逐增量地加载数据。

import os
import zipfile

def extract_zip_file(file_data):
    file_path, modificationTime, length, content = file_data
    with zipfile.ZipFile(file_path.replace("dbfs:", "/dbfs"), 'r') as zip_ref:
        zip_ref.extractall("/dbfs" + output_mount_path + "csv_data")

input_mount_path = "/mnt/jgsblob/cnt-input/"
output_mount_path = "/mnt/jgsblob2/cnt-ouput/"

extract_zip_file是用于提取由Spark的readstream读取的文件的函数,如下所示。

query = spark.readStream.format("cloudFiles").option("cloudFiles.format", "binaryFile").option("pathGlobFilter", "*.zip")\
.load(input_mount_path+"Databricks").writeStream.option("checkpointLocation", output_mount_path+"chk_point/")\
.foreachBatch(lambda batch_df, batch_id: batch_df.foreach(extract_zip_file)).start()

在此代码中,我使用了格式为cloudFiles,它调用了自动加载器,并且checkpointLocation用于存储已上传的记录。

以下是容器一中的zip文件,即输入挂载路径:
input_mount_path = "/mnt/jgsblob/cnt-input/"

解压大文件使用Databricks PySpark

输出:
以下是容器二中提取的文件,即输出挂载路径。
output_mount_path = "/mnt/jgsblob2/cnt-ouput/"

解压大文件使用Databricks PySpark

在这里,您可以看到最初的cosmetics.csvvoted-kaggle-dataset是在11:51创建的,然后我上传了新的zip文件,然后在11:55加载了gold.csv
即使之前的zip文件仍在容器一中。

请确保根据您的输入数据大小使用正确的节点类型。
因为我已经测试过这个方法适用于小型文件。

英文:

You can follow below approach.

Here, I am loading zip files, extract it and upload it to blob container using
auto loader concept.
Which loads data incrementally.

import os
import zipfile

  

def extract_zip_file(file_data):
    file_path, modificationTime,length,content = file_data
	with zipfile.ZipFile(file_path.replace("dbfs:","/dbfs"), 'r') as zip_ref:
		zip_ref.extractall("/dbfs"+output_mount_path+"csv_data")

input_mount_path = "/mnt/jgsblob/cnt-input/"
output_mount_path = "/mnt/jgsblob2/cnt-ouput/"

extract_zip_file is the function used to extract files which are read by spark
readstream as below.

query = spark.readStream.format("cloudFiles").option("cloudFiles.format", "binaryFile").option("pathGlobFilter", "*.zip")\
.load(input_mount_path+"Databricks").writeStream.option("checkpointLocation", output_mount_path+"chk_point/")\
.foreachBatch(lambda batch_df, batch_id: batch_df.foreach(extract_zip_file)).start()

解压大文件使用Databricks PySpark

In this code, I used the format as cloudFiles which invokes auto loader,
and checkpointLocation where it stores the records which are already uploaded.

Below are the zip files in container one, that is input mount path
input_mount_path = "/mnt/jgsblob/cnt-input/"

解压大文件使用Databricks PySpark

Output:
Below are files extracted in container 2, that is output mount path.
output_mount_path = "/mnt/jgsblob2/cnt-ouput/"

解压大文件使用Databricks PySpark

Here, you can see initially the cosmetics.csv and voted-kaggle-dataset are created at 11:51 and I uploaded new zip file, then at 11:55 gold.csv loaded,
even though previous zip files contain in container one.

Make sure to use the correct node type according to your input data size.
Because I have tested this for small size files.

huangapple
  • 本文由 发表于 2023年6月15日 00:06:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/76475541.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定