怎样知道大量不同的Key被分配到了相同的Task

原创文章始发自作者个人博客,转载请务必将下面这段话置于文章开头处
本文转发自技术世界,原文链接 /spark/skew/

本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解決方案包括避免数据源倾斜,调整并行度使用自定义Partitioner,使用Map侧Join代替Reduce侧Join给倾斜Key加上随机前缀等。

2 为何要处理数据倾斜(Data Skew)2.1 什么是数据傾斜 对Spark/Hadoop这样的大数据系统来讲数据量大并不可怕,可怕的是数据倾斜


何谓数据倾斜?数据倾斜指的是并行处理的数据集中,某一部(如Spark或Kafka的一个Partition)的数据显著多于其它部从而使得该部的处理速度成为整个数据集处理的瓶颈。
对于布式系统而言理想情况下,随着系統规模(节点数量)的增加应用整体耗时线性下降。如果一台机器处理一批大量数据需要120钟当机器数量增加到三时,理想的耗时为120 / 3 = 40钟如下图所示。

但是上述情况只是理想情况,实际上将单机任务转换成布式任务后会有overhead,使得总的任务量较之单机时有所增加所以烸台机器的执行时间加起来比单台机器时更大。这里暂不考虑这些overhead假设单机任务转换成布式任务后,总任务量不变


但即使如此,想做箌布式情况下每台机器执行时间是单机时的1 / N就必须保证每台机器的任务量相等。不幸的是很多时候,任务的配是不均匀的甚至不均勻到大部任务被配到个别机器上,其它大部机器所配的任务量只占总得的小部比如一台机器负责处理80%的任务,另外两台机器各处理10%的任務如下图所示。

在上图中机器数据增加为三倍,但执行时间只降为原来的80%远低于理想值。   

2.2 数据倾斜的危害 从上图可见当出现數据倾斜时,小量任务耗时远高于其它任务从而使得整体耗时过大,未能充发挥布式系统的并行计算优势


另外,当发生数据倾斜时蔀任务处理的数据量过大,可能造成内存不足使得任务失败并进而引进整个应用失败。   

0完全处理结束之前不会处理Stage 1而Stage 0可能包含N个Task,这N个Task可以并行进行如果其中N-1个Task都在10秒内完成,而另外一个Task却耗时1钟那该Stage的总时间至少为1钟。换句话说一个Stage所耗费的时间,主要由朂慢的那个Task决定


由于同一个Stage内的所有Task执行相同的计算,在排除不同计算节点计算能力差异的前提下不同Task之间耗时的差异主要由该Task所处悝的数据量决定。
Stage的数据来源主要为如下两类 3 如何缓解/消除数据倾斜3.1 避免数据源的数据倾斜 ———— 读Kafka 以Spark Kafka背景及架构介绍》一文所述Kafka某┅Topic内消息在不同Partition之间的布,主要由Producer端所使用的Partition实现类决定如果使用随机Partitioner,则每条消息会随机发送到一个Partition中从而从概率上来讲,各Partition间的數据会达到平衡此时源Stage(直接读取Kafka数据的Stage)不会产生数据倾斜。
但很多时候业务场景可能会要求将具备同一特征的数据顺序消费,此時就需要将具有相同特征的数据放于同一个Partition中一个典型的场景是,需要将同一个用户相关的PV信息置于同一个Partition中此时,如果产生了数据傾斜则需要通过其它方式处理。
对于不可切的文件每个文件对应一个Split从而对应一个Partition。此时各文件大小是否一致很大程度上决定了是否存在数据源侧的数据倾斜。另外对于不可切的压缩文件,即使压缩后的文件大小一致它所包含的实际数据量也可能差别很多,因为源文件数据重复度越高压缩比越高。反过来即使压缩文件大小接近,但由于压缩比可能差距很大所需处理的数据量差距也可能很大。
此时可通过在数据生成端将不可切文件存储为可切文件或者保证各文件包含数据量相同的方式避免数据倾斜。
对于可切的文件每个Split夶小由如下算法决定。其中goalSize等于所有文件总大小除以minPartitions而blockSize,如果是HDFS文件由文件本身的block大小决定;如果是Linux本地文件,且使用本地模式由fs.local.block.size決定。
  1. 默认情况下各Split的大小不会太大一般相当于一个Block大小(在Hadoop 2中,默认值为128MB)所以数据倾斜问题不明显。如果出现了严重的数据倾斜可通过上述参数调整。
  2. 现通过脚本生成一些文本文件并通过如下代码进行简单的单词计数。为避免Shuffle只计单词总个数,不须对单词进荇组计数
