from_json输出在Spark Dataframe模式中定义为Int时保存为null

huangapple go评论75阅读模式
英文:

from_json output saved as null when defined in schema as Int for Spark Dataframe

问题

在使用from_json与使用Encoders创建的schema一起使用时,是从case class中提取的,但只使用DF而不是DS,如下所示:

case class MyProducts(PRODUCT_ID: Option[String], DESCRIPTION: Option[String], PRICE: Option[Int], OLD_FIELD_1: Option[String]) 
val ProductsSchema = Encoders.product[MyProducts].schema

val df_products_output_final = df_products_output.withColumn("parsedProducts", from_json(col("afterImage"), ProductsSchema)) 

> 1. 将PRICE定义为Int时,该字段的值为null。
> 2. 将PRICE定义为String时,该字段的值为String。
> 3. 在DF模式中,将Int定义为正确的DF定义。

问题出在哪里?

import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._
import org.apache.spark.sql.functions.{col, lit, when, from_json, map_keys, map_values, regexp_replace, coalesce}
import org.apache.spark.sql.types.{MapType, StringType}
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.{MapType, StringType, StructType, IntegerType}

case class MyMeta(op: String, table: String)
val metaSchema = Encoders.product[MyMeta].schema
case class MySales(NUM: Option[Integer], PRODUCT_ID: Option[String], DESCRIPTION: Option[String], OLD_FIELD_1: Option[String]) 
val salesSchema = Encoders.product[MySales].schema
case class MyProducts(PRODUCT_ID: Option[String], DESCRIPTION: Option[String], PRICE: Option[Int], OLD_FIELD_1: Option[String]) 
val ProductsSchema = Encoders.product[MyProducts].schema

def getAfterImage (op: String, data: String, key: String, jsonOLD_TABLE_FIELDS: String) : String = {   
  val jsonOLD_FIELDS = parse(jsonOLD_TABLE_FIELDS)   
  val jsonData = parse(data)                         
  val jsonKey = parse(key)                           
   
  op match {
  case "ins" =>
               return(compact(render(jsonData merge jsonOLD_FIELDS)))
  case _ => 
               val Diff(changed, added, deleted) = jsonKey diff jsonData
               return(compact(render(changed merge deleted merge jsonOLD_FIELDS)))
  }
}
val afterImage = spark.udf.register("callUDFAI", getAfterImage _)

