如何将复杂的Java类对象作为参数传递给Spark中的Scala UDF?

huangapple go评论72阅读模式
英文:

How to pass complex Java Class Object as parameter to Scala UDF in Spark?

问题

我有一个Java客户端类(作为依赖Jar与`spark-shell`一起使用),它响应API调用 - 让我们称这个类为`SomeAPIRequester`。

在纯Java中,以下示例代码会返回我期望的结果 - 

    SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build(); // 构建类
    System.out.println(requester.getSomeItem("id123"));  // 结果:{"id123": "item123"}

我想以分布式的方式通过存储在spark数据帧(在scala中)中的ID的RDD来调用这个API - 

    val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...))  // 要为其调用API的ID的示例RDD

并且我定义了我的UDF如下 - 

    val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
       requester.getSomeItem(id)
    })

并且这样调用这个UDF - 

    inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester))  // requester是使用SomeAPIRequester.builder()构建的...

    // 或者直接使用RDD?udf,还是一个纯scala函数..
    inputIdRdd.foreach{ id => test(id, requester) }

当我对结果运行`.show()`或`.take()`时,我在requester Java类上得到`NullPointerException`。

我还尝试过发送字面值(`lit`),并且我在scala中读到了关于`typedLit`的内容,但是我无法将Java的`Requester`类转换为scala中允许的任何`typedLit`类型。

是否有一种方法可以通过UDF调用这个Java类对象,并从API获取结果?

### 编辑:

我还尝试在RDD的foreach块中初始化requester类 - 

    inputIdRdd.foreach(x => {
      val apiRequester = SomeAPIRequester.builder()...(argPool).build();

      try {
        apiRequester.getSomeItem(x);
      } catch {
        case ex: Exception => println(ex.printStackTrace()); ""
      }
    })

但这没有返回响应 - 无法初始化类等。

谢谢!
英文:

I have a Java client class (used as a dependency Jar with spark-shell) that responds to an API call - let's call the class SomeAPIRequester.

In plain Java, it would return me desired results with below sample code -

SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123"))  // result: {"id123": "item123"}

I want to call this API in a distributed manner through my RDD of IDs in a stored in spark dataframe (in scala) -

val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...))  // sample RDD of IDs i want to call the API for

and I define my UDF as -

val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
   requester.getSomeItem(id)
})

and call this UDF as -

inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester)  // requester as built with SomeAPIRequester.builder()....

// or directly with RDD ? udf, or a plain scala function .. 
inputIdRdd.foreach{ id => test(id, requester) }

When I run a .show() or .take() on the result, I get NullPointerException on the requester java class.

I also tried sending in literals (lit), and I read about typedLit in scala, but I could not convert the Java Requester class into any allowed typedLit types in scala.

Is there a way to call this Java class object through UDFs and get the result from the API?

Edit:

I also tried to initialize the requester class in the RDD's foreach block -

inputIdRdd.foreach(x =>{
  val apiRequester = SomeAPIRequester.builder()...(argPool).build()

  try {
    apiRequester.getSomeItem(x)
  } catch {
    case ex: Exception => println(ex.printStackTrace()); ""
  }
})

But this returns no response - cannot initialize class etc.

Thanks!

答案1

得分: 2

使用Spark与自定义类一起工作需要了解Spark在底层的工作原理。不要将实例作为参数放入udf中。udf中的参数是从数据框的行中提取的,所以在这种情况下会出现空指针异常。您可以尝试以下选项:

  1. 首先将实例放入udf的作用域中:

    val requester: SomeAPIRequester = ???
    
    val test: UserDefinedFunction = udf((id: String) => {
         requester.getSomeItem(id)
    })
    

在此时,如果可能的话,您需要将您的类标记为Serializable,否则会出现NotSerializableException。

  1. 如果您的类不是可序列化的,因为它来自第三方,您可以将实例标记为lazy transient val,如您可以在 https://mengdong.github.io/2016/08/16/spark-serialization-memo/https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d 中看到。

  2. 如果您在RDD世界中工作,可以使用mapPartitions来为每个分区创建一个实例。

英文:

Working with custom classes working with Spark requires having some knowledge about how Spark works under the hood. Don´t put your instance as a parameter in the udf. Parameters in udfs are extracted from the rows of the dataframe, the null pointer exception is understandable in this case. You can try with the following options:

  1. First put the instance in the scope of the udf:

    val requester: SomeAPIRequester = ???
    
    val test: UserDefinedFunction = udf((id: String) => {
         requester.getSomeItem(id)
    })
    

At this point you will need to mark your class as Serializable if possible, otherwise you will have a NotSerializableException.

  1. If your class is not Seriazable because it comes form a third party you can mark your instance as lazy transient val as you can see in https://mengdong.github.io/2016/08/16/spark-serialization-memo/ or https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d.

  2. If you work in the RDD world you can use mapPartitions to create just one instance per partition.

huangapple
  • 本文由 发表于 2020年9月27日 15:26:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/64085944.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定