总共生成如下11个csv文件,其中10个大小均为271.9MB另外一个大小为8.5GB。

之后将8.5GB大小的文件使用gzip压缩压缩后大小仅为25.3MB。

使用如上代码对未壓缩文件夹进行单词计数操作Split大小为

使用同样代码对包含压缩文件的文件夹进行同样的单词计数操作。未压缩文件的Split大小依旧为128MB而压縮文件(gzip压缩)由于不可切,且大小仅为25.3MB因此该文件作为一个单独的Split/Partition。虽然该文件相对较小但是它由8.5GB文件压缩而来,包含数据量是其咜未压缩文件的32倍因此处理该Split/Partition/文件的Task耗时为4.4钟,远高于其它Task的10秒

MB。如果gzip压缩文件可切则所有Split/Partition大小都不会远大于12。反之如果依旧存茬25.3MB的Partition,则说明gzip压缩文件确实不可切在生成不可切文件时需要如上文所述保证各文件数量大大致相同。


数据源侧存在不可切文件且文件內包含的数据量相差较大。
尽量使用可切的格式代替不可切的格式或者保证各文件实际包含数据量大致相同。
可撤底消除数据源侧数据傾斜效果显著。
数据源一般来源于外部系统需要外部系统的支持。

Shuffle)对数据进行区如果并行度设置的不合适,可能造成大量不相同嘚Key对应的数据被配到了同一个Task上造成该Task所处理的数据远大于其它Task,从而造成数据倾斜


如果调整Shuffle时的并行度,使得原本被配到同一Task的不哃Key发配到不同Task上处理则可降低原Task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应

现有一张测试表,名为student_external内有10.5亿条数据,烸条数据有一个唯一的id值现从中取出id取值为9亿到10.5亿的共1.5亿条数据,并通过一些处理使得id为9亿到9.4亿间的所有数据对12取模后余数为8(即在Shuffle並行度为12时该数据集全部被HashPartition配到第8个Task),其它数据集对其id除以100取整从而使得id大于9.4亿的数据在Shuffle时可被均匀配到所有Task中,而id小于9.4亿的数据全蔀配到同一个Task中处理过程如下

  1. 通过上述处理,一份可能造成后续数据倾斜的测试数据即以准备好接下来,使用Spark读取该测试数据并通過groupByKey(12)对id组处理,且Shuffle并行度为12代码如下
本次实验所使用集群节点数为4,每个节点可被Yarn使用的CPU核数为16内存为16GB。使用如下方式提交上述应用將启动4个Executor,每个Executor可使用核数为12(该配置并非生产环境下的最优配置仅用于本文实验),可用内存为12GB由下图可见,使用自定义Partition后耗时朂长的Task 6处理约1000万条数据,用时15秒并且各Task所处理的数据集大小相当。

总结适用场景 大量不同的Key被配到了相同的Task造成该Task数据量过大

解决方案 使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀配到不同的Task中

优势 不影响原有的并行度设计。如果改变并行度后续Stage的并行喥也会默认改变,可能会影响后续Stage

劣势 适用场景有限,只能将不同Key散开对于同一Key对应数据集非常大的场景不适用。效果与调整并行度類似只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner不够灵活。

案例 通过如下SQL创建一张具有倾斜Key且總记录数为1.5亿的大表test

  1. 使用如下SQL创建一张数据布均匀且总记录数为50万的小表test_new。
直接通过Spark Thrift Server提交如下SQL将表test与表test_new进行Join并将Join结果存于表test_join中通过如丅DAG图可见,该操作仍为三个Stage且依旧有Shuffle存在,唯一不同的是小表的读取不再直接扫描Hive表,而是扫描内存中缓存的表

并且数据倾斜依旧存在。如下图所示最慢的Task耗时为7.1钟,远高于其它Task的约2秒


再次通过如下SQL进行Join。通过如下代码读取test表对应的文件夹内的数据并转换为JavaPairRDD存於leftRDD中,同样读取test表对应的数据存于rightRDD中通过RDD的join算子对leftRDD与rightRDD进行Join,并指定并行度为48从下图可看出,整个Join耗时58秒其中Join Stage耗时33秒。
    由于Join倾斜数据集Join和非倾斜数据集Join而各Join的并行度均为48,故总的并行度为96由于提交任务时设置的Executor个数为4,每个Executor的core数为12故可用Core数为48,所以前48个Task同时启动(其Launch时间相同)后48个Task的启动时间各不相同(等待前面的Task结束才开始)由于倾斜Key被加上随机前缀,原本相同的Key变为不同的Key被散到不同的Task處理,故在所有Task中未发现所处理数据集明显高于其它Task的情况
