使用pyspark从Azure文件共享中读取数据。

huangapple go评论67阅读模式
英文:

Use pyspark to read data from azure fileshare

问题

抱歉,以下是您要翻译的内容:

"Hey guys does anyone have an idea how I can read qvd data from azure fileshare

I want to read these qvd convert to parquet then load the data using ADF to the container

But I am having trouble using synapse to read data from specified fileshare this is how the path is defined

base_path = f'abfss://fileshare@storagename.file.core.windows.net'
adf_path= f'{base_path}/WMOS/WMOS'
i get

Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature

however same code but for blob in the same container works"

英文:

Hey guys does anyone have an idea how I can read qvd data from azure fileshare

I want to read these qvd convert to parquet then load the data using ADF to the container

But I am having trouble using synapse to read data from specified fileshare this is how the path is defined

base_path = f'abfss://fileshare@storagename.file.core.windows.net'
adf_path= f'{base_path}/WMOS/WMOS'

i get

Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature

however same code but for blob in the same container works

答案1

得分: 1

首先,请确保你的 Python 版本在 [3.6 - 3.9] 之间。

在创建 Apache Spark 池时,请选择 Spark 版本,使 Python 版本在上述范围内。

接下来,在你的 Synapse 工作区创建一个笔记本,添加以下代码。
要从 Azure 文件共享 获取数据,你需要将其下载到本地,然后读入 Pandas,然后再读入 Spark DataFrame。

将以下代码块添加到你的笔记本中。

pip install azure-storage-file-share==12.1.0 qvd

安装所需的包。

from qvd import qvd_reader
localpath = "tmp.qvd"
connection_string = "Your_conn_string_to_storage_account"
share_name = "Your_file_share_name"
directory_name = "dir_name_in_fileshare"
file_name = "Airlines.qvd"

def download_from_file_storage():
    share_client = ShareClient.from_connection_string(connection_string, share_name)
    file_client = share_client.get_file_client(directory_name + '/' + file_name)
    with open(localpath, "wb") as file:
        download_stream = file_client.download_file()
        file.write(download_stream.readall())

download_from_file_storage()

这是一个下载文件到本地文件系统的函数。

from pyspark.sql.functions import col
df = qvd_reader.read(localpath)
s_df = spark.createDataFrame(df)
s_df = s_df.withColumn("AirlineId", col("%Airline ID")).drop(col("%Airline ID"))
display(s_df)

在这里,我们从本地读取 qvd 文件,并将其转换为 Spark DataFrame。

接下来,使用链接服务将数据写入 adls2 存储为 parquet

linkedServiceName_var = "adls2link"
spark.conf.set("fs.azure.account.auth.type", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")
spark.conf.set("spark.storage.synapse.linkedServiceName", linkedServiceName_var)

raw_container_name = "data"
raw_storageaccount_name = "jgsadls2"
relative_path = "qvd_parquet"
path = f"abfss://{raw_container_name}@{raw_storageaccount_name}.dfs.core.windows.net/{relative_path}"
s_df.write.parquet(path)

在执行之前,你需要创建一个链接服务到你的 adls 存储。

输出:

如果你想在管道中使用这个笔记本,请将它添加到管道中,将退出值设置为 path,然后运行它。
然后在管道的输出中获取路径并进一步使用它。

英文:

First, make sure you have python between these version [3.6 - 3.9]

While creating apache spark pool select spark version in such a way that python version should be between as mentioned above.

Next, create a notebook in your synapse workspace add following code.
To get data from Azure file Share , you need to download it locally and read into pandas then to spark dataframe.

Add below code blocks into you notebook.

pip install azure-storage-file-share==12.1.0 qvd

installs required package.

from qvd import qvd_reader
localpath="tmp.qvd"
connection_string = "Your_conn_string_to_storage_account"
share_name = "Your_file_share_name"
directory_name = "dir_name_in_fileshare"
file_name = "Airlines.qvd"

def  download_from_file_storage():
    share_client = ShareClient.from_connection_string(connection_string, share_name)
    file_client = share_client.get_file_client(directory_name + '/' + file_name)
    with  open(localpath, "wb") as  file:
	    download_stream = file_client.download_file()
	    file.write(download_stream.readall())
	    
download_from_file_storage()

Function which downloads file into local file system.

from pyspark.sql.functions import col
df = qvd_reader.read(localpath)
s_df = spark.createDataFrame(df)
s_df = s_df.withColumn("AirlineId",col("%Airline ID")).drop(col("%Airline ID"))
display(s_df)

Here, reading qvd file from local and converting it to spark dataframe.

使用pyspark从Azure文件共享中读取数据。

Next, using linked service writing that data to adls2 storage as parquet.

使用pyspark从Azure文件共享中读取数据。

linkedServiceName_var = "adls2link"
spark.conf.set("fs.azure.account.auth.type", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")
spark.conf.set("spark.storage.synapse.linkedServiceName", linkedServiceName_var)

raw_container_name = "data"
raw_storageaccount_name = "jgsadls2"
relative_path = "qvd_parquet"
path = f"abfss://{raw_container_name}@{raw_storageaccount_name}.dfs.core.windows.net/qvd_parquet"
s_df.write.parquet(path)

Before, executing this you need to create linked service to your adls storage.

Output:

使用pyspark从Azure文件共享中读取数据。

and

使用pyspark从Azure文件共享中读取数据。

If you want to use this in pipeline, add this notebook to pipeline with exit value as path and run it.
Then take the path in output of the pipeline and use it further.

使用pyspark从Azure文件共享中读取数据。

huangapple
  • 本文由 发表于 2023年7月18日 02:13:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/76707097.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定