vs如何编写单片机程序这个单片机串行通信程序?

 上传我的文档
 下载
 收藏
工程师,工作经验5年以上,很多方案类资料自己整理
 下载此文档
正在努力加载中...
PMC Job SOP
下载积分:3000
内容提示:PMC Job SOP
文档格式:PPT|
浏览次数:2|
上传日期: 01:05:49|
文档星级:
该用户还上传了这些文档
PMC Job SOP
官方公共微信spark 启动job的流程分析 - 推酷
spark 启动job的流程分析
编写一个例子程序
编写一个从
中读取并计算
的例子程序
org.apache.spark.examples
org.apache.spark.SparkContext
org.apache.spark.SparkContext._
main(args : Array[String]) {
SparkContext(args(
&wordcount by hdfs&
System.getenv(
&SPARK_HOME&
),SparkContext.jarOfClass(
.getClass()))
的根路径下得到一个文件
.textFile(
&/hadoop-test.txt&
.flatMap(line=& line.split(
.map(word =& (word,
)).reduceByKey(_+ _)
.saveAsTextFile(
&/newtest.txt&
SparkContext
在上面例子中,要执行
map/reduce
操作,首先需要一个
SparkContext
,因此看看
SparkContext
的实例生成
master: String,
appName: String,
sparkHome: String =
jars: Seq[String] = Nil,
environment: Map[String, String]= Map(),
preferredNodeLocationData:Map[String, Set[SplitInfo]] = Map()) =
(SparkContext.updatedConf(
SparkConf(), master, appName, sparkHome, jars, environment),
preferredNodeLocationData)
例子时使用了上面列出的构造函数,后面两个
environment
preferredNodeLocationData
传入为默认值。
updatedConf
的单例函数,生成或更新当前的
SparkContext
的默认构造函数。
生成并启动监控的
Jettyui,SparkUI.
TaskScheduler
实例,并启动。
此函数会根据不同的
mastername
生成不同的
TaskScheduler
,yarn-cluster
YarnClusterScheduler
主要用来启动
的运行状态。
taskScheduler
= SparkContext.createTaskScheduler(
taskScheduler
DAGScheduler
实例,并启动。
dagScheduler
DAGScheduler(
taskScheduler
dagScheduler
操作后,通过调用
postStartHook
SparkContext
WorkerRunnable
线程,通过
CoarseGrainedExecutorBackend
此实例通过
实例来加载相关的
SparkContext.textFile
此方法用来生成
的实例,通常读取文本文件的方式通过
来进行,并其调用
hadoopFile
hadoopFile
HadoopRDD&K,V&
的实例后,通过
的值。并生成
textFile(path: String, minSplits: Int = defaultMinSplits):RDD[String] = {
hadoopFile(path,classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
).map(pair=& pair.
.toString)
hadoopFile
函数生成一个
hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ &:InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
configuration can be about 10 KB, which is pretty big, so broadcastit.
confBroadcast
= broadcast(
SerializableWritable
hadoopConfiguration
setInputPathsFunc
= (jobConf: JobConf) =& FileInputFormat.setInputPaths(jobConf,path)
HadoopRDD(
confBroadcast
setInputPathsFunc
inputFormatClass,
valueClass,
minSplits)
函数的抽象执行
reduceByKey
,也就是需要多个
中的数据集合到相同的
中运行,生成相关的
.textFile(
&/hadoop-test.txt&
.flatMap(line=& line.split(
.map(word =& (word,
)).reduceByKey(_+ _)
.saveAsTextFile(
&/newtest.txt&
在以上代码中
,textFile,flatMap,map,reduceByKey
transformation,
saveAsTextFile
中进行执行操作的
http://my-oschina-net/hanzhankang/blog/200275
的相关说明:
具体可参见:
http://spark.apache.org/docs/0.9.0/scala-programming-guide.html
transformation
是得到一个新的
,方式很多,比如从数据源生成一个新的
生成一个新的
是得到一个值,或者一个结果(直接将
到内存中)
transformation
都是采用的懒策略,就是如果只是将
transformation
提交是不会执行计算的,计算只有在
被提交的时候才被触发。
transformation
map(func):
数据集中的每个
,然后返回一个新的
这个返回的数据集是分布式的数据集
filter(func):
数据集中的每个元素都使用
,然后返回一个包含使
的元素构成的
flatMap(func):
差不多,但是
生成的是多个结果
mapPartitions(func):
很像,但是
mapPartitions
mapPartitionsWithSplit(func):
mapPartitions
很像,但是
作用的是其中一个
sample(withReplacement,faction,seed):
union(otherDataset)
:返回一个新的
的元素的集合
distinct([numTasks]):
返回一个新的
含有的是源
groupByKey(numTasks):
(K,Seq[V])
函数接受的
key-valuelist
reduceByKey(func,[numTasks]):
就是用一个给定的
reducefunc
groupByKey
(K,Seq[V]),
比如求和,求平均数
sortByKey([ascending],[numTasks]):
来进行排序,是升序还是降序,
join(otherDataset,[numTasks]):
dataset(K,V)
,返回的是
dataset,numTasks
为并发的任务数
cogroup(otherDataset,[numTasks]):
dataset(K,V)
,返回的是
(K,Seq[V],Seq[W])
dataset,numTasks
为并发的任务数
cartesian(otherDataset)
:笛卡尔积就是
,大家懂的
reduce(func)
:说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的
或者足够小的结果的时候,再用
封装返回一个数组
中的第一个元素
driverprogram
takeSample(withReplacement
:抽样返回一个
个元素,随机种子
saveAsTextFile
支持的文件系统中,
把每条记录都转换为一行记录,然后写到
saveAsSequenceFile(path):
对上,然后生成
SequenceFile
写到本地或者
countByKey()
:返回的是
对应的个数的一个
,作用于一个
foreach(func):
中的每个元素都使用
saveAsTextFile
SparkContext.runJob
saveAsTextFile
--&saveAsHadoopFile,
SparkContext.runJob
saveAsTextFile(path: String) {
.map(x=& (NullWritable.get(),
Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)
以下一行代码就是在
saveASTextFile
函数嵌套调用中最终调用的函数,调用
SparkContext.runJob
self.context.runJob(self,writeToFile _)
SparkContext.runJob
runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T])=& U): Array[U] = {
runJob(rdd, func,
rdd.partitions
SparkContext
的最终执行
runJob[T, U: ClassTag](
rdd: RDD[T],//
此处是具体的
func: (TaskContext, Iterator[T])=& U,//
具体的执行的
reduceByKey
partitions:Seq[Int],//
一个数值从
partitions.size-1
allowLocal: Boolean,//
是否可以在本地执行
的处理逻辑
resultHandler: (Int, U)
=getCallSite
cleanedFunc
= clean(func)
&Startingjob: &
=System.nanoTime
DAGScheduler.runJob
的运行操作,请看下面的
DAGScheduler
dagScheduler
.runJob(rdd,
cleanedFunc
,partitions,
,allowLocal,
resultHandler,
localProperties
&Jobfinished: &
+ (System.nanoTime -
rdd.doCheckpoint()
DAGScheduler
上面的函数最终通过
DagScheduler.runJob
进行执行。
runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T])=& U,
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
resultHandler: (Int, U) =&Unit,
properties: Properties =
=submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler,properties)
运行完成。
.awaitResult()
JobSucceeded =& {}
JobFailed(
:Exception, _) =&
&Failedto run &
+ callSite)
DAGShceduler.submitJob
来提交任务。
submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T])=& U,
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
resultHandler: (Int, U) =&Unit,
properties: Properties =
):JobWaiter[U] =
//Check to make sure we are not launching a task on a partition thatdoes not exist.
maxPartitions
= rdd.partitions.length
partitions.find(p =& p &=
maxPartitions
).foreach{ p =&
IllegalArgumentException(
&Attemptingto access a non-existent partition: &
&Totalnumber of partitions: &
maxPartitions
.getAndIncrement()
(partitions.size ==
JobWaiter[U](
,resultHandler)
assert(partitions.size &
=func.asInstanceOf[(TaskContext, Iterator[_]) =& _]
JobWaiter(
,partitions.size, resultHandler)
JobSubmitted,!
表示发送消息
eventProcessActor
! JobSubmitted(
,partitions.toArray, allowLocal, callSite,
,properties)
DAGShceduler
方法时,会生成如下代码,此代码
eventProcessActor
发送的消息并进行处理
eventProcessActor
actorSystem
.actorOf(Props(
* A handle to the periodicaltask, used to cancel the task when the actor is stopped.
resubmissionTask
:Cancellable = _
preStart() {
context.dispatcher
* A message is sent to theactor itself periodically to remind the actor to resubmit failed
* stages. In this way, stage
resubmission
can be done within the same thread context of
* other event processing logicto avoid unnecessary synchronization overhead.
resubmissionTask
.system.scheduler.schedule(
RESUBMIT_TIMEOUT
RESUBMIT_TIMEOUT
,ResubmitFailedStages)
* The main event loop of the DAGscheduler.
接收发送的
事件,并通过
processEvent
进行处理。
receive = {
:DAGSchedulerEvent =&
&Gotevent of type &
.getClass.getName)
* All events are forwarded to`processEvent()`, so that the event processing logic can
* easily tested withoutstarting a dedicated actor. Please refer to `DAGSchedulerSuite`
* for details.
(!processEvent(
submitWaitingStages()
resubmissionTask
processEvent
JobSubmitted
的处理流程
以下代码中生成一个
finalStage,
finalStage,
划分出不同的
,并且提交
[scheduler]
processEvent(event: DAGSchedulerEvent): Boolean = {
JobSubmitted(
partitions
allowLocal
properties
finalStage
//New stage creation may throw an exception if, for example, jobs arerun on a HadoopRDD
//whose underlying HDFS files have been deleted.
finalStage
= newStage(
partitions
.size,None,
:Exception =&
logWarning(
&Creatingnew stage failed due to exception - job: &
.jobFailed(
ActiveJob(
finalStage
partitions
properties
clearCacheLocs()
partitions
&output partitions (allowLocal=&
allowLocal
&Finalstage: &
finalStage
finalStage
&Parentsof final stage: &
finalStage
&Missingparents: &
+getMissingParentStages(
finalStage
如果可以本地运行,同时此
finalStage
的依赖关系
partitions
只有一个。也就是只有一个处理的
那么这时直接通过
localThread
的方式来运行此
实例。不通过
TaskScheduler
进行处理。
allowLocal
finalStage
partitions
//Compute very short actions like first() or take() with no parentstages locally.
listenerBus
.post(SparkListenerJobStart(
properties
runLocally(
partitions
有多个,或者
本身的依赖关系,也就是像
这种场景。
stage(finalStage),
submitStage,
之间的依赖关系得出
,并以依赖关系进行处理:
idToActiveJob
activeJobs
resultStageToJob
finalStage
listenerBus
.post(SparkListenerJobStart(
jobIdToStageIds
).toArray,
properties
submitStage(
finalStage
submitStage
方法处理流程:
submitStage(stage: Stage) {
=activeJobForStage(stage)
.isDefined){
&submitStage(&
(stage)&& !
(stage)&& !
=getMissingParentStages(stage).sortBy(_.
&missing:&
&Submitting&
&), which has no missingparents&
submitMissingTasks(stage,
submitStage(
abortStage(stage,
&Noactive job for stage &
对于一个刚生成的
为刚生成,
submitStage
getMissingParentStages
的依赖关系
parentStage
dependencies
来生成相关的
的依赖关系,
如果依赖关系是
ShuffleDependency
,生成一个
finalStage
NarrowDependency
,不生成新的
没有相关的数据依赖
也就是说,对应需要执行
finalStage
只需要一个
finalStage
getMissingParentStages(stage:Stage): List[Stage] = {
HashSet[Stage]
HashSet[RDD[_]]
visit(rdd: RDD[_]) {
getCacheLocs
.contains(Nil)){
&-rdd.dependencies) {
:ShuffleDependency[_,_] =&
=getShuffleMapStage(
.isAvailable){
:NarrowDependency[_] =&
visit(stage.
接下来回到
submitStage
parentstage)
的提交操作。
&Submitting&
&), which has no missingparents&
submitMissingTasks(stage,
设置当前的
因为当前的
需要进行顺序执行。先执行
submitStage(
设置当前的
的执行完成。
submitMissingTasks
流程处理,把
TaskScheduler
submitMissingTasks(stage:Stage, jobId: Int) {
&submitMissingTasks(&
//Get our pending tasks and remember them in our pendingTasks entry
pendingTasks
.getOrElseUpdate(stage,
=ArrayBuffer[Task[_]]()
shuffleMapStage,
ShuffleMapTask,
表示还有其它
isShuffleMap
until stage.
numPartitions
outputLocs
(p)== Nil) {
中当前传入的
TaskLocation(
=getPreferredLocs(stage.
ShuffleMapTask(stage.
shuffleDep
否则表示是一个
finalStage,
直接输出结果,生成
ResultTask,
//T figure out its job's missing partitions
resultStageToJob
numPartitions
partitions
中当前传入的
TaskLocation(
=getPreferredLocs(stage.
ResultTask(stage.
properties
idToActiveJob
.contains(jobId)){
idToActiveJob
properties
//thisstage will be assigned to &default& pool
//must be run listener before possible NotSerializableException
//should be &StageSubmitted& first and then &JobEnded&
listenerBus
.post(SparkListenerStageSubmitted(
stageToInfos
properties
如果有生成的
中有需要执行的
//Preemptively serialize a task to make sure it can be serialized. Weare catching this
//exception here because it would be fairly hard to catch thenon-
serializable
//down the road, where we have several different implementations forlocal scheduler and
//cluster schedulers.
SparkEnv.get.
closureSerializer
.newInstance().serialize(
:NotSerializableException =&
abortStage(stage,
&Tasknot serializable: &
.toString)
&Submitting&
& missing tasks from &
&Newpending tasks: &
TaskScheduler.submitTasks
处理函数,
TaskScheduler
YarnClusterScheduler
请参见下面的
TaskScheduler
taskSched.submitTasks(
.toArray,stage.
,stage.newAttemptId(), stage.
properties
stageToInfos
submissionTime
= Some(System.currentTimeMillis())
& %b %d %d&
stage.isAvailable,stage.
numAvailableOutputs
numPartitions
到目前为此,
DAGScheduler
的处理流程完成。等待
TaskScheduler
处理完数据后,回调
DAGScheduler.
TaskScheduler
TaskScheduler
YarnClusterScheduler
时,通过调用
submitTasks
YarnClusterScheduler
TaskSchedulerImpl.
TaskSchedulerImpl.submitTasks
的提交进行处理。
submitTasks(taskSet: TaskSet){
&Addingtask set &
+ taskSet.
.synchronized{
TaskSetManager
实例,并把此实例设置到
activeTaskSets
的容器中。
在生成实例的过程中,会把
传入,并得到要执行的
生成副本执行次数的容器
copiesRunning
,列表的个数为
的个数,所有的列表值为
,表示没有副本执行
pendingTasksForExecutor(process_local)
此时没有值,
pendingTasksForHost(node_local),
此时此节点的
全在此里面
注册时已经存在
/pendingTasksForRack(rack)/
,通常情况不会有值
pendingTasksWithNoPrefs(
通常情况不会有值。
allPendingTasks(any)
都在最后一个中
TaskSetManager(
maxTaskFailures
activeTaskSets
TaskSetManager
schedulableBuilder
.addTaskSetManager(
properties
TaskSet(job)
生成一个跟踪每一个
taskSetTaskIds
HashSet[Long]()
是否被启动,如果没有被启动,提示无资源,如果被启动成功,关闭此检查线程。
(!isLocal && !
hasReceivedTask
starvationTimer
.scheduleAtFixedRate(
TimerTask() {
hasLaunchedTask
logWarning(
&Initialjob has not ac &
&checkyour cluster UI to ensure that workers are registered &
&andhave sufficient memory&
STARVATION_TIMEOUT
STARVATION_TIMEOUT
hasReceivedTask
发起执行消息
SchedulerBackend
的具体实现,
yarn-cluster
CoarseGrainedSchedulerBackend
.reviveOffers()
CoarseGrainedSchedulerBackend.reviveOffers
driverActor
实例发起一个
ReviveOffers
的事件处理消息。
reviveOffers() {
driverActor
! ReviveOffers
driverActor
CoarseGrainedSchedulerBackend.DriverActor
DriverActor
其中,处理
ReviveOffers
部分定义如下:
ReviveOffers =&
makeOffers()
最终调用的
makeOffers
makeOffers() {
executorHost
的值由来请查看
启动时的补充
launchTasks(scheduler.resourceOffers(
executorHost
WorkerOffer(
CoarseGrainedSchedulerBackend
scheduler(TaskSchdulerImpl).resourceOffers
resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] =synchronized {
SparkEnv.set(
//Mark each slave as alive and remember its
&-offers) {
executor(worker)
存储到对应的容器中
executorid
executorIdToHost
executorId
得到当前所有注册的
写入到对应的容器中
此容器表示
node_local
executorsByHost
.contains(
executorsByHost
HashSet[String]()
DAGScheduler.executorGained
executorId
请参见下面
DAGScheduler
ExecutorGained
executorGained(
executorId
根据所有的
根据每一个
[arraybuffer[]]
//Build a list of tasks to assign to each worker
=offers.map(o =&
ArrayBuffer[TaskDescription](o.
得到每一个
availableCpus
= offers.map(o =& o.
中排序后的队列中的所有的
TaskSetMansger
sortedTaskSets
.getSortedTaskSetQueue()
sortedTaskSets
&parentName:%s, name: %s, runningTasks: %s&
runningTasks
//Take each TaskSet in our scheduling order, and then offer it eachnode in increasing order
//of locality levels so that it gets a chance to launch local tasks onall of them.
launchedTask
迭代出每一个
TaskSetMansger,
同时根据每一个
TaskSetMansger,
迭代去按网络的优先级执行
PROCESS_LOCAL
NODE_LOCAL
RACK_LOCAL
如果包含多个执行器,也就是
的表达式,多个用
号分开,后面一个优先前面一个执行
也就是后一个执行完成后,相当于一个嵌套的
此处开始执行对
的执行节点选择,针对每一个
PROCESS_LOCAL
sortedTaskSets
maxLocality
&- TaskLocality.values) {
迭代所有的
并在每迭代出一个
时,在此机器上生成执行
中对应的相关
TaskSetmanager.resourceOffer
的处理流程,见后面的细节分析,现在不分析此实现。
launchedTask
until offers.size) {
executorId
执行的节点信息等,每次执行
resourceOffer
TaskDescription
executorid
添加到对应的
activeExecutorIds
executorsByHost
.resourceOffer(
availableCpus
maxLocality
taskIdToTaskSetId
taskSetTaskIds
taskIdToExecutorId
activeExecutorIds
的值添加一个正在执行的
这个值的作用是当有多个
的依赖时,
submitTasks
TaskSetManager
直接使用此
PROCESS_LOCAL.
activeExecutorIds
executorsByHost
availableCpus(i) -=
launchedTask
TaskLocality
,如果在一个较小的
时找到一个
中接着找,
否则跳出去从下一个
重新找,放大
的查找条件。
launchedTask
表示在传入的
级别上查找到
要执行对应的级别,
那么在当前级别下接着去找到下一个可执行的
launchedTask
launchedTask
当前迭代的
PROCESS_LOCAL,
那么把级别放大到
NODE_LOCAL
launchedTask
生成成功,设置
hasLaunchedTask
前面我们提到过的
submitTasks
中的检查线程开始结束。
hasLaunchedTask
返回生成成功的
列表。交给
CoarseGrainedSchedulerBackend.launchTasks
CoarseGrainedSchedulerBackend.launchTasks
处理流程:
CoarseGrainedExecutorBackend
发送消息,处理
LaunchTask
launchTasks(tasks: Seq[Seq[TaskDescription]]) {
&-tasks.flatten) {
freeCores(task.executorId) -=
executorActor
executorId
! LaunchTask(
CoarseGrainedExecutorBackend
LaunchTask
事件事件。
receive = {
LaunchTask(
&Gotassigned task &
&ReceivedLaunchTask command but executor was null&
System.exit(
见后面的分析。
.launchTask(
.serializedTask)
TaskSetManager.resourceOffer
函数,每次执行得到一个
的执行节点。
resourceOffer(
execId: String,
host: String,
availableCpus: Int,
maxLocality:TaskLocality.TaskLocality)
:Option[TaskDescription] =
如果成功的
个数小于当前的
资源需要大于或等于
spark.task.cpus
配置的值,默认需要大于或等于
tasksSuccessful
&& availableCpus &=
CPUS_PER_TASK
=clock.getTime()
得到一个默认的
的值,默认情况下最有可能是
NODE_LOCAL.
此处根据上一次查找可执行节点的时间,得到一个合适此执行时间的一个
spark.locality.wait
配置全局的等待时间。默认为
PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL
spark.locality.wait.process
PROCESS_LOCAL
的等待时间。
spark.locality.wait.node
NODE_LOCAL
的等待时间。
spark.locality.wait.rack
RACK_LOCAL
的等待时间。
这里的查找方式是通过当前的
currentLocalityIndex
的值,默认从
开始,找到对应可执行的级别,
检查当前时间减去上次的查找级别的执行时间是否大于上面配置的在此级别的执行时间,
如果大于配置的时间,把
currentLocalityIndex
重新检查,返回一个合适的
如果执行查找的时间超过了以上配置的几个
的级别的查找时间,此时返回的值为
allowedLocality
= getAllowedLocalityLevel(
首先把当前可执行的
PROCESS_LOCAL
.maxLocality
是最大的级别,
得到的可执行级别不能超过此级别
PROCESS_LOCAL
开始一级一级向上加大。
maxLocality
PROCESS_LOCAL
一级一级向上加,
getAllowedLocalityLevel
查找到的级别大于现在传入的级别。把级别设置为传入的级别。
maxLocality
PROCESS_LOCAL/NODE_LOCAL/RACK_LOCAL/ANY
进行传入。
allowedLocality
& maxLocality) {
allowedLocality
= maxLocality
// We're not allowed tosearch for farther-away tasks
对应的执行网络选择。
TaskSetManager.findTask
的执行节点的流程部分
findTask(execId, host,
allowedLocality
taskLocality
//F do some bookkeeping and return a task description
=sched.newTaskId()
//Figure out whether this should count as a preferred launch
&Startingtask %s:%d as TID %s on executor %s: %s (%s)&
,execId, host,
taskLocality
的执行副本加一,
//Do various bookkeeping
copiesRunning(index) +=
,execId, host,
taskLocality
taskAttempts
taskAttempts
得到当前加载的节点执行级别的
并更新当前查找此执行节点的查找时间为当前时间。
//Update our locality level for delay scheduling
currentLocalityIndex
= getLocalityIndex(
taskLocality
lastLaunchTime
//Serialize and return the task
=clock.getTime()
//We rely on the DAGScheduler to catch non-
serializable
closures and RDDs, so in here
//we assume the task can be serialized without exceptions.
serializedTask
= Task.serializeWithDependencies(
addedFiles
=clock.getTime() -
runningTasksSet
的容器中。
addRunningTask(
&Serializedtask %s:%d as %d bytes in %d ms&
serializedTask
&task %s:%d&
的执行尝试的值为
,表示是第一次尝试执行,通过
DAGScheduler
BeginEvent
taskAttempts
taskStarted(
TaskDescription(
serializedTask
TaskSetManager.findTask
的执行节点的流程部分:
级别中取出需要执行的
findTask(execId: String,host: String, locality: TaskLocality.Value)
:Option[(Int, TaskLocality.Value)] =
此处优先找
PROCESS_LOCAL
的值,但是我现在还没有搞明白这个
pendingTasksForExecutor
的值从何来。
TaskSetManager
生成时可以看出
pendingTasksForExecutor
的值在实例生成时
TaskSchedulerImpl.
activeExecutorIds
中检查并生成。但实例生成此,此容器还没有值。这点还没搞明白。
新的批注:
的依赖关系时,第一个
执行完成后,
activeExecutorIds
的容器会有执行过的
执行完成后,新的一个
开始执行,
TaskSetManager
pendingTasksForExecutor
中包含可以直接使用上一个
因此,如果有
的依赖关系时,下一个
在此时如果
executorid
相同,直接使用
PROCESS_LOCAL
PROCESS_LOCAL
不会被选择,正常情况
的选择会放大的
NODE_LOCAL
&-findTaskFromList(getPendingTasksForExecutor(execId))) {
,TaskLocality.
PROCESS_LOCAL
(TaskLocality.isAllowed(locality, TaskLocality.
NODE_LOCAL
&-findTaskFromList(getPendingTasksForHost(host))) {
,TaskLocality.
NODE_LOCAL
(TaskLocality.isAllowed(locality, TaskLocality.
RACK_LOCAL
&- sched.getRackForHost(host)
&- findTaskFromList(getPendingTasksForRack(
,TaskLocality.
RACK_LOCAL
//Look for no-
tasks after rack-local tasks since they can run anywhere.
&-findTaskFromList(
pendingTasksWithNoPrefs
,TaskLocality.
PROCESS_LOCAL
(TaskLocality.isAllowed(locality, TaskLocality.
&-findTaskFromList(
allPendingTasks
,TaskLocality.
//Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host,locality)
启动时的补充
一些需要的说明:在
makeOffers
TaskScheduler.resourceOffers
此函数中传入的
executorHost,freeCores
的值什么时候得到呢:
我们知道在
启动的时候。会根据设置的
num-worker
运行的资源,
WorkerRunnable
CoarseGrainedExecutorBackend
在实例中连接
sparkContext
CoarseGrainedSchedulerBackend.DriverActor
DriverActor
CoarseGrainedSchedulerBackend.
ACTOR_NAME
CoarseGrainedExecutorBackend
生成的一些代码片段:
YarnAllocationHandler.allocateResources
workerIdCounter
.incrementAndGet().toString
&akka.tcp://spark@%s:%s/user/%s&
&spark.driver.host&
&spark.driver.port&
CoarseGrainedSchedulerBackend.
ACTOR_NAME
YarnAllocationHandler.allocateResources
WorkerRunnable
的线程启动
workerRunnable
WorkerRunnable(
workerHostname
workerMemory
workerCores
workerRunnable
CoarseGrainedExecutorBackend
实例启动时,向
preStart() {
&Connectingto driver: &
+ driverUrl)
.actorSelection(driverUrl)
启动时注册
RegisterExecutor
(executorId,hostPort, cores)
.system.eventStream.subscribe(
,classOf[RemotingLifecycleEvent])
CoarseGrainedSchedulerBackend
RegisterExecutor
的事件处理。
receive = {
RegisterExecutor(
executorId
Utils.checkHostPort(
&Host port expected &
executorActor
中已经包含有发送此消息过来的
已经注册,
通过发送消息过来的
actor(sender
表示发送此消息的
RegisterExecutorFailed
executorActor
.contains(
executorId
! RegisterExecutorFailed(
&Duplicateexecutor ID: &
executorId
actor(worker)
还没有被注册,把
executorActor
同时向发送消息过来的
actor(sender
表示发送此消息的
RegisteredExecutor
&Registeredexecutor: &
+ sender +
&with ID &
executorId
!RegisteredExecutor(sparkProperties)
executorActor
executorId
TaskScheduler
executorHost,
executorHost
executorId
)= Utils.parseHostPort(
executorId
executorAddress
executorId
)= sender.path.address
addressToExecutorId
(sender.path.address)=
executorId
totalCoreCount
.addAndGet(
把现在注册的所有的节点添加到
TaskScheduler.
executorsByHost
中。在生成
TaskSetManager
makeOffers()
CoarseGrainedExecutorBackend
RegisteredExecutor
RegisterExecutorFailed
的处理流程:
函数中处理
RegisterExecutorFailed:
如果已经存在,直接
RegisterExecutorFailed(
&Slaveregistration failed: &
System.exit(
函数中处理
RegisteredExecutor:
如果不存在,生成
实例。此时
启动完成,
注册成功。
RegisteredExecutor(
sparkProperties
&Successfullyregistered with driver&
//Make this host instead of hostPort ?
Executor(executorId, Utils.parseHostPort(hostPort).
sparkProperties
的依赖关系
..........
的执行过程
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致

我要回帖

更多关于 双单片机串行通信 的文章

 

随机推荐