spark complaining column not found even though error message show its there

huangapple go评论68阅读模式
英文:

spark complaining column not found even though error message show its there

问题

以下是您提供的代码中的已翻译部分:

我正在尝试将嵌套的 JSON 数据展平到单独的列并应用转换在实际场景中它是一个嵌套的 JSON因此我无法预测提前嵌套了哪些列所以我使用递归循环来遍历 JSON 并将其展平以下是示例情况

#### 输入 JSON

{
    "order": {        
        "name": "John Doe",
        "address": {
            "state": "NY"
        },
        "orders": [
            {
                "order_id": 1001,
                "quarter": "2023-06-30"
            },
            {
                "order_id": 1002,
                "quarter": "2023-06-29"
            }
        ]
    }
}

展平后显示如下

+-------------------+----------+---------------------+------------------------+
|order.address.state|order.name|order.orders.order_id|order.orders.quarter    |
+-------------------+----------+---------------------+------------------------+
|NY                 |John Doe  |[1001, 1002]         |[2023-06-30, 2023-06-29]|
|NY                 |John Doe  |[1001, 1002]         |[2023-06-30, 2023-06-29]|
+-------------------+----------+---------------------+------------------------+

但是当我尝试转换列时它抱怨order.address.state不存在即使错误消息将此特定列列为其中一个列!!!!

如果您需要进一步的帮助,请随时告诉我。

英文:

I am trying to flatten a nested json to individual columns and apply transformation. In the real scenario its a nested json so I can't predict what columns are nested ahead of time, so I am using recursive looping to traverse through the json and flatten it out. Below is the sample scenario:

Input JSON

{
	"order": {		
		"name": "John Doe",
		"address": {
			"state": "NY"
		},
		"orders": [
			{
				"order_id": 1001,
				"quarter": "2023-06-30"
			},
			{
				"order_id": 1002,
				"quarter": "2023-06-29"
			}
		]
	}
}

After flattening, it displays as below:

+-------------------+----------+---------------------+------------------------+
|order.address.state|order.name|order.orders.order_id|order.orders.quarter    |
+-------------------+----------+---------------------+------------------------+
|NY                 |John Doe  |[1001, 1002]         |[2023-06-30, 2023-06-29]|
|NY                 |John Doe  |[1001, 1002]         |[2023-06-30, 2023-06-29]|
+-------------------+----------+---------------------+------------------------+

But when i try to transform the column it complains that order.address.state is missing, even though the error message lists this specific column as one of the columns !!!!

> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> Column 'order.address.state' does not exist. Did you mean one of the
> following? [order.address.state, order.orders.quarter, order.name,
> order.orders.order_id];

Code Snippet

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Column;

