如何在Spark中从JSON输入文件创建DataFrame?

huangapple go评论58阅读模式
英文:

how to create dataframe from json input file in spark?

问题

I am creating dataframe from downloaded json file which results in error that some of data is corrupted.
我正在从下载的 JSON 文件中创建数据框架,但出现了一些数据损坏的错误。

I used spark.read.json("json file path") to create dataframe. Error is like:
我使用了 spark.read.json("JSON 文件路径") 来创建数据框架。错误信息如下:

pyspark.sql.utils.AnalysisException:
pyspark.sql.utils.AnalysisException:

Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default). For example:
自 Spark 2.3 开始,当引用的列仅包括内部损坏记录列(默认命名为 _corrupt_record)时,不允许从原始 JSON/CSV 文件执行查询。例如:

spark.read.schema(schema).csv(file).filter("_corrupt_record".isNotNull).count()
spark.read.schema(schema).csv(file).filter("_corrupt_record".isNotNull).count()

and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
以及 spark.read.schema(schema).csv(file).select("_corrupt_record").show()。

Instead, you can cache or save the parsed results and then send the same query. For example, val df = spark.read.schema(schema).csv(file).cache() and then df.filter("_corrupt_record".isNotNull).count().
而是,您可以缓存或保存解析后的结果,然后发送相同的查询。例如,val df = spark.read.schema(schema).csv(file).cache(),然后 df.filter("_corrupt_record".isNotNull).count()。

英文:

I am creating dataframe from downloaded json file which results in error that some of data is corrupted.
i used spark.read.json("json file path") to create dataframe. Error is like:
pyspark.sql.utils.AnalysisException:
Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

答案1

得分: 1

以下是翻译好的部分:

由于您没有指定错误,我假设您看到了类似于我看到的内容:

pyspark.errors.exceptions.captured.AnalysisException:从Spark 2.3开始,当引用的列仅包括内部损坏记录列(默认命名为 _corrupt_record)时,不允许从原始JSON/CSV文件查询。例如:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
和 spark.read.schema(schema).csv(file).select("_corrupt_record").show()。
相反,您可以缓存或保存解析后的结果,然后发送相同的查询。
例如,val df = spark.read.schema(schema).csv(file).cache() 然后 df.filter($"_corrupt_record".isNotNull).count()。

要修复它,您需要使用如下命令:

spark.read.option("multiline","true").json("json文件路径")

这里是一个示例:

cat /Users/yuri/test.json
{
"name": "Yuri",
"gender": "male"
}

这是pyspark代码:

df = spark.read.option("multiline","true").json("/Users/yuri/test.json")
df.printSchema()
df.show(truncate=False)

这是输出:

df.printSchema()
root
|-- gender: string (nullable = true)
|-- name: string (nullable = true)

df.show(truncate=False)
+------+----+
|gender|name|
+------+----+
|male |Yuri|
+------+----+

英文:

As you don't specify the error you have, I assume you see something similar to what i see:

pyspark.errors.exceptions.captured.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

To fix it you need to use command like:

spark.read.option("multiline","true").json("json file path")

Here is an example:

cat /Users/yuri/test.json 
{
	"name": "Yuri",
	"gender": "male"
}

Here is the pyspark code:

df = spark.read.option("multiline","true").json("/Users/yuri/test.json")
df.printSchema()
df.show(truncate=False)

Here is the output:

>>> df.printSchema()
root
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)

>>> df.show(truncate=False)
+------+----+
|gender|name|
+------+----+
|male  |Yuri|
+------+----+

huangapple
  • 本文由 发表于 2023年5月10日 22:22:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/76219575.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定