英文:
Use pyspark to read data from azure fileshare
问题
抱歉,以下是您要翻译的内容:
"Hey guys does anyone have an idea how I can read qvd data from azure fileshare
I want to read these qvd convert to parquet then load the data using ADF to the container
But I am having trouble using synapse to read data from specified fileshare this is how the path is defined
base_path = f'abfss://fileshare@storagename.file.core.windows.net'
adf_path= f'{base_path}/WMOS/WMOS'
i get
Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature
however same code but for blob in the same container works"
英文:
Hey guys does anyone have an idea how I can read qvd data from azure fileshare
I want to read these qvd convert to parquet then load the data using ADF to the container
But I am having trouble using synapse to read data from specified fileshare this is how the path is defined
base_path = f'abfss://fileshare@storagename.file.core.windows.net'
adf_path= f'{base_path}/WMOS/WMOS'
i get
Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature
however same code but for blob in the same container works
答案1
得分: 1
首先,请确保你的 Python 版本在 [3.6 - 3.9] 之间。
在创建 Apache Spark 池时,请选择 Spark 版本,使 Python 版本在上述范围内。
接下来,在你的 Synapse 工作区创建一个笔记本,添加以下代码。
要从 Azure 文件共享 获取数据,你需要将其下载到本地,然后读入 Pandas,然后再读入 Spark DataFrame。
将以下代码块添加到你的笔记本中。
pip install azure-storage-file-share==12.1.0 qvd
安装所需的包。
from qvd import qvd_reader
localpath = "tmp.qvd"
connection_string = "Your_conn_string_to_storage_account"
share_name = "Your_file_share_name"
directory_name = "dir_name_in_fileshare"
file_name = "Airlines.qvd"
def download_from_file_storage():
share_client = ShareClient.from_connection_string(connection_string, share_name)
file_client = share_client.get_file_client(directory_name + '/' + file_name)
with open(localpath, "wb") as file:
download_stream = file_client.download_file()
file.write(download_stream.readall())
download_from_file_storage()
这是一个下载文件到本地文件系统的函数。
from pyspark.sql.functions import col
df = qvd_reader.read(localpath)
s_df = spark.createDataFrame(df)
s_df = s_df.withColumn("AirlineId", col("%Airline ID")).drop(col("%Airline ID"))
display(s_df)
在这里,我们从本地读取 qvd 文件,并将其转换为 Spark DataFrame。
接下来,使用链接服务将数据写入 adls2 存储为 parquet。
linkedServiceName_var = "adls2link"
spark.conf.set("fs.azure.account.auth.type", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")
spark.conf.set("spark.storage.synapse.linkedServiceName", linkedServiceName_var)
raw_container_name = "data"
raw_storageaccount_name = "jgsadls2"
relative_path = "qvd_parquet"
path = f"abfss://{raw_container_name}@{raw_storageaccount_name}.dfs.core.windows.net/{relative_path}"
s_df.write.parquet(path)
在执行之前,你需要创建一个链接服务到你的 adls 存储。
输出:
如果你想在管道中使用这个笔记本,请将它添加到管道中,将退出值设置为 path,然后运行它。
然后在管道的输出中获取路径并进一步使用它。
英文:
First, make sure you have python between these version [3.6 - 3.9]
While creating apache spark pool select spark version in such a way that python version should be between as mentioned above.
Next, create a notebook in your synapse workspace add following code.
To get data from Azure file Share , you need to download it locally and read into pandas then to spark dataframe.
Add below code blocks into you notebook.
pip install azure-storage-file-share==12.1.0 qvd
installs required package.
from qvd import qvd_reader
localpath="tmp.qvd"
connection_string = "Your_conn_string_to_storage_account"
share_name = "Your_file_share_name"
directory_name = "dir_name_in_fileshare"
file_name = "Airlines.qvd"
def download_from_file_storage():
share_client = ShareClient.from_connection_string(connection_string, share_name)
file_client = share_client.get_file_client(directory_name + '/' + file_name)
with open(localpath, "wb") as file:
download_stream = file_client.download_file()
file.write(download_stream.readall())
download_from_file_storage()
Function which downloads file into local file system.
from pyspark.sql.functions import col
df = qvd_reader.read(localpath)
s_df = spark.createDataFrame(df)
s_df = s_df.withColumn("AirlineId",col("%Airline ID")).drop(col("%Airline ID"))
display(s_df)
Here, reading qvd file from local and converting it to spark dataframe.
Next, using linked service writing that data to adls2 storage as parquet.
linkedServiceName_var = "adls2link"
spark.conf.set("fs.azure.account.auth.type", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")
spark.conf.set("spark.storage.synapse.linkedServiceName", linkedServiceName_var)
raw_container_name = "data"
raw_storageaccount_name = "jgsadls2"
relative_path = "qvd_parquet"
path = f"abfss://{raw_container_name}@{raw_storageaccount_name}.dfs.core.windows.net/qvd_parquet"
s_df.write.parquet(path)
Before, executing this you need to create linked service to your adls storage.
Output:
and
If you want to use this in pipeline, add this notebook to pipeline with exit value as path and run it.
Then take the path in output of the pipeline and use it further.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论