在用户定义的函数(UDF)中拟合逻辑回归模型。

huangapple go评论90阅读模式
英文:

Fitting LogisticRegression within a User Defined Fuction (UDF)

问题

我已经在Spark Scala中实现了以下代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.classification._

object Hello {
    def main(args: Array[String]) = {

          val getLabel1Probability = udf((param1: Double, labeledEntries: Seq[Array[Double]]) => {

            val trainingData = labeledEntries.map(entry => (org.apache.spark.ml.linalg.Vectors.dense(entry(0)), entry(1))).toList.toDF("features", "label")
            val regression = new LogisticRegression()
            val fittingModel = regression.fit(trainingData)

            val prediction = fittingModel.predictProbability(org.apache.spark.ml.linalg.Vectors.dense(param1))
            val probability = prediction.toArray(1)

            probability
          })

          val df = Seq((1.0, Seq(Array(1.0, 0), Array(2.0, 1))), (3.0, Seq(Array(1.0, 0), Array(2.0, 1)))).toDF("Param1", "LabeledEntries")

          val dfWithLabel1Probability = df.withColumn(
                "Label1Probability", getLabel1Probability(
                  $"Param1",
                  $"LabeledEntries"
                )
          )
          display(dfWithLabel1Probability)
    }
}

Hello.main(Array())  

在Databricks的笔记本多节点集群上运行时(DBR(Databricks)13.2,Spark 3.4.0和Scala 2.12),dfWithLabel1Probability的显示结果如下。

我有以下问题:

  • 我的理解是,在创建trainingData数据框时,应该会出现NullPointerException,因为在udf中_sqlContext为空。如果是这样,为什么我没有得到它?这与从Databricks的笔记本运行有关吗?行为是否是非确定性的?
  • 如果不允许在udf中创建数据框,那么我如何使用给定数据框列中的数据来拟合LogisticRegression?在实际示例中,我处理的是数百万行的数据框,所以我希望避免使用Datasetcollect()将所有这些行带入驱动程序的内存中。有没有其他的替代方法?

谢谢。

英文:

I've implemented the following code in Spark Scala:

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.classification._

object Hello {
    def main(args: Array[String]) = {

          val getLabel1Probability = udf((param1: Double, labeledEntries: Seq[Array[Double]]) => {

            val trainingData = labeledEntries.map(entry => (org.apache.spark.ml.linalg.Vectors.dense(entry(0)), entry(1))).toList.toDF("features", "label")
            val regression = new LogisticRegression()
            val fittingModel = regression.fit(trainingData)

            val prediction = fittingModel.predictProbability(org.apache.spark.ml.linalg.Vectors.dense(param1))
            val probability = prediction.toArray(1)

            probability
          })

          val df = Seq((1.0, Seq(Array(1.0, 0), Array(2.0, 1))), (3.0, Seq(Array(1.0, 0), Array(2.0, 1)))).toDF("Param1", "LabeledEntries")

          val dfWithLabel1Probability = df.withColumn(
                "Label1Probability", getLabel1Probability(
                  $"Param1",
                  $"LabeledEntries"
                )
          )
          display(dfWithLabel1Probability)
    }
}

Hello.main(Array())  

When running it on Databricks' notebook multi-node cluster (DBR (Databricks) 13.2, Spark 3.4.0 and Scala 2.12.), dfWithLabel1Probability's display gets shown.

I have the following questions:

  • My understanding is that I should be getting a NullPointerException when creating the trainingData dataframe because _sqlContext is null within the udf. If so, why am I not getting it? Is it related to running it from Databricks' notebook? Is the behaviour non-deterministic?
  • If creating a dataframe is not allowed within a udf, how can I fit LogisticRegression with the data from a given dataframe's column? In the real example, I'm dealing with millions of rows for the dataframe so I would prefer avoiding the usage of Dataset's collect() to bring all those rows into driver's memory. Is there any alternative?

Thanks.

答案1

得分: 1

对于第一个问题,如果你运行以下代码:

val largedf = spark.range(100000).selectExpr("cast(id as double) Param1", "array(array(1.0, 0), array(2.0, 1)) LabeledEntries")

val largedfWithLabel1Probability = largedf.withColumn(
    "Label1Probability", getLabel1Probability(
      $"Param1",
      $"LabeledEntries"
    )
)

display(largedfWithLabel1Probability)

它会出现 npe 错误,range 为 1 也会出错,但是使用以下代码:

(1 until 1000).map(a => (a.toDouble, Seq.. )).toDF..

至少会开始处理。这是因为 toDF 使用 LocalRelation 构建数据,而不会发送到执行器,而 Range 使用 LeafNodes(执行器),因此会出现异常。

关于第二个问题,这可能值得作为一个单独的顶级问题提出。

英文:

For the first question, if, instead, you run:

val largedf = spark.range(100000).selectExpr("cast(id as double) Param1", "array(array(1.0, 0), array(2.0, 1)) LabeledEntries")

val largedfWithLabel1Probability = largedf.withColumn(
    "Label1Probability", getLabel1Probability(
      $"Param1",
      $"LabeledEntries"
    )
)

display(largedfWithLabel1Probability)

it will npe, as will range of 1, but using:

(1 until 1000).map(a => (a.toDouble, Seq.. )).toDF..

it will start processing at least. This is because toDF is using LocalRelation to build the data which is not sent to executors, whereas Range uses LeafNodes (executors) hence the exception.

re the second question that could be worth putting as a separate top level question.

huangapple
  • 本文由 发表于 2023年8月9日 07:09:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76863662.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定