Spark “直接在文件上运行SQL” 失败

huangapple go评论65阅读模式
英文:

Spark "Run SQL on files directly" fails

问题

Spark文档建议我可以在PySpark中直接使用以下语法在文件上运行SQL:

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

我有一些Delta格式的数据存储在AWS S3桶中(实际上是一个本地LocalStack中的模拟S3桶)。我尝试使用这种方法进行查询。桶的结构如下:

$ awslocal s3 ls s3://data-lake/data/
                   PRE _delta_log/
2023-06-07 16:55:33        903 part-00000-923a2eea-e2fe-468e-b2b9-a85206858ddb-c000.snappy.parquet
2023-06-07 16:55:33        914 part-00001-a6ceac4f-d7ea-44e9-bce9-d7fe0c103e35-c000.snappy.parquet

虽然以下代码可以正常工作:

df = spark.read.format("delta").load("s3://data-lake/data")

但是我尝试使用以下SQL语法进行查询时失败:

df = spark.sql("select * from delta.`s3://data-lake/data`")

(我也尝试过直接使用文件路径而不仅仅是包含目录,但没有区别)

数据是通过以下简单的Scala Spark代码编写的:

val ds = Seq(
  Foo("a", Bar(1, 2)),
  // 等等
).toDS()

ds
  .write
  .format("delta")
  .mode(SaveMode.Overwrite)
  .save("s3://data-lake/data")

当我尝试查询时,我收到以下错误,稍作编辑以减少冗长:

Py4JError                                 Traceback (most recent call last)
/tmp/ipykernel_178/100492152.py in <module>

---> 11 df = spark.sql("select * from delta.`s3://data-lake/data`")

<snip>

py4j.Py4JException: Method sql([class java.lang.String, class java.util.HashMap]) does not exist

感觉这是与内部实现相关的问题,而不是我的语法错误(尤其是因为如果我在Scala中尝试相同的方法,它可以正常工作)。

  • Python 3.7.16
  • PySpark 3.4.0
  • 从Jupyter笔记本运行,针对基于emr-serverless/spark/emr-6.10.0:latest镜像的LocalStack容器。
英文:

Spark documentation suggests I can run SQL directly on files in PySpark with syntax like this:

df = spark.sql(&quot;SELECT * FROM parquet.`examples/src/main/resources/users.parquet`&quot;)

I have some Delta format data sitting in an AWS S3 bucket (well, currently a fake S3 bucket in LocalStack). I'm trying to query it using this approach. The bucket looks like this:

$ awslocal s3 ls s3://data-lake/data/
                       PRE _delta_log/
2023-06-07 16:55:33        903 part-00000-923a2eea-e2fe-468e-b2b9-a85206858ddb-c000.snappy.parquet
2023-06-07 16:55:33        914 part-00001-a6ceac4f-d7ea-44e9-bce9-d7fe0c103e35-c000.snappy.parquet

And while the following works just fine:

df = spark.read.format(&quot;delta&quot;).load(&quot;s3://data-lake/data&quot;)

The syntax I'm trying to query it with using SQL as follows fails:

df = spark.sql(&quot;select * from delta.`s3://data-lake/data`&quot;)

(I've also tried with the path directly to a file rather than just the containing directory with not difference).

The data is written via the following simple Scala Spark code:

val ds = Seq(
  Foo(&quot;a&quot;, Bar(1, 2)),
  // etc
).toDS()

ds
  .write
  .format(&quot;delta&quot;)
  .mode(SaveMode.Overwrite)
  .save(&quot;s3://data-lake/data&quot;)

The error I am getting when I try and query this is as follows, edited a bit for brevity:

Py4JError                                 Traceback (most recent call last)
/tmp/ipykernel_178/100492152.py in &lt;module&gt;

---&gt; 11 df = spark.sql(&quot;select * from delta.`s3://data-lake/data`&quot;)

&lt;snip&gt;

py4j.Py4JException: Method sql([class java.lang.String, class java.util.HashMap]) does not exist

Feels like a weird thing in the plumbing rather than a syntax error on my part (not least because if I try the same approach in Scala it works OK).

  • Python 3.7.16
  • PySpark 3.4.0
  • Running from a Jupyter notebook against a LocalStack container based on the emr-serverless/spark/emr-6.10.0:latest image.

答案1

得分: 0

看起来你误解了文档。

文档中说你可以直接从一个文件中读取。

我建议尝试以下代码:

df = spark.sql("select * from parquet.`s3://data-lake/data/part-00000-923a2eea-e2fe-468e-b2b9-a85206858ddb-c000.snappy.parquet`")

但由于Delta文件夹不是文件格式,我不明白为什么select * from delta会起作用。

df = spark.read.format("delta").load("s3://data-lake/data") 能够正常工作的原因是你可以从一个文件夹中加载数据。

英文:

It looks like you've mistaken the documentation.

It says you can read directly from a file.

> Delta Lake is an open source file protocol that stores data in Parquet files

I suggest trying:

df = spark.sql(&quot;select * from parquet.`s3://data-lake/data/part-00000-923a2eea-e2fe-468e-b2b9-a85206858ddb-c000.snappy.parquet`&quot;)

But since delta folders are not file formats, I do not see why select * from delta would ever work.

The reason why df = spark.read.format(&quot;delta&quot;).load(&quot;s3://data-lake/data&quot;) works is because you can load from a folder.

答案2

得分: 0

在我看来,这似乎是PySpark和底层Spark版本不匹配的问题。错误是py4j无法调用带有参数化查询和参数名称与其值的映射的SparkSession::sql()方法:

                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

这个特定的重载是在Spark 3.4.0中添加的,而PySpark 3.4.0始终使用它,无论查询是否参数化:

...
try:
   litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
   return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
...

确保Spark的主目录在笔记本环境中正确配置,并且PySpark未使用较旧的Spark版本。

事实上,检查emr-6.10.0中包含的软件版本证实了我的理论是正确的:

Spark “直接在文件上运行SQL” 失败

将PySpark降级到版本3.3.1。

英文:

To me, it looks like a mismatch between the versions of PySpark and the underlying Spark. The error is that py4j is not able to invoke the SparkSession::sql() method that takes both a parameterised query and a map of parameter names with their values:

py4j.Py4JException: Method sql([class java.lang.String, class java.util.HashMap]) does not exist
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

That particular overload was added in Spark 3.4.0 and PySpark 3.4.0 always makes use of it, no matter if the query is parameterised or not:

...
try:
   litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
   return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
...

Make sure that Spark's home directory is properly configured in the notebook environment and that PySpark is not using an older Spark version.


Indeed, checking what version is the software included in emr-6.10.0 proves my theory correct:

Spark “直接在文件上运行SQL” 失败

Downgrade your PySpark to version 3.3.1.

huangapple
  • 本文由 发表于 2023年6月9日 06:51:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76436178.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定