英文:
Unzipping Large Files Using Databricks PySpark
问题
我有一个情景,其中有两个属于两个不同的Azure存储账户的"blob容器"。这两个容器都被挂载到一个Databricks工作区中。第一个容器"cnt-input"有一个包含大量"zip"文件(每天约20K个),每个文件大小约为5GB。压缩比为1/10,意味着解压后的结果是多个总大小约为50GB的"csv"文件。每天都会将"zip"文件推送到此容器中。我想创建一个pyspark笔记本,其中:
1- 逐步处理文件;检测新到达的文件并保留已处理的文件。
2- 以批处理的方式处理大量文件(例如20K个),解压并将解压后的"csv"文件保存在第二个"blob"容器"cnt-output"中。
3- 处理应该高效且最佳化(Databricks运行时版本为12.1,集群有8个"Standard_DS3_v2"类型的工作节点)。
英文:
I have a scenario, where there are two blob containers
belonging to two different azure
storage accounts
. Both containers are mounted in a Databricks
workspace. The first container cnt-input
has a folder with large number of zip
files (20K per day), each is appx 5GB in size. The compression ratio is 1/10, meaning that when decompressed, the result is multiple csv
files with total size of appx. 50GB. Zip
files are pushed to this container on a daily basis. I would like to create a pyspark
notebook that:
1- Process files incrementally; detect new incoming files and leave the ones already processed.
2- Take a large number of files as a batch (e.g. 20K) and unzip it, and save the decompressed csv
files in the second blob
container: cnt-output
.
3- The process should be performant and optimal (Databricks
runtime version is 12.1, and the cluster has 8 worker nodes of type Standard_DS3_v2
).
答案1
得分: 2
以下是翻译好的部分:
你可以按照以下方法操作。
这里,我正在加载zip文件,提取并使用自动加载器概念将其上传到Blob容器中。
它会逐增量地加载数据。
import os
import zipfile
def extract_zip_file(file_data):
file_path, modificationTime, length, content = file_data
with zipfile.ZipFile(file_path.replace("dbfs:", "/dbfs"), 'r') as zip_ref:
zip_ref.extractall("/dbfs" + output_mount_path + "csv_data")
input_mount_path = "/mnt/jgsblob/cnt-input/"
output_mount_path = "/mnt/jgsblob2/cnt-ouput/"
extract_zip_file
是用于提取由Spark的readstream
读取的文件的函数,如下所示。
query = spark.readStream.format("cloudFiles").option("cloudFiles.format", "binaryFile").option("pathGlobFilter", "*.zip")\
.load(input_mount_path+"Databricks").writeStream.option("checkpointLocation", output_mount_path+"chk_point/")\
.foreachBatch(lambda batch_df, batch_id: batch_df.foreach(extract_zip_file)).start()
在此代码中,我使用了格式为cloudFiles,它调用了自动加载器,并且checkpointLocation用于存储已上传的记录。
以下是容器一中的zip文件,即输入挂载路径:
input_mount_path = "/mnt/jgsblob/cnt-input/"
输出:
以下是容器二中提取的文件,即输出挂载路径。
output_mount_path = "/mnt/jgsblob2/cnt-ouput/"
在这里,您可以看到最初的cosmetics.csv和voted-kaggle-dataset是在11:51创建的,然后我上传了新的zip文件,然后在11:55加载了gold.csv,
即使之前的zip文件仍在容器一中。
请确保根据您的输入数据大小使用正确的节点类型。
因为我已经测试过这个方法适用于小型文件。
英文:
You can follow below approach.
Here, I am loading zip files, extract it and upload it to blob container using
auto loader concept.
Which loads data incrementally.
import os
import zipfile
def extract_zip_file(file_data):
file_path, modificationTime,length,content = file_data
with zipfile.ZipFile(file_path.replace("dbfs:","/dbfs"), 'r') as zip_ref:
zip_ref.extractall("/dbfs"+output_mount_path+"csv_data")
input_mount_path = "/mnt/jgsblob/cnt-input/"
output_mount_path = "/mnt/jgsblob2/cnt-ouput/"
extract_zip_file
is the function used to extract files which are read by spark
readstream
as below.
query = spark.readStream.format("cloudFiles").option("cloudFiles.format", "binaryFile").option("pathGlobFilter", "*.zip")\
.load(input_mount_path+"Databricks").writeStream.option("checkpointLocation", output_mount_path+"chk_point/")\
.foreachBatch(lambda batch_df, batch_id: batch_df.foreach(extract_zip_file)).start()
In this code, I used the format as cloudFiles which invokes auto loader,
and checkpointLocation where it stores the records which are already uploaded.
Below are the zip files in container one, that is input mount path
input_mount_path = "/mnt/jgsblob/cnt-input/"
Output:
Below are files extracted in container 2, that is output mount path.
output_mount_path = "/mnt/jgsblob2/cnt-ouput/"
Here, you can see initially the cosmetics.csv and voted-kaggle-dataset are created at 11:51 and I uploaded new zip file, then at 11:55 gold.csv loaded,
even though previous zip files contain in container one.
Make sure to use the correct node type according to your input data size.
Because I have tested this for small size files.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论