苹果手机如何初始化化一个SparkContext

深入理解Spark:核心思想与源码分析SparkContext的初始化(叔篇)TaskScheduler的启动 - 服务器技术综合 - 次元立方网 - 电脑知识与技术互动交流平台
深入理解Spark:核心思想与源码分析SparkContext的初始化(叔篇)TaskScheduler的启动
3.8 TaskScheduler的启动
  3.7节介绍了任务调度器TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码如下。
taskScheduler.start()
TaskScheduler在启动的时候,实际调用了backend的start方法。
override def start() {
backend.start()
以LocalBackend为例,启动LocalBackend时向actorSystem注册了LocalActor,见代码清单3-30所示(在《深入理解Spark:核心思想与源码分析》&&SparkContext的初始化(中)》一文)。
3.8.1 创建LocalActor
  创建LocalActor的过程主要是构建本地的Executor,见代码清单3-36。
代码清单3-36&&&&&&&& LocalActor的实现
private[spark] class LocalActor(scheduler: TaskSchedulerImpl, executorBackend: LocalBackend,
private val totalCores: Int) extends Actor with ActorLogReceive with Logging {
import context.dispatcher
// to use Akka's scheduler.scheduleOnce()
private var freeCores = totalCores
private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
private val localExecutorHostname = 'localhost'
val executor = new Executor(
localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)
override def receiveWithLogging = {
case ReviveOffers =&
reviveOffers()
case StatusUpdate(taskId, state, serializedData) =&
scheduler.statusUpdate(taskId, state, serializedData)
if (TaskState.isFinished(state)) {
freeCores += scheduler.CPUS_PER_TASK
reviveOffers()
case KillTask(taskId, interruptThread) =&
executor.killTask(taskId, interruptThread)
case StopExecutor =&
executor.stop()
Executor的构建,见代码清单3-37,主要包括以下步骤:
1)&创建并注册ExecutorSource。ExecutorSource是做什么的呢?笔者将在3.10.2节详细介绍。
2) 获取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注册Executor时,则需要新建SparkEnv。可以修改属性spark.executor.port(默认为0,表示随机生成)来配置Executor中的ActorSystem的端口号。
3) 创建并注册ExecutorActor。ExecutorActor负责接受发送给Executor的消息。
4) urlClassLoader的创建。为什么需要创建这个ClassLoader?在非local模式中,Driver或者Worker上都会有多个Executor,每个Executor都设置自身的urlClassLoader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。
5) 创建Executor执行TaskRunner任务(TaskRunner将在5.5节介绍)的线程池。此线程池是通过调用Utils.newDaemonCachedThreadPool创建的,具体实现请参阅附录A。
6) 启动Executor的心跳线程。此线程用于向Driver发送心跳。
此外,还包括Akka发送消息的帧大小(字节)、结果总大小的字节限制(字节)、正在运行的task的列表、设置serializer的默认ClassLoader为创建的ClassLoader等。
代码清单3-37&&&&&&&& Executor的构建
val executorSource = new ExecutorSource(this, executorId)
private val env = {
if (!isLocal) {
val port = conf.getInt('spark.executor.port', 0)
val _env = SparkEnv.createExecutorEnv(
conf, executorId, executorHostname, port, numCores, isLocal, actorSystem)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
_env.blockManager.initialize(conf.getAppId)
SparkEnv.get
private val executorActor = env.actorSystem.actorOf(
Props(new ExecutorActor(executorId)), 'ExecutorActor')
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
env.serializer.setDefaultClassLoader(urlClassLoader)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
private val maxResultSize = Utils.getMaxResultSize(conf)
val threadPool = Utils.newDaemonCachedThreadPool('Executor task launch worker')
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
startDriverHeartbeater()
3.8.2 ExecutorSource的创建与注册
  ExecutorSource用于测量系统。通过metricRegistry的register方法注册计量,这些计量信息包括threadpool.activeTasks、pleteTasks、threadpool.currentPool_size、threadpool.maxPool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.write_ops等,ExecutorSource的实现见代码清单3-38。Metric接口的具体实现,参考附录D。
代码清单3-38&&&&&&&& ExecutorSource的实现
private[spark] class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s =& s.getScheme.equals(scheme)).headOption
private def registerFileSystemStat[T](
scheme: String, name: String, f: FileSystem.Statistics =& T, defaultValue: T) = {
metricRegistry.register(MetricRegistry.name('filesystem', scheme, name), new Gauge[T] {
override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
override val metricRegistry = new MetricRegistry()
override val sourceName = 'executor'
metricRegistry.register(MetricRegistry.name('threadpool', 'activeTasks'), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount()
metricRegistry.register(MetricRegistry.name('threadpool', 'completeTasks'), new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
metricRegistry.register(MetricRegistry.name('threadpool', 'currentPool_size'), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize()
metricRegistry.register(MetricRegistry.name('threadpool', 'maxPool_size'), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
// Gauge for file system stats of this executor
for (scheme &- Array('hdfs', 'file')) {
registerFileSystemStat(scheme, 'read_bytes', _.getBytesRead(), 0L)
registerFileSystemStat(scheme, 'write_bytes', _.getBytesWritten(), 0L)
registerFileSystemStat(scheme, 'read_ops', _.getReadOps(), 0)
registerFileSystemStat(scheme, 'largeRead_ops', _.getLargeReadOps(), 0)
registerFileSystemStat(scheme, 'write_ops', _.getWriteOps(), 0)
创建完ExecutorSource后,调用MetricsSystem的registerSource方法将ExecutorSource注册到MetricsSystem。registerSource方法使用MetricRegistry的register方法,将Source注册到MetricRegistry,见代码清单3-39。关于MetricRegistry,具体参阅附录D。
代码清单3-39&&&&&&&& MetricsSystem注册Source的实现
def registerSource(source: Source) {
sources += source
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
case e: IllegalArgumentException =& logInfo('Metrics already registered', e)
3.8.3 ExecutorActor的构建与注册
  ExecutorActor很简单,当接收到SparkUI发来的消息时,将所有线程的栈信息发送回去,代码实现如下。
override def receiveWithLogging = {
case TriggerThreadDump =&
sender ! Utils.getThreadDump()
3.8.4 Spark自身ClassLoader的创建
  获取要创建的ClassLoader的父加载器currentLoader,然后根据currentJars生成URL数组,spark.files.userClassPathFirst属性指定加载类时是否先从用户的classpath下加载,最后创建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,见代码清单3-40。
代码清单3-40&&&&&&&& Spark自身ClassLoader的创建
private def createClassLoader(): MutableURLClassLoader = {
val currentLoader = Utils.getContextOrSparkClassLoader
val urls = currentJars.keySet.map { uri =&
new File(uri.split('/').last).toURI.toURL
val userClassPathFirst = conf.getBoolean('spark.files.userClassPathFirst', false)
userClassPathFirst match {
case true =& new ChildExecutorURLClassLoader(urls, currentLoader)
case false =& new ExecutorURLClassLoader(urls, currentLoader)
Utils.getContextOrSparkClassLoader的实现见附录A。ExecutorURLClassLoader或者ChildExecutorURLClassLoader实际上都继承了URLClassLoader,见代码清单3-41。&
代码清单3-41&&&&&&&& ChildExecutorURLClassLoader与ExecutorURLClassLoader的实现
private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends MutableURLClassLoader {
private object userClassLoader extends URLClassLoader(urls, null){
override def addURL(url: URL) {
super.addURL(url)
override def findClass(name: String): Class[_] = {
super.findClass(name)
private val parentClassLoader = new ParentClassLoader(parent)
override def findClass(name: String): Class[_] = {
userClassLoader.findClass(name)
case e: ClassNotFoundException =& {
parentClassLoader.loadClass(name)
def addURL(url: URL) {
userClassLoader.addURL(url)
def getURLs() = {
userClassLoader.getURLs()
private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends URLClassLoader(urls, parent) with MutableURLClassLoader {
override def addURL(url: URL) {
super.addURL(url)
如果需要REPL交互,还会调用addReplClassLoaderIfNeeded创建replClassLoader,见代码清单3-42。
代码清单3-42&&&&&&&& addReplClassLoaderIfNeeded的实现
private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
val classUri = conf.get('spark.repl.class.uri', null)
if (classUri != null) {
logInfo('Using REPL class URI: ' + classUri)
val userClassPathFirst: java.lang.Boolean =
conf.getBoolean('spark.files.userClassPathFirst', false)
val klass = Class.forName('org.apache.spark.repl.ExecutorClassLoader')
.asInstanceOf[Class[_ &: ClassLoader]]
val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
classOf[ClassLoader], classOf[Boolean])
constructor.newInstance(conf, classUri, parent, userClassPathFirst)
case _: ClassNotFoundException =&
logError('Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!')
System.exit(1)
3.8.5 启动Executor的心跳线程
  Executor的心跳由startDriverHeartbeater启动,见代码清单3-43。Executor心跳线程的间隔由属性spark.executor.heartbeatInterval配置,默认是10000毫秒。此外,超时时间是30秒,超时重试次数是3次,重试间隔是3000毫秒,使用actorSystem.actorSelection (url)方法查找到匹配的Actor引用, url是akka.tcp://sparkDriver@ $driverHost:$driverPort/user/HeartbeatReceiver,最终创建一个运行过程中,每次会休眠1毫秒的线程。此线程从runningTasks获取最新的有关Task的测量信息,将其与executorId、blockManagerId封装为Heartbeat消息,向HeartbeatReceiver发送Heartbeat消息。
代码清单3-43&&&&&&&& 启动Executor的心跳线程
def startDriverHeartbeater() {
val interval = conf.getInt('spark.executor.heartbeatInterval', 10000)
val timeout = AkkaUtils.lookupTimeout(conf)
val retryAttempts = AkkaUtils.numRetries(conf)
val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
val heartbeatReceiverRef = AkkaUtils.makeDriverRef('HeartbeatReceiver', conf,env.actorSystem)
val t = new Thread() {
override def run() {
// Sleep a random interval so the heartbeats don't end up in sync
Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
while (!isStopped) {
val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
val curGCTime = gcTime
for (taskRunner &- runningTasks.values()) {
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =&
metrics.updateShuffleReadMetrics
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
if (isLocal) {
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
tasksMetrics += ((taskRunner.taskId, copiedMetrics))
// It will be copied by serialization
tasksMetrics += ((taskRunner.taskId, metrics))
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (response.reregisterBlockManager) {
logWarning('Told to re-register on heartbeat')
env.blockManager.reregister()
case NonFatal(t) =& logWarning('Issue communicating with driver in heartbeater', t)
Thread.sleep(interval)
t.setDaemon(true)
t.setName('Driver Heartbeater')
这个心跳线程的作用是什么呢?其作用有两个:
q&&更新正在处理的任务的测量信息;
q&&通知BlockManagerMaster,此Executor上的BlockManager依然活着。
下面对心跳线程的实现详细分析下,读者可以自行选择是否需要阅读。
  初始化TaskSchedulerImpl后会创建心跳接收器HeartbeatReceiver。HeartbeatReceiver接受所有分配给当前Driver Application的Executor的心跳,并将Task、Task计量信息、心跳等交给TaskSchedulerImpl和DAGScheduler作进一步处理。创建心跳接收器的代码如下。
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), 'HeartbeatReceiver')
HeartbeatReceiver在收到心跳消息后,会调用TaskScheduler的executorHeartbeatReceived方法,代码如下。
override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =&
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
sender ! response
executorHeartbeatReceived的实现代码如下。
val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {
taskMetrics.flatMap { case (id, metrics) =&
taskIdToTaskSetId.get(id)
.flatMap(activeTaskSets.get)
.map(taskSetMgr =& (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
这段程序通过遍历taskMetrics,依据taskIdToTaskSetId和activeTaskSets找到TaskSetManager。然后将taskId、TaskSetManager.stageId、TaskSetManager .taskSet.attempt、TaskMetrics封装到Array[(Long, Int, Int, TaskMetrics)]的数组metricsWithStageIds中。最后调用了dagScheduler的executorHeartbeatReceived方法,其实现如下。
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
implicit val timeout = Timeout(600 seconds)
Await.result(
blockManagerMaster.driverActor ? BlockManagerHeartbeat(blockManagerId),
timeout.duration).asInstanceOf[Boolean]
dagScheduler将executorId、metricsWithStageIds封装为SparkListenerExecutorMetricsUpdate事件,并post到listenerBus中,此事件用于更新Stage的各种测量数据。最后给BlockManagerMaster持有的BlockManagerMasterActor发送BlockManagerHeartbeat消息。BlockManagerMasterActor在收到消息后会匹配执行heartbeatReceived方法(会在4.3.1节介绍)。heartbeatReceived最终更新BlockManagerMaster对BlockManager最后可见时间(即更新BlockManagerId对应的BlockManagerInfo的_lastSeenMs,见代码清单3-44)。
代码清单3-44&&&&&&&& BlockManagerMasterActor的心跳处理
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
blockManagerId.isDriver && !isLocal
blockManagerInfo(blockManagerId).updateLastSeenMs()
local模式下Executor的心跳通信过程,可以用图3-3来表示。
图3-3&&&&&& Executor的心跳通信过程
注意:在非local模式中Executor发送心跳的过程是一样的,主要的区别是Executor进程与Driver不在同一个进程,甚至不在同一个节点上。
接下来会初始化块管理器BlockManager,代码如下。
env.blockManager.initialize(applicationId)
具体的初始化过程,请参阅第4章。
未完待续。。。
后记:自己牺牲了7个月的周末和下班空闲时间,通过研究Spark源码和原理,总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前亚马逊、京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。我开始研究源码时的Spark版本是1.2.0,经过7个多月的研究和出版社近4个月的流程,Spark自身的版本迭代也很快,如今最新已经是1.6.0。目前市面上另外2本源码研究的Spark书籍的版本分别是0.9.0版本和1.2.0版本,看来这些书的作者都与我一样,遇到了这种问题。由于研究和出版都需要时间,所以不能及时跟上Spark的脚步,还请大家见谅。但是Spark核心部分的变化相对还是很少的,如果对版本不是过于追求,依然可以选择本书。
京东(现有满100减30活动):/.html&
当当:/.html&
延伸阅读:
目的:通过挂载的方式,可以类似访问本地磁盘的方式一...
本教程为 李华明 编著的iOS-Cocos2d游戏开发系列教程:教程涵盖关于i......
专题主要学习DirectX的初级编程入门学习,对Directx11的入门及初学者有......
&面向对象的JavaScript&这一说法多少有些冗余,因为JavaScript 语言本......
Windows7系统专题 无论是升级操作系统、资料备份、加强资料的安全及管......SparkContext的初始化(叔篇)——TaskScheduler的启动 - 泰山不老生 - 博客园
Blog Stats
Posts - 38
Stories - 1
Comments - 185
Trackbacks - 0
《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接
由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现。
《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第三章第二部分的内容请看链接
本文展现第3章第三部分的内容:
3.8 TaskScheduler的启动
  3.7节介绍了任务调度器TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码如下。
taskScheduler.start()
TaskScheduler在启动的时候,实际调用了backend的start方法。
override def start() {
backend.start()
以LocalBackend为例,启动LocalBackend时向actorSystem注册了LocalActor,见代码清单3-30所示(在一文)。
3.8.1 创建LocalActor
  创建LocalActor的过程主要是构建本地的Executor,见代码清单3-36。
代码清单3-36&&&&&&&& LocalActor的实现
private[spark] class LocalActor(scheduler: TaskSchedulerImpl, executorBackend: LocalBackend,
private val totalCores: Int) extends Actor with ActorLogReceive with Logging {
import context.dispatcher
// to use Akka's scheduler.scheduleOnce()
private var freeCores = totalCores
private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
private val localExecutorHostname = "localhost"
val executor = new Executor(
localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)
override def receiveWithLogging = {
case ReviveOffers =&
reviveOffers()
case StatusUpdate(taskId, state, serializedData) =&
scheduler.statusUpdate(taskId, state, serializedData)
if (TaskState.isFinished(state)) {
freeCores += scheduler.CPUS_PER_TASK
reviveOffers()
case KillTask(taskId, interruptThread) =&
executor.killTask(taskId, interruptThread)
case StopExecutor =&
executor.stop()
Executor的构建,见代码清单3-37,主要包括以下步骤:
1)&创建并注册ExecutorSource。ExecutorSource是做什么的呢?笔者将在3.10.2节详细介绍。
2) 获取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注册Executor时,则需要新建SparkEnv。可以修改属性spark.executor.port(默认为0,表示随机生成)来配置Executor中的ActorSystem的端口号。
3) 创建并注册ExecutorActor。ExecutorActor负责接受发送给Executor的消息。
4) urlClassLoader的创建。为什么需要创建这个ClassLoader?在非local模式中,Driver或者Worker上都会有多个Executor,每个Executor都设置自身的urlClassLoader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。
5) 创建Executor执行TaskRunner任务(TaskRunner将在5.5节介绍)的线程池。此线程池是通过调用Utils.newDaemonCachedThreadPool创建的,具体实现请参阅附录A。
6) 启动Executor的心跳线程。此线程用于向Driver发送心跳。
此外,还包括Akka发送消息的帧大小(字节)、结果总大小的字节限制(字节)、正在运行的task的列表、设置serializer的默认ClassLoader为创建的ClassLoader等。
代码清单3-37&&&&&&&& Executor的构建
val executorSource = new ExecutorSource(this, executorId)
private val env = {
if (!isLocal) {
val port = conf.getInt("spark.executor.port", 0)
val _env = SparkEnv.createExecutorEnv(
conf, executorId, executorHostname, port, numCores, isLocal, actorSystem)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
_env.blockManager.initialize(conf.getAppId)
SparkEnv.get
private val executorActor = env.actorSystem.actorOf(
Props(new ExecutorActor(executorId)), "ExecutorActor")
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
env.serializer.setDefaultClassLoader(urlClassLoader)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
private val maxResultSize = Utils.getMaxResultSize(conf)
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
startDriverHeartbeater()
3.8.2 ExecutorSource的创建与注册
  ExecutorSource用于测量系统。通过metricRegistry的register方法注册计量,这些计量信息包括threadpool.activeTasks、pleteTasks、threadpool.currentPool_size、threadpool.maxPool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.write_ops等,ExecutorSource的实现见代码清单3-38。Metric接口的具体实现,参考附录D。
代码清单3-38&&&&&&&& ExecutorSource的实现
private[spark] class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s =& s.getScheme.equals(scheme)).headOption
private def registerFileSystemStat[T](
scheme: String, name: String, f: FileSystem.Statistics =& T, defaultValue: T) = {
metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
override val metricRegistry = new MetricRegistry()
override val sourceName = "executor"
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount()
metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize()
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
// Gauge for file system stats of this executor
for (scheme &- Array("hdfs", "file")) {
registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
创建完ExecutorSource后,调用MetricsSystem的registerSource方法将ExecutorSource注册到MetricsSystem。registerSource方法使用MetricRegistry的register方法,将Source注册到MetricRegistry,见代码清单3-39。关于MetricRegistry,具体参阅附录D。
代码清单3-39&&&&&&&& MetricsSystem注册Source的实现
def registerSource(source: Source) {
sources += source
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
case e: IllegalArgumentException =& logInfo("Metrics already registered", e)
3.8.3 ExecutorActor的构建与注册
  ExecutorActor很简单,当接收到SparkUI发来的消息时,将所有线程的栈信息发送回去,代码实现如下。
override def receiveWithLogging = {
case TriggerThreadDump =&
sender ! Utils.getThreadDump()
3.8.4 Spark自身ClassLoader的创建
  获取要创建的ClassLoader的父加载器currentLoader,然后根据currentJars生成URL数组,spark.files.userClassPathFirst属性指定加载类时是否先从用户的classpath下加载,最后创建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,见代码清单3-40。
代码清单3-40&&&&&&&& Spark自身ClassLoader的创建
private def createClassLoader(): MutableURLClassLoader = {
val currentLoader = Utils.getContextOrSparkClassLoader
val urls = currentJars.keySet.map { uri =&
new File(uri.split("/").last).toURI.toURL
val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)
userClassPathFirst match {
case true =& new ChildExecutorURLClassLoader(urls, currentLoader)
case false =& new ExecutorURLClassLoader(urls, currentLoader)
Utils.getContextOrSparkClassLoader的实现见附录A。ExecutorURLClassLoader或者ChildExecutorURLClassLoader实际上都继承了URLClassLoader,见代码清单3-41。&
代码清单3-41&&&&&&&& ChildExecutorURLClassLoader与ExecutorURLClassLoader的实现
private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends MutableURLClassLoader {
private object userClassLoader extends URLClassLoader(urls, null){
override def addURL(url: URL) {
super.addURL(url)
override def findClass(name: String): Class[_] = {
super.findClass(name)
private val parentClassLoader = new ParentClassLoader(parent)
override def findClass(name: String): Class[_] = {
userClassLoader.findClass(name)
case e: ClassNotFoundException =& {
parentClassLoader.loadClass(name)
def addURL(url: URL) {
userClassLoader.addURL(url)
def getURLs() = {
userClassLoader.getURLs()
private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends URLClassLoader(urls, parent) with MutableURLClassLoader {
override def addURL(url: URL) {
super.addURL(url)
如果需要REPL交互,还会调用addReplClassLoaderIfNeeded创建replClassLoader,见代码清单3-42。
代码清单3-42&&&&&&&& addReplClassLoaderIfNeeded的实现
private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
val classUri = conf.get("spark.repl.class.uri", null)
if (classUri != null) {
logInfo("Using REPL class URI: " + classUri)
val userClassPathFirst: java.lang.Boolean =
conf.getBoolean("spark.files.userClassPathFirst", false)
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ &: ClassLoader]]
val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
classOf[ClassLoader], classOf[Boolean])
constructor.newInstance(conf, classUri, parent, userClassPathFirst)
case _: ClassNotFoundException =&
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
System.exit(1)
3.8.5 启动Executor的心跳线程
  Executor的心跳由startDriverHeartbeater启动,见代码清单3-43。Executor心跳线程的间隔由属性spark.executor.heartbeatInterval配置,默认是10000毫秒。此外,超时时间是30秒,超时重试次数是3次,重试间隔是3000毫秒,使用actorSystem.actorSelection (url)方法查找到匹配的Actor引用, url是akka.tcp://sparkDriver@ $driverHost:$driverPort/user/HeartbeatReceiver,最终创建一个运行过程中,每次会休眠1毫秒的线程。此线程从runningTasks获取最新的有关Task的测量信息,将其与executorId、blockManagerId封装为Heartbeat消息,向HeartbeatReceiver发送Heartbeat消息。
代码清单3-43&&&&&&&& 启动Executor的心跳线程
def startDriverHeartbeater() {
val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
val timeout = AkkaUtils.lookupTimeout(conf)
val retryAttempts = AkkaUtils.numRetries(conf)
val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf,env.actorSystem)
val t = new Thread() {
override def run() {
// Sleep a random interval so the heartbeats don't end up in sync
Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
while (!isStopped) {
val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
val curGCTime = gcTime
for (taskRunner &- runningTasks.values()) {
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =&
metrics.updateShuffleReadMetrics
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
if (isLocal) {
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
tasksMetrics += ((taskRunner.taskId, copiedMetrics))
// It will be copied by serialization
tasksMetrics += ((taskRunner.taskId, metrics))
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (response.reregisterBlockManager) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
case NonFatal(t) =& logWarning("Issue communicating with driver in heartbeater", t)
Thread.sleep(interval)
t.setDaemon(true)
t.setName("Driver Heartbeater")
这个心跳线程的作用是什么呢?其作用有两个:
q& 更新正在处理的任务的测量信息;
q& 通知BlockManagerMaster,此Executor上的BlockManager依然活着。
下面对心跳线程的实现详细分析下,读者可以自行选择是否需要阅读。
  初始化TaskSchedulerImpl后会创建心跳接收器HeartbeatReceiver。HeartbeatReceiver接受所有分配给当前Driver Application的Executor的心跳,并将Task、Task计量信息、心跳等交给TaskSchedulerImpl和DAGScheduler作进一步处理。创建心跳接收器的代码如下。
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
HeartbeatReceiver在收到心跳消息后,会调用TaskScheduler的executorHeartbeatReceived方法,代码如下。
override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =&
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
sender ! response
executorHeartbeatReceived的实现代码如下。
val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {
taskMetrics.flatMap { case (id, metrics) =&
taskIdToTaskSetId.get(id)
.flatMap(activeTaskSets.get)
.map(taskSetMgr =& (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
这段程序通过遍历taskMetrics,依据taskIdToTaskSetId和activeTaskSets找到TaskSetManager。然后将taskId、TaskSetManager.stageId、TaskSetManager .taskSet.attempt、TaskMetrics封装到Array[(Long, Int, Int, TaskMetrics)]的数组metricsWithStageIds中。最后调用了dagScheduler的executorHeartbeatReceived方法,其实现如下。
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
implicit val timeout = Timeout(600 seconds)
Await.result(
blockManagerMaster.driverActor ? BlockManagerHeartbeat(blockManagerId),
timeout.duration).asInstanceOf[Boolean]
dagScheduler将executorId、metricsWithStageIds封装为SparkListenerExecutorMetricsUpdate事件,并post到listenerBus中,此事件用于更新Stage的各种测量数据。最后给BlockManagerMaster持有的BlockManagerMasterActor发送BlockManagerHeartbeat消息。BlockManagerMasterActor在收到消息后会匹配执行heartbeatReceived方法(会在4.3.1节介绍)。heartbeatReceived最终更新BlockManagerMaster对BlockManager最后可见时间(即更新BlockManagerId对应的BlockManagerInfo的_lastSeenMs,见代码清单3-44)。
代码清单3-44&&&&&&&& BlockManagerMasterActor的心跳处理
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
blockManagerId.isDriver && !isLocal
blockManagerInfo(blockManagerId).updateLastSeenMs()
local模式下Executor的心跳通信过程,可以用图3-3来表示。
图3-3&&&&&& Executor的心跳通信过程
注意:在非local模式中Executor发送心跳的过程是一样的,主要的区别是Executor进程与Driver不在同一个进程,甚至不在同一个节点上。
接下来会初始化块管理器BlockManager,代码如下。
env.blockManager.initialize(applicationId)
具体的初始化过程,请参阅第4章。
未完待续。。。
后记:自己牺牲了7个月的周末和下班空闲时间,通过研究Spark源码和原理,总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前亚马逊、京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。我开始研究源码时的Spark版本是1.2.0,经过7个多月的研究和出版社近4个月的流程,Spark自身的版本迭代也很快,如今最新已经是1.6.0。目前市面上另外2本源码研究的Spark书籍的版本分别是0.9.0版本和1.2.0版本,看来这些书的作者都与我一样,遇到了这种问题。由于研究和出版都需要时间,所以不能及时跟上Spark的脚步,还请大家见谅。但是Spark核心部分的变化相对还是很少的,如果对版本不是过于追求,依然可以选择本书。
京东(现有满100减30活动):&

我要回帖

更多关于 如何初始化一个数组 的文章

 

随机推荐