有关球探网即时比分分网站怎么判断?

Spark-1.6以后RPC默认使用Netty替代Akka,在Netty上加了一层封装,为实现对Spark的定制开发,必须对RPC实现方式有比较清晰的了解,本文解读Spark RPC实现。 本文代码全部出自于Spark-2.0.0 。
Akka是一个异步的消息框架,所谓的异步,简言之就是消息发送方发送出消息,不用阻塞等待结果,接收方处理完返回结果即可。Akka支持百万级的消息传递,特别适合复杂的大规模分布式系统。Akka基于Actor模型,提供用于创建可扩展,弹性,快速响应的应用程序的平台。Actor封装了状态和行为的对象,不同的Actor之间可通过消息交换实现通信,每个Actor都有自己的消息收件箱。Akka可以简化并发场景下的开发,其异步,高性能的事件驱动模型,轻量级的事件处理可大大方便用于开发复杂的分布式系统。早期Spark大量采用Akka作为RPC。Netty也是一个知名的高性能,异步消息框架,Spark早期便使用它解决大文件传输问题,用来克服Akka的短板。根据
,因为很多Spark用户饱受Akka复杂依赖关系的困扰,所以后来干脆就直接用Netty代替了Akka。因为Akka和Netty基础知识不是本文的重点,相关知识可自行查阅。
Spark-1.6+中的RPC
Spark-1.6以前的版本中RPC是采用Akka实现,我们以master和worker之间的通信为例解释Akka RPC的工作模式。Spark-1.6以后,RPC默认采用Netty实现。RpcEndpoint注册到RpcEnv上然后才能接收消息,RpcEnv处理从RpcEndpointRef或者其他远程节点的消息,然后回发给相应的RpcEndpoint。Rpc组件之间的关系见下图:
master实现
master的定义
我们看master类的定义
private[deploy] classMaster(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
可以看到master类继承自ThreadSafeRpcEndpoint,进一步定位可以发现ThreadSafeRpcEndpoint是继承自RpcEndpoint特质。
master启动
再看master的启动方法:
defstartRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
RpcEndpoint特质
master的启动会创建一个RpcEnv并将自己注册到其中。继续看RpcEndpoint特质的定义:
private[spark] traitRpcEndpoint{
//当前RpcEndpoint注册到的RpcEnv主子,可以类比为Akka中的actorSystem
val rpcEnv: RpcEnv
//直接用来发送消息的RpcEndpointRef,可以类比为Akka中的actorRef
final defself: RpcEndpointRef = {
require(rpcEnv != null, &rpcEnv has not been initialized&)
rpcEnv.endpointRef(this)
//处理来自RpcEndpointRef.send或者RpcCallContext.reply的消息
defreceive: PartialFunction[Any, Unit] = {
case _ =& throw new SparkException(self + & does not implement 'receive'&)
//处理来自RpcEndpointRef.ask的消息,会有相应的回复
defreceiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ =& context.sendFailure(new SparkException(self + & won't reply anything&))
//篇幅限制,其余onError,onConnected,onDisconnected,onNetworkError,
//onStart,onStop,stop方法此处省略
RpcEnv抽象类
我们来看看RpcEnv的具体内容:
private[spark] abstract classRpcEnv(conf:SparkConf){
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
//返回endpointRef
private[rpc] defendpointRef(endpoint: RpcEndpoint): RpcEndpointRef
//返回RpcEnv监听的地址
defaddress: RpcAddress
//注册一个RpcEndpoint到RpcEnv并返回RpcEndpointRef
defsetupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
//通过uri异步地查询RpcEndpointRef
defasyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
//通过uri查询RpcEndpointRef,这种方式会产生阻塞
defsetupEndpointRefByURI(uri: String): RpcEndpointRef = {
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
//通过address和endpointName查询RpcEndpointRef,这种方式会产生阻塞
defsetupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
//关掉endpoint
defstop(endpoint: RpcEndpointRef): Unit
//关掉RpcEnv
defshutdown(): Unit
//等待结束
defawaitTermination(): Unit
//没有RpcEnv的话RpcEndpointRef是无法被反序列化的,这里是反序列化逻辑
defdeserialize[T](deserializationAction: () =& T): T
//返回文件server实例
deffileServer: RpcEnvFileServer
//开一个针对给定URI的channel用来下载文件
defopenChannel(uri: String): ReadableByteChannel
另外RpcEnv有一个伴生对象,实现了create方法:
private[spark] objectRpcEnv{
defcreate(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv = {
val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
new NettyRpcEnvFactory().create(config)
这就是在master启动方法中的create具体实现,可以看到调用了Netty工厂方法NettyRpcEnvFactory,该方法是对Netty的具体封装。
master中消息处理
上文可以看到,在RpcEndpoint中最核心的便是receive和receiveAndReply方法,定义了消息处理的核心逻辑,master中也有相应的实现:
override defreceive: PartialFunction[Any, Unit] = {
case ElectedLeader =&
case CompleteRecovery =&
case RevokedLeadership =&
case RegisterApplication(description, driver) =&
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =&
case DriverStateChanged(driverId, state, exception) =&
case Heartbeat(workerId, worker) =&
case MasterChangeAcknowledged(appId) =&
case WorkerSchedulerStateResponse(workerId, executors, driverIds) =&
case WorkerLatestState(workerId, executors, driverIds) =&
case UnregisterApplication(applicationId) =&
case CheckForWorkerTimeOut =&
这里定义了master一系列的消息处理逻辑,而receiveAndReply中,
override defreceiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =&
case RequestSubmitDriver(description) =&
case RequestKillDriver(driverId) =&
case RequestDriverStatus(driverId) =&
case RequestMasterState =&
case BoundPortsRequest =&
case RequestExecutors(appId, requestedTotal) =&
case KillExecutors(appId, executorIds) =&
定义了对需要回复的消息组的处理逻辑。
worker实现
worker的定义
我们看看worker类的定义:
private[deploy] classWorker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging {
和master同样继承自ThreadSafeRpcEndpoint特质,也是一个Endpoint类型。
worker启动方法
有了前面master的分析,按照相同的逻辑,我们直接看worker的启动:
defstartRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse(&&)
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
自然而然的,和master启动方法基本如出一辙,但是问题出在,worker是如何和master建立联系的?其实细心一点你就可以发现,在master消息处理中,receiveAndReply方法第一个处理的消息便是RegisterWorker,也就是处理来自worker的注册消息,那么事情就变得简单了,worker是通过将自己注册到master的RpcEnv中,然后实现通信的。
worker注册到master RpcEnv
定位到worker中发送RegisterWorker消息到master Endpoint的方法:
private defregisterWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use &ThreadUtils.sameThread&
case Success(msg) =&
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
case Failure(e) =&
logError(s&Cannot register with master:${masterEndpoint.address}&, e)
System.exit(1)
}(ThreadUtils.sameThread)
其中,masterEndpoint.ask是核心,发送了一个RegisterWorker消息到masterEndpoint并期待对方的RegisterWorkerResponse,对response做出相应的处理。这样worker就成功和master建立了连接,它们之间可以互相发送消息进行通信。ps:其实worker注册到master的步骤有一点复杂,涉及到容错等问题,具体的实现这里暂时不做讨论。
worker到master的通信
worker和master之间是一个主从关系,worker注册到master之后,master就可以通过消息传递实现对worker的管理,在worker中有一个方法:
private defsendToMaster(message: Any): Unit = {
master match {
case Some(masterRef) =& masterRef.send(message)
case None =&
logWarning(
s&Dropping$messagebecause the connection to master has not yet been established&)
一目了然,就是干的发送消息到master的活儿,在worker中很多地方都用到这个方法,比如handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged)方法中,sendToMaster(executorStateChanged)就向masterRef发送了executorStateChanged消息,前文中master中的recevie方法中,就有一个对ExecutorStateChanged消息的处理逻辑。
master到worker的通信
同样的,master要对worker实现管理也是通过发送消息实现的,比如launchExecutor(worker: WorkerInfo, exec: ExecutorDesc)方法中:
private deflaunchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo(&Launching executor & + exec.fullId + & on worker & + worker.id)
worker.addExecutor(exec)
//向worker发送LaunchExecutor消息
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
master向worker发送了LaunchExecutor消息告诉worker应该启动executor了,而worker中的receive方法中对LaunchExecutor消息进行处理并完成master交代给自己的任务。
复杂的分布式系统中RPC是最重要的一个模块,得益于前人的工作,Akka亦或Netty都为我们提供了完备的RPC实现框架。通过这些框架,我们可以不必去关心RPC实现方式,设计出复杂的分布式系统。本文只是本人在Spark学习过程中的一个记录,分布式系统远远不止这么简单,学习之路依旧漫长。21026人阅读
Spark(60)
1. Master挂掉,standby重启也失效
Master默认使用512M内存,当集群中运行的任务特别多时,就会挂掉,原因是master会读取每个task的event log日志去生成spark ui,内存不足自然会OOM,可以在master的运行日志中看到,通过HA启动的master自然也会因为这个原因失败。
增加Master的内存占用,在Master节点spark-env.sh 中设置:
export SPARK_DAEMON_MEMORY 10g # 根据你的实际情况
减少保存在Master内存中的作业信息
spark.ui.retainedJobs 500
# 默认都是1000
spark.ui.retainedStages 500
2. worker挂掉或假死
有时候我们还会在web ui中看到worker节点消失或处于dead状态,在该节点运行的任务则会报各种 lost worker 的错误,引发原因和上述大体相同,worker内存中保存了大量的ui信息导致gc时失去和master之间的心跳。
增加Master的内存占用,在Worker节点spark-env.sh 中设置:
export SPARK_DAEMON_MEMORY 2g # 根据你的实际情况
减少保存在Worker内存中的Driver,Executor信息
spark.worker.ui.retainedExecutors 200
# 默认都是1000
spark.worker.ui.retainedDrivers 200
二. 运行错误
1.shuffle FetchFailedException
missing output location
org.apache.spark.shuffle.MetadataFetchFailedException:
Missing an output location for shuffle 0
shuffle fetch faild
org.apache.spark.shuffle.FetchFailedException:
Failed to connect to spark.168.47.215:50268
当前的配置为每个executor使用1core,5GRAM,启动了20个executor
这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,直到application失败。
一般遇到这种问题提高executor内存即可,同时增加每个executor的cpu,这样不会减少task并行度。
spark.executor.memory 15G
spark.executor.cores 3
spark.cores.max 21
启动的execuote数量为:7个
execuoterNum = spark.cores.max/spark.executor.cores
每个executor的配置:
3core,15G RAM
消耗的内存资源为:105G RAM
15G*7=105G
可以发现使用的资源并没有提升,但是同样的任务原来的配置跑几个小时还在卡着,改了配置后几分钟就能完成。
2.Executor&Task Lost
executor lost
WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local):
ExecutorLostFailure (executor lost)
WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 8.47.217):
java.io.IOException: Connection from /192.168.47.217:55483 closed
各种timeout
java.util.concurrent.TimeoutException: Futures timed out after [120 second]
ERROR TransportChannelHandler: Connection to /192.168.47.212:35409
has been quiet for 120000 ms while there are outstanding requests.
Assumin please adjust spark.network.
timeout if this is wrong
由网络或者gc引起,worker或executor没有接收到executor或task的心跳反馈。
提高 spark.network.timeout 的值,根据情况改成300(5min)或更高。
默认为 120(120s),配置所有网络传输的延时,如果没有主动设置以下参数,默认覆盖其属性
spark.core.connection.ack.wait.timeout
spark.akka.timeout
spark.storage.blockManagerSlaveTimeoutMs
spark.shuffle.io.connectionTimeout
spark.rpc.askTimeout or spark.rpc.lookupTimeout
差距不大的几个task,有的运行速度特别慢。
大多数任务都完成了,还有那么一两个任务怎么都跑不完或者跑的很慢,分为数据倾斜和task倾斜两种。
数据倾斜大多数情况是由于大量的无效数据引起,比如null或者”“,也有可能是一些异常数据,比如统计用户登录情况时,出现某用户登录过千万次的情况,无效数据在计算前需要过滤掉。
数据处理有一个原则,多使用filter,这样你真正需要分析的数据量就越少,处理速度就越快。
sqlContext.sql("...where col is not null and col != ''")
具体可参考:
task倾斜原因比较多,网络io,cpu,mem都有可能造成这个节点上的任务执行缓慢,可以去看该节点的性能监控来分析原因。以前遇到过同事在spark的一台worker上跑R的任务导致该节点spark task运行缓慢。
或者可以开启spark的推测机制,开启推测机制后如果某一台机器的几个task特别慢,推测机制会将任务分配到其他机器执行,最后Spark会选取最快的作为最终结果。
spark.speculation true
spark.speculation.interval 100 - 检测周期,单位毫秒;
spark.speculation.quantile 0.75 - 完成task的百分比时启动推测
spark.speculation.multiplier 1.5 - 比其他的慢多少倍时启动推测。
堆内存溢出
java.lang.OutOfMemoryError: Java heap space
内存不够,数据太多就会抛出OOM的Exeception,主要有driver OOM和executor OOM两种
driver OOM
一般是使用了collect操作将所有executor的数据聚合到driver导致。尽量不要使用collect操作即可。
executor OOM
可以按下面的内存优化的方法增加code使用内存空间
增加executor内存总量,也就是说增加spark.executor.memory的值
增加任务并行度(大任务就被分成小任务了),参考下面优化并行度的方法
5.task not serializable
org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException: ...
如果你在worker中调用了driver中定义的一些变量,Spark就会将这些变量传递给Worker,这些变量并没有被序列化,所以就会看到如上提示的错误了。
val x = new X()
dd.map{r =& x.doSomething(r) }.collect
除了上文的map,还有filter,foreach,foreachPartition等操作,还有一个典型例子就是在foreachPartition中使用数据库创建连接方法。这些变量没有序列化导致的任务报错。
下面提供三种解决方法:
将所有调用到的外部变量直接放入到以上所说的这些算子中,这种情况最好使用foreachPartition减少创建变量的消耗。
将需要使用的外部变量包括sparkConf,SparkContext,都用 @transent进行注解,表示这些变量不需要被序列化
将外部变量放到某个class中对类进行序列化。
6.driver.maxResultSize太小
Caused by: org.apache.spark.SparkException:
Job aborted due to stage failure: Total size of serialized
results of 374 tasks (1026.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)
spark.driver.maxResultSize默认大小为1G
每个Spark action(如collect)所有分区的序列化结果的总大小限制,简而言之就是executor给driver返回的结果过大,报这个错说明需要提高这个值或者避免使用类似的方法,比如countByValue,countByKey等。
将值调大即可
spark.driver.maxResultSize 2g
7.taskSet too large
WARN TaskSetManager: Stage 198 contains a task of very large size (5953 KB). The maximum recommended task size is 100 KB.
这个WARN可能还会导致ERROR
Caused by: java.lang.RuntimeException: Failed to commit task
Caused by: org.apache.spark.executor.CommitDeniedException: attempt__0218_m_: Not committed because the driver did not authorize commit
如果你比较了解spark中的stage是如何划分的,这个问题就比较简单了。
一个Stage中包含的task过大,一般由于你的transform过程太长,因此driver给executor分发的task就会变的很大。
所以解决这个问题我们可以通过拆分stage解决。也就是在执行过程中调用cache.count缓存一些中间数据从而切断过长的stage。
8. driver did not authorize commit
9. 环境报错
driver节点内存不足
driver内存不足导致无法启动application,将driver分配到内存足够的机器上或减少driver-memory
Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0x7296, 0)
error=’Cannot allocate memory’ (errno=12)
hdfs空间不够
hdfs空间不足,event_log无法写入,所以 ListenerBus会报错 ,增加hdfs空间(删除无用数据或增加节点)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
File /tmp/spark-history/app-52-0072.inprogress
could only be replicated to 0 nodes instead of minReplication (=1)
ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
spark编译包与hadoop版本不一致
下载对应hadoop版本的spark包或自己编译。
java.io.InvalidClassException: org.apache.spark.rdd.RDD;
local class incompatible: stream classdesc serialVersionUID
driver机器端口使用过多
在一台机器上没有指定端口的情况下,提交了超过15个任务。
16/03/16 16:03:17 ERROR SparkUI: Failed to bind SparkUI
java.net.BindException: 地址已在使用: Service 'SparkUI' failed after 16 retries!
提交任务时指定app web ui端口号解决:
--conf spark.ui.port=xxxx
使用write.csv等方法写出到hdfs的文件,中文乱码。JVM使用的字符集如果没有指定,默认会使用系统的字符集,因为各个节点系统字符集并不都是UTF8导致,所以会出现这个问题。直接给JVM指定字符集即可。
spark-defaults.conf
spark.executor.extraJavaOptions -Dfile.encoding=UTF-8
三. 一些python错误
1.python版本过低
java.io.UIException: Cannot run program "python2.7": error=2,没有那个文件或目录
spark使用的python版本为2.7,centOS默认python版本为2.6,升级即可。
2.python权限不够
部分节点上有错误提示
java.io.IOExeception: Cannot run program "python2.7": error=13, 权限不够
新加的节点运维装2.7版本的python,python命令是正确的,python2.7却无法调用,只要改改环境变量就好了。
3.pickle使用失败
TypeError: ('__cinit__() takes exactly 8 positional arguments (11 given)',
&type 'sklearn.tree._tree.Tree'&, (10, array([1], dtype=int32), 1,
&sklearn.tree._tree.RegressionCriterion object at 0x&,
50.0, 2, 1, 0.1, 10, 1, &mtrand.RandomState object at 0x10a55da08&))
该pickle文件是在0.17版本的scikit-learn下训练出来的,有些机器装的是0.14版本,版本不一致导致,升级可解决,记得将老版本数据清理干净,否则会报各种Cannot import xxx的错误。
4.python编码错误
UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: ordinal not in range(128)
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
str(u'中国')
str(u'中国'.encode('utf-8'))
四. 一些优化
1. 部分Executor不执行任务
有时候会发现部分executor并没有在执行任务,为什么呢?
(1) 任务partition数过少,
要知道每个partition只会在一个task上执行任务。改变分区数,可以通过 repartition 方法,即使这样,在 repartition 前还是要从数据源读取数据,此时(读入数据时)的并发度根据不同的数据源受到不同限制,常用的大概有以下几种:
hdfs - block数就是partition数
mysql - 按读入时的分区规则分partition
es - 分区数即为 es 的 分片数(shard)
(2) 数据本地性的副作用
taskSetManager在分发任务之前会先计算数据本地性,优先级依次是:
process(同一个executor) -& node_local(同一个节点) -& rack_local(同一个机架) -& any(任何节点)
Spark会优先执行高优先级的任务,任务完成的速度很快(小于设置的spark.locality.wait时间),则数据本地性下一级别的任务则一直不会启动,这就是Spark的延时调度机制。
举个极端例子:运行一个count任务,如果数据全都堆积在某一台节点上,那将只会有这台机器在长期计算任务,集群中的其他机器则会处于等待状态(等待本地性降级)而不执行任务,造成了大量的资源浪费。
判断的公式为:
curTime – lastLaunchTime &= localityWaits(currentLocalityIndex)
其中 curTime 为系统当前时间,lastLaunchTime 为在某优先级下最后一次启动task的时间
如果满足这个条件则会进入下一个优先级的时间判断,直到 any,不满足则分配当前优先级的任务。
数据本地性任务分配的源码在 taskSetManager.scala 。
如果存在大量executor处于等待状态,可以降低以下参数的值(也可以设置为0),默认都是3s。
spark.locality.wait
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
当你数据本地性很差,可适当提高上述值,当然也可以直接在集群中对数据进行balance。
2. spark task 连续重试失败
有可能哪台worker节点出现了故障,task执行失败后会在该 executor 上不断重试,达到最大重试次数后会导致整个 application 执行失败,我们可以设置失败黑名单(task在该节点运行失败后会换节点重试),可以看到在源码中默认设置的是 0,
private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)
在 spark-default.sh 中设置
spark.scheduler.executorTaskBlacklistTime 30000
当 task 在该 executor 运行失败后会在其它 executor 中启动,同时此 executor 会进入黑名单30s(不会分发任务到该executor)。
如果你的任务shuffle量特别大,同时rdd缓存比较少可以更改下面的参数进一步提高任务运行速度。
spark.storage.memoryFraction - 分配给rdd缓存的比例,默认为0.6(60%),如果缓存的数据较少可以降低该值。
spark.shuffle.memoryFraction - 分配给shuffle数据的内存比例,默认为0.2(20%)
剩下的20%内存空间则是分配给代码生成对象等。
如果任务运行缓慢,jvm进行频繁gc或者内存空间不足,或者可以降低上述的两个值。
"press","true" - 默认为false,压缩序列化的RDD分区,消耗一些cpu减少空间的使用
spark.default.parallelism
发生shuffle时的并行度,在standalone模式下的数量默认为core的个数,也可手动调整,数量设置太大会造成很多小任务,增加启动任务的开销,太小,运行大数据量的任务时速度缓慢。
spark.sql.shuffle.partitions
sql聚合操作(发生shuffle)时的并行度,默认为200,如果该值太小会导致OOM,executor丢失,任务执行时间过长的问题
相同的两个任务:
spark.sql.shuffle.partitions=300:
spark.sql.shuffle.partitions=500:
速度变快主要是大量的减少了gc的时间。
但是设置过大会造成性能恶化,过多的碎片task会造成大量无谓的启动关闭task开销,还有可能导致某些task hang住无法执行。
修改map阶段并行度主要是在代码中使用rdd.repartition(partitionNum)来操作。
5. shuffle
8.数据本地性
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:237133次
积分:3219
积分:3219
排名:第9871名
原创:95篇
评论:66条
邮箱:breeze_
简书博客: /users/dba7cda12069
(1)(1)(1)(1)(1)(5)(6)(1)(5)(5)(1)(6)(7)(6)(10)(13)(5)(1)(2)(5)(1)(1)(6)(4)(6)(3)

我要回帖

更多关于 大赢家足球即时比分中 的文章

 

随机推荐