我能帮你你能解决吗

当前位置: >
spark的任务执行流程解析
来源:INFOCOOL & 发布时间:
& 作者:网友 &
浏览次数:
摘要: 一、从架构上看Spark的Job工作(Master\Worker) [睡着的水-hzjs-] 1.Master节点上是Master进程,主要是管理资源的,...
一、从架构上看Spark的Job工作(Master\Worker)
[睡着的水-hzjs-]
1.Master节点上是Master进程,主要是管理资源的,资源主要是内存和CPU。Master能够接收客户端发送的程序并且为程序进行注册。worker节点有worker进程,负责当前节点的内存和cpu的使用,spark是主从结构式架构。
2.Spark运行作业的方式有很多,最简单的是就是通过spark-shell ,每个程序的程序的ID是向master 注册的时候由master分配的。worker节点在执行程序工作的时候的core数量、内存的大小是在安装Spark集群的时候配置文件中我们自己配置的。当启动spark-shell 的时候并没有运行任何job计算任务,那Master有没有分配资源呢?是有的!
3.master分配资源的方式是粗粒度的。spark-shell 中默认的是没有任何的Job,有资源分配就是因为分配资源的方式粗粒度的,默认情况下,虽然没有job计算,只要提交了程序并注册了,就会分配资源。
4.分配资源默认的分配方式在每个workers上,为当前程序分配一个Executorbackend 进行,且默认情况下会最大化的使用cores和memory。
5.Executor一次性最多能运行多少个并发的task,取决于当前Executor 能够使用的work的core的数量
6.有多个Executor ,就能更多的获取内存跟core的数量,但是默认情况下,一个节点只有一个Executor.
7.出现OOM的解决方式之一就是加大分片的数量。
8.一个分片数据进哪一个Executor ,主要取决于数据的本地性。
9.Executor 的并行度是可以继承的,后面的任务并行数量会继承前面的task数量,如果不进行类似reduceByKey类似的操作。
10.reduceByKey 运行的时候会只有一个任务,并行分片唯一,一般是运行在数据量比较大的节点上,数据本地性线程并不关心运行 什么代码,线程是代表这资源,由于它不关心代码,所以task与Thread是解耦合的,所以Thread是可以复用的,
总结:当spark集群启动的时候,首先启动master进程,负责整个集群资源的管理和分配,并且接收作业的提交,为作业注册、分配资源及每个工作节点默认情况都会启动一个worker Process 来管理当前节点的cpu等计算资源(实际上是还是通过Master来管理的)。worker Process&它会向master汇报worker当前能够工作。当用户向Master提交作业的时候,master为会为程序分配ID,并分配资源,默认情况下会为当前的应用程序在每个worker Process 下面分配一个CoarseGrainedExecutorBackend进程,该进程默认情况下会最大化的使用当前节点上的内存和CPU。
流程图如下:
二、从依赖上看spark的job运行原理
两种依赖的示意图如下,窄依赖、宽依赖:
宽依赖与窄依赖的定义:
示例如下图:
如果是这样的复杂的依赖关系会产生很多问题,严重的影响程序的性能:
1、Task太大。遇到Shuffle级别的依赖关系(宽依赖)必须要计算依赖的RDD的所有的Partitions,并且是发生在一个Task中。
2、重复计算。如上图最终的结果有三个Partitions,所以因为Shuffle的缘由,计算了三次所有的依赖RDD.
3、假设考虑从后往前的依赖关系,我们设计算法的角度看那些多次用到就需要cache(),这会造成了存储的浪费。
-------虽然这个依赖有很多的内存浪费,计算重复等,,但是我们依然可以看出血统的影子pipeline(做函数展开),无法很好的实现是因为上述的假设的核心问题都是在遇到shuffle依赖的时候无法很好的进行pipeline.我们只能退而求其次,在遇到shuffle的时候我们就需要断开,这样一个个的Stage的划分就清楚了。窄依赖加入,宽依赖断开。
-------每个Stage里面的Task的数量是由最后一个Stage的Partition的数量决定的
-------再次思考pipeline,即使采用pipeline的方式,函数f对依赖的RDD中的数据集合操作也会有两种方式:
1,f(记录),f 作用于集合的每一条记录,每次只作用于一条数据(spark采用);
2,f(记录),f一次性作用于集合的全部数据;
-------Spark采用第一种方式的原因:、
1、无需等待,可以最大化的使用集群的计算资源。
2、减少OOM的发生
3、最大化的有利于并发
4、可以精确的控制每一个partiton本身极其内部的计算
5、基于lineage的算子流动式函数编程,节省了中间结果的产生,并且可以最快的恢复;
三、从物理执行的角度看Spark的job执行
------Spark Application 里面可以产生一个或者多个Job,例如spark-shell 默认启动的时候内部就没有Job,只是作为资源分配的程序,可以在spark-shell 里面写代码产生他若干个job,普通程序一般而言可以有不同的Action,每一个Action一般也会触发一个job
------Spark是MapReduce思想的一种更加精致和高效的实现,MapReduce有很多集体不同的实现,例如hadoop的MapReduce基本的计算流程如下:
1、首先是以JVM为对象的并发执行的Mappper,其中Map的执行会产生数据,输出数据经过Partitioner指定的规则,放到当地的文件系统中,然后经过Shuffle、sort、Affregate 变成reduce的输入,执行reduce产生最终的执行结果。但是在执行迭代的时候,由于每次都要将reduce的结果存入HDFS,下次计算还要取出,,,这样就造成了很多的局限性。
2、Spark 算法构造和物理执行时的核心思想之一就是:最大化pipeline(数据复用效果好)!基于Pipeline的思想,数据被使用的时候才开始计算,从数据流动的视觉来说,是数据流动到计算的位置!从逻辑的角度上来看是算子在数据上流带动。从算法构建的角度而言,是算子作用于数据,所以是算子在数据上流动。从物理执行的角度而言,是数据流动到计算的位置。
--------每个Stage中除了最后一个RDD算子是真实的以外,前面的算子都是假的。在中间它会进行算子合并。。。
--------由于计算的Lazy特性,导致计算从后往前回溯,形成Computing Chain ,导致的结果就是需要首先计算出具体一个Stage内部左侧的RDD中本次计算依赖的Partition(只是逻辑上思考的结果,在所有的发送给Executor之前,所有的算子已经合并成了一个,现实中就是一个函数).计算发生的位置在最后的RDD。
------从后往前回溯形成计算链条,实际执行肯定是从前往后执行的。
-----窄依赖的物理执行
一个Stage内部的RDD都是窄依赖,窄依赖计算本身是逻辑上看从Stage内部最左侧的RDD开始立即计算的,根据Computing Chain(从后往前回溯构建生成),数据(Record) 从一个计算步骤流动下一个结算步骤,以此类推,知道算到Stage内部最后一个RDD来产生计算结果。实际物理计算是让数据从前往后再算子上流动,知道不能流动时,就开始计算下一个Record。这就导致了一个比较好的方式:
后面的RDD对前面的RDD的依赖虽然是Partition级别的数据集合的依赖,但是并不需要父RDD把Partition中所有的Records计算完毕才整体往后流动数据进行计算,这就极大的提高了计算速率。
-----宽依赖的物理执行 必须等到依赖的父Stage中最后一个RDD全部数据彻底计算完毕,才能够经过shuffle来计算当前的Stage.
我来说两句
友情链接:ExecutorBackend的异常处理
ExecutorBackend的异常处理
CoarseGrainedExecutorBackend在运行中出现异常,将调用exitExecutor方法进行处理,处理以后退出。exitExecutor这个函数可以由其他子类重载来处理,executor执行的退出方式不同。例如:当executor挂掉了,后台程序可能不会让父进程也挂掉。如果需通知Driver,Driver将清理挂掉的executor的数据。
CoarseGrainedExecutorBackend的exitExecutor方法:
1.protected def exitExecutor(code: Int,
2.reason: String,
3.throwable:Throwable = null,
4.notifyDriver:Boolean = true) = {
5.val message = &Executorself-exiting due to : & + reason
6.if (throwable != null) {
7.logError(message, throwable)
8.} else {
9.logError(message)
12.if (notifyDriver &&driver.nonEmpty) {
13.driver.get.ask[Boolean](
14.RemoveExecutor(executorId,new ExecutorLossReason(reason))
15.).onFailure { case e =&
16.logWarning(s&Unableto notify the driver due to & + e.getMessage, e)
17.}(ThreadUtils.sameThread)
20.System.exit(code)
CoarseGrainedExecutorBackend在运行中可能出现的异常情况,将调用exitExecutor方法处理:
l Executor向Driver注册RegisterExecutor失败。
l Executor收到Driver的RegisteredExecutor注册成功消息以后,创建Executor实例失败。
l Driver返回Executor注册失败消息RegisterExecutorFailed。
l Executor收到Driver的LaunchTask启动任务消息,但是executor为null。
l Executor收到Driver的KillTask消息,但是executor为null。
l Executor和Driver失去连接。Cloudera CommunitySpark on Mesos: 粗粒度与细粒度实现分析 (markdown排版)
背景Mesos粗粒度Mesos细粒度背景顺着昨天spark standalone实现那篇文章继续扯淡,看看Mesos Scheduler的两种实现的异同。 对我来说,回过头再仔细看Spark在这一
顺着昨天spark standalone实现那篇文章继续扯淡,看看Mesos Scheduler的两种实现的异同。
对我来说,回过头再仔细看Spark在这一层的实现,思路又清晰了许多。
Mesos粗粒度
CoarseMesosSchedulerBackend,是mesos的粗粒度scheduler backend实现。
简单说一下mesos的Scheduler,提供的回调函数,及spark实现的逻辑:
Mesos Scheduler接口
spark实现逻辑
void registered(SchedulerDriver driver, FrameworkID frameworkId, MasterInfo masterInfo);
当Scheduler成向mesos master注册之后回调,会返回唯一的framework id
得到framework的id,作为appId
void reregistered(SchedulerDriver driver, MasterInfo masterInfo)
是mesos换了个新选举出来的master的时候触发,前提是该scheduler之前已经注册过了
没有实现。
void resourceOffers(SchedulerDriver driver, List[Offer] offers)
mesos提供了一批可用的资源,让scheduler可以使用或拒绝这些资源。每个Offer是以slave为单位的,即以slave为单位的资源列表。
得到mesos的Offers列表,,只要已经启动的executor还不足够,那么从资源列表里继续获取资源,启动CoarseGrainedExecutorBackend。
void offerRescinded(SchedulerDriver driver, OfferID offerId)
当请求的offer不可用时回调(可能是slave lost了之类的原因导致的),如果在这上面继续起task的话会报Task Lost的状态。
没有实现。spark在task scheduler层面对lost的task有自己的处理。
void statusUpdate(SchedulerDriver driver, TaskStatus status)
task状态更新回调,可能是finish了,可能是lost了,可能是fail了等等
得到finished、failed、lost等task状态,更新内存里维护的task状态,并触发新一轮的reviveOffers,即通过task scheduler继续把resource分配给需要的task并执行它们。
void frameworkMessage(SchedulerDriver driver, ExecutorID executorId, SlaveID slaveId, byte[] data)
用于接收executor主动发的消息
没有实现。mesos虽然在内部提供了这种msg接口,貌似不是很稳定。使用方可以使用自己的RPC来做executor与scheduler的通信,如果真的需要的话。
void disconnected(SchedulerDriver driver)
当scheduler与master断开的时候触发,原因可能是master挂了,或者master换了等等。
没有实现。spark前面就没有实现master新选举的接口。
void slaveLost(SchedulerDriver driver, SlaveID slaveId)
通知某个slave lost了,以便framework进行任务的rescheduler或其他逻辑
spark把slave lost和executor lost一起处理了。处理逻辑就是执行RemoveExecutor操作,最终调用TaskScheduler的executorLost方法,把executor的状态移除,并且会继续向上调用DAGScheduler的handleExecutorLost方法。因executor lost可能会影响到shuffle数据,这部分还需要BlockManager感知。
void executorLost(SchedulerDriver driver, ExecutorID executorId, SlaveID slaveId, int status)
通知某个slave上的executor挂了
void error(SchedulerDriver driver, String message)
scheduler或scheduler driver发送错误触发
另一方面,要说明一下mesos的SchedulerDriver。
它相当于是Scheduler与mesos通信的一个连接人,一方面是管理Scheduler的生命周期,二方面是与mesos交互,让它启停task。
在初始化SchedulerDriver的时候,向里面传入spark自己实现的Scheduler就可以了,即CoarseMesosSchedulerBackend或MesosSchedulerBackend。在每个mesos的Scheduler接口的回调方法里,都会传回SchedulerDriver,以对应可以进行scheduler的启停和task的启停管理。
CoarseMesosSchedulerBackend内部主要维护的信息为:
Mesos细粒度
翻译下注释:
细粒度模式下,允许多个app共享nodes,包括,即不同app的tasks可以跑同几个core,和时间,即一个core可以切换ownership。
这块共享的控制,在mesos端,所以spark在实现的时候,其实差别和难度都不大。
MesosSchedulerBackend的实现
在resourceOffers逻辑里,获得mesos提供的slave资源后,直接在里面调用scheduler的resourceOffers,即TaskSchedulerImpl的分配task的逻辑。也就是说,会按优先级,从active
task sets(来自多个app)里选择并直接launch
task。而粗粒度里的做法,是先启动executorbackend,把资源准备好,进程先拉起,供app去launch task。
其他回调接口的逻辑是大同小异的。
还有一点不同之处,粗粒度模式下executor的实现使用的是与standalone模式相同的CoarseGrainedExecutorBackend。在细粒度模式下,executor的实现是MesosExecutorBackend,实现了spark的ExecutorBackend和mesos的MesosExecutor。实际上,在类里面,还是使用的spark的executor,在对应的回调里执行start/kill task这样的操作。另外,mesos的ExecutorDriver用于负责与mesos通信,比如传递一些状态更新的消息,或有特殊的msg要发送。executor这块的差别无关紧要。
在我看来,executor这块最终一定是落到了spark的executor上,里面有一个线程池,可以跑spark的ShuffleMapTask或ResultTask。而mesos粗、细粒度的Scheduler实现,最大区别在于resourceOffers的逻辑,是怎么处理分配的进程资源:粗粒度是预先拉起执行进程,而细粒度是直接通过TaskScheduler来摆放执行线程了。
粗细粒度分别适合跑什么样的任务,可以具体见
你最喜欢的spark内核揭秘-05-SparkContext核心源码解析初体验 - stark_summer - ITeye技术网站
博客分类:
SparkContext在获得了一系列的初始化信息后开始创建并启动TaskScheduler实例:
进入createTaskScheduler方法:
我们看一下其Standalone的方式:
在上述代码中首先实例化一个TaskSchedulerImpl:
然后构建出了masterUrls:
接着创建出关键的backend:
进入SparkDeploySchedulerBackend实现:
从以上截图可以看出来,SparkDeploySchedulerBackend核心是为了启动CoarseGrainedExecutorBackend
此处使用了Akka技术进行不同机器之间的通信,CoarseGrainedExecutorBackend是具体在Worker上执行具体的任务的进程的代表,所以我们的backend实例就是用来提交任务给Executor的:
其实CoarseGrainedExecutorBackend是Executor的代理人,能够完成很多任务,例如启动一个任务:LaunchTask
回到SparkContext的Standalone的方式的代码处:
接着代码是把backend传给了TaskSchedulerImpl的initialize方法中:
在上述代码中显示处理调度模式 例如FIFO和Fair的模式。
在代码块的最后返回实例化后的backend, scheduler:
PS:妈蛋的 公司破网,spark源码没下载成功,只能在github上面看了,蛋疼
Stark_Summer
浏览: 434865 次
来自: 大连
Tachyon 能在做数据分类吗?例如我有一坨hdfs文件,将 ...
求源码,大牛
9.9 送上,希望博客长久~。~

我要回帖

更多关于 能接受帮男闺蜜解决 的文章

 

随机推荐