英文:
spark complaining column not found even though error message show its there
问题
以下是您提供的代码中的已翻译部分:
我正在尝试将嵌套的 JSON 数据展平到单独的列并应用转换。在实际场景中,它是一个嵌套的 JSON,因此我无法预测提前嵌套了哪些列,所以我使用递归循环来遍历 JSON 并将其展平。以下是示例情况:
#### 输入 JSON
{
"order": {
"name": "John Doe",
"address": {
"state": "NY"
},
"orders": [
{
"order_id": 1001,
"quarter": "2023-06-30"
},
{
"order_id": 1002,
"quarter": "2023-06-29"
}
]
}
}
展平后,显示如下:
+-------------------+----------+---------------------+------------------------+
|order.address.state|order.name|order.orders.order_id|order.orders.quarter |
+-------------------+----------+---------------------+------------------------+
|NY |John Doe |[1001, 1002] |[2023-06-30, 2023-06-29]|
|NY |John Doe |[1001, 1002] |[2023-06-30, 2023-06-29]|
+-------------------+----------+---------------------+------------------------+
但是当我尝试转换列时,它抱怨“order.address.state”不存在,即使错误消息将此特定列列为其中一个列!!!!
如果您需要进一步的帮助,请随时告诉我。
英文:
I am trying to flatten a nested json to individual columns and apply transformation. In the real scenario its a nested json so I can't predict what columns are nested ahead of time, so I am using recursive looping to traverse through the json and flatten it out. Below is the sample scenario:
Input JSON
{
"order": {
"name": "John Doe",
"address": {
"state": "NY"
},
"orders": [
{
"order_id": 1001,
"quarter": "2023-06-30"
},
{
"order_id": 1002,
"quarter": "2023-06-29"
}
]
}
}
After flattening, it displays as below:
+-------------------+----------+---------------------+------------------------+
|order.address.state|order.name|order.orders.order_id|order.orders.quarter |
+-------------------+----------+---------------------+------------------------+
|NY |John Doe |[1001, 1002] |[2023-06-30, 2023-06-29]|
|NY |John Doe |[1001, 1002] |[2023-06-30, 2023-06-29]|
+-------------------+----------+---------------------+------------------------+
But when i try to transform the column it complains that order.address.state
is missing, even though the error message lists this specific column as one of the columns !!!!
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> Column 'order.address.state' does not exist. Did you mean one of the
> following? [order.address.state, order.orders.quarter, order.name,
> order.orders.order_id];
Code Snippet
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Column;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class JsonFlattener {
// UDF to transform values based on the config file
private static UserDefinedFunction udfTransform = functions.udf(
(UDF1<Object, Object>) value -> {
// Your transformation logic based on the config file
// Here, we're simply appending 'transformed_' prefix to the value
return "transformed_" + value;
}, DataTypes.StringType);
public static void main(String[] args) throws IOException {
SparkSession spark = SparkSession.builder()
.appName("JsonFlattener")
.master("local")
.getOrCreate();
spark.udf().register("getQuarterOfYearUDF", getQuarterOfYearUDF, DataTypes.IntegerType);
// Read the JSON file
Dataset<Row> input = spark.read().option("multiLine","true").option("inferSchema","true").json("/home/akannan/INPUT/orders2.json");
input.show(false);
// Flatten the nested JSON recursively based on the level of nesting
Dataset<Row> flattened = flattenNestedJson(input);
flattened.printSchema();
flattened.show(false);
// Read the config file to determine which columns need transformation
String configPath = "/home/akannan/INPUT/orders_modify.json"; // Specify the path to your config file
String configFileContents = new String(Files.readAllBytes(Paths.get(configPath)));
Gson gson = new Gson();
Type listType = new TypeToken<List<UDFFullReplace.FieldTransformation>>() {
}.getType();
List<UDFFullReplace.FieldTransformation> transformations = gson.fromJson(configFileContents, listType);
// Apply the UDF transformation to the specified columns
Dataset<Row> transformed = applyTransformation(flattened, transformations);
transformed.show(false);
transformed.printSchema();
// Unflatten the DataFrame back to the original JSON format
Dataset<Row> unflattened = unflattenJson(transformed);
// Show the result
unflattened.show();
}
private static Dataset<Row> flattenNestedJson(Dataset<Row> input) {
StructType schema = input.schema();
return flattenNestedJson(input, "", schema);
}
private static Dataset<Row> flattenNestedJson(Dataset<Row> input, String prefix, DataType dataType) {
if (dataType instanceof StructType) {
StructType structType = (StructType) dataType;
for (StructField field : structType.fields()) {
String columnName = field.name();
String fullColumnName = prefix.isEmpty() ? columnName : prefix + "." + columnName;
DataType columnDataType = field.dataType();
input = flattenColumn(input, fullColumnName, columnDataType);
}
input = input.drop(prefix);
} else if (dataType instanceof ArrayType) {
ArrayType arrayType = (ArrayType) dataType;
DataType elementType = arrayType.elementType();
// input = input.withColumn(prefix, functions.explode(functions.col(prefix)))
// .selectExpr("*", prefix + ".*")
// .drop(prefix);
input = input.withColumn(prefix, functions.explode_outer(functions.col(prefix)));
input = flattenColumn(input, prefix, elementType);
}
return input;
}
private static Dataset<Row> flattenColumn(Dataset<Row> input, String columnName, DataType columnDataType) {
if (columnDataType instanceof StructType) {
input = flattenNestedJson(input, columnName, columnDataType);
} else if (columnDataType instanceof ArrayType) {
input = flattenNestedJson(input, columnName, columnDataType);
}
else {
input = input.withColumn(columnName, functions.col(columnName));
}
return input;
}
private static Dataset<Row> applyTransformation(Dataset<Row> input, List<UDFFullReplace.FieldTransformation> transformations) {
input.cache();
for (UDFFullReplace.FieldTransformation transformation : transformations) {
String[] fieldNames = transformation.getName().split("\\.");
String columnName = transformation.getName();
String udf = transformation.getUdf();
// input = input.withColumn(columnName, udfTransform.apply(functions.col(columnName)));
input = input.withColumn(columnName, functions.concat(functions.col(columnName), functions.lit("_suffix")));
}
return input;
}
private static Dataset<Row> unflattenJson(Dataset<Row> input) {
// Group the columns based on the nested structure and create a struct
StructType schema = input.schema();
StructField[] fields = schema.fields();
Column[] columns = new Column[fields.length];
for (int i = 0; i < fields.length; i++) {
columns[i] = functions.col(fields[i].name());
}
// Create a single column named 'value' with the struct
//Dataset<Row> withStructColumn = input.withColumn("value", functions.struct(columns));
Dataset<Row> withStructColumn = input.withColumn("value", functions.struct(
input.col("order.address.state").alias("state"),
input.col("order.name").alias("name"),
input.col("order.orders.order_id").alias("order_id"),
input.col("order.orders.quarter").alias("quarter")
));
// Convert the DataFrame to a JSON string
Dataset<Row> jsonString = withStructColumn.select(functions.to_json(functions.col("value")).alias("json"));
// Parse the JSON string and extract the struct column
return jsonString.select(functions.from_json(functions.col("json"), schema).alias("value")).select("value.*");
}
static UDF1<String, Integer> getQuarterOfYearUDF = (dateString) -> {
String[] parts = dateString.split("-");
int month = Integer.parseInt(parts[1]);
int quarter = (month - 1) / 3 + 1;
return quarter;
};
}
Stacktrace
Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 'order.address.state' does not exist. Did you mean one of the following? [order.address.state, order.orders.quarter, order.name, order.orders.order_id];
'Project [concat('order.address.state, _suffix) AS order.address.state#95, order.name#15, order.orders.order_id#27, order.orders.quarter#34]
+- Project [order.address.state#9, order.name#15, order.orders.order_id#27, order.orders.quarter#34]
+- Project [order#0, order.address.state#9, order.name#15, order.orders.order_id#27, order.orders.quarter#34]
+- Project [order#0, order.address.state#9, order.name#15, order.orders#22, order.orders.order_id#27, order#0.orders.quarter AS order.orders.quarter#34]
+- Project [order#0, order.address.state#9, order.name#15, order.orders#22, order#0.orders.order_id AS order.orders.order_id#27]
+- Project [order#0, order.address.state#9, order.name#15, order.orders#22]
+- Generate explode(order#0.orders), true, [order.orders#22]
+- Project [order#0, order.address.state#9, order#0.name AS order.name#15]
+- Project [order#0, order#0.address.state AS order.address.state#9]
+- Relation [order#0] json
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7(CheckAnalysis.scala:199)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7$adapted(CheckAnalysis.scala:192)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6(CheckAnalysis.scala:192)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6$adapted(CheckAnalysis.scala:192)
at scala.collection.immutable.Stream.foreach(Stream.scala:533)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:192)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:101)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:101)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:96)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:187)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:210)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1519)
at org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2542)
at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2480)
at org.nypd.nextgen.data.pipeline.JsonFlattener.applyTransformation(JsonFlattener.java:116)
at org.nypd.nextgen.data.pipeline.JsonFlattener.main(JsonFlattener.java:57)
答案1
得分: 1
你的代码还不完整,但我在这里预见到的问题是列名中包含点号。
尽量避免在列名中使用点号,因为点号通常用于获取嵌套字段。一般情况下,不要使用任何特殊字符,只使用下划线。
如果你真的想保留点号,那么在访问指定列时,请使用反引号(`):
flattened.withColumn("test", functions.col("`order.address.state`"));
英文:
Your code is not complete but what I forsee here is a problem with the dots inside the column names.
Try to avoid that as dots are usually used to get nested fields. In general don't use any special character, only underscores.
If you really want to keep the dots, then when accessing the specified column, use back ticks (`)
flattened.withColumn("test", functions.col("`order.address.state`"));
答案2
得分: 1
I see you have written very big logic to flatten array columns. You can use inline
function to flatten array columns in line like below.
Use .
to access any columns from struct & array.
df.printSchema
root
|-- order: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- state: string (nullable = true)
| |-- name: string (nullable = true)
| |-- orders: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- order_id: long (nullable = true)
| | | |-- quarter: string (nullable = true)
df.select("order.*").selectExpr("*", "inline(orders)").drop("orders").printSchema
root
|-- address: struct (nullable = true)
| |-- state: string (nullable = true)
|-- name: string (nullable = true)
|-- order_id: long (nullable = true)
|-- quarter: string (nullable = true)
df.select("order.*").selectExpr("*", "inline(orders)").drop("orders").show(false)
+-------+--------+--------+----------+
|address|name |order_id|quarter |
+-------+--------+--------+----------+
|{NY} |John Doe|1001 |2023-06-30|
|{NY} |John Doe|1002 |2023-06-29|
+-------+--------+--------+----------+
英文:
I see you have written very big logic to flatten array columns. You can use inline
function to flatten array columns in line like below.
Use .
to access any columns from struct & array.
scala> df.printSchema
root
|-- order: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- state: string (nullable = true)
| |-- name: string (nullable = true)
| |-- orders: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- order_id: long (nullable = true)
| | | |-- quarter: string (nullable = true)
scala> df.select("order.*").selectExpr("*", "inline(orders)").drop("orders").printSchema
root
|-- address: struct (nullable = true)
| |-- state: string (nullable = true)
|-- name: string (nullable = true)
|-- order_id: long (nullable = true)
|-- quarter: string (nullable = true)
scala> df.select("order.*").selectExpr("*", "inline(orders)").drop("orders").show(false)
+-------+--------+--------+----------+
|address|name |order_id|quarter |
+-------+--------+--------+----------+
|{NY} |John Doe|1001 |2023-06-30|
|{NY} |John Doe|1002 |2023-06-29|
+-------+--------+--------+----------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论