Coarsefind graineddExecutorBackend是什么进程,为什么kill不掉

&&国之画&&&&&&
版权所有 京ICP备号-2
迷上了代码!Spark资源调度流程浅析 - 为程序员服务
Spark资源调度流程浅析
spark的调度分为资源的调度和任务的调度。前者目前支持standalone、yarn、mesos等,后者对标hadoop的MapReduce。本文介绍资源调度的流程,能力所限仅cover到最简单的standalone,无法对比各种调度框架的优劣。
Spark中与资源调度相关的角色包括driver、master、worker和executor,其中executor更多是与任务调度有关。其交互关系如下图所示,主要交互流如下:
driver-&master:提交Application,并告知需要多少资源(cores)
master-&driver:告知Application提交成功(我们只考虑最理想的情况)
driver-&workers:发送appDescription,启动executor子进程
executor-&driver:注册executor
driver-&executor:提交Tasks,由后者执行任务
上图的虚线代表Actor交互,spark基于Actor模式的Akka包进行进程、线程、网络间的交互,以避免同步、死锁等问题,也使代码相对简单。可以注意到,worker主进程是不跟driver交互的,只有executor子进程才跟driver交互。
Driver & Master
在SparkContext初始化的时候,在SparkContext class的一个大try catch中,就会完成Application注册,在standalone mode下,主要做了以下事情:
启动心跳维持监听actor,因为executors需要同driver维持心跳
启动Job进度监听actor,在UI和LOG里需要能够看到进度
启动mapOutputTracker,因为作为reducer的executors会向driver询问map output的地址
初始化task pool,确定FIFO or FAIR的调度方式
根据masters uri的格式,确定schedulerBackend和taskScheduler的取值,其中schedulerBackend与executor端的executorBackend交互
由AppClient/ClientActor代理,同masters交互,确定可用master,由后者代理获取并启动workers上的可用executors资源
SparkContext.createTaskScheduler方法会确定backend和scheduler的取值,并调用_taskScheduler.initialize(),初始化pool并确定schedulableBuilder:FIFO、FAIR。
schedulerBackend
taskScheduler
LocalBackend
TaskSchedulerImpl
local\[([0-9]+|\*)\]
LocalBackend
TaskSchedulerImpl
local\[([0-9]+|\*)\s*,\s*([0-9]+)\]
LocalBackend
TaskSchedulerImpl
spark://(.*)
SparkDeploySchedulerBackend
TaskSchedulerImpl
local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]
SparkDeploySchedulerBackend
TaskSchedulerImpl
"yarn-standalone" | "yarn-cluster"
org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
org.apache.spark.scheduler.cluster.YarnClusterScheduler
"yarn-client"
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
org.apache.spark.scheduler.cluster.YarnScheduler
mesosUrl @ MESOS_REGEX(_)
CoarseMesosSchedulerBackend or MesosSchedulerBackend
TaskSchedulerImpl
simr://(.*)
SimrSchedulerBackend
TaskSchedulerImpl
确定两者取值之后,会立即触发与master的交互:
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
以standalone模式为例,间接调用 _schedulerBackend.start(),并阻塞等待初始化AppClient完成(即接收到master返回的RegisteredApplication消息)
AppClient就是一个wrapper,初始化了:
actor = actorSystem.actorOf(Props(new ClientActor))
ClientActor代理与master的交互,在preStart()里向每一个master uri发送RegisterApplication消息,并注册了接收处理master发送消息的多个处理方法。
Worker & Driver
deploy/worker/Worker.scala 是worker主进程,以actor方式接收master发送的消息,例如LaunchExecutor。在该消息处理方法中,通过一个子线程启动真正的executor子进程。
deploy/worker/ExecutorRunner.scala -& fetchAndRunExecutor()是启动子进程的地方。
executor/CoarseGrainedExecutorBackend.scala -& onStart() 里会向driver发送RegisterExecutor消息,由driver的schedulerBackend接收并处理。在该消息里提交了executor的相关信息,driver会将executor信息存储在executorDataMap对象里,并触发makeOffers方法,分配pending的tasks。
Driver端的消息处理方法如下:
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =&
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
context.reply(RegisteredExecutor)
addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (numPendingExecutors & 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
listenerBus.post(
// 通过bus,向所有监听SparkListenerExecutorAdded的线程发送通知,这里好像没人关注。
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
// Make fake resource offers on all executors ,
其中makerOffers方法会分配tasks,是把资源调度和Task调度融合的地方,调用该方法的地方还有:
case StatusUpdate
case ReviveOffers
submitTasks以及任务重试等时机时,会调用
case RegisterExecutor
即一旦有executor注册,就看看有没有需要分配的任务
该方法调用scheduler.resourceOffers ,每次尽量调度更多的tasks,主要考虑的是:
本地化 recomputeLocality,PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
负载均衡 round robin
任务的重跑
任务 + executorId的黑名单
各个状态的任务列表
其中调用的scheduler.resourceOffers关键代码如下,针对每个taskSet,尽量多的调度task,在分配时,尽量分配locality的task:
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet &- sortedTaskS maxLocality &- taskSet.myLocalityLevels) {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
在scheduler.resourceOffers子方法中,也会告知driver,有task状态的改变,可用通过listenerBus告知监听者:
sched.dagScheduler.taskStarted(task, info)
eventProcessLoop.post(BeginEvent(task, taskInfo))
在makeOffers的最后,调用launchTasks,这时才是真正发起任务。workder端的CoarseGrainedExecutorBackend.scala接收到消息后,会继续执行。
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
Worker主进程启动executor子进程后(独立JVM),后者通过actor方式向driver注册自己
driver接收到RegisterExecutor消息,会检查有没有pending的tasks,并且task的localityLevel与executorId匹配
如果有合适的tasks(可能多个),则会再通过actor方式发送LaunchTask消息给executor,由后者真正执行任务
driver、master、worker里有哪些相关的子线程、子进程?
driver里至少有工作主进程、通过ClientActor与master通信的子线程们、通过SchedulerBackend与executors通信的子线程们、关注各种进度的listenerBus相关子线程、通过MapOutputTrackerMasterEndpoint管理mapOutput位置的子线程们、心跳维持的子线程,并且应该还有block、http等
standalone的master采用actor子线程维护可用workers列表,包括资源使用情况;并且接收driver发起的app注册请求等。如果是使用zk进行master选主,还得维持与zk之间的连接。另外,还有UI界面。
worker主进程需要维持与master之间的心跳,汇报资源使用情况。executors子进程需要同driver间维持心跳。主进程通过子线程启动executor子进程。另外还有block、shuffle output(不确定是否直接复用了block线程or进程)等。
模块、进程、线程间如何通信的?数据结构体如何?
scala程序,网络、进程、线程间主要通过Actor方式进行交互,资源调度主要走这条流
python通过py4j调用java代理的scala程序
shuffle output等通过netty交互数据
broadcast通过p2p交互数据
master怎么知道有哪些可用worker的?
worker启动时,向master发送RegisterWorker消息,由Master类通过Actor方式处理
任务调度的算法是什么?有没有优缺点?trait SchedulerBuilder , FairSchedulingAlgorithm/FIFOSchedulingAlgorithm
资源调度的依据,对可用executor 随机化,locality的原则,每个worker有cores限制,当前每个task只能占用1个core
jar、files、library是这时发送给executor的吗?driver还是master发送的呢?
driver提交给master的是appDescription,其中包含的command只是指定了executorBackend类,未涉及app具体的代码
driver在executor注册后,向其发送的launchTask消息才包含真正的task信息(files、jars名称),由TaskRunner中反序列化并拉取文件
standalone、yarn、mesos切换对driver、master、worker都有什么影响呢?使用到哪些类?
目前只能看出有不同的backend类,实现功能,没有比较优劣。
同一worker上的资源隔离做到哪一步?JVM可以控制memory,CPU、IO、DISK呢?
我们当前的配置,一台worker上只能有一个executor,后者是在driver-&master-&Worker时生成的
每个executor在调度时,可能会被分配多个tasks,只要availableCpus(i) &= CPUS_PER_TASK
executorBackend在接收到launchTask请求时,会生成TaskRunner对象,并由threadpool调度执行,即如果有多个tasks到达,就会多线程并发执行了
这些属于同一个executor的tasks共享一个JVM,所以共享executor.memory等限制
JVM、python进程复用 是如何做到的?
standalone cluster manager type时,如何与zk配合?
ZooKeeperPersistenceEngine和ZooKeeperLeaderElectionAgent 在Master类里被初始化。后者在new时直接start了,基于apache curator.apache.org实现,其isLeader、notLeader方法会被curator的方法调用。
最后,放一张简化后的时序图:
原文地址:, 感谢原作者分享。
您可能感兴趣的代码Spark问题笔记1
Spark问题笔记1
我们知道Spark总是以集群的方式运行的,Standalone的部署方式是集群方式中最为精简的一种(另外的是Mesos和Yarn)。Standalone模式中,资源调度是自己实现的,是MS架构的集群模式,故存在单点故障问题。
下面提出几个问题并解决:
1、Standalone部署方式下包含哪些节点?
& & 由不同级别的三个节点组成,分别是Master主控节点、Worker工作节点、客户端节点;
& & (1)其中Master主控节点,顾名思义,类似于领导者,在整个集群中,最多只有一个Master处于Active状态。在使用spark-shell等交互式运行或者使用官方提供的run-example实例时,Driver运行在Master节点中;若是使用spark-submit工具进行任务的提交或者IDEA等工具开发运行任务时,Driver是运行在本地客户端的。
& & Master一方面负责各种信息,比如Driver、Worker、Application的注册;另一方面还负责Executor的启动,Worker心跳等诸多信息的处理。
& & (2)Woker节点,类似于yarn中的NodeManager,在整个集群中,可以有多个Worker(&0)。负责当前WorkerNode上的资源汇报、监督当前节点运行的Executor。并通过心跳机制来保持和Master的存活性连接。Executor受到Worker掌控,一个Worker启动Executor的个数受限于 机器中CPU核数。每个Worker节点存在一个多个CoarseGrainedExecutorBackend进程,每个进程包含一个Executor对象,该对象持有一个线程池,每个线程执行一个Task。
2、基本的概念?
& &(1)Application:指的是用户编写的Spark应用程序,包含了含有一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码。
& &(2)Driver:运行Application的main()函数并创建SparkContext,SparkContext的目的是为了准备Spark应用程序的运行环境。SparkContext负责资源的申请、任务分配和监控等。当Executor运行结束后,Driver负责关闭SparkContext;
& &(3)Job:一个Application可以产生多个Job,其中Job由Spark Action触发产生。每个Job包含多个Task组成的并行计算。
& &(4)Stage:每个Job会拆分为多个Task,作为一个TaskSet,称为Stage;Stage的划分和调度是由DAGScheduler负责的。Stage分为Result Stage和Shuffle Map Stage;
& &(5)Task:Application的运行基本单位,Executor上的工作单元。其调度和 管理又TaskScheduler负责。
& &(6)RDD:Spark基本计算单元,是Spark最核心的东西。表示已被分区、被序列化、不可变的、有容错机制的、能被并行操作的数据集合。
& & (7) DAGScheduler:根据Job构建基于Stage的DAG,划分Stage依据是RDD之间的依赖关系。
& &(8)TaskScheduler:将TaskSet提交给Worker运行,每个Worker运行了什么Task于此处分配。同时还负责监控、汇报任务运行情况等。
3、Standalone启动过程是啥?
& &(1)首先,启动master,worker节点。
& & &worker启动后触发Master的RegisterWorker事件,进行注册。主要讲要注册的Worker信息封装成WorkerInfo对象,包括Worker节点的CPU、内存等基本信息。记录Worker的信息(IP、Address)到master缓存中(HashMap),若Worker节点的注册信息有效,持久化已注册的Worker信息。然后给个完成注册的反馈信号。
& &(2)提交Application
& & &运行spark-shell时,会由Driver端的DAGScheduler向Master发送RegisterApplication请求。根据此请求信息会创建ApplicationInfo对象,将Application加入到Master的缓存apps中,这个结构是HashSet。
& & &如果worker已经注册,发送lanchExecutor指令给相应的Worker。
& &(3)Work收到lanchExecutor后,会由ExecutorRunner启动Excutor进程,启动的Executor进程会根据启动时的入参,将自己注册到Drive中的ScheduleBackend。
& & (4)&ScheduleBackend收到Excutor的注册信息后,会将提交到的Spark Job分解为多个具体的Task,然后通过LaunchTask指令将这些Task分散到各个Executor上运行。
4、Standalone部署方式下某一节点出现问题时,系统如何处理?
& & &出现问题的节点可能发生的情况有三种:
& & (1)Master崩掉了:这个坏掉了,就真的没法完了。单点故障的问题。
& & & & &有两种解决办法:第一种基于文件系统的故障恢复,适合Master进程本身挂掉,那直接重启就Ok了。
& & & & & & & & & & & & &第二种是基于ZookerKeep的HA方式。此方式被许多的分布式框架使用。
& & (2)某一Worker崩掉了:
& & & & & 若是所有的Worker挂掉,则整个集群就不可用;
& & & & & Worker退出之前,会将管控的所有Executor进程kill;由于Worker挂掉,不能向master玩心跳了,根据超时处理会知道Worker挂了,然后Master将相应的情况汇报给Driver。Driver会根据master的信息和没有收到Executor的StatusUpdate确定这个Worker挂了,则Driver会将这个注册的Executor移除。
& & (3)某Worker的Excutor崩掉了:
& & & & &Excutor的作为一个独立的进程在运行,由ExcutorRunner线程启动,并收到ExcutorRunner的监控,当Excutor挂了,ExcutorRunner会注意到异常情况,将ExecutorStateChanged汇报给Master,master会再次发送lanchExecutor指令给相应的Worker启动相应的Excutor。
版权声明:本文为博主原创文章,未经博主允许不得转载。
现在的服务器有很多都弱爆了,有谁想要了解最强大的服务器就和我联系,也可以通过隐形云找我
本分类共有文章7篇,更多信息详见
& 2012 - 2016 &
&All Rights Reserved. &
/*爱悠闲图+*/
var cpro_id = "u1888441";
/*爱悠闲底部960*75*/
var cpro_id = "u1888128";ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
3 messages
Open this post in threaded view
Report Content as Inappropriate
ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
I have this problem with a job. A random executor gets this
ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
Almost always at the same point in the processing of the data. I am processing 1 mil files with sc.wholeText. At around the 600.000th file, a container receives this signal. On the driver i get:
15/07/03 14:20:11 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. cruncher03.stratified:44617
15/07/03 14:20:11 ERROR cluster.YarnClusterScheduler: Lost executor 3 on cruncher03.stratified: remote Rpc client disassociated
15/07/03 14:20:11 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@cruncher03.stratified:44617] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/07/03 14:20:11 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. cruncher03.stratified:44617
15/07/03 14:20:11 INFO scheduler.TaskSetManager: Re-queueing tasks for 3 from TaskSet 5.0
There is plenty of memory on the machine and container jvm, so I don't think it is an OOM (after all it would be a SIGKILL) or an OutOfMemory (there is no out of mem exception)
What can be causing this?
Open this post in threaded view
Report Content as Inappropriate
Re: ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
I'm running into the same issue. Are you running a custom spark build too?
Open this post in threaded view
Report Content as Inappropriate
Re: ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
No, just the build for hadoop 2.6
What some others pointed out is that this is caused by yarn killing
the executor. That was my case. Check/grep your yarn-nodemanager
logs for "is running beyond physical memory limits"
On 28/07/15 23:41, rohit.sureka [via
Apache Spark User List] wrote:
I'm running into the same issue. Are you running a
custom spark build too?
If you reply to this email, your
message will be added to the discussion below:
To unsubscribe from ERROR executor.CoarseGrainedExecutorBackend:
RECEIVED SIGNAL 15: SIGTERM, .
Loading...17406人阅读
spark(33)
7 内存溢出问题
& & 在Spark中使用hql方法执行hive语句时,由于其在查询过程中调用的是Hive的获取元数据信息、SQL解析,并且使用Cglib等进行序列化反序列化,中间可能产生较多的class文件,导致JVM中的持久代使用较多,如果配置不当,可能引起类似于如下的OOM问题:
Exception in thread &Thread-2& java.lang.OutOfMemoryError: PermGen space
原因是实际使用时,如果用的是JDK1.6版本,Server模式的持久代默认大小是64M,Client模式的持久代默认大小是32M,而Driver端进行SQL处理时,其持久代的使用可能会达到90M,导致OOM溢出,任务失败。
解决方法就是在Spark的conf目录中的spark-defaults.conf里,增加对Driver的JVM配置,因为Driver才负责SQL的解析和元数据获取。配置如下:
spark.driver.extraJavaOptions -XX:PermSize=128M -XX:MaxPermSize=256M& &
但是,上述情况是在yarn-cluster模式下出现,yarn-client模式运行时倒是正常的,原来在$SPARK_HOME/bin/spark-class文件中已经设置了持久代大小:
JAVA_OPTS=&-XX:MaxPermSize=256m $OUR_JAVA_OPTS&
当以yarn-client模式运行时,driver就运行在客户端的spark-submit进程中,其JVM参数是取的spark-class文件中的设置,所谓未出现持久代溢出现象。
& & 总结一下Spark中各个角色的JVM参数设置:& &
(1)Driver的JVM参数:
-Xmx,-Xms,如果是yarn-client模式,则默认读取-env文件中的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值一样大小;如果是yarn-cluster模式,则读取的是spark-default.conf文件中的spark.driver.extraJavaOptions对应的JVM参数值。
PermSize,如果是yarn-client模式,则是默认读取-class文件中的JAVA_OPTS=&-XX:MaxPermSize=256m
$OUR_JAVA_OPTS&值;如果是yarn-cluster模式,读取的是spark-default.conf文件中的spark.driver.extraJavaOptions对应的JVM参数值。
GC方式,如果是yarn-client模式,默认读取的是spark-class文件中的JAVA_OPTS;如果是yarn-cluster模式,则读取的是spark-default.conf文件中的spark.driver.extraJavaOptions对应的参数值。
以上值最后均可被spark-submit工具中的--driver-java-options参数覆盖。
(2)Executor的JVM参数:
-Xmx,-Xms,如果是yarn-client模式,则默认读取spark-env文件中的SPARK_EXECUTOR_MEMORY值,-Xmx,-Xms值一样大小;如果是yarn-cluster模式,则读取的是spark-default.conf文件中的spark.executor.extraJavaOptions对应的JVM参数值。
PermSize,两种模式都是读取的是-default.conf文件中的spark.executor.extraJavaOptions对应的JVM参数值。
GC方式,两种模式都是读取的是spark-default.conf文件中的spark.executor.extraJavaOptions对应的JVM参数值。
(3)Executor数目及所占CPU个数
如果是yarn-client模式,Executor数目由spark-env中的SPARK_EXECUTOR_INSTANCES指定,每个实例的数目由SPARK_EXECUTOR_CORES指定;如果是yarn-cluster模式,Executor的数目由spark-submit工具的--num-executors参数指定,默认是2个实例,而每个Executor使用的CPU数目由--executor-cores指定,默认为1核。
每个Executor运行时的信息可以通过yarn logs命令查看到,类似于如下:
14/08/13 18:12:59 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Setting up executor with commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p', -Xms1024m -Xmx1024m , -XX:PermSize=256M -XX:MaxPermSize=256M -verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -Xloggc:/tmp/_gc.log,
-Djava.io.tmpdir=$PWD/tmp, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler, 1, sparktest2, 3, 1&, &LOG_DIR&/stdout, 2&, &LOG_DIR&/stderr)
&&其中,akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler表示当前的Executor进程所在节点,后面的1表示Executor编号,sparktest2表示ApplicationMaster的host,接着的3表示当前Executor所占用的CPU数目。
8 序列化异常
在Spark上执行hive语句的时候,出现类似于如下的异常:
org.apache.spark.SparkDriverExecutionException: Execution error
& & at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:849)
& & at org.apache..scheduler.DAGSchedulerEventProcessActor$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
& & at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
& & at akka.actor.ActorCell.invoke(ActorCell.scala:456)
& & at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
& & at akka.dispatch.Mailbox.run(Mailbox.scala:219)
& & at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
& & at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
& & at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
& & at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
& & at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: scala.collection.mutable.HashSet cannot be cast to scala.collection.mutable.BitSet
& & at org.apache.spark..execution.BroadcastNestedLoopJoin$anonfun$7.apply(joins.scala:336)
& & at org.apache.spark.rdd.RDD$anonfun$19.apply(RDD.scala:813)
& & at org.apache.spark.rdd.RDD$anonfun$19.apply(RDD.scala:810)
& & at org.apache..scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
& & at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:845)
排查其前后的日志,发现大都是序列化的东西:
14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Serialized task 8.0:3 as 20849 bytes in 0 ms
14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Finished TID 813 in 25 ms on sparktest0 (progress: 3/200)
而在-default.conf中,事先设置了序列化方式为Kryo:
.serializer
org.apache.spark.serializer.KryoSerializer
根据异常信息,可见是HashSet转为BitSet类型转换失败,Kryo把松散的HashSet转换为了紧凑的BitSet,把序列化方式注释掉之后,任务可以正常执行。难道Spark的Kryo序列化做的还不到位?此问题需要进一步跟踪。
9 Executor僵死问题
& & 运行一个Spark任务,发现其运行速度远远慢于执行同样SQL语句的Hive的执行,甚至出现了OOM的错误,最后卡住达几小时!并且Executor进程在疯狂GC。
& & 截取其一Task的OOM异常信息:
可以看到这是在序列化过程中发生的OOM。根据节点信息,找到对应的Executor进程,观察其Jstack信息:
Thread 36169: (state = BLOCKED)
- java.lang.Long.valueOf(long) @bci=27, line=557 (Compiled frame)
- com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=5,&=113
(Compiled frame)
- com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=103 (Compiled frame)
- com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
- com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=158,&=338
(Compiled frame)
- com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=293 (Compiled frame)
- com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136,&=651
(Compiled frame)
- com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
- com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
- com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136,&=651
(Compiled frame)
- com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
- com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
- com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158,&=732
(Compiled frame)
- org.apache.spark.serializer.KryoDeserializationStream.readObject(scala.reflect.ClassTag) @bci=8, line=118 (Compiled frame)
- org.apache.spark.serializer.DeserializationStream$anon$1.getNext() @bci=10, line=125 (Compiled frame)
- org.apache.spark.util.NextIterator.hasNext() @bci=16, line=71 (Compiled frame)
- org.apache..storage.BlockManager$LazyProxyIterator$1.hasNext()
@bci=4,&=1031
(Compiled frame)
- scala.collection.Iterator$anon$13.hasNext() @bci=4, line=371 (Compiled frame)
- org.apache.pletionIterator.hasNext() @bci=4, line=30 (Compiled frame)
- org.apache.spark.InterruptibleIterator.hasNext() @bci=22, line=39 (Compiled frame)
- scala.collection.Iterator$anon$11.hasNext() @bci=4, line=327 (Compiled frame)
- org.apache..sql.execution.HashJoin$anonfun$execute$1.apply(scala.collection.Iterator,
scala.collection.Iterator) @bci=14,&=77
(Compiled frame)
- org.apache.spark..execution.HashJoin$anonfun$execute$1.apply(java.lang.Object,
java.lang.Object) @bci=9, line=71 (Interpreted frame)
- org.apache.spark.pute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=48, line=87 (Interpreted frame)
- org.apache..puteOrReadCheckpoint(org.apache.spark.Partition,
org.apache.spark.TaskContext) @bci=26,&=262
(Interpreted frame)
有大量的BLOCKED线程,继续观察GC信息,发现大量的FULL GC。
& & 分析,在插入Hive表的时候,实际上需要写HDFS,在此过程的HashJoin时,伴随着大量的Shuffle写操作,JVM的新生代不断GC,Eden Space写满了就往Survivor Space写,同时超过一定大小的数据会直接写到老生代,当新生代写满了之后,也会把老的数据搞到老生代,如果老生代空间不足了,就触发FULL GC,还是空间不够,那就OOM错误了,此时线程被Blocked,导致整个Executor处理数据的进程被卡住。
当处理大数据的时候,如果JVM配置不当就容易引起上述问题。解决的方法就是增大Executor的使用内存,合理配置新生代和老生代的大小,可以将老生代的空间适当的调大点
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:940871次
积分:11291
积分:11291
排名:第874名
原创:270篇
转载:115篇
评论:216条
(2)(4)(9)(8)(2)(17)(7)(8)(14)(2)(6)(1)(2)(6)(2)(5)(5)(12)(9)(6)(18)(10)(12)(3)(5)(10)(22)(14)(4)(11)(1)(6)(1)(3)(1)(4)(12)(1)(1)(4)(3)(22)(1)(10)(9)(3)(3)(4)(1)(1)(2)(1)(1)(1)(3)(2)(2)(1)(1)(3)(1)(3)(1)(1)(5)(2)(5)(8)(2)(9)(16)(2)(6)

我要回帖

更多关于 finergrained 的文章

 

随机推荐