如何在Java Spark中获取执行器数量和核心数量。

huangapple go评论54阅读模式
英文:

How to get number of executors and number of cores in Java spark

问题

我对Spark还不熟悉,我们现在正在使用Java编码。问题是,我们正在尝试找出执行器的数量和核心数。我在谷歌上搜索到一些提到在Spark中执行此操作的文章,如下所示。但是在Java中没有看到类似的内容(JavaSparkContext没有getExecutorMemoryStatusgetExecutorStorageStatus)。有人可以帮忙吗?

// 获取执行器数量
def currentActiveExecutors(sc: SparkContext): Seq[String] = {
val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
val driverHost: String = sc.getConf.get("spark.driver.host")
allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
}

// 获取执行器核心数
int(sc._conf.get('spark.executor.cores'))

英文:

I am new to Spark and we are coding in Java now. The problem is that we are trying to figure out the number of executors and number of cores. I googled and saw some articles that mentioning that the way to do that in Spark as below. But didn't see anything similar in Java (JavaSparkContext doesn't have getExecutorMemoryStatus or getExecutorStorageStatus). Can anyone help, please?

 // for executor numbers
 def currentActiveExecutors(sc: SparkContext): Seq[String] = {
         val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
         val driverHost: String = sc.getConf.get("spark.driver.host")
         allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
       }

  // for executor core numbers
  int(sc._conf.get('spark.executor.cores'))

答案1

得分: 1

以下是您要求的翻译内容:

基本上,我的回答主要基于这个Stack Overflow回答
最近,在较新版本的Spark中,已从SparkContext中移除了getExecutorStorageStatus。因此,您无法使用sc.getExecutorStorageStatus。相反,我们可以使用SparkEnvblockManager.master.getStorageStatus.length - 1(减一是为了排除驱动程序)。通常的方法是通过SparkContextenv来访问它。但是它在org.apache.spark包之外是不可访问的。因此,我们使用了一个**“封装违规模式”**,如下所示:

package org.apache.spark.util

import org.apache.spark.{SparkContext, SparkEnv}

/**
  * 以下对象在org.apache.spark.util包之外不可访问。
  * 因此,我们使用了一个封装违规模式。
  */
object SparkInternalUtils {

  def sparkEnv(sc: SparkContext): SparkEnv = sc.env
  def getThreadUtils: ThreadUtils.type = ThreadUtils

}

现在,我们可以使用SparkInternalUtils.sparkEnv(sc)来获取SparkEnv的实例。

如下所示定义RichSparkContext

import org.apache.spark.SparkContext
import org.apache.spark.util.SparkInternalUtils

import scala.language.implicitConversions

class RichSparkContext(val sc: SparkContext) {

  def executorCount: Int =
    SparkInternalUtils.sparkEnv(sc).blockManager.master.getStorageStatus.length - 1 // one is the driver

  def coresPerExecutor: Int =
    RichSparkContext.coresPerExecutor(sc)

  def coreCount: Int =
    executorCount * coresPerExecutor

  def coreCount(coresPerExecutor: Int): Int =
    executorCount * coresPerExecutor

}

object RichSparkContext {

  trait Enrichment {
    implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
      new RichSparkContext(sc)
  }

  object implicits extends Enrichment

  private var _coresPerExecutor: Int = 0

  def coresPerExecutor(sc: SparkContext): Int =
    synchronized {
      if (_coresPerExecutor == 0)
        sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
      else _coresPerExecutor
    }

}

“在Scala中获取执行者数量和核心数”

val sc = ... // SparkContext实例
import RichSparkContext.implicits._
val executorCount = sc.executorCount
val coresPerExecutor = sc.coresPerExecutor
val totalCoreCount = sc.coreCount

“在Java中获取执行者数量和核心数”

JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
RichSparkContext richSparkContext = new RichSparkContext(javaSparkContext.sc());
System.out.println(richSparkContext.coresPerExecutor());
System.out.println(richSparkContext.coreCount());
System.out.println(richSparkContext.executorCount());
英文:

My answer is mostly based on this SO Answer.
Recently, getExecutorStorageStatus has been removed from SparkContext(in the newer version of spark). Hence you can't use sc. getExecutorStorageStatus. Instead we can use SparkEnv's blockManager.master.getStorageStatus.length - 1 (the minus one is for the driver again). The normal way to get to it, via env of SparkContext. But it is not accessible outside of the org.apache.spark package. Therefore, we use an encapsulation violation pattern as:

package org.apache.spark.util

import org.apache.spark.{SparkContext, SparkEnv}

/**
  * Below objects are not accessible outside of the org.apache.spark.util package.
  * Therefore, we use an encapsulation violation pattern.
  */
object SparkInternalUtils {

  def sparkEnv(sc: SparkContext): SparkEnv = sc.env
  def getThreadUtils: ThreadUtils.type = ThreadUtils

}

Now, we can get the instance of SparkEnv using SparkInternalUtils.sparkEnv(sc)

Define RichSparkContext as below-


import org.apache.spark.SparkContext
import org.apache.spark.util.SparkInternalUtils

import scala.language.implicitConversions


class RichSparkContext(val sc: SparkContext) {

  def executorCount: Int =
    SparkInternalUtils.sparkEnv(sc).blockManager.master.getStorageStatus.length - 1 // one is the driver

  def coresPerExecutor: Int =
    RichSparkContext.coresPerExecutor(sc)

  def coreCount: Int =
    executorCount * coresPerExecutor

  def coreCount(coresPerExecutor: Int): Int =
    executorCount * coresPerExecutor

}


object RichSparkContext {

  trait Enrichment {
    implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
      new RichSparkContext(sc)
  }

  object implicits extends Enrichment

  private var _coresPerExecutor: Int = 0

  def coresPerExecutor(sc: SparkContext): Int =
    synchronized {
      if (_coresPerExecutor == 0)
        sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
      else _coresPerExecutor
    }

}

In scala, get the number of executors & and core count

val sc = ... // SparkContext instance
    import RichSparkContext.implicits._
    val executorCount = sc.executorCount
val coresPerExecutor = sc.coresPerExecutor
val totalCoreCount = sc.coreCount

In java, get the number of executors & and core count

 JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
        RichSparkContext richSparkContext = new RichSparkContext(javaSparkContext.sc());
        System.out.println(richSparkContext.coresPerExecutor());
        System.out.println(richSparkContext.coreCount());
        System.out.println(richSparkContext.executorCount());

答案2

得分: 0

尝试了上述答案,并发现可以通过以下方式获取执行器数量:

import org.apache.spark.SparkEnv
val executorCount = SparkEnv.get.blockManager.master.getStorageStatus.length
英文:

Tried above answer and found the executor count can be fetched this way:

import org.apache.spark.SparkEnv
val executorCount = SparkEnv.get.blockManager.master.getStorageStatus.length

huangapple
  • 本文由 发表于 2020年8月25日 01:06:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/63565589.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定