Spark代码在集群上运行时间太长

huangapple go评论78阅读模式
英文:

Spark code takes too much time to run on cluster

问题

我已经编写了一个Spark应用程序。我的代码对较小规模的数据集工作得很好,但对于较大规模的数据集需要太长时间。

以下是类的定义。

case class myClass(d: Int, max: Double, min: Double, f: (List[Double], Double) => Double) extends Serializable {
  val random = new Random()
  var pos: Array[Double] = Array.fill(d)(random.nextDouble() * (max - min) + min)
  var vel: Array[Double] = Array.fill(d)(math.random)
  var PR: Double = 0.1 // Rate
  var LR: Double = 0.95
  var fitness: Double = f(this.pos.toList, 0)
  val Band: Double = 0.001

  // Functions definition
  def move(l_Best: Array[Double], sumL: Double, size: Int, f: (List[Double], Double) => Double): (Array[Double], Double) = {
    val f = math.random
    val temp1 = ElementsWiseSubtract(pos, l_Best)
    val temp2 = temp1.map(_ * freq)
    val newVel = ElementsWiseSum(temp2, vel)
    var newPos = ElementsWiseSum(pos, newVel)
    if (math.random > this.PR) {
      newPos = l_Best.map(_ * (Band * (sumL / size))).toArray
    }
    val nFit = f(newPos.toList, 0)
    (newPos, nFit)
  }

  def ElementsWiseSum(arr: Array[Double], arr1: Array[Double]): Array[Double] = {
    var res: Array[Double] = Array()
    if (arr.length == 1)
      res = Array(arr.head + arr1.head)
    else
      res = ElementsWiseSum(arr.slice(0, arr.length / 2), arr1.slice(0, arr1.length / 2)) ++
        ElementsWiseSum(arr.slice(arr.length / 2, arr.length), arr1.slice(arr1.length / 2, arr1.length))
    res
  }

  def ElementsWiseSubtract(arr: Array[Double], arr1: Array[Double]): Array[Double] = {
    var res: Array[Double] = Array()
    if (arr.length == 1)
      res = Array(arr.head - arr1.head)
    else
      res = ElementsWiseSubtract(arr.slice(0, arr.length / 2), arr1.slice(0, arr1.length / 2)) ++
        ElementsWiseSubtract(arr.slice(arr.length / 2, arr.length), arr1.slice(arr1.length / 2, arr1.length))
    res
  }
}

主函数定义和RDD的创建如下:

@tailrec
final def Sphere(Alleles: List[Double], accumulator: Double): Double = Alleles match {
  case Nil => accumulator
  case x :: xs => Sphere(xs, accumulator + Math.pow(x, 2))
}

val N = 10000 // population size
val d = 10000 // dimensions
val nP = 20 // partitions
val iterations = 100000 // total iterations. This could be less more
val RDD = sc.parallelize(0 until N, nP).mapPartitionsWithIndex { (index, iter) =>
  val data = iter.map(i =>
    new myClass(d, max, min, Sphere)
  )
  data
}.persist(StorageLevel.MEMORY_AND_DISK)

val itr = 1
val res = RDD.mapPartitionsWithIndex {
  (index, Iterator) => {
    var li = Iterator.toArray
    li = li.sortWith(_.fitness < _.fitness)

    val res = loop(li, iterations, itr)
    val bests = res.sortWith(_.fitness < _.fitness).take(5).map(x => (x, index))
    bests.toIterator
  }
}
@tailrec
private def loop(arr: Array[myClass], iteration: Int, itret: Int): Array[myClass] = {
  iteration match {
    case 0 => arr
    case _ => {
      arr.map { j =>
        val l_Best = arr.minBy(_.fitness).pos // sort based on fitness
        val l_Sum: Double = arr.map(_.LR).reduce(_ + _) // Calculate sum of Rate
        val res = j.move(l_Best, l_Sum, arr.size, Sphere)

        if (math.random < j.LR && res._2 < j.fitness) {
          j.pos = res._1
          j.fitness = res._2
          j.LR = j.LR * 0.95
          j.PR = 0.95 * (1 - math.pow(math.E, (-0.95 * itret)))
        }
      }
      loop(arr, iteration - 1, itret + 1)
    }
  }
}

我在4节点集群上测试了这段代码,对于N = 100d = 100,完成10,000次迭代不到一分钟,但对于N = 10,000d = 10,000,仅完成500次迭代需要19小时

根据我的观察,位于myClass类内部的元素级操作函数需要很长时间。您如何提高其速度?请提供一些建议。我希望使用以下配置执行它。

N = 10,000,d = 10,000和iterations = 1000000000(10亿)。

英文:

I have written a Spark application. My code works fine for smaller size population (dataset) but it takes too much time for the larger population (dataset).

Here is the definition of class.

