英文:
How to get number of executors and number of cores in Java spark
问题
我对Spark还不熟悉,我们现在正在使用Java编码。问题是,我们正在尝试找出执行器的数量和核心数。我在谷歌上搜索到一些提到在Spark中执行此操作的文章,如下所示。但是在Java中没有看到类似的内容(JavaSparkContext没有getExecutorMemoryStatus
或getExecutorStorageStatus
)。有人可以帮忙吗?
// 获取执行器数量
def currentActiveExecutors(sc: SparkContext): Seq[String] = {
val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
val driverHost: String = sc.getConf.get("spark.driver.host")
allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
}
// 获取执行器核心数
int(sc._conf.get('spark.executor.cores'))
英文:
I am new to Spark and we are coding in Java now. The problem is that we are trying to figure out the number of executors and number of cores. I googled and saw some articles that mentioning that the way to do that in Spark as below. But didn't see anything similar in Java (JavaSparkContext doesn't have getExecutorMemoryStatus
or getExecutorStorageStatus
). Can anyone help, please?
// for executor numbers
def currentActiveExecutors(sc: SparkContext): Seq[String] = {
val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
val driverHost: String = sc.getConf.get("spark.driver.host")
allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
}
// for executor core numbers
int(sc._conf.get('spark.executor.cores'))
答案1
得分: 1
以下是您要求的翻译内容:
基本上,我的回答主要基于这个Stack Overflow回答。
最近,在较新版本的Spark中,已从SparkContext
中移除了getExecutorStorageStatus
。因此,您无法使用sc.getExecutorStorageStatus
。相反,我们可以使用SparkEnv
的blockManager.master.getStorageStatus.length - 1
(减一是为了排除驱动程序)。通常的方法是通过SparkContext
的env
来访问它。但是它在org.apache.spark
包之外是不可访问的。因此,我们使用了一个**“封装违规模式”**,如下所示:
package org.apache.spark.util
import org.apache.spark.{SparkContext, SparkEnv}
/**
* 以下对象在org.apache.spark.util包之外不可访问。
* 因此,我们使用了一个封装违规模式。
*/
object SparkInternalUtils {
def sparkEnv(sc: SparkContext): SparkEnv = sc.env
def getThreadUtils: ThreadUtils.type = ThreadUtils
}
现在,我们可以使用SparkInternalUtils.sparkEnv(sc)
来获取SparkEnv
的实例。
如下所示定义RichSparkContext
:
import org.apache.spark.SparkContext
import org.apache.spark.util.SparkInternalUtils
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
SparkInternalUtils.sparkEnv(sc).blockManager.master.getStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
“在Scala中获取执行者数量和核心数”
val sc = ... // SparkContext实例
import RichSparkContext.implicits._
val executorCount = sc.executorCount
val coresPerExecutor = sc.coresPerExecutor
val totalCoreCount = sc.coreCount
“在Java中获取执行者数量和核心数”
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
RichSparkContext richSparkContext = new RichSparkContext(javaSparkContext.sc());
System.out.println(richSparkContext.coresPerExecutor());
System.out.println(richSparkContext.coreCount());
System.out.println(richSparkContext.executorCount());
英文:
My answer is mostly based on this SO Answer.
Recently, getExecutorStorageStatus
has been removed from SparkContext
(in the newer version of spark). Hence you can't use sc. getExecutorStorageStatus
. Instead we can use SparkEnv's blockManager.master.getStorageStatus.length - 1
(the minus one is for the driver again). The normal way to get to it, via env
of SparkContext
. But it is not accessible outside of the org.apache.spark
package. Therefore, we use an encapsulation violation pattern
as:
package org.apache.spark.util
import org.apache.spark.{SparkContext, SparkEnv}
/**
* Below objects are not accessible outside of the org.apache.spark.util package.
* Therefore, we use an encapsulation violation pattern.
*/
object SparkInternalUtils {
def sparkEnv(sc: SparkContext): SparkEnv = sc.env
def getThreadUtils: ThreadUtils.type = ThreadUtils
}
Now, we can get the instance of SparkEnv
using SparkInternalUtils.sparkEnv(sc)
Define RichSparkContext as below-
import org.apache.spark.SparkContext
import org.apache.spark.util.SparkInternalUtils
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
SparkInternalUtils.sparkEnv(sc).blockManager.master.getStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
In scala, get the number of executors & and core count
val sc = ... // SparkContext instance
import RichSparkContext.implicits._
val executorCount = sc.executorCount
val coresPerExecutor = sc.coresPerExecutor
val totalCoreCount = sc.coreCount
In java, get the number of executors & and core count
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
RichSparkContext richSparkContext = new RichSparkContext(javaSparkContext.sc());
System.out.println(richSparkContext.coresPerExecutor());
System.out.println(richSparkContext.coreCount());
System.out.println(richSparkContext.executorCount());
答案2
得分: 0
尝试了上述答案,并发现可以通过以下方式获取执行器数量:
import org.apache.spark.SparkEnv
val executorCount = SparkEnv.get.blockManager.master.getStorageStatus.length
英文:
Tried above answer and found the executor count can be fetched this way:
import org.apache.spark.SparkEnv
val executorCount = SparkEnv.get.blockManager.master.getStorageStatus.length
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论