英文:
Spark code takes too much time to run on cluster
问题
我已经编写了一个Spark应用程序。我的代码对较小规模的数据集工作得很好,但对于较大规模的数据集需要太长时间。
以下是类的定义。
case class myClass(d: Int, max: Double, min: Double, f: (List[Double], Double) => Double) extends Serializable {
val random = new Random()
var pos: Array[Double] = Array.fill(d)(random.nextDouble() * (max - min) + min)
var vel: Array[Double] = Array.fill(d)(math.random)
var PR: Double = 0.1 // Rate
var LR: Double = 0.95
var fitness: Double = f(this.pos.toList, 0)
val Band: Double = 0.001
// Functions definition
def move(l_Best: Array[Double], sumL: Double, size: Int, f: (List[Double], Double) => Double): (Array[Double], Double) = {
val f = math.random
val temp1 = ElementsWiseSubtract(pos, l_Best)
val temp2 = temp1.map(_ * freq)
val newVel = ElementsWiseSum(temp2, vel)
var newPos = ElementsWiseSum(pos, newVel)
if (math.random > this.PR) {
newPos = l_Best.map(_ * (Band * (sumL / size))).toArray
}
val nFit = f(newPos.toList, 0)
(newPos, nFit)
}
def ElementsWiseSum(arr: Array[Double], arr1: Array[Double]): Array[Double] = {
var res: Array[Double] = Array()
if (arr.length == 1)
res = Array(arr.head + arr1.head)
else
res = ElementsWiseSum(arr.slice(0, arr.length / 2), arr1.slice(0, arr1.length / 2)) ++
ElementsWiseSum(arr.slice(arr.length / 2, arr.length), arr1.slice(arr1.length / 2, arr1.length))
res
}
def ElementsWiseSubtract(arr: Array[Double], arr1: Array[Double]): Array[Double] = {
var res: Array[Double] = Array()
if (arr.length == 1)
res = Array(arr.head - arr1.head)
else
res = ElementsWiseSubtract(arr.slice(0, arr.length / 2), arr1.slice(0, arr1.length / 2)) ++
ElementsWiseSubtract(arr.slice(arr.length / 2, arr.length), arr1.slice(arr1.length / 2, arr1.length))
res
}
}
主函数定义和RDD的创建如下:
@tailrec
final def Sphere(Alleles: List[Double], accumulator: Double): Double = Alleles match {
case Nil => accumulator
case x :: xs => Sphere(xs, accumulator + Math.pow(x, 2))
}
val N = 10000 // population size
val d = 10000 // dimensions
val nP = 20 // partitions
val iterations = 100000 // total iterations. This could be less more
val RDD = sc.parallelize(0 until N, nP).mapPartitionsWithIndex { (index, iter) =>
val data = iter.map(i =>
new myClass(d, max, min, Sphere)
)
data
}.persist(StorageLevel.MEMORY_AND_DISK)
val itr = 1
val res = RDD.mapPartitionsWithIndex {
(index, Iterator) => {
var li = Iterator.toArray
li = li.sortWith(_.fitness < _.fitness)
val res = loop(li, iterations, itr)
val bests = res.sortWith(_.fitness < _.fitness).take(5).map(x => (x, index))
bests.toIterator
}
}
@tailrec
private def loop(arr: Array[myClass], iteration: Int, itret: Int): Array[myClass] = {
iteration match {
case 0 => arr
case _ => {
arr.map { j =>
val l_Best = arr.minBy(_.fitness).pos // sort based on fitness
val l_Sum: Double = arr.map(_.LR).reduce(_ + _) // Calculate sum of Rate
val res = j.move(l_Best, l_Sum, arr.size, Sphere)
if (math.random < j.LR && res._2 < j.fitness) {
j.pos = res._1
j.fitness = res._2
j.LR = j.LR * 0.95
j.PR = 0.95 * (1 - math.pow(math.E, (-0.95 * itret)))
}
}
loop(arr, iteration - 1, itret + 1)
}
}
}
我在4节点集群上测试了这段代码,对于N = 100和d = 100,完成10,000次迭代不到一分钟,但对于N = 10,000和d = 10,000,仅完成500次迭代需要19小时。
根据我的观察,位于myClass类内部的元素级操作函数需要很长时间。您如何提高其速度?请提供一些建议。我希望使用以下配置执行它。
N = 10,000,d = 10,000和iterations = 1000000000(10亿)。
英文:
I have written a Spark application. My code works fine for smaller size population (dataset) but it takes too much time for the larger population (dataset).
Here is the definition of class.
case class myClass( d : Int , max : Double , min : Double , f: (List[Double],Double) => Double ) extends Serializable {
val random = new Random()
var pos : Array[Double] = Array.fill ( d ) ( random.nextDouble()*(max-min)+min )
var vel : Array[Double] = Array.fill ( d ) ( math.random)
var PR : Double = 0.1 //Rate
var LR : Double = 0.95
var fitness : Double = f (this.pos.toList ,0)
val Band : Double = 0.001
//Functions definition
def move ( l_Best : Array [ Double ] , sumL : Double , size : Int , f : ( List [ Double ] ,Double ) => Double ) : ( Array [ Double ] , Double ) = {
val f = math.random
val temp1 = ElementsWiseSubtract ( pos , l_Best )
val temp2 = temp1.map ( _ * freq )
val newVel = ElementsWiseSum ( temp2 , vel ) // vel == this.vel
var newPos = ElementsWiseSum ( pos , newVel ) // pos == this.pos
if ( math.random > this.PR ) {
newPos = l_Best.map ( _ * ( Band * ( sumL / size ) ) ).toArray
}
val nFit = f ( newPos.toList , 0 )
( newPos , nFit )
}
def ElementsWiseSum ( arr : Array[Double] , arr1 : Array[Double] ) : Array [Double] = {
var res : Array[ Double ] = Array()
if ( arr.length == 1 )
res = Array ( arr.head + arr1.head )
else
res = ElementsWiseSum ( arr.slice(0, arr.length/2) ,arr1.slice(0, arr1.length/2) ) ++
ElementsWiseSum ( arr.slice(arr.length/2, arr.length ) , arr1.slice( arr1.length/2 , arr1.length) )
res
}
def ElementsWiseSubtract ( arr :Array[Double] , arr1 :Array[Double] ) : Array [Double] = {
var res : Array[ Double ] = Array()
if ( arr.length == 1 )
res = Array ( arr.head - arr1.head )
else
res = ElementsWiseSubtract ( arr.slice(0, arr.length/2) ,arr1.slice(0, arr1.length/2) ) ++
ElementsWiseSubtract ( arr.slice ( arr.length / 2, arr.length ) , arr1.slice ( arr1.length / 2 , arr1.length ) )
res
}
}
Main function definition and creation of RDD is:
@tailrec
final def Sphere ( Alleles:List[Double] , accumulator:Double ) : Double = Alleles match {
case Nil => accumulator
case x :: xs => Sphere ( xs , accumulator + Math.pow ( x , 2 ) )
}
val N = 10000 // population size
val d = 10000 // dimensions
val nP = 20 // partitions
val iterations = 100000 // total iterations. This could be less more
val RDD = sc.parallelize(0 until N , nP).mapPartitionsWithIndex{ (index,iter) =>
val data = iter.map(i =>
new myClass( d, max,min , Sphere ) )
data
}.persist(StorageLevel.MEMORY_AND_DISK)
val itr = 1
val res = RDD.mapPartitionsWithIndex {
(index, Iterator) => {
var li = Iterator.toArray
li = li.sortWith(_.fitness < _.fitness)
val res = loop(li, iterations, itr)
val bests = res.sortWith(_.fitness < _.fitness).take(5).map(x => (x, index))
bests.toIterator
}
}
@tailrec
private def loop(arr: Array[myClass], iteration: Int, itret: Int): Array[myClass] = {
iteration match {
case 0 => arr
case _ => {
arr.map { j =>
val l_Best = arr.minBy(_.fitness).pos // sort based on fitness
val l_Sum: Double = arr.map(_.LR).reduce( _ + _ ) // Calculate sum of Rate
val res = j.move(l_Best, l_Sum, arr.size, Sphere)
if (math.random < j.LR && res._2 < j.fitness) {
j.pos = res._1
j.fitness = res._2
j.LR = j.LR * 0.95
j.PR = 0.95 * (1 - math.pow(math.E, (-0.95 * itret)))
}
}
loop(arr, iteration - 1, itret + 1)
}
}
}
I tested this code on 4 node cluster For N = 100 and d = 100. It takes less than one minute to complete 10,000 iterations, but for N = 10,000 and d = 10,000, it took 19 hours to complete just 500 iterations.
According to my observations, functions that do element-wise operations inside myClass class taking a long time.
How can I increase its speed? Please give some suggestion. I want to execute it with the following configurations.
N = 10,000 ,d = 10,000 and iterations = 1000000000 (1 billion )
答案1
得分: 3
你可以大大简化你的逐元素操作,例如:
def ElementsWiseSubtract(arr: Array[Double], arr1: Array[Double]): Array[Double] = arr.zip(arr1).map(x => x._1 - x._2)
英文:
You can greatly simplify your element-wise operations, e.g.:
def ElementsWiseSubtract ( arr :Array[Double] , arr1 :Array[Double] ) : Array [Double] = arr.zip(arr1).map(x => x._1 - x._2)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论