import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class JsonFlattener {

    // UDF to transform values based on the config file
    private static UserDefinedFunction udfTransform = functions.udf(
        (UDF1<Object, Object>) value -> {
            // Your transformation logic based on the config file
            // Here, we're simply appending 'transformed_' prefix to the value
            return "transformed_" + value;
        }, DataTypes.StringType);

    public static void main(String[] args) throws IOException {
        SparkSession spark = SparkSession.builder()
            .appName("JsonFlattener")
            .master("local")
            .getOrCreate();

        spark.udf().register("getQuarterOfYearUDF", getQuarterOfYearUDF, DataTypes.IntegerType);

        // Read the JSON file
        Dataset<Row> input = spark.read().option("multiLine","true").option("inferSchema","true").json("/home/akannan/INPUT/orders2.json");

        input.show(false);

        // Flatten the nested JSON recursively based on the level of nesting
        Dataset<Row> flattened = flattenNestedJson(input);
        flattened.printSchema();
        flattened.show(false);

        // Read the config file to determine which columns need transformation
        String configPath = "/home/akannan/INPUT/orders_modify.json"; // Specify the path to your config file
        String configFileContents = new String(Files.readAllBytes(Paths.get(configPath)));
        Gson gson = new Gson();
        Type listType = new TypeToken<List<UDFFullReplace.FieldTransformation>>() {
        }.getType();
        List<UDFFullReplace.FieldTransformation> transformations = gson.fromJson(configFileContents, listType);

        // Apply the UDF transformation to the specified columns
        Dataset<Row> transformed = applyTransformation(flattened, transformations);

        transformed.show(false);
        transformed.printSchema();
        // Unflatten the DataFrame back to the original JSON format
        Dataset<Row> unflattened = unflattenJson(transformed);

        // Show the result
        unflattened.show();
    }

    private static Dataset<Row> flattenNestedJson(Dataset<Row> input) {
        StructType schema = input.schema();
        return flattenNestedJson(input, "", schema);
    }

    private static Dataset<Row> flattenNestedJson(Dataset<Row> input, String prefix, DataType dataType) {
        if (dataType instanceof StructType) {
            StructType structType = (StructType) dataType;
            for (StructField field : structType.fields()) {
                String columnName = field.name();
                String fullColumnName = prefix.isEmpty() ? columnName : prefix + "." + columnName;
                DataType columnDataType = field.dataType();
                input = flattenColumn(input, fullColumnName, columnDataType);
            }
            input = input.drop(prefix);
        } else if (dataType instanceof ArrayType) {
            ArrayType arrayType = (ArrayType) dataType;
            DataType elementType = arrayType.elementType();
//            input = input.withColumn(prefix, functions.explode(functions.col(prefix)))
//                .selectExpr("*", prefix + ".*")
//                .drop(prefix);
            input = input.withColumn(prefix, functions.explode_outer(functions.col(prefix)));

            input = flattenColumn(input, prefix, elementType);
        }
        return input;
    }

    private static Dataset<Row> flattenColumn(Dataset<Row> input, String columnName, DataType columnDataType) {
        if (columnDataType instanceof StructType) {
            input = flattenNestedJson(input, columnName, columnDataType);
        } else if (columnDataType instanceof  ArrayType) {
            input = flattenNestedJson(input, columnName, columnDataType);
        }
        else {
            input = input.withColumn(columnName, functions.col(columnName));
        }
        return input;
    }

    private static Dataset<Row> applyTransformation(Dataset<Row> input, List<UDFFullReplace.FieldTransformation> transformations) {
        input.cache();
        for (UDFFullReplace.FieldTransformation transformation : transformations) {
            String[] fieldNames = transformation.getName().split("\\.");
            String columnName = transformation.getName();
            String udf = transformation.getUdf();

//            input = input.withColumn(columnName, udfTransform.apply(functions.col(columnName)));
            input =  input.withColumn(columnName, functions.concat(functions.col(columnName), functions.lit("_suffix")));
        }
        return input;


    }

    private static Dataset<Row> unflattenJson(Dataset<Row> input) {
        // Group the columns based on the nested structure and create a struct
        StructType schema = input.schema();
        StructField[] fields = schema.fields();
        Column[] columns = new Column[fields.length];
        for (int i = 0; i < fields.length; i++) {
            columns[i] = functions.col(fields[i].name());
        }

        // Create a single column named 'value' with the struct
        //Dataset<Row> withStructColumn = input.withColumn("value", functions.struct(columns));
        Dataset<Row> withStructColumn = input.withColumn("value", functions.struct(
            input.col("order.address.state").alias("state"),
            input.col("order.name").alias("name"),
            input.col("order.orders.order_id").alias("order_id"),
            input.col("order.orders.quarter").alias("quarter")
        ));

        // Convert the DataFrame to a JSON string
        Dataset<Row> jsonString = withStructColumn.select(functions.to_json(functions.col("value")).alias("json"));

        // Parse the JSON string and extract the struct column
        return jsonString.select(functions.from_json(functions.col("json"), schema).alias("value")).select("value.*");
    }


    static UDF1<String, Integer> getQuarterOfYearUDF = (dateString) -> {
        String[] parts = dateString.split("-");
        int month = Integer.parseInt(parts[1]);
        int quarter = (month - 1) / 3 + 1;
        return quarter;
    };
}

