请教Spark 中 combinebyKey 和map reduce combineByKey的传入函数参数的区别

请教Spark 中
combinebyKey 和 reduceByKey的传入函数参数的区别? - 知乎18被浏览1419分享邀请回答class PairRDDFunctions[K, V](...) {
def reduceByKey(func: (V, V) =& V): RDD[(K, V)]
def combineByKey[C](
createCombiner: V =& C,
mergeValue: (C, V) =& C,
mergeCombiners: (C, C) =& C): RDD[(K, C)]
可以看到 reduceByKey 的 func 参数的类型只依赖于 PairRDDFunction 的类型参数 V,在这个例子里也就是 Int。于是 func 的类型已经确定为 (Int, Int) =& Int,所以就不需要额外标识类型了。而 combineByKey 比 reduceByKey 更加通用,它允许各个 partition 在 shuffle 前先做 local reduce 得到一个类型为 C 的中间值,待 shuffle 后再做合并得到各个 key 对应的 C。以求均值为例,我们可以让每个 partiton 先求出单个 partition 内各个 key 对应的所有整数的和 sum 以及个数 count,然后返回一个 pair (sum, count)。在 shuffle 后累加各个 key 对应的所有 sum 和 count,再相除得到均值:val sumCountPairs: RDD[(String, (Int, Long))] = testData.combineByKey(
(_: Int) =& (0, 0L),
(pair: (Int, Long), value: Int) =&
(pair._1 + value, pair._2 + 1L),
(pair1: (Int, Long), pair2: (Int, Long)) =&
(pair1._1 + part2._1, pair2._2 + pair2._2)
val averages: RDD[String, Double] = sumCountPairs.mapValues {
case (sum, 0L) =& 0D
case (sum, count) =& sum.toDouble / count
由于 C 这个 类型参数是任意的,并不能从 testData 的类型直接推导出来,所以必须明确指定。只不过题主的例子是最简单的用 reduceByKey 就可以搞定的情况,也就是 V 和 C 完全相同,于是就看不出区别了。114 条评论分享收藏感谢收起Spark(47)
import java.io.File
import scala.io.Source
class WorldCount {
&&var map = Map.empty[String, Int]
&&def scanDir(file: File) {
&&&&file.listFiles().foreach { f =& if (f.isFile()) readFile(f) }
&&def readFile(file: File) {
&&&&val f = Source.fromFile(file,&utf-8&)
&&&&for (line &- f.getLines()) {
&&&&&&for (word &- line.split(&\t|,|\\\\|\\+|\\-|\\(|\\)|\\[|\\]|!|:|\\.|&|&|\\{|\\}|\\?|\\*| |\\/|\&&)) {
&&&&&&&&if (map.contains(word))
&&&&&&&&&&map += (word -& (map(word) + 1))
&&&&&&&&else
&&&&&&&&&&map += (word -& 1)
object WorldCount {
&&def main(args: Array[String]): Unit = {
&&&&val wordCount = new WorldCount()
&&&&wordCount.scanDir(new File(&D:\\scalaDir&))
&&&&wordCount.map.foreach{x=&println(&world:&+x._1+&&&count:&+x._2)}
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:93312次
积分:1540
积分:1540
排名:千里之外
原创:19篇
转载:258篇
(1)(16)(26)(24)(35)(18)(7)(15)(6)(12)(32)(4)(15)(15)(13)(1)(11)(29)第11课:彻底解析wordcount运行原理【spark吧】_百度贴吧
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&签到排名:今日本吧第个签到,本吧因你更精彩,明天继续来努力!
本吧签到人数:0成为超级会员,使用一键签到本月漏签0次!成为超级会员,赠送8张补签卡连续签到:天&&累计签到:天超级会员单次开通12个月以上,赠送连续签到卡3张
关注:2,320贴子:
第11课:彻底解析wordcount运行原理收藏
本期内容:1. 从数据流动视角解密WordCount,即用Spark作单词计数统计,数据到底是怎么流动的。2. 从RDD依赖关系的视角解密WordCount。Spark中的一切操作皆RDD,后面的RDD对前面的RDD有依赖关系。3. DAG与Lineage的思考。依赖关系会形成DAG。1. 从数据流动视角解密WordCount(1)在IntelliJ IDEA中编写下面代码:package com.dt.spark/** * 使用Java的方式开发进行本地测试Spark的WordCount程序 *
DT大数据梦工厂 *
*/import org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject WordCount {
def main(args: Array[String]){
val conf = new SparkConf()
conf.setAppName(&Wow, My First Spark App!&)
conf.setMaster(&local&)
val sc = new SparkContext(conf)
val lines = sc.textFile(&D://tmp//helloSpark.txt&, 1)
val words = lines.flatMap { line =& line.split(& &) }
val pairs = words.map { word =& (word,1) }
val wordCounts = pairs.reduceByKey(_+_)
wordCounts.foreach(wordNumberPair =& println(wordNumberPair._1 + & : & + wordNumberPair._2))
}}(2)在D盘下地tmp文件夹下新建helloSpark.txt文件,内容如下:Hello Spark Hello ScalaHello HadoopHello FlinkSpark is awesome(3) 在WordCount代码区域点击右键选择Run 'WordCount'。可以得到如下运行结果:Flink : 1Spark : 2is : 1Hello : 4awesome : 1Hadoop : 1Scala : 1下面从数据流动的视角分析数据到底是怎么被处理的。说明:Spark有三大特点:1. 分布式。无论数据还是计算都是分布式的。默认分片策略:Block多大,分片就多大。但这种说法不完全准确,因为分片切分时有的记录可能跨两个Block,所以一个分片不会严格地等于Block的大小,例如HDFS的Block大小是128MB的话,分片可能多几个字节或少几个字节。一般情况下,分片都不会完全与Block大小相等。分片不一定小于Block大小,因为如果最后一条记录跨两个Block的话,分片会把最后一条记录放在前一个分片中。2. 基于内存(部分基于磁盘)3. 迭代textFile源码(SparkContext中);def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair =& pair._2.toString)}可以看出在进行了hadoopFile之后又进行了map操作。HadoopRDD从HDFS上读取分布式文件,并且以数据分片的方式存在于集群之中。map的源码(RDD.scala中)def map[U: ClassTag](f: T =& U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) =& iter.map(cleanF))}读取到的一行数据(key,value的方式),对行的索引位置不感兴趣,只对其value事情兴趣。pair时有个匿名函数,是个tuple,取第二个元素。此处又产生了MapPartitionsRDD。MapPartitionsRDD基于hadoopRDD产生的Parition去掉行的KEY。注:可以看出一个操作可能产生一个RDD也可能产生多个RDD。如sc.textFile就产生了两个RDD:hadoopRDD和MapParititionsRDD。下一步:val words = lines.flatMap { line =& line.split(& &) }对每个Partition中的每行进行单词切分,并合并成一个大的单词实例的集合。FlatMap做的一件事就是对RDD中的每个Partition中的每一行的内容进行单词切分。这边有4个Partition,对单词切分就变成了一个一个单词,下面是FlatMap的源码(RDD.scala中)def flatMap[U: ClassTag](f: T =& TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) =& iter.flatMap(cleanF))}可以看出flatMap又产生了一个MapPartitionsRDD,此时的各个Partition都是拆分后的单词。下一步:
val pairs = words.map { word =& (word,1) }将每个单词实例变为形如word=&(word,1)map操作就是把切分后的每个单词计数为1。根据源码可知,map操作又会产生一个MapPartitonsRDD。此时的MapPartitionsRDD是把每个单词变成Array(&&Hello&,1),(&Spark&,1)等这样的形式。下一步:val wordCounts = pairs.reduceByKey(_+_)reduceByKey是进行全局单词计数统计,对相同的key的value相加,包括local和reducer同时进行reduce。所以在map之后,本地又进行了一次统计,即local级别的reduce。shuffle前的Local Reduce操作,主要负责本地局部统计,并且把统计后的结果按照分区策略放到不同的File。下一Stage就叫Reducer了,下一阶段假设有3个并行度的话,每个Partition进行Local Reduce后都会把数据分成三种类型。最简单的方式就是用HashCode对其取模。至此都是stage1。Stage内部完全基于内存迭代,不需要每次操作都有读写磁盘,所以速度非常快。reduceByKey的源码:def reduceByKey(partitioner: Partitioner, func: (V, V) =& V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) =& v, func, func, partitioner)}/** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * &combiner& in MapReduce. Output will be hash-partitioned with numPartitions partitions. */def reduceByKey(func: (V, V) =& V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)}可以看到reduceByKey内部有combineByKeyWithClassTag。combineByKeyWithClassTag的源码如下:def combineByKeyWithClassTag[C](
createCombiner: V =& C,
mergeValue: (C, V) =& C,
mergeCombiners: (C, C) =& C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, &mergeCombiners must be defined&) // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException(&Cannot use map-side combining with array keys.&)
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException(&Default partitioner cannot partition array keys.&)
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter =& {
val context = TaskContext.get()
new InterruptibleIterator(context, bineValuesByKey(iter, context))
}, preservesPartitioning = true)
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}}可以看出在combineByKeyWithClassTag内又new 了一个ShuffledRDD。ReduceByKey有两个作用:1. 进行Local级别的Reduce,减少网络传输。2. 把当前阶段的内容放到本地磁盘上供shuffle使用。下一步是shuffledRDD,产生Shuffle数据就需要进行分类,MapPartitionsRDD时其实已经分好类了,最简单的分类策略就是Hash分类。ShuffledRDD需要从每台机上抓取同一单词。reduceByKey发生在哪里?Stage2全部都是reduceByKey最后一步:保存数据到HDFS(MapPartitionsRDD)统计完的结果:(“Hello”,4)只是一个Value,而不是Key:&Hello&,value:4。但输出到文件系统时需要KV的格式,现在只有Value,所以需要造个KEY。saveAsTextFile的源码:def saveAsTextFile(path: String){this.map(x =& (NullWritable.get())),new Text(x.toStirng)).saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)}this.map把当前的值(x)变成tuple。tuple的Key是Null,Value是(“Hello”,4)。为什么要为样?因为saveAsHadoopFile时要求以这样的格式输出。Hadoop需要KV的格式!!map操作时把key舍去了,输出时就需要通过生成Key。第一个Stage有哪些RDD?HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD第二个Stage有哪些RDD?ShuffledRDD、MapPartitionsRDD只有Collect 或saveAsTextFile会触发作业,其他的时候都没有触发作业(Lazy)
DT大数据梦工厂
登录百度帐号推荐应用Spark Core Aggregator – 驴和羊
本文要介绍的是Spark Core中的Aggregator这个类。这个类的用处非常大,为什么这么说呢?我们都知道Spark支持传统的MapReduce模型,并基于这种模型提供了比Hadoop更多更高层次的计算接口。比如Spark Core PairRDD中非常常用的:
reduceByKey
提供聚合函数,将k-v对集合将相同key值的value聚合,这个方法会先在map端执行减少shuffle量,然后在reduce端执行
aggregateByKey 与reduceByKey类似,不过会将k-v对集合聚合变形为新的k-u类型的RDD。需要提供两个方法:seqOp[(U,V)=&U]在单个分区内将原始V类型的值merge到U类型的汇总值方法,以及combineOp[(U,U)=&]在不同分区间将聚合结果merge的方法。
groupByKey 将原本的RDD,根据key值进行分组,返回RDD[K, Iterable[V]]结果。顺带说一嘴:这个方法不会做map端的combine操作(因为实际上数据结果并没有减少,反而因为需要插入到hash table中会增加老年代的内存压力)
而这几种方法底层都是依赖于combineByKeyWithClassTag(在Spark1.4中是combineByKey,新版本接口增加了ClassTag,支持原生类型)。这个函数的实现方式如以下代码所示:
combineByKeyWithClassTag方法实现
def combineByKeyWithClassTag[C](
createCombiner: V =& C,
mergeValue: (C, V) =& C,
mergeCombiners: (C, C) =& C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
... // 省略参数检查
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter =& {
val context = TaskContext.get()
new InterruptibleIterator(context, bineValuesByKey(iter, context))
}, preservesPartitioning = true)
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
123456789101112131415161718192021222324
def combineByKeyWithClassTag[C](&&&&&&createCombiner: V =& C,&&&&&&mergeValue: (C, V) =& C,&&&&&&mergeCombiners: (C, C) =& C,&&&&&&partitioner: Partitioner,&&&&&&mapSideCombine: Boolean = true,&&&&&&serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {&&&&... // 省略参数检查&&&&val aggregator = new Aggregator[K, V, C](&&&&&&self.context.clean(createCombiner),&&&&&&self.context.clean(mergeValue),&&&&&&self.context.clean(mergeCombiners))&&&&if (self.partitioner == Some(partitioner)) {&&&&&&self.mapPartitions(iter =& {&&&&&&&&val context = TaskContext.get()&&&&&&&&new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))&&&&&&}, preservesPartitioning = true)&&&&} else {&&&&&&new ShuffledRDD[K, V, C](self, partitioner)&&&&&&&&.setSerializer(serializer)&&&&&&&&.setAggregator(aggregator)&&&&&&&&.setMapSideCombine(mapSideCombine)&&&&}&&}
该函数依赖于六个参数,计算逻辑如图
createCombiner: V =& C 提供聚合过程的初始值的函数
mergeValue: (C, V) =& C 将V值merge到聚合值中的方法
mergeCombiners: (C, C) =& C 将聚合值进行聚合的方法
partitioner: Partitioner 指定分区方法
mapSideCombine: Boolean = true 告诉ShuffleRDD(其实是告诉它依赖的ShuffleDependency)是否需要做Map端的combine 过程
serializer: Serializer = null 序列化,一般不用指定
如图可以看出来,由于Spark懒式计算的原则,现在只是生成了MapPartitionsRDD 或者 ShuffledRDD,当遇到类似collect / count 等操作的时候这些RDD就会依赖DAGScheduler 的调度,递归的先执行依赖,然后一步步执行完成。当然,我们的主角Aggregator 在计算Shuffled Dependency的时候就会完全发挥出作用来了。具体它是怎么做的呢?
我们都知道Shuffle RDD 的行为是新创建一个stage,然后顺次执行它依赖的stage,并且读取最终执行完的结果。而从上面我们可以看到,我们将Aggregator 设置到了Shuffle Dependency 中了,所以我们猜测这个聚合操作就应该在读取执行结果的前后过程中。果然,在HashShuffleWriter 和SortShuffleWriter(后者用的更多些)的write方法中,在BlockStoreShuffleReader 这个类的read 方法中,我们找到了:
Writer逻辑
// SortShuffleWriter 的write方法
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
// 需要map端做聚合,则先做聚合,然后将结果迭代器传给外部排序器,完成聚合后排序
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
// 无聚合逻辑
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
sorter.insertAll(records)
... // 省略
// HashShuffleWriter 的write方法
override def write(records: Iterator[Product2[K, V]]): Unit = {
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
map端的combine逻辑
dep.bineValuesByKey(records, context)
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
for (elem &- iter) {
val bucketId = dep.partitioner.getPartition(elem._1)
shuffle.writers(bucketId).write(elem._1, elem._2)
123456789101112131415161718192021222324252627282930313233343536
// SortShuffleWriter 的write方法&&override def write(records: Iterator[Product2[K, V]]): Unit = {&&&&sorter = if (dep.mapSideCombine) {&&&&&&// 需要map端做聚合,则先做聚合,然后将结果迭代器传给外部排序器,完成聚合后排序&&&&&&require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")&&&&&&new ExternalSorter[K, V, C](&&&&&&&&context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)&&&&} else {&&&& // 无聚合逻辑&&&&&&new ExternalSorter[K, V, V](&&&&&&&&context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)&&&&}&&&&sorter.insertAll(records)&&&&&... // 省略&&}&// HashShuffleWriter 的write方法&&&&override def write(records: Iterator[Product2[K, V]]): Unit = {&&&&val iter = if (dep.aggregator.isDefined) {&&&&&&if (dep.mapSideCombine) {&&&&&&&&//&&map端的combine逻辑&&&&&&&&dep.aggregator.get.combineValuesByKey(records, context)&&&&&&} else {&&&&&&&&records&&&&&&}&&&&} else {&&&&&&require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")&&&&&&records&&&&}&&&&&for (elem &- iter) {&&&&&&val bucketId = dep.partitioner.getPartition(elem._1)&&&&&&shuffle.writers(bucketId).write(elem._1, elem._2)&&&&}&&}
Reader逻辑
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val blockFetcherItr = new ShuffleBlockFetcherIterator(
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
... //省略无关逻辑
// 如果结果需要进行聚合
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// 将map端combine的结果再次进行聚合
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
dep.bineCombinersByKey(combinedKeyValuesIterator, context)
// map端并没有进行聚合
// 直接根据Value做聚合操作,这里V的类型是Nothing,代表此处并不关心原始的RDD
// 的Value类型是什么,直接传入combineValuesByKey里面,进行聚合,返回最后我
// 们需要的[K, C]类型迭代器即可
val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.bineValuesByKey(keyValuesIterator, context)
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
// 省略了判断结果是否需要排序
// 如果结果需要排序的话 则将聚合的结果迭代器传递给排序器,使用外部排序收集结果,与SortShuffleWriter类似
// 如果不排序直接返回聚合结果
12345678910111213141516171819202122232425262728293031323334
&&/** Read the combined key-values for this reduce task */&&override def read(): Iterator[Product2[K, C]] = {&&&&val blockFetcherItr = new ShuffleBlockFetcherIterator(&&&&&&context,&&&&&&blockManager.shuffleClient,&&&&&&blockManager,&&&&&&mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),&&&&&&SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)&&&&&... //省略无关逻辑&&&&&// 如果结果需要进行聚合&&&&val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {&&&&&&if (dep.mapSideCombine) {&&&&&&&&// 将map端combine的结果再次进行聚合&&&&&&&&val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]&&&&&&&&dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)&&&&&&} else {&&&&&&&&// map端并没有进行聚合&&&&&&&&// 直接根据Value做聚合操作,这里V的类型是Nothing,代表此处并不关心原始的RDD&&&&&&&&// 的Value类型是什么,直接传入combineValuesByKey里面,进行聚合,返回最后我&&&&&&&&// 们需要的[K, C]类型迭代器即可&&&&&&&&val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]&&&&&&&&dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)&&&&&&}&&&&} else {&&&&&&require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")&&&&&&interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]&&&&}&&&&&... &&&&// 省略了判断结果是否需要排序&&&&// 如果结果需要排序的话 则将聚合的结果迭代器传递给排序器,使用外部排序收集结果,与SortShuffleWriter类似&&&&// 如果不排序直接返回聚合结果
在这里我们看到了,通过调用Aggregator的以下两个方法就完成了Shuffle过程中map端和reduce端的聚合操作:
def combineValuesByKey(iter: Iterator[_ &: Product2[K, V]], context: TaskContext): Iterator[(K, C)] 直接对依赖RDD 的值进行聚合
def combineCombinersByKey(iter: Iterator[_ &: Product2[K, C]], context: TaskContext): Iterator[(K, C)] 对依赖RDD 产出的聚合结果进行再次聚合
这两个函数的逻辑非常相似,我只把第一个函数贴出来分析里面进行的操作
聚合操作的逻辑
def combineValuesByKey(iter: Iterator[_ &: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = {
// 使用了ExternalAppendOnlyMap 进行聚合操作
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
updateMetrics(context, combiners)
combiners.iterator
&&def combineValuesByKey(iter: Iterator[_ &: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = {&&&&// 使用了ExternalAppendOnlyMap 进行聚合操作&&&&val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)&&&&combiners.insertAll(iter)&&&&updateMetrics(context, combiners)&&&&combiners.iterator&&}
我们看到,这里Spark使用了一个ExternalAppendOnlyMap 这样的Map 结构,将所有的数据insertAll 到这个结构里面来,结果调用iterator 方法就可以获得最终结果了。我们可以理解,如果是一个Map 的话,我们可以将所有的key 作为map 的key,然后迭代取上一轮的结果,判断key 如果存在于map 中的话,将value 取出,跟新的数据做聚合,然后写回map中即可。原来Aggregator 的实现也很简单嘛,跟我们单机版的并没有什么不同。但现在有一个最重要的问题:如果数据量太大,内存中存储不下了怎么办?
这种情况在大数据场景下几乎是必然发生的问题。别担心,ExternalAppendOnlyMap 这个名字中的External 就告诉我们它肯定是可以向磁盘进行Spill 的,它也是一个Spark Spill 到磁盘的机制很典型的例子。我们来看看它是如何优雅的处理这个问题的。
如左图,我们可以看出ExternalAppendOnlyMap 持续地读取数据,然后在SizeTrackingAppendOnlyMap (使用开放寻址和二次探测的方式实现的不可删除的HashMap,为什么要这么实现?留作一个问题,大家请自己思考。)中不断做聚合操作。如果数据一旦超出规定的阈值,就将currentMap 按照hash 值排序后spill 到磁盘上(按照hash 排序很重要!!!),然后创建一个新的map继续重复这样的操作。
但大家就会问了,这样子的话,一个key 可能会存在在内存中、多个DiskIterator 中,那其实并没有完成真正的数据聚合啊?这时候,当insertAll 完成之后,我们将会调用iterator 方法,在这里是真正完成聚合的关键所在(右图所示)。iterator 返回了一个基于内存中currentMap 和DiskIterator 两部分数据的多路归并迭代器。这个迭代器,每次在调用next 方法的时候都会在内部的优先级队列(按每个迭代器最小hash值作为比较对象的堆结构),寻找最小的hash值且key值相等的所有元素(因为我们每个map 都是排序过的,所以这总能实现),进行merge,将所有符合要求的元素merge完成后返回。这样便完成了最终的聚合操作。
所以我们总结下:
groupByKey/ reduceByKey/ aggregateByKey/ foldByKey 等都是用Aggregator 实现的。其中groupByKey 没有做Map 端的combine,且分组操作比较重,如果只是要做聚合操作,那建议用后三种操作。
Aggregator 作用的位置是ShuffleWriter 和ShuffleReader 的write 和read 过程,分别完成map 端的combine 和 reduce端的聚合。
Aggregator 内部实现考虑到了内存不足,进行磁盘spill 的场景,它采用多个基于开放寻址的不可删除SizeTrackingAppendOnlyMap 进行聚合,然后内存超过阈值是进行spill,最后迭代器中多路归并完成聚合操作。
对我们的优化的意义:
不要进行groupByKey.map(_._2.size) 类似这样的操作来统计每个key的count数,因为groupByKey 操作非常重,这种情况用其它聚合方式
Aggregator 使用的还是基于近于java的HashMap的方式进行内存中的聚合的,这个方式是比较消耗内存的,所以在这种过程中很容易发生多次磁盘Spill,容易在老年代生成很多对象,容易发生GC,导致性能问题。所以在这种情况下就要求
合理进行分区,要对自己数据进行更多的测试,分区数量要足够,否则很容易出现性能问题
如果Shuffle数据量太大的话,建议不要使用这种方式,可以使用repartitionAndSortWithinPartition这种函数做特异性优化。

我要回帖

更多关于 combinelatest reduce 的文章

 

随机推荐