平果县安装App要怎么安?

当前位置: →
→ hadoop代码笔记hadoop作业提交之汇总
hadoop代码笔记hadoop作业提交之汇总
& 作者及来源: douba - 博客园 &
&收藏到→_→:
摘要: 【hadoop代码笔记】hadoop作业提交之汇总
"hadoop代码笔记hadoop作业提交之汇总"::
在本篇博文中,试图通过代码了解hadoop job执行的整个流程。即用户提交的mapreduce的jar文件、输入提交到hadoop的集群,并在集群中运行。重点在代码的角度描述整个流程,有些细节描述的并不那么详细。
汇总的代码流程图附件:&hadoop_mapreduce_jobsubmit
二、主要流程
&jobclient通过rpc方式调用到jobtracker的submitjob方法提交作业,包括作业的jar、分片和作业描述。
jobtracker的submitjob方法吧job加入到内存队列中,由独立的线程取出每个jobinprogress的对象调用其 inittasks方法,根据传入的作业分片创建对应数量的taskinprogress类型的maptask和指定数量的reduce task。
tasktracker的offerservice定时调用jobtracker的heartbeat发心跳给jobtracker报告状态并获取要执行的task。在haeartbeat中其实是通过配置的taskscheduler来分配task的。
&tasktracker初始化时,会初始化并启动两个tasklauncher类型的线程,maplauncher,reducelauncher。在tasktracker从jobtracher获取到任务后,对应的会把任务添加到两个 tasklauncher的queue中。tasklauncher线程一直会定时检查tasktracher上面有slot可以运行新的task,则启动task。
先把task运行需要的文件解压到本地,并创建根据task类型(map或者reduce)创建一个taskrunner线程,在 taskrunner中jvmmanager调用jvmmanagerfortype、jvmrunner来启动一个java进程来执行map或 reduce任务。在taskrunner线程执行中,会构造一个java &d** child address port tasked这 样第一个java命令,单独启动一个java进程。在child的main函数中通过taskumbilicalprotocol协议,从 tasktracker获得需要执行的task,并调用task的run方法来执行。
对于maptask的的run方法会通过java反射机制构造根据配置 mapper,inputformat,mappercontext等对象,然后调用构造的mapper的run方法执行mapper操作。
对于reducetask,由reducecopier对象的不同线程来获取map输出地址,拷贝输出,merge输出等操作。并利用反射机制根据配置的reducer类构造一个reducer实例和运行的上下文。并调用reducer的run方法来执行到用户定义的reduce操作。
mapreduce 作业提交和执行
三、详细流程、
一)&jobtracker等相关功能模块初始化(详细)
本来按照流程,第一步骤应该是jobclient向jobtracker发起作业提交的请求。为了更好的理解jobtracker是如何接收从 jobclient提交的作业,有必要了解jobtracker相关的服务(和功能模块)的初始化过程。即jobtracker作为一个服务启动起来,包括其附属的其他服务(和功能模块)。以接受jobclient的作业提交,初始化作业,向tasktracker分配任务。
jobtracker等相关功能模块初始化
jobtracker 的main函数中调用其starttracker方法。
在main函数中调用offerservice,启动各个子服务项(大部分形态都此文来自: 马开东博客
转载请注明出处 网址:
是线程,有些是其他的初始化,如taskscheduler)
&在starttracker中调用其构造函数,在构造函数中对其中重要的属性根据配置进行初始化。(个人感觉再构造中设置scheduler,在stattracker调用构造的下一句有给scheduler传jobtracker的引用,有点不自然)。scheduler和 jobtracker实例间,scheduler包含jobtracker(实际上就是tasktrackermanager)对象,通过 tasktrackermanager对象获取hadoop集群的一些信息,如slot总数,queuemanager对象,这些都是调度器中调度算法输入的指标;jobtracker中要包含scheduler对象,使用scheduler来为tasktracker分配task。
在offerservice()中启动taskschedulerexpiretrackersthread retirejobsthread expirelaunchingtaskthread completedjobsstorethread intertrackerserver等几个线程来共同完成服务。同时调用taskscheduler的start方法进行初始化。
在fairscheduler调度器的start方法中调用eagertaskinitializationlistenerr的start方法来初始化eagertaskinitializationlistener
&在fairscheduler调度器的start方法中调用defaulttaskselector的start方法来初始化defaulttaskselector,因为该类实现的taskselector太简单,start方法里也没有做任何事情。
二)客户端作业提交(详细)
jobclient使用内置的jobsubmissionprotocol 实例jobsubmitclient 和jobtracker交互。向jobtracker请求一个新的作业id,计算作业的输入分片,并将运行作业所需的资源(包括作业jar文件,配置文件和计算所得此文来自: 马开东博客
转载请注明出处 网址:
的输入分片)复制到jobtracker的文件系统中一个以作业id命名的目录下。
通过调用jobtracker的getnewjobid()向jobtracker请求一个新的作业id
获取job的jar、输入分片、作业描述等几个路径信息,以jobid命名。
其中getsystemdir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitjarfile、分片文件submitsplitfile、作业描述文件submitjobfile
检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.fileoutputformat的checkoutputspecs方法。如果没有指定,则抛出invalidjobconfexception,文件已经存在则抛出filealreadyexistsexception
计算作业的输入分片。通过inputformat的getsplits(job)方法获得作业的split并将split序列化封装为rawsplit。返回split数目,也即代表有多个分片有多少个map。详细参见inputformat获取split的方法。
writenewsplits 方法把输入分片写到jobtracker的job目录下。
将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业id命名的目录下。
使用句柄jobsubmissionprotocol通过rpc远程调用的submitjob()方法,向jobtracker提交作业。jobtracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。jobtracker创建job成功后会给jobclient传回一个jobstatus对象用于记录job的状态信息,如执行时间、map和reduce任务完成的比例等。jobclient会根据这个jobstatus对象创建一个 networkedjob的runningjob对象,用于定时从jobtracker获得执行过程的统计数据来监控并打印到用户的控制台。
&三)jobtracker接收作业(详细)
jobtracker根据接收到的submitjob()方法调用后,把调用放入到内存队列中,由作业调度器进行调度。并初始化作业实例,从共享文件系统中获取jobclient计算好的输入分片信息,为每个分片创建一个map任务,根据mapred.reduce.task设置来创建指定数量的 reduce任务。
jobtracker接收作业提交
&jobclient通过rpc的方式向jobtracker提交作业;
调用jobtracker的submitjob方法。该方法是jobtracker向外提供的供调用的提交作业的接口。
submit方法中调用jobtracker的addjob方法。
在addjob方法中会把作业加入到集合中供调度,并会触发注册的jobinprogresslistener的jobadded事件。由上篇博文的jobtracker相关服务和功能的初始化的fairscheduler的start方法中看到,这里注册的是两个jobinprogresslistener。分别是fairscheduler的内部类joblistener和eagertaskinitializationlistener。
&fairscheduler的内部类joblistener响应jobadded事件事件。只是为每个加入的job创建一个用于fairscheduler调度用的jobinfo对象,并将其和job的对应的存储在map&jobinprogress, jobinfo& infos集合中。
&eagertaskinitializationlistener响应jobadded事件事件。jobadded 只是简单的把job加入到一个list&jobinprogress&类型的 jobinitqueue中。并不直接对其进行初始化,对其中的job的处理由另外线程jobinitmanager来做。该线程,一直检查 jobinitqueue是否有作业,有则拿出来从线程池中取一个线程initjob处理。关于作业的初始化过程专门在下一篇文章中介绍。
&四)job初始化(详细)
jobtracker响应作业提交请求,将提交的作业加入到一个列表中,由单独的线程来对列表中的job进行初始化。至此在jobtracker一端对提交的job的准备工作就完毕了。
hadoop job初始化
eagertaskinitializationlistener的 jobadded方法把jobinprogress类型的job放到list&jobinprogress&类型的 jobinitqueue中,有个单独的线程会对此文来自: 马开东博客
转载请注明出处 网址:
新加入的每个job进行初始化,其初始化调用的方法就是jobinprogress的方法 inittasks。
在jobinprogress的方法inittasks方法中,会根据传入的作业分片创建对应数量的taskinprogress类型的maptask,同时会创建taskinprogress类型的指定数量的reducetask。
taskinprogress的初始化是由其构造函数和构造函数中调用的init方法完成的。有构造maptask的构造函数和构造 reducetask的构造函数。分别是如下。其主要区别在于构造maptask是要传入输入分片信息的rawsplit,而reduce task则不需要。两个构造函数都要调用init方法,进行其他的初始化。
五)& tasktracker获取task,即jobtracker派发task(详细)
tasktracker定时发心跳给jobtracker,并从jobtracker获取要执行的task。jobtracker在分配map任务会考虑数据本地化,对于reduce任务不用考虑本地化。
tasktracker获取task
tasktracker在run中调用offerservice()方法一直死循环的去连接jobtracker,先jobtracker发送心跳,发送自身状态,并从jobtracker获取任务指令来执行。
在jobtracker的heartbeat方法中,对于来自每一个tasktracker的心跳请求,根据一定的作业调度策略调用assigntasks方法选择一定task
scheduler调用对应的loadmanager的canassignmap方法和canassignreduce方法以决定是否可以给 tasktracker分配任务。默认的是capbasedload,全局平均分配。即根据全局的任务槽数,全局的map任务数的比值得到一个load系 数,该系数乘以待分配任务的tasktracker的最大map任务数,即是该tasktracker能分配得到的任务数。如果太tracker当前运行 的任务数小于可运行的任务数,则任务可以分配新作业给他。
scheduler的调用taskselector的obtainnewmaptask或者obtainnewreducetask选择task。
在defaulttaskselector中选择task的方法其实只是封装了jobinprogress的对应方法。根据待派发task的 tasktracker根据集群中的tasktracker数量(clustersize),运行trasktracker的服务器数(numuniquehosts),该job中map task的平均进度(avgprogress),可以调度map的最大水平(距离其实),选择一个task执行。考虑到map的本地化,选择 reducetask时,不用考虑本地化。
jobtracker根据得到的task构造tasktrackeraction设置到到heartbeatresponse返回给tasktracker。
&tasktracker中将来自jobtracker的任务加入到taskqueue中等待执行。
&六)tasktracker启动task(详细)
tasktracker初始化时,会初始化并启动两个tasklauncher类型的线程,maplauncher,reducelauncher。在tasktracker从jobtracher获取到任务后,对应的会把任务添加到两个 tasklauncher的queue中,其实是tasklauncher维护的一个列表list&taskinprogress& taskstolaunch。 tasklauncher线程一直会定时检查tasktracher上面有slot开业运行新的task,则启动 task。在这个过程中,先把task运行需要的文件解压到本地,并创建根据task类型(map或者reduce)创建一个taskrunner线程, 在taskrunner中jvmmanager调用jvmmanagerfortype、jvmrunner来启动一个java进程来执行map或reduce任务。
tasktracker启动task
tasktracker的offerservice方法获取到要执行的task后调用addtotaskqueue方法,其实是调用taskrunner的addtotaskqueue方法
tasklauncher内部维护了一个list&taskinprogress& taskstolaunch,只是把task加入到该集合中
tasklauncher是一个线程,在其run方法中从taskstolaunch集合中取出task来执行,调用tasktracker的startnewtask方法启动task。
startnewtask方法中调用localizejob方法把job相关的配置信息和要运行的jar拷贝到tasktracker本地,然后调用taskinprogress的launchtask方法来启动task。
taskinprogress的launchtask方法先调用localizetask(task把task相关的配置信息获取到本地。然后创建一个taskrunner线程来启动task。
在taskrunner的run方法中构建一个java命令的执行的条件,包括引用类,执行目录等,入口类是child。然后调用jvmmanager 的launchjvm方法来调用。
jvmmanager 进而调用 jvmmanagerfortype的reapjvm,和spawnnewjvm 方法,发起调用。
在jvmmanagerfortype的spawnnewjvm 方法中创建了一个jvmrunner线程类执行调用。
jvmrunner线程的run调用runchild方法来执行 一个命令行的调用。
七)tasktracker运行map任务(详细)
taskrunner线程执行中,会构造一个java &d** child address port tasked这 样第一个java命令,单独启动一个java进程。在child的main函数中通过taskumbilicalprotocol协议,从 tasktracker获得需要执行的task,并调用task的run方法来执行,而task的run方法会通过java反射机制构造 mapper,inputformat,mappercontext,然后调用构造的mapper的run方法执行mapper操作。
child启动map任务
child类根据前面输入的三个参数,即tasktracher的地址、端口、taskid。通过taskumbilicalprotocol协议,从tasktracker获得需要执行的task,在child的main函数中调用执行。
在chilld中,执行task的run方法。task 的run方法。是真正执行用户定义的map或者reduce任务的入口,通过taskumbilicalprotocol向tasktracker上报执行进度。
在maptask的run中执行runmapper方法来调用mapper定义的方法。
在runnewmapper方法中构造mapper实例和mapper执行的配置信息。并执行mapper.run方法来调用到用户定义的mapper的方法。
mapper的run方法中,从输入数据中逐一取出调用map方法来处理每一条数据
mapper的map方法是真正用户定义的处理数据的类。也是用户唯一需要定义的方法。
八)&tasktracker运行reduce任务(详细)
taskrunner线程执行中,会构造一个java &d** child address port tasked这样第一个java命令,单独启动一个java进程。在child的main函数中通过taskumbilicalprotocol协议,从 tasktracker获得需要执行的task,并调用task的run方法来执行。在reducetask而task的run方法会通过java反射机制构造reducer,reducer.context,然后调用构造的reducer的run方法执行reduce操作。不同于map任务,在执行 reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。 reduce需要集群上若干个map任务的输出作为其特殊的分区文件。每个map任务完成的时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。其实是启动若干个mapoutputcopier线程来复制完所有map输出。在复制完成后 reduce任务进入排序阶段。这个阶段将由localfsmerger或inmemfsmergethread合并map输出,维持其顺序排序。【即对有序的几个文件进行归并,采用归并排序】在reduce阶段,对已排序输出的每个键都要调用reduce函数,此阶段的输出直接写到文件系统,一般为 hdfs上。(如果采用hdfs,由于tasktracker节点也是datanoe,所以第一个块副本将被写到本地磁盘。 即数据本地化) map 任务完成后,会通知其父tasktracker状态更新,然后tasktracker通知jobtracker。通过心跳机制来完成。因此 jobtracker知道map输出和tasktracker之间的映射关系。reducer的一个getmapcompletionevents线程定期询问jobtracker以便获取map输出位置。
child启动reduce任务
1. 在reducetak中 构建reducecopier对象,调用其fetchoutputs方法。
2.& 在reducecopier的fetchoutputs方法中分别构造几个独立的线程。相互配合,并分别独立的完成任务。
2.1& getmapeventsthread线程通过rpc询问tasktracker,对每个完成的event,获取maptask所在的服务器地址,即maptask输出的地址,构造url,加入到maplocations,供copier线程获取。
2.2 构造并启动若干个mapoutputcopier线程,通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。
2.搜索此文相关文章:此文来自: 马开东博客
网址: 站长QQ
hadoop代码笔记hadoop作业提交之汇总_博客园相关文章
博客园_总排行榜
博客园_最新
博客园_月排行榜
博客园_周排行榜
博客园_日排行榜Map-Reduce入门1
我的图书馆
Map-Reduce入门1
1、Map-Reduce的逻辑过程
假设我们需要处理一批有关天气的数据,其格式如下:
按照ASCII码存储,每行一条记录
每一行字符从0开始计数,第15个到第18个字符为年
第25个到第29个字符为温度,其中第25位是符号+/-
9991950051507+0000+
9991950051512+0022+
9991950051518-0011+
9991949032412+0111+
9991949032418+0078+
9991937051507+0001+
9991937051512-0002+
9991945051518+0001+
9991945032412+0002+
9991945032418+0078+
现在需要统计出每年的最高温度。
Map-Reduce主要包括两个步骤:Map和Reduce
每一步都有key-value对作为输入和输出:
map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应
对于上面的例子,在map过程,输入的key-value对如下:
(0, 9991950051507+0000+)
(33, 9991950051512+0022+)
(66, 9991950051518-0011+)
(99, 9991949032412+0111+)
(132, 9991949032418+0078+)
(165, 9991937051507+0001+)
(198, 9991937051512-0002+)
(231, 9991945051518+0001+)
(264, 9991945032412+0002+)
(297, 9991945032418+0078+)
在map过程中,通过对每一行字符串的解析,得到年-温度的key-value对作为输出:
(1950, 22)
(1950, -11)
(1949, 78)
(1937, -2)
(1945, 78)
在reduce过程,将map过程中的输出,按照相同的key将value放到同一个列表中作为reduce的输入
(1950, [0, 22, –11])
(1949, [111, 78])
(1937, [1, -2])
(1945, [1, 2, 78])
在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:
(1950, 22)
(1945, 78)
其逻辑过程可用如下图表示:
2、编写Map-Reduce程序
编写Map-Reduce程序,一般需要实现两个函数:mapper中的map函数和reducer中的reduce函数。
一般遵循以下格式:
map: (K1, V1)& -&& list(K2, V2)
public interface Mapper&K1, V1, K2, V2& extends JobConfigurable, Closeable {
& void map(K1 key, V1 value, OutputCollector&K2, V2& output, Reporter reporter)
& throws IOE
reduce: (K2, list(V))& -&& list(K3, V3)&
public interface Reducer&K2, V2, K3, V3& extends JobConfigurable, Closeable {
& void reduce(K2 key, Iterator&V2& values,
&&&&&&&&&&&&& OutputCollector&K3, V3& output, Reporter reporter)
&&& throws IOE
对于上面的例子,则实现的mapper如下:
public class MaxTemperatureMapper extends MapReduceBase implements Mapper&LongWritable, Text, Text, IntWritable& {
&&& @Override
&&& public void map(LongWritable key, Text value, OutputCollector&Text, IntWritable& output, Reporter reporter) throws IOException {
&&&&&&& String line = value.toString();
&&&&&&& String year = line.substring(15, 19);
&&&&&&& int airT
&&&&&&& if (line.charAt(25) == '+') {
&&&&&&&&&&& airTemperature = Integer.parseInt(line.substring(26, 30));
&&&&&&& } else {
&&&&&&&&&&& airTemperature = Integer.parseInt(line.substring(25, 30));
&&&&&&& output.collect(new Text(year), new IntWritable(airTemperature));
实现的reducer如下:
public class MaxTemperatureReducer extends MapReduceBase implements Reducer&Text, IntWritable, Text, IntWritable& {
&&& public void reduce(Text key, Iterator&IntWritable& values, OutputCollector&Text, IntWritable& output, Reporter reporter) throws IOException {
&&&&&&& int maxValue = Integer.MIN_VALUE;
&&&&&&& while (values.hasNext()) {
&&&&&&&&&&& maxValue = Math.max(maxValue, values.next().get());
&&&&&&& output.collect(key, new IntWritable(maxValue));
欲运行上面实现的Mapper和Reduce,则需要生成一个Map-Reduce得任务(Job),其基本包括以下三部分:
输入的数据,也即需要处理的数据
Map-Reduce程序,也即上面实现的Mapper和Reducer
此任务的配置项JobConf
欲配置JobConf,需要大致了解Hadoop运行job的基本原理:
Hadoop将Job分成task进行处理,共两种task:map task和reduce task
Hadoop有两类的节点控制job的运行:JobTracker和TaskTracker
JobTracker协调整个job的运行,将task分配到不同的TaskTracker上
TaskTracker负责运行task,并将结果返回给JobTracker
Hadoop将输入数据分成固定大小的块,我们称之input split
Hadoop为每一个input split创建一个task,在此task中依次处理此split中的一个个记录(record)
Hadoop会尽量让输入数据块所在的DataNode和task所执行的DataNode(每个DataNode上都有一个TaskTracker)为同一个,可以提高运行效率,所以input split的大小也一般是HDFS的block的大小。
Reduce task的输入一般为Map Task的输出,Reduce Task的输出为整个job的输出,保存在HDFS上。
在reduce中,相同key的所有的记录一定会到同一个TaskTracker上面运行,然而不同的key可以在不同的TaskTracker上面运行,我们称之为partition
partition的规则为:(K2, V2) –& Integer, 也即根据K2,生成一个partition的id,具有相同id的K2则进入同一个partition,被同一个TaskTracker上被同一个Reducer进行处理。
public interface Partitioner&K2, V2& extends JobConfigurable {
& int getPartition(K2 key, V2 value, int numPartitions);
下图大概描述了Map-Reduce的Job运行的基本原理:
下面我们讨论JobConf,其有很多的项可以进行配置:
setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text
setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数
setMapperClass:设置Mapper,默认为IdentityMapper
setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数
setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式
setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式
setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数
setReducerClass:设置Reducer,默认为IdentityReducer
setOutputFormat:设置任务的输出格式,默认为TextOutputFormat
FileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径
FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在
当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:
public class MaxTemperature {
&&& public static void main(String[] args) throws IOException {
&&&&&&& if (args.length != 2) {
&&&&&&&&&&& System.err.println("Usage: MaxTemperature &input path& &output path&");
&&&&&&&&&&& System.exit(-1);
&&&&&&& JobConf conf = new JobConf(MaxTemperature.class);
&&&&&&& conf.setJobName("Max temperature");
&&&&&&& FileInputFormat.addInputPath(conf, new Path(args[0]));
&&&&&&& FileOutputFormat.setOutputPath(conf, new Path(args[1]));
&&&&&&& conf.setMapperClass(MaxTemperatureMapper.class);
&&&&&&& conf.setReducerClass(MaxTemperatureReducer.class);
&&&&&&& conf.setOutputKeyClass(Text.class);
&&&&&&& conf.setOutputValueClass(IntWritable.class);
&&&&&&& JobClient.runJob(conf);
3、Map-Reduce数据流(data flow)
Map-Reduce的处理过程主要涉及以下四个部分:
客户端Client:用于提交Map-reduce任务job
JobTracker:协调整个job的运行,其为一个Java进程,其main class为JobTracker
TaskTracker:运行此job的task,处理input split,其为一个Java进程,其main class为TaskTracker
HDFS:hadoop分布式文件系统,用于在各个进程间共享Job相关的文件
3.1、任务提交
JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。
向JobTracker请求一个新的job ID
检测此job的output配置
计算此job的input splits
将Job运行所需的资源拷贝到JobTracker的文件系统中的文件夹中,包括job jar文件,job.xml配置文件,input splits
通知JobTracker此Job已经可以运行了
提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务运行完毕。
3.2、任务初始化
当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。
初始化首先创建一个对象来封装job运行的tasks, status以及progress。
在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。
其为每个input split创建一个map task。
每个task被分配一个ID。
3.3、任务分配
TaskTracker周期性的向JobTracker发送heartbeat。
在heartbeat中,TaskTracker告知JobTracker其已经准备运行一个新的task,JobTracker将分配给其一个task。
在JobTracker为TaskTracker选择一个task之前,JobTracker必须首先按照优先级选择一个Job,在最高优先级的Job中选择一个task。
TaskTracker有固定数量的位置来运行map task或者reduce task。
默认的调度器对待map task优先于reduce task
当选择reduce task的时候,JobTracker并不在多个task之间进行选择,而是直接取下一个,因为reduce task没有数据本地化的概念。
3.4、任务执行
TaskTracker被分配了一个task,下面便要运行此task。
首先,TaskTracker将此job的jar从共享文件系统中拷贝到TaskTracker的文件系统中。
TaskTracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。
其次,其为每个task创建一个本地的工作目录,将jar解压缩到文件目录中。
其三,其创建一个TaskRunner来运行task。
TaskRunner创建一个新的JVM来运行task。
被创建的child JVM和TaskTracker通信来报告运行进度。
3.4.1、Map的过程
MapRunnable从input split中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。
map的输出并不是直接写入硬盘,而是将其写入缓存memory buffer。
当buffer中数据的到达一定的大小,一个背景线程将数据开始写入硬盘。
在写入硬盘之前,内存中的数据通过partitioner分成多个partition。
在同一个partition中,背景线程会将数据按照key在内存中排序。
每次从内存向硬盘flush数据,都生成一个新的spill文件。
当此task结束之前,所有的spill文件被合并为一个整的被partition的而且排好序的文件。
reducer可以通过http协议请求map的输出文件,tracker.http.threads可以设置http服务线程数。
3.4.2、Reduce的过程
当map task结束后,其通知TaskTracker,TaskTracker通知JobTracker。
对于一个job,JobTracker知道TaskTracer和map输出的对应关系。
reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出。
reduce task需要其对应的partition的所有的map输出。
reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。
reduce task中有多个copy线程,可以并行拷贝map输出。
当很多map输出拷贝到reduce task后,一个背景线程将其合并为一个大的排好序的文件。
当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。
最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。
3.5、任务结束
当JobTracker获得最后一个task的运行成功的报告后,将job得状态改为成功。
当JobClient从JobTracker轮询的时候,发现此job已经成功结束,则向用户打印消息,从runJob函数中返回。
发表评论:
TA的最新馆藏

我要回帖

更多关于 平果怎么安装两个微信 的文章

 

随机推荐