spark sparkstreaming kafka怎么关闭

Spark Streaming程序的停止可以是强制停止、异常停止或其他方式停止。首先我们看StreamingContext的stop()方法def stop(
stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
): Unit = synchronized {
stop(stopSparkContext, false)
这里定义了两个参数,stopSparkContext可以通过配置文件定义,接着看两个参数的stop方法,代码如下/**
* Stop the execution of the streams, with option of ensuring all received data
* has been processed.
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
will be stopped regardless of whether this StreamingContext has been
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
received data to be completed
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
var shutdownHookRefToRemove: AnyRef = null
if (AsynchronousListenerBus.withinListenerThread.value) {
throw new SparkException("Cannot stop StreamingContext within listener thread of" +
" AsynchronousListenerBus")
synchronized {
state match {
case INITIALIZED =&
logWarning("StreamingContext has not been started yet")
case STOPPED =&
logWarning("StreamingContext has already been stopped")
case ACTIVE =&
scheduler.stop(stopGracefully)
// Removing the streamingSource to de-register the metrics on stop()
env.metricsSystem.removeSource(streamingSource)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
if (shutdownHookRef != null) {
shutdownHookRefToRemove = shutdownHookRef
shutdownHookRef = null
logInfo("StreamingContext stopped successfully")
} finally {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
state = STOPPED
if (shutdownHookRefToRemove != null) {
ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
注释中说停止程序时需要所以接收的数据被处理完成后再停止,需要我们传入的stopGracefully参数为true
在StreamingContext里面有一个stopOnShutdown()方法,代码如下private def stopOnShutdown(): Unit = {
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
stopOnShutdown()方法是什么意思呢,在我们的程序退出时,不管是正常退出或异常退出,stopOnShutdown()方法都会被回调,然后调用stop方法。stopGracefully 可以通过配置项spark.streaming.stopGracefullyOnShutdown配置,生产环境需要配置为true.
stopOnShutdown()方法是怎样被调用的呢?在StreamingContext的start方法中有一行代码
shutdownHookRef = ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
添加stopOnShutdown函数到ShutdownHookManager中,addShutdownHook代码如下
def addShutdownHook(priority: Int)(hook: () =& Unit): AnyRef = {
shutdownHooks.add(priority, hook)
看SparkShutdownHookManager 里都有什么,看注释
private [util] class SparkShutdownHookManager {
// 优先级队列,优先级越大,越优先执行
private val hooks = new PriorityQueue[SparkShutdownHook]()
@volatile private var shuttingDown = false
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
* have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
* the best.
// 这里实例化一个线程,添加到jvm的关闭钩子中,等到jvm退出时才会被调用
def install(): Unit = {
val hookTask = new Runnable() {
override def run(): Unit = runAll()
Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =&
val fsPriority = classOf[FileSystem]
.getField("SHUTDOWN_HOOK_PRIORITY")
.get(null) // static field, the value is not used
.asInstanceOf[Int]
val shm = shmClass.getMethod("get").invoke(null)
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
.invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
case Failure(_) =&
Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
// jvm退出时钩子回调此函数
def runAll(): Unit = {
shuttingDown = true
var nextHook: SparkShutdownHook = null
//循环从优先级队列取数据执行,优先级越大,越优先执行
while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
Try(Utils.logUncaughtExceptions(nextHook.run()))
def add(priority: Int, hook: () =& Unit): AnyRef = {
hooks.synchronized {
if (shuttingDown) {
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
val hookRef = new SparkShutdownHook(priority, hook)
hooks.add(hookRef)
def remove(ref: AnyRef): Boolean = {
hooks.synchronized { hooks.remove(ref) }
看到这里就明白了,把stopOnShutdown()函数放入SparkShutdownHookManager 中的优化级队列hooks中,默认优先级为51,jvm退出时退出线程,runAll()方法被调用,然后从hooks队列中一个一个取数据(函数),然后执行,就用了stopOnShutdown()函数,我们的应用程序就可以优雅的执行停止工作了。
热点阅读:
小主,按键盘右方向键 → 翻页可以跳过片头呢
本文标题:
原文链接:
和本文相似的内容:
编辑推荐 &&
妖怪研究所系列其一:山童? 李家怪事绵延大青山,茫茫烟苍几百里,这里的树木相当繁茂,遮天蔽日,林中更是鹿鸣呦呦,空山绝响,充满了无限的生机和神秘。大青山下大青庄,有山民100余户。俗话说,坐山吃山,这里的人们多为猎户,平日里青壮男子外出打猎【实战篇】如何优雅的停止你的 Spark Streaming Application - 简书
<div class="fixed-btn note-fixed-download" data-toggle="popover" data-placement="left" data-html="true" data-trigger="hover" data-content=''>
写了75444字,被370人关注,获得了171个喜欢
【实战篇】如何优雅的停止你的 Spark Streaming Application
Spark 1.3及其前的版本
你的一个 spark streaming application 已经好好运行了一段时间了,这个时候你因为某种原因要停止它。你应该怎么做?直接暴力 kill 该 application 吗?这可能会导致数据丢失,因为 receivers 可能已经接受到了数据,但该数据还未被处理,当你强行停止该 application,driver 就没办法处理这些本该处理的数据。
所以,我们应该使用一种避免数据丢失的方式,官方建议调用 StreamingContext#stop(stopSparkContext: Boolean, stopGracefully: Boolean),将 stopGracefully 设置为 true,这样可以保证在 driver 结束前处理完所有已经接受的数据。
一个 streaming application 往往是长时间运行的,所以存在两个问题:
应该在什么时候去调用 StreamingContext#stop
当 streaming application 已经在运行了该怎么去调用 StreamingContext#stop
通过 Runtime.getRuntime().addShutdownHook 注册关闭钩子, JVM将在关闭之前执行关闭钩子中的 run 函数(不管是正常退出还是异常退出都会调用),所以我们可以在 driver 代码中加入以下代码:
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() {
log("Shutting down streaming app...")
streamingContext.stop(true, true)
log("Shutdown of streaming app complete.")
这样就能保证即使 application 被强行 kill 掉,在 driver 结束前,streamingContext.stop(true, true)也会被调用,从而保证已接收的数据都会被处理。
Spark 1.4及其后的版本
上一小节介绍的方法仅适用于 1.3及以前的版本,在 1.4及其后的版本中不仅不能保证生效,甚至会引起死锁等线程问题。在 1.4及其后的版本中,我们只需设置 spark.streaming.stopGracefullyOnShutdown 为 true 即可达到上一小节相同的效果。
下面来分析为什么上一小节介绍的方法在 1.4其后的版本中不能用。首先,需要明确的是:
当我们注册了多个关闭钩子时,JVM开始启用其关闭序列时,它会以某种未指定的顺序启动所有已注册的关闭钩子,并让它们同时运行
万一不止一个关闭钩子,它们将并行地运行,并容易引发线程问题,例如死锁
综合以上两点,我们可以明确,如果除了我们注册的关闭钩子外,driver 还有注册了其他钩子,将会引发上述两个问题。
在 StreamingContext#start 中,会调用
ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
该函数最终注册一个关闭钩子,并会在 run 方法中调用 stopOnShutdown,
private def stopOnShutdown(): Unit = {
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
从 stopOnShutdown 中会根据 stopGracefully 的值来决定是否以优雅的方式结束 driver,而 stopGracefully 的值由 spark.streaming.stopGracefullyOnShutdown 决定。结合上文,也就能说明为什么 spark.streaming.stopGracefullyOnShutdown能决定是否优雅的结束 application 和为什么上一小节的方法不适用与 1.4及其后版本。
请我吃一根辣条吧~
打开微信“扫一扫”,打开网页后点击屏幕右上角分享按钮
被以下专题收入,发现更多相似内容:
如果你是程序员,或者有一颗喜欢写程序的心,喜欢分享技术干货、项目经验、程序员日常囧事等等,欢迎投稿《程序员》专题。
专题主编:小...
· 242096人关注
大数据,收录大数据相关技术的文章。
· 3380人关注
Spark深入学习专题旨在通过高质量的文章对Spark相关技术进行研究学习
· 720人关注
请我吃一根辣条吧~
选择支付方式:Spark Streaming快速状态流处理 - 推酷
Spark Streaming快速状态流处理
许多复杂流处理流水线程序必须将状态保持一段时间,例如,如果你想实时了解网站用户行为, 你需要将网站上各“用户会话(user session)”信息保存为持久状态并根据用户的行为对这一状态进行持续更新。这种有状态的流计算可以在Spark Streaming中使用updateStateByKey方法实现。
在Spark 1.6中,我们通过使用新API mapWithState极大地增强对状态流处理的支持。该新的API提供了通用模式的内置支持,而在以前使用updateStateByKey方法来实 现这一相同功能(如会话超时)需要进行手动编码和优化。因此,mapWithState方法较之于updateStateByKey方法,有十倍之多的性 能提升。在本博文当中,我们将对mapWithState方法进行深入讲解,同时提前感受后续新版本中将加入的特性。
使用mapWithState方法进行状态流处
Spark Streaming中最强大的特性之一是简单的状态流处理API及相关联的本地化、可容错的状态管理能力。开发人员仅需要指定状态的结构和更新逻 辑,Spark Streaming便能够接管集群中状态的分发、管理,在程序出错时自动进行恢复并提供端到端的容错保障。尽管现有DStream中 updateStateByKey方法能够允许用户执行状态计算,但使用mapWithState方法能够让用户更容易地表达程序逻辑,同时让性能提升 10倍之多。让我们通过一个例子对mapWithState方法的优势进行阐述。
假设我们要根据用户历史动作对某一网站的用户行为进行实时分析,对各个用户,我们需要保持用户动作的历史信息,然后根据这些历史信息得到用户的行为模型并输出到下游的数据存储当中。
在Spark Streaming中构建此应用程序时,我们首先需要获取用户动作流作为输入(例如通过Kafka或Kinesis),然后使用mapWithState方法对输入进行转换操作以生成用户模型流,最后将处理后的数据流保存到数据存储当中。
在Spark Streaming中使用状态流处理进行用户会话维护
mapWithState 方法可以通过下面的抽象方式进行理解,假设它是将用户动作和当前用户会话作为输入的一个算子(operator),基于某个输入动作,该算子能够有选择地 更新用户会话,然后输出更新后的用户模型作为下游操作的输入。开发人员在定义mapWithState方法时可以指定该更新函数。
现在让我们转入到具体代码来说明,首先我们定义状态数据结构及状态更新函数:
def stateUpdateFunction(
userId: UserId,
newData: UserAction,
stateData: State[UserSession]): UserModel = {
val currentSession = stateData.get()// 获取当前会话数据
val updatedSession = … &// 使用newData计算更新后的会话
stateData.update(updatedSession) // 更新会话数据
val userModel = … &// 使用updatedSession计算模型
return userModel & // 将模型发送给下游操作
然后,在用户动作DStream上定义mapWithState方法,通过创建StateSpec对象来实现,该对象中包含所有前述指定的操作。
// 用去动作构成的Stream,用户ID作为key
val userActions = … &// key-value元组(UserId, UserAction)构成的stream
// 待提交的数据流
val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction))
mapWithState的新特性和性能改进
通过前面的例子,我们已经明白其使用方式,现在让我们再深入理解使用该新的API所带来的特定优势。
1. 原生支持会话超时
许 多基于会话的应用程序要求具备超时机制,当某个会话在一定的时间内(如用户没有显式地注销而结束会话)没有接收到新数据时就应该将其关闭,与使用 updateStateByKey方法时需要手动进行编码实现所不同的是,开发人员可以通过mapWithState方法直接指定其超时时间。
userActions.mapWithState(StateSpec.function(stateUpdateFunction).timeout(Minutes(10)))
除超时机制外,开发人员也可以设置程序启动时的分区模式和初始状态信息。
2. 任意数据都能够发送到下游
与updateStateByKey方法不同,任意数据都可以通过状态更新函数将数据发送到下游操作,这一点已经在前面的例子中有说明(例如通过用户会话状态返回用户模型),此外,最新状态的快照也能够被访问。
val userSessionSnapshots = userActions.mapWithState(statSpec).snapshotStream()
变量userSessionSnapshots为一个DStream,其中各个RDD为各批(batch)数据处理后状态更新会话的快照,该DStream与updateStateByKey方法返回的DStream是等同的。
3. 更高的性能
最后,与updateStateByKey方法相比,使用mapWithState方法能够得到6倍的低延迟同时维护的key状态数量要多10倍,这一性能提升和扩展性可从后面的基准测试结果得到验证,所有的结果全部在时间间隔为1秒的batch和相同大小的集群中生成。
下 图比较的是mapWithState方法和updateStateByKey方法处理1秒的batch所消耗的平均时间,在本例中,我们为同样数量(从 0.25~1百万)的key保存其状态,然后以同样的速率(30k个更新/s)对其进行更新,如下图所示,mapWithState方法比 updateStateByKey方法的处理时间快8倍,从而允许更低的端到端延迟。
mapWithState方法比updateStateByKey方法的批处理时间(例如延迟)快8倍
此外,更快的处理速度使得mapWithState方法能够比updateStateByKey方法管理多10倍的key(批处理间隔、集群大小、更新频率全部相同)。
mapWithState方法比updateStateByKey方法管理的key数量多10倍
Spark Streaming中其它的改进
除mapWithState方法外,Spark 1.6中的Spark Streaming组件还有其它几项更进,部分如下:
Streaming UI的更进[SPARK-10885,SPARK-11742]:Job失败和其它一些详细信息可以显示在Streaming UI当中以便于程序调试。
Kinesis 集成API改进[SPARK-11198, SPARK-10891]:Kinesis流已经升级到可以使用KCL 1.4.0同时支持对KPL聚合记录进行解聚合操作,另外,在确定什么样的数据要保存到内存中之前,任意的函数现在都可以作用于Kinesis接收器中的 某个Kinesis记录。
Python Streaming监听器API[SPARK-6328]—获取Streaming的统计信息(调度延迟、批处理时间等)
支 持S3写时提前写日志(Write Ahead Logs ,WALs)[SPARK-11324,SPARK-11141]:Spark Streaming使用提前写日志确保接收数据的容错性。Spark 1.6中允许WAL应用到S3及其它不支持文件flush操作的存储上,详细信息请参见programming guide
如果你想试用这些新特性,你可以在Databricks官网上使用Spark 1.6,在使用时可以保留更老版本的Spark。
注:转载文章均来自于公开网络,仅供学习使用,不会用于任何商业用途,如果侵犯到原作者的权益,请您与我们联系删除或者授权事宜,联系邮箱:contact@dataunion.org。转载数盟网站文章请注明原文章作者,否则产生的任何版权纠纷与数盟无关。
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
您的访问请求被拒绝 403 Forbidden - ITeye技术社区
您的访问请求被拒绝
亲爱的会员,您的IP地址所在网段被ITeye拒绝服务,这可能是以下两种情况导致:
一、您所在的网段内有网络爬虫大量抓取ITeye网页,为保证其他人流畅的访问ITeye,该网段被ITeye拒绝
二、您通过某个代理服务器访问ITeye网站,该代理服务器被网络爬虫利用,大量抓取ITeye网页
请您点击按钮解除封锁&Spark Streaming程序的优雅停止_Spark技术日报_传送门
Spark Streaming程序的优雅停止
Spark技术日报
Spark Streaming程序的停止可以是强制停止、异常停止或其他方式停止。首先我们看StreamingContext的stop()方法
stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
): Unit = synchronized {
stop(stopSparkContext, false)
这里定义了两个参数,stopSparkContext可以通过配置文件定义,接着看接收两个参数的stop方法,代码如下
* Stop the execution of the streams, with option of ensuring all received data
* has been processed.
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
will be stopped regardless of whether this StreamingContext has been
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
received data to be completed
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
var shutdownHookRefToRemove: AnyRef = null
if (AsynchronousListenerBus.withinListenerThread.value) {
throw new SparkException("Cannot stop StreamingContext within listener thread of" +
" AsynchronousListenerBus")
synchronized {
state match {
case INITIALIZED =>
logWarning("StreamingContext has not been started yet")
case STOPPED =>
logWarning("StreamingContext has already been stopped")
case ACTIVE =>
scheduler.stop(stopGracefully)
// Removing the streamingSource to de-register the metrics on stop()
env.metricsSystem.removeSource(streamingSource)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
if (shutdownHookRef != null) {
shutdownHookRefToRemove = shutdownHookRef
shutdownHookRef = null
logInfo("StreamingContext stopped successfully")
} finally {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
state = STOPPED
if (shutdownHookRefToRemove != null) {
ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
注释中说明要停止程序时,正确的方式是需要所有接收的数据被处理完成后再停止,那么就需要我们传入的stopGracefully参数为true,然后停止时会等待所有任务执行完成2. Spark Streaming提供了一个优雅停止的方法,在StreamingContext里面有一个stopOnShutdown()方法,代码如下
private def stopOnShutdown(): Unit = {
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
stopOnShutdown()方法是什么意思呢,在我们的程序退出时,不管是正常退出或异常退出,stopOnShutdown()方法都会被回调,然后调用stop方法。stopGracefully 可以通过配置项spark.streaming.stopGracefullyOnShutdown配置,生产环境需要配置为true.3.
stopOnShutdown()方法是怎样被调用的呢?在StreamingContext的start方法中有一行代码
shutdownHookRef = ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
添加stopOnShutdown函数到ShutdownHookManager中,addShutdownHook代码如下
def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
看SparkShutdownHookManager 里都有什么,看代码注释了解SparkShutdownHookManager的功能,不一一介绍
private [util] class SparkShutdownHookManager {
// 优先级队列,优先级越大,越优先执行
private val hooks = new PriorityQueue[SparkShutdownHook]()
@volatile private var shuttingDown = false
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
* have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
* the best.
// 这里实例化一个线程,添加到jvm的关闭钩子中,等到jvm退出时才会被调用
def install(): Unit = {
val hookTask = new Runnable() {
override def run(): Unit = runAll()
Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =>
val fsPriority = classOf[FileSystem]
.getField("SHUTDOWN_HOOK_PRIORITY")
.get(null) // static field, the value is not used
.asInstanceOf[Int]
val shm = shmClass.getMethod("get").invoke(null)
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
.invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
case Failure(_) =>
Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
// jvm退出时钩子回调此函数
def runAll(): Unit = {
shuttingDown = true
var nextHook: SparkShutdownHook = null
//循环从优先级队列取数据执行,优先级越大,越优先执行
while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
Try(Utils.logUncaughtExceptions(nextHook.run()))
def add(priority: Int, hook: () => Unit): AnyRef = {
hooks.synchronized {
if (shuttingDown) {
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
val hookRef = new SparkShutdownHook(priority, hook)
hooks.add(hookRef)
def remove(ref: AnyRef): Boolean = {
hooks.synchronized { hooks.remove(ref) }
}4. 看到这里就明白了,把stopOnShutdown()函数放入SparkShutdownHookManager 中的优化级队列hooks中,默认优先级为51,jvm退出时启动一个线程,调用runAll()方法,然后从hooks队列中一个一个取数据(函数),然后执行,就调用了stopOnShutdown()函数,接着调用stop()函数,我们的应用程序就可以优雅的执行停止工作了。来源:
觉得不错,分享给更多人看到
Spark技术日报 微信二维码
分享这篇文章
Spark技术日报 最新文章
Spark技术日报 热门文章

我要回帖

更多关于 spark streaming 入门 的文章

 

随机推荐