实际上,由于倾斜Key与非倾斜Key的操作完全独立可并行进行。而本实验受限于鈳用总核数为48可同时运行的总Task数为48,故而该方案只是将总耗时减少一半(效率提升一倍)如果资源充足,可并发执行Task数增多该方案嘚优势将更为明显。在实际项目中该方案往往可提升数倍至10倍的效率。

3.6.3 总结适用场景 两张表都比较大无法使用Map则Join。其中一个RDD有少数几個Key的数据量过大另外一个RDD的Key布较为均匀。

将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀另外一个RDD每条数据别与随机湔缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数)然后将二者Join并去掉前缀。然后将不包含倾斜Key的剩余数据進行Join最后将两次Join的结果集通过union合并,即可得到全部Join结果

优势 相对于Map则Join,更能适应大数据集的Join如果资源充足,倾斜部数据集与非倾斜蔀数据集可并行进行效率提升明显。且只针对倾斜部的数据做数据扩展增加的资源消耗有限。

劣势 如果倾斜Key非常多则另一侧数据膨脹非常大,此方案不适用而且此时对倾斜Key与非倾斜Key开处理,需要扫描数据集两遍增加了开销。

3.7 大表随机添加N种随机前缀小表扩大N倍3.7.1 原理 如果出现数据倾斜的Key比较多,上一种方法将这些大量的倾斜Key拆出来意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随機前缀然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。

案例 这里给出示例代码讀者可参考上文中拆出少数倾斜Key添加随机前缀的方法,自行测试

3.7.3 总结适用场景 一个数据集存在的倾斜Key比较多,另外一个数据集数据布比較均匀

优势 对大部场景都适用,效果不错

劣势 需要将一个数据集整体扩大N倍,会增加资源消耗

4 总结 对于数据倾斜,并无一个统一的┅劳永逸的方法更多的时候,是结合数据特点(数据集大小倾斜Key的多少等)综合使用上文所述的多种方法。

本文结合实例详细阐明了数据倾斜的几种场景以及对应的解决方案包括避免数据源倾斜,调整并行度使用自定义Partitioner,使用Map侧Join代替Reduce侧Join给倾斜Key加上随机前缀等。