val path = "/FileStore/tables/json_0006_file.txt"  
val df = spark.read.text(path)  // String.
val df2 = df.withColumn("value", from_json(col("value"), MapType(StringType, StringType)))    
val df3 = df2.select(map_values(col("value")))  
val df4 = df3.select($"map_values(value)"(0).as("meta"), $"map_values(value)"(1).as("data"), $"map_values(value)"(2).as("key")).withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").select(col("parsedMeta.*"), col("data"), col("key")).withColumn("key2", coalesce(col("key"), lit(""" { "DUMMY_FIELD_XXX": ""} """) )).toDF().cache()
// 此阶段的DF,不是DF。

val df_sales    = df4.filter('table === "BILL.SALES") 
val df_products = df4.filter('table === "BILL.PRODUCTS")
val df_sales_output = df_sales.withColumn("afterImage", afterImage(col("op"), col("data"), col("key2") , lit(""" { "OLD_FIELD_1": ""} """)))
                              .select("afterImage") 
val df_products_output = df_products.withColumn("afterImage", afterImage(col("op"), col("data"), col("key2") , lit(""" { "OLD_FIELD_A":"","OLD_FIELD_B":""} """)))
                                    .select("afterImage")                          
val df_sales_output_final = df_sales_output.withColumn("parsedSales", from_json(col("afterImage"), salesSchema)) 
df_products_output_final.show(false)
df_products_output_final.printSchema()
英文:

When using from_json with schema created with Encoders, from a case class but only using DF, not DS, as per below:

case class MyProducts(PRODUCT_ID: Option[String], DESCRIPTION: Option[String], PRICE: Option[Int], OLD_FIELD_1: Option[String]) 
val ProductsSchema = Encoders.product[MyProducts].schema

val df_products_output_final = df_products_output.withColumn("parsedProducts", from_json(col("afterImage"), ProductsSchema)) 

> 1. When defining PRICE as Int, I get a null value in the field.
> 2. When defining PRICE as String, I get a String value in the field.
> 3. The DF definition for Int is correct in DF schema.

What is the issue here?

Code:

import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._
import org.apache.spark.sql.functions.{col, lit, when, from_json, map_keys, map_values, regexp_replace, coalesce}
import org.apache.spark.sql.types.{MapType, StringType}
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.{MapType, StringType, StructType, IntegerType}

case class MyMeta(op: String, table: String)
val metaSchema = Encoders.product[MyMeta].schema
case class MySales(NUM: Option[Integer], PRODUCT_ID: Option[String], DESCRIPTION: Option[String], OLD_FIELD_1: Option[String]) 
val salesSchema = Encoders.product[MySales].schema
case class MyProducts(PRODUCT_ID: Option[String], DESCRIPTION: Option[String], PRICE: Option[Int], OLD_FIELD_1: Option[String]) 
val ProductsSchema = Encoders.product[MyProducts].schema

def getAfterImage (op: String, data: String, key: String, jsonOLD_TABLE_FIELDS: String) : String = {   
  val jsonOLD_FIELDS = parse(jsonOLD_TABLE_FIELDS)   
  val jsonData = parse(data)                         
  val jsonKey = parse(key)                           
   
  op match {
  case "ins" =>
               return(compact(render(jsonData merge jsonOLD_FIELDS)))
  case _ => 
               val Diff(changed, added, deleted) = jsonKey diff jsonData
               return(compact(render(changed merge deleted merge jsonOLD_FIELDS)))
  }
}
val afterImage = spark.udf.register("callUDFAI", getAfterImage _)

val path = "/FileStore/tables/json_0006_file.txt"  
val df = spark.read.text(path)  // String.
val df2 = df.withColumn("value", from_json(col("value"), MapType(StringType, StringType)))    
val df3 = df2.select(map_values(col("value")))  
val df4 = df3.select($"map_values(value)"(0).as("meta"), $"map_values(value)"(1).as("data"), $"map_values(value)"(2).as("key")).withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").select(col("parsedMeta.*"), col("data"), col("key")).withColumn("key2", coalesce(col("key"), lit(""" { "DUMMY_FIELD_XXX": ""} """) )).toDF().cache()
// DF at this stage, not a DF.

val df_sales    = df4.filter('table === "BILL.SALES") 
val df_products = df4.filter('table === "BILL.PRODUCTS")
val df_sales_output = df_sales.withColumn("afterImage", afterImage(col("op"), col("data"), col("key2") , lit(""" { "OLD_FIELD_1": ""} """)))
                              .select("afterImage") 
val df_products_output = df_products.withColumn("afterImage", afterImage(col("op"), col("data"), col("key2") , lit(""" { "OLD_FIELD_A":"", "OLD_FIELD_B":""} """)))
                                    .select("afterImage")                          
val df_sales_output_final = df_sales_output.withColumn("parsedSales", from_json(col("afterImage"), salesSchema)) 
df_products_output_final.show(false)
df_products_output_final.printSchema()

答案1

得分: 1

你的 PRICE 字段值周围的引号弄乱了这个问题。

如果你将输入数据从:

{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "DESCRIPTION":"XXX" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":"4099" }}
{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "PRICE":"4000" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":"3599" }} 

变成

{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "DESCRIPTION":"XXX" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":4099 }}
{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "PRICE":4000 }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":3599 }} 

(区别只是PRICE值周围的引号)。那么你的脚本会得到以下输出:

+--------------------------------------------------------------------------------------------------------------------+---------------------------------------------------+
|afterImage                                                                                                          |parsedProducts                                     |
+--------------------------------------------------------------------------------------------------------------------+---------------------------------------------------+
|{"DESCRIPTION":"XXX","PRODUCT_ID":"230117","PRICE":4099,"OLD_FIELD_A":"","OLD_FIELD_B":""}                          |{230117, XXX, 4099, null}                          |
|{"PRICE":4000,"PRODUCT_ID":"230117","DESCRIPTION":"Hamsberry vintage tee, cherry","OLD_FIELD_A":"","OLD_FIELD_B":""}|{230117, Hamsberry vintage tee, cherry, 4000, null}|
+--------------------------------------------------------------------------------------------------------------------+---------------------------------------------------+

root
 |-- afterImage: string (nullable = true)
 |-- parsedProducts: struct (nullable = true)
 |    |-- PRODUCT_ID: string (nullable = true)
 |    |-- DESCRIPTION: string (nullable = true)
 |    |-- PRICE: integer (nullable = true)
 |    |-- OLD_FIELD_1: string (nullable = true)
英文:

The quotes around the values of your PRICE field are messing this up.

If you change your input data from:

{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "DESCRIPTION":"XXX" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":"4099" }}
{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "PRICE":"4000" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":"3599" }} 

to

{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "DESCRIPTION":"XXX" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":4099 }}
{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "PRICE":4000 }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":3599 }} 

(the difference is just the quotes around the PRICE values).

Then you get this output from your script:

+--------------------------------------------------------------------------------------------------------------------+---------------------------------------------------+
|afterImage                                                                                                          |parsedProducts                                     |
+--------------------------------------------------------------------------------------------------------------------+---------------------------------------------------+
|{"DESCRIPTION":"XXX","PRODUCT_ID":"230117","PRICE":4099,"OLD_FIELD_A":"","OLD_FIELD_B":""}                          |{230117, XXX, 4099, null}                          |
|{"PRICE":4000,"PRODUCT_ID":"230117","DESCRIPTION":"Hamsberry vintage tee, cherry","OLD_FIELD_A":"","OLD_FIELD_B":""}|{230117, Hamsberry vintage tee, cherry, 4000, null}|
+--------------------------------------------------------------------------------------------------------------------+---------------------------------------------------+
root
|-- afterImage: string (nullable = true)
|-- parsedProducts: struct (nullable = true)
|    |-- PRODUCT_ID: string (nullable = true)
|    |-- DESCRIPTION: string (nullable = true)
|    |-- PRICE: integer (nullable = true)
|    |-- OLD_FIELD_1: string (nullable = true)

No null values for PRICE anymore!!

huangapple
  • 本文由 发表于 2023年4月7日 04:49:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/75953660.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定