Stacktrace

Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 'order.address.state' does not exist. Did you mean one of the following? [order.address.state, order.orders.quarter, order.name, order.orders.order_id];
'Project [concat('order.address.state, _suffix) AS order.address.state#95, order.name#15, order.orders.order_id#27, order.orders.quarter#34]
+- Project [order.address.state#9, order.name#15, order.orders.order_id#27, order.orders.quarter#34]
   +- Project [order#0, order.address.state#9, order.name#15, order.orders.order_id#27, order.orders.quarter#34]
      +- Project [order#0, order.address.state#9, order.name#15, order.orders#22, order.orders.order_id#27, order#0.orders.quarter AS order.orders.quarter#34]
         +- Project [order#0, order.address.state#9, order.name#15, order.orders#22, order#0.orders.order_id AS order.orders.order_id#27]
            +- Project [order#0, order.address.state#9, order.name#15, order.orders#22]
               +- Generate explode(order#0.orders), true, [order.orders#22]
                  +- Project [order#0, order.address.state#9, order#0.name AS order.name#15]
                     +- Project [order#0, order#0.address.state AS order.address.state#9]
                        +- Relation [order#0] json

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7(CheckAnalysis.scala:199)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7$adapted(CheckAnalysis.scala:192)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6(CheckAnalysis.scala:192)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6$adapted(CheckAnalysis.scala:192)
	at scala.collection.immutable.Stream.foreach(Stream.scala:533)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:192)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:101)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:101)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:96)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:187)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:210)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
	at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
	at org.apache.spark.sql.Dataset.select(Dataset.scala:1519)
	at org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2542)
	at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2480)
	at org.nypd.nextgen.data.pipeline.JsonFlattener.applyTransformation(JsonFlattener.java:116)
	at org.nypd.nextgen.data.pipeline.JsonFlattener.main(JsonFlattener.java:57)

答案1

得分: 1

你的代码还不完整,但我在这里预见到的问题是列名中包含点号。

尽量避免在列名中使用点号,因为点号通常用于获取嵌套字段。一般情况下,不要使用任何特殊字符,只使用下划线。

如果你真的想保留点号,那么在访问指定列时,请使用反引号(`):

flattened.withColumn("test", functions.col("`order.address.state`"));

英文:

Your code is not complete but what I forsee here is a problem with the dots inside the column names.

Try to avoid that as dots are usually used to get nested fields. In general don't use any special character, only underscores.

If you really want to keep the dots, then when accessing the specified column, use back ticks (`)

flattened.withColumn("test", functions.col("`order.address.state`"));

答案2

得分: 1

I see you have written very big logic to flatten array columns. You can use inline function to flatten array columns in line like below.

Use . to access any columns from struct & array.

df.printSchema
root
 |-- order: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- orders: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- order_id: long (nullable = true)
 |    |    |    |-- quarter: string (nullable = true)

df.select("order.*").selectExpr("*", "inline(orders)").drop("orders").printSchema
root
 |-- address: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |-- name: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- quarter: string (nullable = true)

df.select("order.*").selectExpr("*", "inline(orders)").drop("orders").show(false)
+-------+--------+--------+----------+
|address|name    |order_id|quarter   |
+-------+--------+--------+----------+
|{NY}   |John Doe|1001    |2023-06-30|
|{NY}   |John Doe|1002    |2023-06-29|
+-------+--------+--------+----------+
英文:

I see you have written very big logic to flatten array columns. You can use inline function to flatten array columns in line like below.

Use . to access any columns from struct & array.

scala> df.printSchema
root
|-- order: struct (nullable = true)
|    |-- address: struct (nullable = true)
|    |    |-- state: string (nullable = true)
|    |-- name: string (nullable = true)
|    |-- orders: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- order_id: long (nullable = true)
|    |    |    |-- quarter: string (nullable = true)
scala> df.select("order.*").selectExpr("*", "inline(orders)").drop("orders").printSchema
root
|-- address: struct (nullable = true)
|    |-- state: string (nullable = true)
|-- name: string (nullable = true)
|-- order_id: long (nullable = true)
|-- quarter: string (nullable = true)
scala> df.select("order.*").selectExpr("*", "inline(orders)").drop("orders").show(false)
+-------+--------+--------+----------+
|address|name    |order_id|quarter   |
+-------+--------+--------+----------+
|{NY}   |John Doe|1001    |2023-06-30|
|{NY}   |John Doe|1002    |2023-06-29|
+-------+--------+--------+----------+

huangapple
  • 本文由 发表于 2023年7月4日 23:35:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/76614121.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定