Getting error like java.lang.String is not a valid external type for schema of double In below code

huangapple go评论119阅读模式
英文:

Getting error like java.lang.String is not a valid external type for schema of double In below code

问题

I will now provide the translated code segments as per your request:

object DataTypeValidation extends Logging {

  def main(args: Array[String]) {

    val spark = SparkSession.builder()
      .appName("SparkProjectforDataTypeValidation")
      .master("local")
      .getOrCreate();
    spark.sparkContext.setLogLevel("ERROR")
    try {

      breakable {
        val format = new SimpleDateFormat("d-M-y hh:mm:ss.SSSSS")
        println("*********数据类型验证已启动*************** " + format.format(Calendar.getInstance().getTime()))

        val data = Seq(Row(873131558, "ABC22"), Row(29000000, 99.00), Row(27000000, 2.34))
        val schema = StructType(Array(
          StructField("oldcl", IntegerType, nullable = true),
          StructField("newcl", DoubleType, nullable = true))
        )

        val ONE = 1
        var erroredRecordRow = new scala.collection.mutable.ListBuffer[Row]()
        val newSchema = schema.fields.map({
          case StructField(name, _: IntegerType, nullorNotnull, _) => StructField(name, StringType, nullorNotnull)
          case StructField(name, _: DoubleType, nullorNotnull, _) => StructField(name, StringType, nullorNotnull)
          case fields => fields
        }).dropRight(ONE)
        val newStructType = StructType { newSchema }
        val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
        df.show()
        print(df.schema)
      }

    } catch {
      case exception: Exception =>
        println("在架构验证中捕获异常的数据类型不匹配: " + exception.toString())
        exception.printStackTrace()
    }
    spark.stop()
  }
}
在架构验证中捕获异常的数据类型不匹配: org.apache.spark.SparkException: 由于阶段失败而中止作业: 阶段 0.0 中的任务 0 失败 1 次,最近一次失败: 在阶段 0.0 中丢失了任务 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: 编码时出错: java.lang.RuntimeException: java.lang.String 不是 double 的有效外部类型的模式
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, oldcl), IntegerType) AS oldcl#0
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, newcl), DoubleType) AS newcl#1
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)

Please let me know if there's anything else you need.

英文:

My code Looks like below:

object DataTypeValidation extends Logging {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.appName("SparkProjectforDataTypeValidation")
.master("local")
.getOrCreate();
spark.sparkContext.setLogLevel("ERROR")
try {
breakable {
val format = new SimpleDateFormat("d-M-y hh:mm:ss.SSSSS")
println("*********Data Type Validation Started*************** " + format.format(Calendar.getInstance().getTime()))
val data = Seq(Row(873131558, "ABC22"), Row(29000000, 99.00), Row(27000000, 2.34))
val schema = StructType(Array(
StructField("oldcl", IntegerType, nullable = true),
StructField("newcl", DoubleType, nullable = true))
)
val ONE = 1
var erroredRecordRow = new scala.collection.mutable.ListBuffer[Row]()
val newSchema = schema.fields.map({
case StructField(name, _: IntegerType, nullorNotnull, _) => StructField(name, StringType, nullorNotnull)
case StructField(name, _: DoubleType, nullorNotnull, _) => StructField(name, StringType, nullorNotnull)
case fields => fields
}).dropRight(ONE)
val newStructType = StructType { newSchema }
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.show()
print(df.schema)
}
} catch {
case exception: Exception =>
println("exception caught in Data Type Mismatch In Schema Validation: " + exception.toString())
exception.printStackTrace();
}
spark.stop()
}
}
exception caught in Data Type Mismatch In Schema Validation: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of double
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, oldcl), IntegerType) AS oldcl#0
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, newcl), DoubleType) AS newcl#1
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)

答案1

得分: 0

@AnkitTomar,这个错误是因为字符串值ABC22被映射到DoubleType类型。

请更新以下代码部分:

val data = Seq(Row(873131558, "ABC22"), Row(29000000, 99.00), Row(27000000, 2.34))
val schema = StructType(Array(
    StructField("oldcl", IntegerType, nullable = true),
    StructField("newcl", DoubleType, nullable = true))
    )

改为:

val data = Seq(Row(873131558, "ABC22"), Row(29000000, "99.00"), Row(27000000, "2.34"))

val schema = StructType(Array(
    StructField("oldcl", IntegerType, nullable = true),
    StructField("newcl", StringType, nullable = true))
    )

这样你就可以获取期望的结果:

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.show()

/*
+---------+-----+
|    oldcl|newcl|
+---------+-----+
|873131558|ABC22|
| 29000000|99.00|
| 27000000| 2.34|
+---------+-----+
*/

注意: 我在你的代码中找不到newSchema的使用。如果你正在采用其他方法,请注释掉它。

英文:

@AnkitTomar,
this error is due to string value ABC22 is mapped to DoubleType.

please update the following lines

val data = Seq(Row(873131558, "ABC22"), Row(29000000, 99.00), Row(27000000, 2.34))
val schema = StructType(Array(
StructField("oldcl", IntegerType, nullable = true),
StructField("newcl", DoubleType, nullable = true))
)

with

val data = Seq(Row(873131558, "ABC22"), Row(29000000, "99.00"), Row(27000000, "2.34"))
val schema = StructType(Array(
StructField("oldcl", IntegerType, nullable = true),
StructField("newcl", StringType, nullable = true))
)

so that you can retrieve the expected results,

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.show()
/*
+---------+-----+
|    oldcl|newcl|
+---------+-----+
|873131558|ABC22|
| 29000000|99.00|
| 27000000| 2.34|
+---------+-----+
*/

Note: I could not find the usage of newSchema in your code, If you are following any other approach please comment

        val ONE = 1
var erroredRecordRow = new scala.collection.mutable.ListBuffer[Row]()
val newSchema = schema.fields.map({
case StructField(name, _: IntegerType, nullorNotnull, _) => StructField(name, StringType, nullorNotnull)
case StructField(name, _: DoubleType, nullorNotnull, _) => StructField(name, StringType, nullorNotnull)
case fields => fields
}).dropRight(ONE)
val newStructType = StructType { newSchema }

huangapple
  • 本文由 发表于 2020年8月12日 19:57:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/63376049.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定