Connecting and Authenticating to Delta Lake on Azure Data Lake Storage Gen 2 using delta-rs Python API

huangapple go评论82阅读模式
英文:

Connecting and Authenticating to Delta Lake on Azure Data Lake Storage Gen 2 using delta-rs Python API

问题

我正在尝试使用Delta-rs Python API连接并进行身份验证,以访问Azure Data Lake Storage Gen 2中的现有Delta表。我从这个StackOverflow问题中找到了Delta-rs库:https://stackoverflow.com/questions/67181870/delta-lake-independent-of-apache-spark

然而,Delta-rs的文档(https://delta-io.github.io/delta-rs/python/usage.html 和 https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variant.SasKey)在关于连接和身份验证到Azure Data Lake Storage Gen 2的过程方面相当模糊。我很难找到一个清晰的示例,演示了所需的步骤。

能否有人提供关于如何使用Delta-rs Python API连接和进行身份验证以访问Azure Data Lake Storage Gen 2上的Delta表的逐步指南或示例?

英文:

I am trying to connect and authenticate to an existing Delta Table in Azure Data Lake Storage Gen 2 using the Delta-rs Python API. I found the Delta-rs library from this StackOverflow question: https://stackoverflow.com/questions/67181870/delta-lake-independent-of-apache-spark

However, the documentation for Delta-rs (https://delta-io.github.io/delta-rs/python/usage.html and https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variant.SasKey) is quite vague regarding the authentication and connection process to Azure Data Lake Storage Gen 2. I am having trouble finding a clear example that demonstrates the required steps.

Can someone provide a step-by-step guide or example on how to connect and authenticate to an Azure Data Lake Storage Gen 2 Delta table using the Delta-rs Python API?

答案1

得分: 1

以下是您要翻译的部分:

你可以使用以下Python代码与Azure数据湖存储(ADLS)上的Delta Lake进行交互,使用SAS令牌进行身份验证。该代码从ADLS容器中读取CSV文件,将其内容附加到Delta Lake中,并打印一些元数据。

首先,确保已安装所需的库:

pip install deltalake pandas numpy

然后,使用此Python脚本:

import deltalake as dl
from deltalake.writer import write_deltalake
import pandas as pd
import numpy as np

# 定义您的SAS令牌,存储账户名称,容器名称和文件路径
sas_token = "<请使用SAP存储访问策略生成SAS令牌>"
storage_account_name = "mystorage"
container_name = "test-container"
csv_file = "test_delta/test_csv_data/products1.csv"
delta_path = "test_delta/light_delta_lake"

# CSV URL
csv_url = f"https://{storage_account_name}.dfs.core.windows.net/{container_name}/{csv_file}?{sas_token}"

# 选择协议(abfs或abfss)
# https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri
protocol = "abfss"  # 对于非安全连接,请使用“abfs”

# 构建指定文件夹的URL
delta_url = f"{protocol}://{container_name}@{storage_account_name}.dfs.core.windows.net/{delta_path}"

# 将SAS_TOKEN作为存储选项(也可以通过环境变量设置)
storage_options = {"SAS_TOKEN": f"{sas_token}"}

print(csv_url.replace(sas_token, "<SECRET>"))
print(' ')
print(str(storage_options).replace(sas_token, "<SECRET>"))
print(delta_url.replace(sas_token, "<SECRET>"))

# 从存储账户中读取Delta表
dt = dl.DeltaTable(delta_url, storage_options=storage_options)

# 打印Delta表的模式和文件URI
print(dt.schema())
print(dt.file_uris())

# 将Delta表的历史作为DataFrame打印
print(pd.DataFrame(dt.history()))

# 读取CSV文件,修改数据并将其转换为DataFrame
data = pd.read_csv(csv_url).assign(stars=lambda df: df['rating'].astype(np.int32)).drop(['description', 'ingredients'], axis=1).astype({'rating_count': np.int32})
data.head()

# 将DataFrame附加到Delta表
write_deltalake(table_or_uri=dt, data=data, mode="append")

# 打印更新后的文件URI和Delta表的历史
print(dt.file_uris())
print(pd.DataFrame(dt.history()))
英文:

You can use the following Python code to interact with a Delta Lake on Azure Data Lake Storage (ADLS) using an SAS token for authentication. This code reads a CSV file from an ADLS container, appends its content to a Delta Lake, and prints some metadata.

First, make sure you have the required libraries installed:

pip install deltalake pandas numpy

Then, use this Python script:

import deltalake as dl
from deltalake.writer import write_deltalake
import pandas as pd
import numpy as np
# Define your SAS token, storage account name, container name, and file path
sas_token = &quot;&lt;please_generate_sas_token_using_a_sap_stored_acces_policy&gt;&quot;
storage_account_name = &quot;mystorage&quot;
container_name = &quot;test-container&quot;
csv_file = &quot;test_delta/test_csv_data/products1.csv&quot;
delta_path = &quot;test_delta/light_delta_lake&quot;
#csv url
csv_url = f&quot;https://{storage_account_name}.dfs.core.windows.net/{container_name}/{csv_file}?{sas_token}&quot;
# Choose the protocol (abfs or abfss)
# https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri
protocol = &quot;abfss&quot;  # Use &quot;abfs&quot; for non-secure connections
# Construct the URL for the specified folder
delta_url = f&quot;{protocol}://{container_name}@{storage_account_name}.dfs.core.windows.net/{delta_path}&quot;
# Give SAS_TOKEN as storage option (can be set via ENV variable as well)
storage_options = {&quot;SAS_TOKEN&quot;: f&quot;{sas_token}&quot;}
print(csv_url.replace(sas_token, &quot;&lt;SECRET&gt;&quot;))
print(&#39; &#39;)
print(str(storage_options).replace(sas_token, &quot;&lt;SECRET&gt;&quot;))
print(delta_url.replace(sas_token, &quot;&lt;SECRET&gt;&quot;))
# Read the Delta table from the storage account 
dt = dl.DeltaTable(delta_url, storage_options=storage_options)
# Print the schema and file URIs of the Delta table
print(dt.schema())
print(dt.file_uris())
# Print the history of the Delta table as a DataFrame
print(pd.DataFrame(dt.history()))
# Read the CSV file, modify the data, and convert it to a DataFrame
data = pd.read_csv(csv_url).assign(stars=lambda df: df[&#39;rating&#39;].astype(np.int32)).drop([&#39;description&#39;, &#39;ingredients&#39;], axis=1).astype({&#39;rating_count&#39;: np.int32})
data.head()
# Append the DataFrame to the Delta table
write_deltalake(table_or_uri=dt, data=data, mode=&quot;append&quot;)
# Print the updated file URIs and history of the Delta table
print(dt.file_uris())
print(pd.DataFrame(dt.history()))

huangapple
  • 本文由 发表于 2023年3月31日 03:54:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/75892442.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定