case class myClass( d : Int  , max : Double , min : Double  , f:  (List[Double],Double)  =&gt;  Double )   extends Serializable  {
val random  = new Random()
var pos      : Array[Double]	       =   Array.fill ( d ) (  random.nextDouble()*(max-min)+min  )
var vel      : Array[Double]	       =   Array.fill ( d ) (  math.random)
var PR     : Double           	   	=   0.1			//Rate
var LR     : Double              		=   0.95
var fitness       : Double              =   f (this.pos.toList ,0)
val Band : Double = 0.001
//Functions definition
def move  (  l_Best : Array  [  Double  ]  ,  sumL : Double , size : Int  ,  f :  (  List  [  Double  ] ,Double  )  =&gt;  Double  )  : (  Array  [  Double  ]  ,  Double   )  = {
val f   =  math.random   
val temp1  =  ElementsWiseSubtract  ( pos  , l_Best )
val temp2  =    temp1.map  (  _  * freq  ) 
val newVel  =  ElementsWiseSum  ( temp2 , vel  )      // vel == this.vel
var newPos = ElementsWiseSum  (  pos  ,   newVel )  // pos == this.pos
if (  math.random   &gt;   this.PR  )  {
newPos  =  l_Best.map  (  _  *  (  Band *  (  sumL  /  size  )  ) ).toArray
}
val nFit = f  (  newPos.toList  , 0  )
( newPos   ,  nFit  )
} 
def ElementsWiseSum  (  arr  : Array[Double] , arr1  : Array[Double]   )  :  Array [Double] =  {
var res  :  Array[ Double ]  =  Array()
if  (  arr.length  ==  1  )
res  =  Array  (  arr.head  +  arr1.head  )
else
res  =   ElementsWiseSum  (   arr.slice(0, arr.length/2) ,arr1.slice(0, arr1.length/2) )  ++
ElementsWiseSum  (   arr.slice(arr.length/2,  arr.length ) ,  arr1.slice( arr1.length/2 , arr1.length)  )
res
}
def ElementsWiseSubtract  (  arr  :Array[Double] , arr1  :Array[Double]   )  :  Array [Double] =  {
var res  :  Array[ Double ]  =  Array()
if  (  arr.length  ==  1  )
res  =  Array  (  arr.head  -  arr1.head  )
else
res  =   ElementsWiseSubtract  (   arr.slice(0, arr.length/2) ,arr1.slice(0, arr1.length/2) )  ++
ElementsWiseSubtract  (   arr.slice  (  arr.length / 2,  arr.length ) ,  arr1.slice  ( arr1.length / 2 , arr1.length )  )
res
}
}

Main function definition and creation of RDD is:

@tailrec 
final def Sphere  (  Alleles:List[Double]  ,  accumulator:Double  )  :  Double =  Alleles match  {
case  Nil  =&gt;  accumulator
case  x  ::  xs  =&gt;    Sphere (  xs  ,  accumulator  +  Math.pow  (  x , 2  )  )
}
val N = 10000               // population size
val d =  10000             //  dimensions
val nP = 20               //   partitions
val iterations  = 100000  //   total iterations. This could be less more 
val RDD = sc.parallelize(0 until  N , nP).mapPartitionsWithIndex{ (index,iter)  =&gt;
val data  =  iter.map(i =&gt;
new myClass(  d, max,min   ,  Sphere   )   )
data
}.persist(StorageLevel.MEMORY_AND_DISK)  
val itr = 1 
val res = RDD.mapPartitionsWithIndex {
(index, Iterator) =&gt; {
var li = Iterator.toArray
li = li.sortWith(_.fitness &lt; _.fitness)
val res = loop(li, iterations, itr)
val bests = res.sortWith(_.fitness &lt; _.fitness).take(5).map(x =&gt; (x, index))
bests.toIterator
}
}
@tailrec
private def loop(arr: Array[myClass], iteration: Int, itret: Int): Array[myClass] = {
iteration match {
case 0 =&gt; arr
case _ =&gt; {
arr.map { j =&gt;
val l_Best = arr.minBy(_.fitness).pos // sort based on fitness
val l_Sum: Double = arr.map(_.LR).reduce( _ + _ )   // Calculate sum of Rate
val res = j.move(l_Best, l_Sum, arr.size, Sphere)
if (math.random &lt; j.LR &amp;&amp; res._2 &lt; j.fitness) {
j.pos = res._1
j.fitness = res._2
j.LR = j.LR * 0.95
j.PR = 0.95 * (1 - math.pow(math.E, (-0.95 * itret)))
}
}
loop(arr, iteration - 1, itret + 1)
}
}
}

I tested this code on 4 node cluster For N = 100 and d = 100. It takes less than one minute to complete 10,000 iterations, but for N = 10,000 and d = 10,000, it took 19 hours to complete just 500 iterations.

According to my observations, functions that do element-wise operations inside myClass class taking a long time.
How can I increase its speed? Please give some suggestion. I want to execute it with the following configurations.

N = 10,000 ,d = 10,000 and iterations = 1000000000 (1 billion )

答案1

得分: 3

你可以大大简化你的逐元素操作,例如:

def ElementsWiseSubtract(arr: Array[Double], arr1: Array[Double]): Array[Double] = arr.zip(arr1).map(x => x._1 - x._2)
英文:

You can greatly simplify your element-wise operations, e.g.:

def ElementsWiseSubtract ( arr :Array[Double] , arr1 :Array[Double] ) : Array [Double] = arr.zip(arr1).map(x =&gt; x._1 - x._2)

huangapple
  • 本文由 发表于 2020年1月4日 00:11:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/59581765.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定