如何使Spark在将DataFrame转换为具有较少字段的案例类的Dataset时引发异常?

huangapple go评论60阅读模式
英文:

How to get Spark to raise an exception when casting a DataFrame to a Dataset with a case class that has fewer fields?

问题

如果DataFrame df 的行类型是 (col1: Int, col2: String),而我的 case class 是 MyClass(col1: Int)(缺少 col2: String),那么 df.as[MyClass] 不会引发异常。

如果要让它引发异常以便知道我的 case class 缺少与 DataFrame 相比的某些字段,可以考虑使用强制类型转换来实现。

例如,你可以尝试以下方法:

df.select($"col1".as[Int], $"col2".as[String]).as[MyClass]

这将在字段不匹配时引发异常。希望这可以帮助你实现你的需求。

英文:

Suppose we have a DataFrame df with row type (col1: Int, col2: String). If I have a case class MyClass(col1: Int) (missing col2: String), then df.as[MyClass] will not raise an exception.

How could I get it to raise an exception so I know that my case class is missing some fields compared to the DataFrame?

Edit

Adding a bit more info. My main use case is to fail if upstream data sources change their schemas. It works fine if df's schema has fewer columns; MyClass2(col1: Int, col2: String, col3: Double => df.as[MyClass2] fails with the following exception.

org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `col3` cannot be resolved. Did you mean one of the following? [`col1`, `col2`].

答案1

得分: 3

The .as[] method有一些在源代码中的文档(我链接到的是v3.4.0,截止到此帖子发布日期最新的版本)。重要的部分如下:

> 如果数据集的模式不匹配所需的U类型,则可以使用selectaliasas来重新排列或重命名。
>
> 请注意,as[]只会更改传递给类型化操作(例如map())的数据的视图,不会急切地投影掉未在指定类中存在的任何列。

这意味着你所看到的行为是完全符合预期的。由于.as[]不会急切地投影掉不在你的MyClass类中存在的任何列,你必须自己使用.select来做这个工作。

所以回答你的问题:仅使用.as[]方法,你无法发生这些错误。

请注意,你仍然可以在你的模式中指定的那些列上使用类型化操作。考虑以下示例:

scala> val df = Seq((1, 2), (3, 4)).toDF("col1", "col2")
scala> case class MyClass(col1: Int)

scala> df.show
+----+----+
|col1|col2|
+----+----+
|   1|   2|
|   3|   4|
+----+----+

scala> df.as[MyClass].map(_.col1 + 1).show
+-----+
|value|
+-----+
|    2|
|    4|
+-----+

如果你真的想触发一个错误,你可以通过手动比较模式来实现。一个非常简化的示例:

import org.apache.spark.sql.Encoders

val classSchema = Encoders.product[MyClass].schema
val dfSchema = df.schema

if (classSchema != dfSchema)
  throw new RuntimeException("你的模式不一样")
英文:

The .as[] method has some documentation in the source code (I'm linking to v3.4.0, the latest as of the date of this post). The important bit is this:

> If the schema of the Dataset does not match the desired U type, you can use select along with alias or as to rearrange or rename as required.
>
> Note that as[] only changes the view of the data that is passed into typed operations, such as map(), and does not eagerly project away any columns that are not present in the specified class.

This means that the behaviour you are seeing is completely expected. Since .as[] does not eagerly project away any columns that are not present in your MyClass class, you'll have to do it yourself using .select.

So to answer your question: through only using the .as[] method, you can't get these errors to take place.

Please note that you're still able to use typed operations on those columns that you did specify in your schema. Consider the following example:

scala> val df = Seq((1, 2), (3, 4)).toDF("col1", "col2")
scala> case class MyClass(col1: Int)

scala> df.show
+----+----+
|col1|col2|
+----+----+
|   1|   2|
|   3|   4|
+----+----+


scala> df.as[MyClass].map(_.col1 + 1).show
+-----+
|value|
+-----+
|    2|
|    4|
+-----+

If you really want to trigger an error, you could do so by manually comparing the schemas. A very simplified example:

import org.apache.spark.sql.Encoders

val classSchema = Encoders.product[MyClass].schema
val dfSchema = df.schema

if (classSchema != dfSchema)
  throw new RuntimeException("Your schemas are not the same")

答案2

得分: 0

一种方法是使用SparkSessionExtensions.injectResolutionRule注册(注入)一个解析规则,以扩展(改进)分析器的工作方式。

据说在分析之后,查询将产生一些结果。您必须停止它,这就是解析规则发挥作用的地方。


再仔细考虑一下,这种基础设施并非真正特定于应用程序的功能不应该由用户代码(由开发人员编写)处理,而应该由Spark SQL的“平台”本身处理,该平台可以扩展并应该扩展。

英文:

One way would be to register (inject) a resolution rule using SparkSessionExtensions.injectResolutionRule that would extend (improve) how Analyzer works.

It's said that after analysis a query is going to produce some result. You have to stop it and that's where resolution rules come in.


Thinking about it a little more, such infrastructure not-really-application-specific features should not be handled by user code (written by a developer) but the Spark SQL "platform" itself that can and should be extended.

huangapple
  • 本文由 发表于 2023年5月14日 00:55:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/76243926.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定