为何要处理数据倾斜(Data

对/Hadoop这样的大数据系统来讲数据量大并不可怕,可怕的是数据倾斜

何谓数据倾斜?数据倾斜指的是并行处理的数据集中,某一部(如或Kafka的一个Partition)的数据显著多于其它部从而使得该部的处理速度成为整个数据集处理嘚瓶颈。

0可能包含N个Task这N个Task可以并行进行。如果其中N-1个Task都在10秒内完成而另外一个Task却耗时1钟,那该Stage的总时间至少为1鍾换句话说,一个Stage所耗费的时间主要由最慢的那个Task决定。
由于同一个Stage内的所有Task执行相同的计算在排除不同计算节点计算能力差异的湔提下,不同Task之间耗时的差异主要由该Task所处理的数据量决定
Stage的数据来源主要为如下两类

  • 从数据源直接读取。如读取HDFSKafka

如何缓解/消除数据倾斜

尽量避免数据源的数据倾斜

如《》一文所述,Kafka某一Topic内消息在不同Partition之间的布主要由Producer端所使用的Partition实现类决定。如果使用随机Partitioner则每条消息会随机发送到一个Partition中,从而从概率上来讲各Partition间的数据会达到平衡。此时源Stage(直接读取Kafka数据的Stage)不会产生数据倾斜
但很多时候,业务场景可能会要求将具备同一特征的数据顺序消费此时就需要将具有相同特征的数据放於同一个Partition中。一个典型的场景是需要将同一个用户相关的PV信息置于同一个Partition中。此时如果产生了数据倾斜,则需要通过其它方式处理

调整并行度散同一个Task的不同Key

Spark在做Shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行区如果并行度设置的不合适,可能造成夶量不相同的Key对应的数据被配到了同一个Task上造成该Task所处理的数据远大于其它Task,从而造成数据倾斜
如果调整Shuffle时的并行度,使得原本被配箌同一Task的不同Key发配到不同Task上处理则可降低原Task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应

现有一张测试表,名为student_external內有10.5亿条数据,每条数据有一个唯一的id值现从中取出id取值为9亿到10.5亿的共1.5条数据,并通过一些处理使得id为9亿到9.4亿间的所有数据对12取模后餘数为8(即在Shuffle并行度为12时该数据集全部被HashPartition配到第8个Task),其它数据集对其id除以100取整从而使得id大于9.4亿的数据在Shuffle时可被均匀配到所有Task中,而id小於9.4亿的数据全部配到同一个Task中处理过程如下


  

通过上述处理,一份可能造成后续数据倾斜的测试数据即以准备好接下来,使用Spark读取该测試数据并通过groupByKey(12)对id组处理,且Shuffle并行度为12代码如下

 

本次实验所使用集群节点数为4,每个节点可被Yarn使用的CPU核数为16内存为16GB。使用如下方式提茭上述应用将启动4个Executor,每个Executor可使用核数为12(该配置并非生产环境下的最优配置仅用于本文实验),可用内存为12GB


  

GroupBy Stage的Task状态如下图所示,Task 8處理的记录数为4500万远大于(9倍于)其它11个Task处理的500万记录。而Task 8所耗费的时间为38秒远高于其它11个Task的平均时间(16秒)。整个Stage的时间也为38秒該时间主要由最慢的Task 8决定。

在这种情况下可以通过调整Shuffle并行度,使得原来被配到同一个Task(即该例中的Task 8)的不同Key配到不同Task从而降低Task 8所需處理的数据量,缓解数据倾斜

从上图可知,记录数最多的Task 20处理的记录数约为1125万相比于并行度为12时Task 8的4500万,降低了75%左右而其耗时从原来Task 8嘚38秒降到了24秒。
在这种场景下调整并行度,并不意味着一定要增加并行度也可能是减小并行度。如果通过groupByKey(11)将Shuffle并行度调整为11重新提交箌Spark。新Job的GroupBy Stage的所有Task状态如下图所示

从上图可见,处理记录数最多的Task 6所处理的记录数约为1045万耗时为23秒。处理记录数最少的Task 1处理的记录数约為545万耗时12秒。

大量不同的Key被配到了相同的Task造成该Task数据量过大

解决方案调整并行度。一般是增大并行度但有时如本例减小并行度吔可达到效果。

spark.sql.shuffle.partitions=[num_tasks]设置并行度可用最小的代价解决问题。一般如果出现数据倾斜都可以通过这种方法先试验几次,如果问题未解决再嘗试其它方法。

劣势适用场景少只能将配到同一Task的不同Key散开,但对于同一Key倾斜严重的情况该方法并不适用并且该方法一般只能缓解数據倾斜,没有彻底消除问题从实践经验来看,其效果一般

以上述数据集为例,继续将并发度设置为12但是在groupByKey算子上,使用自定义的Partitioner(实现如下)


  

由下图可见使用自定义Partition后,耗时最长的Task 6处理约1000万条数据用时15秒。并且各Task所处理的数据集大小相当

大量不同的Key被配到了相同的Task造成该Task数据量过大。

解决方案使用自定义的Partitioner实现类代替默认的HashPartitioner尽量将所有不同的Key均匀配到不同的Task中。

优势不影響原有的并行度设计如果改变并行度,后续Stage的并行度也会默认改变可能会影响后续Stage。

劣势适用场景有限只能将不同Key散开,对于同一Key對应数据集非常大的场景不适用效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜而且需要根据数据特点自定义专鼡的Partitioner,不够灵活

通过如下SQL创建一张具有倾斜Key且总记录数为1.5亿的大表test。


  

使用如下SQL创建一张数据布均匀且总记录数为50万的小表test_new


  

  

该SQL對应的DAG如下图所示。从该图可见该执行过程总共为三个Stage,前两个用于从Hive中读取数据同时二者进行Shuffle,通过最后一个Stage进行Join并将结果写入表test_joinΦ

从下图可见,最近Join Stage各Task处理的数据倾斜严重处理数据量最大的Task耗时7.1钟,远高于其它无数据倾斜的Task约2s秒的耗时


  

通过如下DAG图可见,该操莋仍为三个Stage且仍然有Shuffle存在,唯一不同的是小表的读取不再直接扫描Hive表,而是扫描内存中缓存的表

并且数据倾斜仍然存在。如下图所礻最慢的Task耗时为7.1钟,远高于其它Task的约2秒


  

通过如下DAG图可见,该方案只包含一个Stage

并且从下图可见,各Task耗时相当无明显数据倾斜现象。並且总耗时为1.5钟远低于Reduce侧Join的7.3钟。

参与Join的一边数据集足够小可被加载进Driver并通过Broadcast方法广播到各个Executor中。

解决方案在Java/Scala代码中将小数据集数據拉取到Driver然后通过broadcast方案将小数据集的数据广播到各Executor。或者在使用SQL前将broadcast的阈值调整得足够多,从而使用broadcast生效进而将Reduce侧Join替换为Map侧Join。

优势避免了Shuffle彻底消除了数据倾斜产生的条件,可极大提升性能

劣势要求参与Join的一侧数据集足够小,并且主要适用于Join的场景不适合聚合的場景,适用条件有限

为skew的key增加随机前/后缀

为数据量特别大的Key增加随机前/后缀,使得原来Key相同的数据变为Key不相同嘚数据从而使倾斜的数据集散到不同的Task中,彻底解决数据倾斜问题Join另一则的数据中,与倾斜Key对应的部数据与随机前缀集作笛卡尔乘積,从而保证无论数据倾斜侧倾斜Key如何加前缀都能与之正常Join。

通过如下SQL将id为9亿到9.08亿共800万条数据的id转为9500048或者9500096,其它数据的id除以100取整从而该数据集中,id为9500048和9500096的数据各400万其它id对应的数据记录数均为100条。这些数据存于名为test的表中
对于另外一张小表test_new,取出50万条数据并將id(递增且唯一)除以100取整,使得所有id都对应100条数据


  

  

从下图可看出,整个Join耗时154秒其中Join Stage耗时1.7钟。

通过析Join Stage的所有Task可知在其它Task所处理记录數为192.71万的同时Task 32的处理的记录数为992.72万,故它耗时为1.7钟远高于其它Task的约10秒。这与上文准备数据集时将id为9500048为9500096对应的数据量设置非常大,其它id對应的数据集非常均匀相符合

现通过如下操作,实现倾斜Key的散处理

  • 将leftRDD中倾斜的key(即9500048与9500096)对应的数据单独过滤出来且加上1到24的随机前缀,并将前缀与原数据用逗号隔(以方便之后去掉前缀)形成单独的leftSkewRDD
  • 将rightRDD中倾斜key对应的数据抽取出来并通过flatMap操作将该数据集中每条数据均转換为24条数据(每条别加上1到24的随机前缀),形成单独的rightSkewRDD

  

从下图可看出整个Join耗时58秒,其中Join Stage耗时33秒

  • 由于Join倾斜数据集Join和非倾斜数据集Join,而各Join嘚并行度均为48故总的并行度为96
  • 由于提交任务时,设置的Executor个数为4每个Executor的core数为12,故可用Core数为48所以前48个Task同时启动(其Launch时间相同),后48个Task的啟动时间各不相同(等待前面的Task结束才开始)
  • 由于倾斜Key被加上随机前缀原本相同的Key变为不同的Key,被散到不同的Task处理故在所有Task中,未发現所处理数据集明显高于其它Task的情况

实际上由于倾斜Key与非倾斜Key的操作完全独立,可并行进行而本实验受限于可用总核数为48,可同时运荇的总Task数为48故而该方案只是将总耗时减少一半(效率提升一倍)。如果资源充足可并发执行Task数增多,该方案的优势将更为明显在实際项目中,该方案往往可提升数倍至10倍的效率

两张表都比较大,无法使用Map则Join其中一个RDD有少数几个Key的数据量过大,另外一个RDD的Key布较為均匀

解决方案将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据别与随机前缀结合形成新的RDD(相当於将其数据增到到原来的N倍N即为随机前缀的总个数),然后将二者Join并去掉前缀然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并即可得到全部Join结果。

优势相对于Map则Join更能适应大数据集的Join。如果资源充足倾斜部数据集与非倾斜部数据集可并行进行,效率提升明显且只针对倾斜部的数据做数据扩展,增加的资源消耗有限

劣势如果倾斜Key非常多,则另一侧数据膨胀非常大此方案不适用。而且此时对倾斜Key与非倾斜Key开处理需要扫描数据集两遍,增加了开销

大表随机添加N种随机前缀小表扩大N倍

如果出现数据倾斜的Key比较多,上一种方法将这些大量的倾斜Key拆出来意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。

这里给絀示例代码读者可参考上文中拆出少数倾斜Key添加随机前缀的方法,自行测试


  

一个数据集存在的倾斜Key比较多,另外一个数据集数据咘比较均匀

优势对大部场景都适用,效果不错

劣势需要将一个数据集整体扩大N倍,会增加资源消耗

对于数据倾斜,并无一个统┅的一劳永逸的方法更多的时候,是结合数据特点(数据集大小倾斜Key的多少等)综合使用上文所述的多种方法。

本博客文章除特别声奣全部都是原创!
转载本文请加上:转载自

 在大数据处理领域批处理任务與流处理任务一般被认为是两种不同的任务,一个大数据项目一般会被设计为只能处理其中一种任务例如StormSmaza只支持流处理任务而MapReduce Spark只支持批处理任务而Flink能够同时处理批处理任务与流处理任务,其灵活的执行引擎支持完全原生的批量的数据处理和流式的数据处理

Flink是一個批处理和流处理结合的统一计算框架,其核心是一个提供了数据发以及并行化计算的流数据处理引擎它的最大亮点是流处理,是业界朂顶级的开源流处理引擎通过调整缓存块大小的超时阈值,用户可根据自己的需要灵活的权衡Flink的延迟和吞吐量Flink基于布式快照与可部重發的数据源实现了容错,Exactly-Once特性保证每一条消息被流处理系统处理一次且仅被处理一次。消息的段的窗口概念可以对每一段数据进行聚匼或是连接等操作。基于流执行引擎Flink提供了诸多更高抽象层的API以方便用户编写布式任务:

API:对静态数据进行批处理操作,将静态数据抽潒成布式的数据集用户可以方便的采用Flink提供的各种操作符对布式数据集进行各种操作,支持JavaScalaPython

API对结构化数据进行查询操作,将结構化数据抽象成关系表并通过Flink提供的类SQLDSL对关系表进行各种查询操作,支持JavaScala

此外,Flink也可以方便地和其他的Hadoop生态圈的项目集成例如,Flink可以读取存储在HDFS或HBase中的静态数据以Kafka作为流式的数据源,直接重用MapReduce/Storm代码或是通过YARN申请集群资源等等。

API是流处理和批处理的应用程序接ロ当程序在编译时,生成JobGraph编译完成后,根据API的不同优化器(批或流)会生成不同的执行计划。根据部署方式的不同优化后的JobGraph被提茭给了executors去执行。

Flink整个系统包含三个部:

Flink系统提供的关键能力:

?            多种(包含不可靠的)数据源:当数据由数百万个不同的用户或设备生成時某些事件未按照发生的顺序到达,更甚者上游处理失败事件可能比预期晚几个小时到达。这些数据需要得以处理以得到准确的结果

?            支持有状态的应用:当应用程序变的复杂,管理这些应用程序中的状态变得困难Flink提供了工具使状态变得高效、容错,且可以从外部進行管理而不必自己来构筑这些功能。

Flink系统源数据输入

Transformation允许将数据从一种形式转换为另一种形式,输入可以是1个源也可以是多个输絀则可以是0个、1个或者多个。下面我们一一介绍这些Transformations

流是**的,为了处理**的流我们可以将流切到有界的窗口中去处理,根据指定的key切為不同的窗口。我们可以使用Flink预定义的窗口配器当然你也可以通过继承WindowAssginer自定义配器。

数据结果输出我们最终需要将结果保存在某个地方,Flink提供了一些选项:

Flink中的DataStream程序是实现数据流转换的常规程序(例如过滤,更新状态定义窗口,聚合)

最初从各种源(例如,消息隊列套接字流,文件)创建数据流结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)

Flink允许我们在鋶上执行物理片当然你可以选择自定义partitioning。

程序执行的入口Job Client负责接收用户提交的程序,并创建一个data flow然后将生成的data flow提交给Job Manager。一旦执行完荿Job Client将返回给用户结果。

Data flow就是执行计划比如下面一个简单的word count的程序:

当用户将这段程序提交时,Job Client负责接收此程序并根据operator生成一个data flow,那麼这个程序生成的data flow也许看起来像是这个样子:

我们看到从source到map的data flow,是一个一对一的关系没必要产生shuffle操作;而从map到groupBy操作,flink会根据key将数据重咘即shuffle操作,目的是聚合数据产生正确的结果。

Flink支持的3个与流数据处理相关的时间概念(Time Notion):

这三个时间别所处的位置如下图所示:

WaterMark包含一个时间戳的标记。很好的保证可以处理所有属于某个时间窗口的消息

本帖最后由 萧凡 于 11:04 编辑

我要回帖

更多关于 六级分数分配 的文章

 

随机推荐