美白面膜是实惠的APP是做什么的?

基于Spark Streaming的实时日志处理平台设计与实现--《电信工程技术与标准化》2015年09期
基于Spark Streaming的实时日志处理平台设计与实现
【摘要】:本文基于Hadoop Yarn平台,通过Spark Streaming流数据处理技术实时处理Web日志、系统日志、监控日志等,很好地解决了目前存在的问题,并使用HBase(Hadoop No SQL Database)作为数据库存储,更好地适应日志数据规模的增长。
【作者单位】:
【关键词】:
【基金】:
【分类号】:TP311.13【正文快照】:
1引言目前大多数日志分析工具都只能用于单机的数据分析处理,而面对海量化日志处理以及对分析结果低延时反馈的实际需求,传统的日志处理方式已经远远无法满足人们的需要。Hadoop是Apache软件基金会(Apache SoftwareFoundation)组织下的一个开源项目,提供分布式计算环境下的可
欢迎:、、)
支持CAJ、PDF文件格式,仅支持PDF格式
【共引文献】
中国期刊全文数据库
郭荔荔;李敬兆;;[J];电脑知识与技术;2014年04期
陈磊;封朝永;;[J];广东工业大学学报;2014年03期
陈燕红;张太红;马健;;[J];计算机应用与软件;2014年06期
陈东辉;曾乐;梁中军;肖卫青;;[J];计算机应用;2014年09期
方艾芬;蔡岗;缪新顿;;[J];警察技术;2014年06期
黄翔;陈志刚;;[J];南方能源建设;2015年01期
闵信志;薛安荣;黄祖卫;;[J];软件导刊;2015年03期
中国博士学位论文全文数据库
高峰;[D];复旦大学;2013年
马莉;[D];西安科技大学;2014年
黄风华;[D];福建师范大学;2014年
中国硕士学位论文全文数据库
成飞龙;[D];南京理工大学;2013年
吴凡;[D];北京林业大学;2013年
许文龙;[D];湖南大学;2013年
封朝永;[D];广东工业大学;2014年
丁琛;[D];南京师范大学;2014年
史晓丽;[D];西安电子科技大学;2014年
赵玉龙;[D];内蒙古科技大学;2014年
王光耀;[D];浙江工业大学;2014年
张丽莹;[D];北京邮电大学;2014年
吴秀娟;[D];南京邮电大学;2014年
【相似文献】
中国期刊全文数据库
谭劲,余胜生,周敬利;[J];Journal of Shanghai U2004年01期
董海燕,芦汉生,李升才,侯山峰,高稚允;[J];Journal of Beijing Institute of Technology(English Edition);2005年02期
康亮;;[J];电信网技术;2011年06期
郭常杰,沈国斌,李世鹏,钟玉琢;[J];Tsinghua Science and T2003年06期
戢彦泓,郭常杰,钟玉琢,孙立峰;[J];Tsinghua Science and T2004年04期
马然,张兆扬,江涛,石旭利;[J];Journal of Shanghai U2005年04期
BOUAZIZI IHANNUKSELA Miska M;RAUF U;[J];Journal of Zhejiang University Science A(Science in Engineering);2006年S1期
;[J];Journal of Zhejiang University Science A(Science in Engineering);2006年10期
;[J];Journal of Zhejiang University Science A(Science in Engineering);2006年10期
;[J];Journal of Zhejiang University(Science A:An International Applied Physics & Engineering Journal);2007年08期
中国重要会议论文全文数据库
Xiaoyu Ma;Rui J;[A];Proceedings of 2011 International Conference on Computer Science and Information Technology(ICCSIT 2011)[C];2011年
Xudong SJianjun Lu;;[A];proceedings of 2010 3rd International Conference on Computer and Electrical Engineering (ICCEE 2010 no.1)[C];2012年
冯侦探;;[A];第二届中国科学院博士后学术年会暨高新技术前沿与发展学术会议程序册[C];2010年
;[A];第一届建立和谐人机环境联合学术会议(HHME2005)论文集[C];2005年
;[A];第一届建立和谐人机环境联合学术会议(HHME2005)论文集[C];2005年
;[A];第二届全国压电和声波理论及器件技术研讨会摘要集[C];2006年
Z.WANG;A.T.SORNBORGER;L.TAO;;[A];中国神经科学学会第十届全国学术会议论文摘要集[C];2013年
Li-fang Z;[A];proceedings of 2010 3rd International Conference on Computer and Electrical Engineering (ICCEE 2010 no.2)[C];2012年
;[A];Information Technology and Computer Science—Proceedings of 2012 National Conference on Information Technology and Computer Science[C];2012年
Li-jun SLi-ying Yu;Hao Xu;Liang SJian-bao Z;[A];第十届全国生物力学学术会议暨第十二届全国生物流变学学术会议论文摘要汇编[C];2012年
中国重要报纸全文数据库
董 权;[N];中国计算机报;2004年
中国硕士学位论文全文数据库
李云飞;[D];上海交通大学;2013年
陈达伟;[D];北京邮电大学;2014年
陈靖隆;[D];华南理工大学;2011年
谢顺华;[D];浙江大学;2003年
Salah Addin M[D];北京理工大学;2015年
李光耀;[D];电子科技大学;2012年
&快捷付款方式
&订购知网充值卡
400-819-9993
《中国学术期刊(光盘版)》电子杂志社有限公司
同方知网数字出版技术股份有限公司
地址:北京清华大学 84-48信箱 大众知识服务
出版物经营许可证 新出发京批字第直0595号
订购热线:400-819-82499
服务热线:010--
在线咨询:
传真:010-
京公网安备75号7.spark Streaming 技术内幕 : 从DSteam到RDD全过程解析
原创文章,转载请注明:转载自周岳飞博客(/zhouyf/)上篇博客讨论了Spark Streaming 程序动态生成Job的过程,并留下一个疑问:JobScheduler将动态生成的Job提交,然后调用了Job对象的run方法,最后run方法的调用是如何触发RDD的Action操作,从而真正触发Job的执行的呢?本文就具体讲解这个问题。一、DStream和RDD的关系
DSream 代表了一系列连续的RDD,DStream中每个RDD包含特定时间间隔的数据,如下图所示:
从上图可以看出, 一个DStream 对应了时间维度上的多个RDD。DStream 作为Spark Stream的一个基本抽象,提供了高层的API来进行Spark Streaming 程序开发,先看一个简单的Spark Streaming的WordCount程序实例: object WordCount{ def main(args:Array[String]):Unit={ val sparkConf =newSparkConf().setMaster("local[4]").setAppName("WordCount") val ssc =newStreamingContext(sparkConf,Seconds(1))
val lines = ssc.socketTextStream("localhost",9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x =>(x,1)).reduceByKey(_+_) wordCounts.print() ssc.start() ssc.awaitTermination() } }我们会发现对DStream的操作和RDD的操作惊人的相似, 通过对DStream的不断转换,形成依赖关系。所以的DStream操作最终会转换成底层的RDD的操作,上面的例子中lines DStream转换成wods DSteam。 lines DStream的 flatMap操作会作用于其中每一个RDD去生成words DStream 中的RDD, 过程如下图所示:下面从源码角度看一下 DStream和RDD的关系:
DStream 中 有一个HashMap[Time,RDD[T]]类型的对象 generatedRDDs,其中Key为作业开始时间,RDD为该DStream对应的RDD,源码如下:
二、Dstream 的分类
Dstream 主要分为三大类:
1. Input DStream
2. Transformed DStream
3. Output DStream2.1 InputDStream 是DStream 最初诞生的地方,也是RDD最初诞生的地方,它是依据数据源创建的最初的DStream,如上面例子中的代码:val lines = ssc . socketTextStream ( "localhost" , 9999 )基于Socket数据源创建了 SocketInputDStream对象lines,下面从源码角度分析一下他是怎么生成RDD的, SocketInputDStream生成RDD的方法在 它的父类ReceiverInputDSteam中:ReceiverInputDSteam 的compute方法中调用了createBloackRDD方法基于Block信息创建了RDD :可以看到 ReceiverInputDSteam 的 createBloackRDD 方法new了BlockRDD对象,BlockRDD 是继承自RDD。至此,最初的RDD创建完成。2.2、 Transformed DStream 是由其他DStream 通过非Output算子装换而来的DStream 例如例子中的lines通过flatMap算子转换生成了FlatMappedDStream:
val words = lines.flatMap(_.split(" ")) 下面看一下flatMap的源码:
可以看到flatMap是DStream的方法,它创建了FlatMappeedDStream并返回,上面例子中words 就是 FlatMappeedDStream 对象,创建 FlatMappeedDStream对象时传入了 参数flatMapFunc,这里的flatMapFunc就是用户编写的业务逻辑,我们再进入FlatMappedDStream,查看其compute方法:可以惊喜的看到 FlatMappedDStream的compute方法调用了parent的getOrCompute方法获取父DStream的RDD.通过对 父DStream的RDD的flatMap算子生成新的RDD,转换的业务逻辑通过flatMapFunc参数传递给flatMap算子。这样对DStream的操作都转换成了对RDD的操作,同时DSream的依赖关系也与RDD之间依赖关系同时建立了起来。说明:这些RDD的创建是在Job动态生成时候发生的,Job生成最终会调用ForeachDStream的generateJob方法,源码如下其中的parent.getOrCompute方法会依据DStream之间的依赖关系,导致一系列的链式调用,从而创建所有的RDD,并形成RDD之间的依赖关系。3.3 Output DStream 是有其他DStream通过Output算子生成,它只存在于Output算子内部,并不会像Transformed Stream一样由算子返回, 他是触发Job执行的关键。
那么什么是Output 算子呢? Output 算子是让DStream中的数据被推送的外部系统,像数据库,文件系统(HDFS,GFS等)的算子。因为Output 算子是将转换后的数据推送到外部系统被使用的操作,所以他触发了前面转换操作的真正执行(类似于RDD的action操作)。
下面,我们看看有哪些Output算子:
Output Operation
Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.
Python APIThis is calledpprint()in the Python API.
saveAsTextFiles(prefix, [suffix])
Save this DStream's contents as text files. The file name at each batch interval is generated based onprefixandsuffix:"prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix])
Save this DStream's contents asSequenceFilesof serialized Java objects. The file name at each batch interval is generated based onprefixandsuffix:"prefix-TIME_IN_MS[.suffix]".
Python APIThis is not available in the Python API.
saveAsHadoopFiles(prefix, [suffix])
Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based onprefixandsuffix:"prefix-TIME_IN_MS[.suffix]".
Python APIThis is not available in the Python API.
foreachRDD(func)
The most generic output operator that applies a function,func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the functionfuncis executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
下面,回到我们开头的例子:wordCounts . print ()其中pirnt算子就是Output算子,我们进入print的源码:print()方法调用了print(10),其实是调用了另一个print方法:print 方法中首先定义了一个函数foreachFunc,foreachFunc从rdd中出去num个元素打印出来。接下来print函数调用了foreachRDD,并将foreachFunc的处理逻辑作为参数传入。这里的foreachRDD也是一个Output算子(上面已经有说明),接下来看看 foreachRDD的源码。可以看到foreachRDD中创建了一个ForeachDStream对象,这就是我们期待已久的Output DStream。这里需要注意一个关键点:创建完ForeachRDD对象后,调用了该对象的register方法。register方法将当前对象注册给DStreamGraph。源码如下:注册的过程就是将当前对象加入graph的输出流outputStream中:这个过程很重要,在Job触发时候会用到outputStream。我们先在这里记住这个过程,下面的分析会用到这个内容。至此,DStream到RDD过程已经解析完毕。三 、由Dstream触发RDD的执行
Spark Stream的Job执行过程我在另一篇博客有详细介绍,具体细节请参考/zhouyf/p/5503682.html在生成Job的过程中会调用DStreamGraph的generate方法:其中,就调用了outputStream的generateJob方法,这里的outputStream就上面有output算子注册给DStreamGraph的输出流。就是我们实例中ForeachDStream 。ForeachDStream 的generateJob方法源码:可以看到它将我们的业务逻辑封装成jobFunc传递给了最终生成的Job对象。由上篇博客《 Spark streaming技术内幕 : Job动态生成原理与源码解析 》 我们知道在StreamContext启动会动态创建job,并且最终调用Job的run方法Job的run方法由JobScheduler的submitJobSet触发 :其中jobExecutor对象是一个线程池,JobHandler实现了 Runnable接口,在JobHandler 的run方法中会调用传入的job对象的run方法。在这里Job的run方法开始在线程中执行,JobHandler的run方法源码如下:其中的job就是封装了我们业务逻辑的Job对象,它的run方法会触发我们在foreachRDD方法中对RDD的操作(一般是action操作),到这里RDD的Action操作被触发,spark作业开始执行。总结: 1、在一个固定时间维度上,DStream和RDD是一一对应关系,可以将DStream看成是RDD在时间维度上封装。
2、Dstream 主要分为三大类:Input DStream,Transformed DStream,Output DStream,其中Output Dstream 对开发者是透明的,存在于Output 算子内部。
3、Spark Streaming应用程序最终会转化成对RDD操作的spark 程序,spark 程序由于执行了foreachRDD算子中的RDD操作被触发。原创文章,转载请注明: 转载自周岳飞博客( /zhouyf/) From WizNote
最新教程周点击榜
微信扫一扫Spark Streaming快速状态流处理 - 博客频道 - CSDN.NET
茄肥猫的窝
分类:扯淡
许多复杂流处理流水线程序必须将状态保持一段时间,例如,如果你想实时了解网站用户行为,你需要将网站上各“用户会话(user session)”信息保存为持久状态并根据用户的行为对这一状态进行持续更新。这种有状态的流计算可以在Spark Streaming中使用updateStateByKey 方法实现。
在Spark 1.6 中,我们通过使用新API mapWithState极大地增强对状态流处理的支持。该新的API提供了通用模式的内置支持,而在以前使用updateStateByKey 方法来实现这一相同功能(如会话超时)需要进行手动编码和优化。因此,mapWithState 方法较之于updateStateByKey方法,有十倍之多的性能提升。在本博文当中,我们将对mapWithState方法进行深入讲解,同时提前感受后续新版本中将加入的特性。
使用mapWithState方法进行状态流处理
Spark Streaming中最强大的特性之一是简单的状态流处理API及相关联的本地化、可容错的状态管理能力。开发人员仅需要指定状态的结构和更新逻辑,Spark Streaming便能够接管集群中状态的分发、管理,在程序出错时自动进行恢复并提供端到端的容错保障。尽管现有DStream中updateStateByKey方法能够允许用户执行状态计算,但使用mapWithState方法能够让用户更容易地表达程序逻辑,同时让性能提升10倍之多。让我们通过一个例子对mapWithState方法的优势进行阐述。&
假设我们要根据用户历史动作对某一网站的用户行为进行实时分析,对各个用户,我们需要保持用户动作的历史信息,然后根据这些历史信息得到用户的行为模型并输出到下游的数据存储当中。
在Spark Streaming中构建此应用程序时,我们首先需要获取用户动作流作为输入(例如通过Kafka或Kinesis),然后使用mapWithState 方法对输入进行转换操作以生成用户模型流,最后将处理后的数据流保存到数据存储当中。
在Spark Streaming中使用状态流处理进行用户会话维护
mapWithState方法可以通过下面的抽象方式进行理解,假设它是将用户动作和当前用户会话作为输入的一个算子(operator),基于某个输入动作,该算子能够有选择地更新用户会话,然后输出更新后的用户模型作为下游操作的输入。开发人员在定义mapWithState方法时可以指定该更新函数。
现在让我们转入到具体代码来说明,首先我们定义状态数据结构及状态更新函数:
def stateUpdateFunction(
userId: UserId,
newData: UserAction,
stateData: State[UserSession]): UserModel = {
val currentSession = stateData.get()// 获取当前会话数据
val updatedSession = ...
// 使用newData计算更新后的会话
stateData.update(updatedSession) // 更新会话数据
val userModel = ...
// 使用updatedSession计算模型
return userModel
// 将模型发送给下游操作
然后,在用户动作DStream上定义mapWithState 方法,通过创建StateSpec对象来实现,该对象中包含所有前述指定的操作。
// 用去动作构成的Stream,用户ID作为key
val userActions = ...
// key-value元组(UserId, UserAction)构成的stream
// 待提交的数据流
val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction))
mapWithState的新特性和性能改进
通过前面的例子,我们已经明白其使用方式,现在让我们再深入理解使用该新的API所带来的特定优势。
1. 原生支持会话超时
许多基于会话的应用程序要求具备超时机制,当某个会话在一定的时间内(如用户没有显式地注销而结束会话)没有接收到新数据时就应该将其关闭,与使用updateStateByKey方法时需要手动进行编码实现所不同的是,开发人员可以通过mapWithState方法直接指定其超时时间。
userActions.mapWithState(StateSpec.function(stateUpdateFunction).timeout(Minutes(10)))
除超时机制外,开发人员也可以设置程序启动时的分区模式和初始状态信息。
2. 任意数据都能够发送到下游
与updateStateByKey方法不同,任意数据都可以通过状态更新函数将数据发送到下游操作,这一点已经在前面的例子中有说明(例如通过用户会话状态返回用户模型),此外,最新状态的快照也能够被访问。
val userSessionSnapshots = userActions.mapWithState(statSpec).snapshotStream()
变量userSessionSnapshots 为一个DStream,其中各个RDD为各批(batch)数据处理后状态更新会话的快照,该DStream与updateStateByKey方法返回的DStream是等同的。
3. 更高的性能
最后,与updateStateByKey方法相比,使用mapWithState方法能够得到6倍的低延迟同时维护的key状态数量要多10倍,这一性能提升和扩展性可从后面的基准测试结果得到验证,所有的结果全部在时间间隔为1秒的batch和相同大小的集群中生成。
下图比较的是mapWithState 方法和updateStateByKey 方法处理1秒的batch所消耗的平均时间,在本例中,我们为同样数量(从0.25~1百万)的key保存其状态,然后以同样的速率(30k个更新/s)对其进行更新,如下图所示,mapWithState方法比updateStateByKey方法的处理时间快8倍,从而允许更低的端到端延迟。
mapWithState方法比updateStateByKey方法的批处理时间(例如延迟)快8倍
此外,更快的处理速度使得mapWithState 方法能够比updateStateByKey 方法管理多10倍的key(批处理间隔、集群大小、更新频率全部相同)。
mapWithState方法比updateStateByKey方法管理的key数量多10倍
Spark Streaming中其它的改进
除mapWithState方法外,Spark 1.6中的Spark Streaming组件还有其它几项更进,部分如下:
Streaming UI的更进[,&]:Job失败和其它一些详细信息可以显示在Streaming
UI当中以便于程序调试。Kinesis集成API改进[,&]:Kinesis流已经升级到可以使用KCL
1.4.0同时支持对KPL聚合记录进行解聚合操作,另外,在确定什么样的数据要保存到内存中之前,任意的函数现在都可以作用于Kinesis接收器中的某个Kinesis记录。Python Streaming监听器API[]—获取Streaming的统计信息(调度延迟、批处理时间等)支持S3写时提前写日志(Write Ahead Logs ,WALs)[,&]:Spark
Streaming使用提前写日志确保接收数据的容错性。Spark 1.6中允许WAL应用到S3及其它不支持文件flush操作的存储上,详细信息请参见
如果你想试用这些新特性,你可以在Databricks官网上使用Spark 1.6,在使用时可以保留更老版本的Spark。
lively1982
排名:第5356名
(5)(125)(6)(0)(0)(2)(5)(223)(11)(0)(0)(30)(0)(17)(2)(5)(16)(2)(0)(2)(8)(6)(3)(2)(27)(9)(1)(5)(2)

我要回帖

更多关于 翡翠玉器是实惠的 的文章

 

随机推荐