Deltalake on Spark: 从Azure存储读取表时初始化配置失败。

huangapple go评论69阅读模式
英文:

Deltalake on spark: Failure to initialize configuration while reading a table from azure storage

问题

上下文

我正在尝试从本地 Spark 集群读取存储在 Azure 上的 Delta 表。我尝试通过Azure Data Lake Storage Gen2 (abfss://) 的方式进行访问,而不是遗留的 Blob 存储。

Spark shell 探索

最终目标是一个 PySpark 应用程序,但为了了解发生了什么,我试图从 Spark shell 中读取表格。这是我启动它的方式:

spark-shell \
--packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-azure:3.3.1  \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf "fs.azure.account.key.<storage_account>.dfs.core.windows.net=<storage_key>"  \

这是我尝试读取表格的方式:

val dt = spark.read.format("delta").load(f"abfss://hub@<storage_account>.dfs.core.windows.net/fec/fec.delta")

这是我得到的错误信息:

org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException: Failure to initialize configuration
  at org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:51)
  at org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:548)
  at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1449)
  at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:215)
  at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:128)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
  at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
  at org.apache.spark.sql.delta.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:184)
  at org.apache.spark.sql.delta.sources.DeltaDataSource$.parsePathIdentifier(DeltaDataSource.scala:314)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.x$1$lzycompute(DeltaTableV2.scala:70)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.x$1(DeltaTableV2.scala:65)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.timeTravelByPath$lzycompute(DeltaTableV2.scala:65)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.timeTravelByPath(DeltaTableV2.scala:65)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$timeTravelSpec$1(DeltaTableV2.scala:99)

我有一种感觉,我遵循了文档,而且我能够在 Python 中使用 delta-rs 使用完全相同的凭据读取 Delta 表:我对凭据很确定。

我可能忘记了设置某些东西,但是我读文档的次数越多,我就越感到困惑。我还尝试设置了 Oauth2 认证,但最终得到了相同的异常。我越是考虑这个问题,我就越觉得 --conf "fs.azure.account.key.<storage_account>.blob.core.windows.net=<storage_key>" 没有被正确使用(但是我不知道为什么)。

环境

  • java 8.0.275.hs-adpt
  • scala 2.13.10
  • spark 3.3.2
英文:

Context

I am trying to read a delta table stored on azure from a local spark cluster. The way I try to reach it is through Azure Data Lake Storage Gen2 (abfss://), not the legacy Blob Storage

Spark shell exploration

The final goal is a pyspark application but, to understand what's going on, I am trying to read the table from a spark shell. Here is how I launch it:

spark-shell \
--packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-azure:3.3.1  \
--conf &quot;spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension&quot; \
--conf &quot;spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog&quot; \
--conf &quot;fs.azure.account.key.&lt;storage_account&gt;.dfs.core.windows.net=&lt;storage_key&gt;&quot;  \

Here is how I try to read the table

val dt = spark.read.format(&quot;delta&quot;).load(f&quot;abfss://hub@&lt;storage_account&gt;.dfs.core.windows.net/fec/fec.delta&quot;)

and here is the error I get

org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException: Failure to initialize configuration
  at org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:51)
  at org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:548)
  at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1449)
  at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.&lt;init&gt;(AzureBlobFileSystemStore.java:215)
  at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:128)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
  at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
  at org.apache.spark.sql.delta.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:184)
  at org.apache.spark.sql.delta.sources.DeltaDataSource$.parsePathIdentifier(DeltaDataSource.scala:314)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.x$1$lzycompute(DeltaTableV2.scala:70)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.x$1(DeltaTableV2.scala:65)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.timeTravelByPath$lzycompute(DeltaTableV2.scala:65)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.timeTravelByPath(DeltaTableV2.scala:65)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$timeTravelSpec$1(DeltaTableV2.scala:99)

I have the impression that I followed the documentation and I was able to read delta table in python using delta-rs with the very same creds: I am sure about the creds.

I probably forgot to set something but, the more I read docs, the more confused I feel. I also tried to set an Oauth2 auth but end up with the same exception. The more I think of it, the more I feel that --conf &quot;fs.azure.account.key.&lt;storage_account&gt;.blob.core.windows.net=&lt;storage_key&gt;&quot; is not taken into account (but I have no idea why)

Environment

  • java 8.0.275.hs-adpt
  • scala 2.13.10
  • spark 3.3.2

答案1

得分: 0

认证原来必须从Spark会话中设置:

```bash
spark-shell \
--packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-azure:3.3.2  \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \

然后:

spark.conf.set("fs.azure.account.key.<storage_account>.dfs.core.windows.net","<storage_key>")
val dt = spark.read.format("delta").load(f"abfss://hub@satestoct.dfs.core.windows.net/fec/fec.delta")
英文:

Turns out, authentication has to be set from the spark session

spark-shell \
--packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-azure:3.3.2  \
--conf &quot;spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension&quot; \
--conf &quot;spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog&quot; \

and then

spark.conf.set(&quot;fs.azure.account.key.&lt;storage_account&gt;.dfs.core.windows.net&quot;,&quot;&lt;storage_key&gt;&quot;)
val dt = spark.read.format(&quot;delta&quot;).load(f&quot;abfss://hub@satestoct.dfs.core.windows.net/fec/fec.delta&quot;)

huangapple
  • 本文由 发表于 2023年5月11日 20:20:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/76227587.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定