一个异常发生在 Spark 将 JSON 字符串转换为 HashMap 时。

huangapple go评论66阅读模式
英文:

An exception occurs when spark converts a json string to a HashMap in spark

问题

本地环境没有问题,但在执行 Spark 提交时出现异常。

大致代码如下

class Test extends Serializable {
def action() = {
val sc = SparkContext.getOrCreate(sparkConf)
val rdd1 = sc.textFile(.. )
val rdd2 = rdd1.map ( logline => {
//gson
val jsonObject = jsonParser.parse(logLine).getAsJsonObject
//jackson
val jsonObject = objectMapper.readValue(logLine,classOf[HashMap[String,String]])
MyDataSet ( parsedJson.get("field1"), parsedJson.get("field2"),...)
}
}
}

异常

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
ensureSerializable(ClosureCleanser.scala:444)..
........
........
caused by: java.io.NotSerializableException : com.fasterxml.jackson.module.scala.modifiers.ScalaTypeModifier


我已经使用了 gson 和 jackson 库。

仅通过继承 Serializable 就能解决这个问题吗?
英文:

there is no problem in the local environment, but a exception occur when performing spark submit.

The approximate code is as follows

class Test extends Serializable {
     def action() = {
         val sc = SparkContext.getOrCreate(sparkConf)
         val rdd1 = sc.textFile(.. )
         val rdd2 = rdd1.map ( logline => {
             //gson
             val jsonObject  jsonParser.parse(logLine).getAsJsonObject
             //jackson
             val jsonObject = objectMapper.readValue(logLine,classOf[HashMap[String,String]])
             MyDataSet ( parsedJson.get("field1"), parsedJson.get("field2"),...)              
         }                                  
     }
}

Exception

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
               ensureSerializable(ClosureCleanser. scala:444)..
               ........
               ........
               caused by: java.io.NotSerializableException : com.fasterxml.jackson.module.scala.modifiers.ScalaTypeModifier

I have used both gson and jackson libraries.

Isn't this a problem that can be solved just by inheriting from serializable ?

答案1

得分: 1

NotSerializableException异常很好理解。您的任务不可序列化。Spark是一个并行计算引擎。驱动程序(执行主程序的地方)将您想要对RDD进行的转换(在map函数中编写的代码)发送到执行器,在那里执行它们。因此,这些转换需要是可序列化的。在您的情况下,jsonParserobjectMapper是在驱动程序上创建的。为了在转换中使用它们,Spark尝试对它们进行序列化,但失败了,因为它们不可序列化。这就是您的错误。我不知道哪个不可序列化,也许两者都是。

让我们举个例子,看看我们如何解决这个问题。

// 让我们创建一个不可序列化的类
class Stuff(val i: Int) {
    def get() = i
}
// 我们在驱动程序中实例化它
val stuff = new Stuff(4)

// 这会失败 "Caused by: java.io.NotSerializableException: Stuff"
val result = sc.parallelize(Seq(1, 2, 3)).map(x => (x, stuff.get)).collect

为了修复它,让我们在转换中创建对象

val result = sc.parallelize(Seq(1, 2, 3))
    .map(x => {
        val new_stuff = new Stuff(4)
        (x, new_stuff.get)
    }).collect

它能工作,但显然,为每个记录创建对象可能相当昂贵。我们可以通过mapPartition更好地处理,并且只在每个分区中创建一次对象:

val result = sc.parallelize(Seq(1, 2, 3))
    .mapPartitions(part => {
         val new_stuff = new Stuff(4)
         part.map(x => (x, new_stuff.get))
    }).collect
英文:

The exception NotSerializableException is pretty self explanatory. Your task is not serializable. Spark is a parallel computing engine. The driver (where your main program is executed) ships the transformations you want to make on the RDD (the code written inside map functions) to executors where they are executed. Therefore those transformations need to be serializable. In your case, jsonParser and objectMapper are created on the driver. To use them inside a transformation, spark tries to serialize them and fails because they are not serializable. That's your error. I don't know which one is not serializable, maybe both.

Let's take an example and see how we can fix it.

// let's create a non serializable class
class Stuff(val i : Int) {
    def get() = i
}
// we instantiate it in the driver
val stuff = new Stuff(4)

//this fails "Caused by: java.io.NotSerializableException: Stuff"
val result = sc.parallelize(Seq(1, 2,3)).map( x => (x, stuff.get)).collect

To fix it let's create the object inside the transformation

val result = sc.parallelize(Seq(1, 2,3))
    .map( x => {
        val new_stuff = new Stuff(4)
        (x, new_stuff.get)
    }).collect

It works, but obviously, creating the object for every record can be quite expensive. We can do better with mapPartition and create the object only once per partition:

val result = sc.parallelize(Seq(1, 2,3))
    .mapPartitions(part => {
         val new_stuff = new Stuff(4)
         part.map( x => (x, new_stuff.get))
    }).collect

huangapple
  • 本文由 发表于 2023年2月14日 22:02:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/75448944.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定