求助,易语言验证码识别模块一下是什么模块

工具类服务
编辑部专用服务
作者专用服务
一种基于动态规划的全局双序列比对优化算法
序列比对是生物信息处理中非常重要的一类方法,基本的序列比对算法是基于动态规划思想提出的.本文提出了一种基于动态规划思想的全局双序列比对优化算法(Optimized Global Pairwise Sequence Alignment based on the idea of Dynamic Programming)OGP-SADP,在保持基本动态规划敏感性的前提下,GOPSA方法计算替换矩阵时只需存储当前相邻两列的元素,同时引用checkpoint技术以减少计算迭代次数,有效降低了时间复杂度和空间复杂度.
LI Nie-lan
LI Qi-shen
ZHANG Yong
作者单位:
南昌航空工业学院,计算机学院,江西,南昌,330063
年,卷(期):
机标分类号:
在线出版日期:
基金项目:
国家博士科研启动项目
本文读者也读过
相关检索词
万方数据知识服务平台--国家科技支撑计划资助项目(编号:2006BAH03B01)(C)北京万方数据股份有限公司
万方数据电子出版社发表了一篇文章
如果使用 JobControl,则用户只需使用 addDepending() 函数添加作业依赖关系接口,JobControl 会按照依赖关系调度各个作业,具体代码如下:
Configuration extractJobConf = new Configuration();
Configuration classPriorJobConf = new Configuration();
Configuration conditionalProbilityJobConf = new Configuration();
Configuration predictJobConf = new Configuration();
...// 设置各个Configuration
// 创建Job对象。注意,JobControl要求作业必须封装成Job对象
Job extractJob = new Job(extractJobConf);
Job classPriorJob = new Job(classPriorJobConf);
Job conditionalProbilityJob = new Job(conditionalProbilityJobConf);
Job predictJob = new Job(predictJobConf);
//设置依赖关系,构造一个DAG作业
classPriorJob.addDepending(extractJob);
conditionalProbilityJob.addDepending(extractJob);
predictJob.addDepending(classPriorJob);
predictJob.addDepending(conditionalProbilityJob);
//创建JobControl对象,由它对作业进行监控和调度
JobControl JC = new JobControl("Native Bayes");
JC.addJob(extractJob);//把4个作业加入JobControl中
JC.addJob(classPriorJob);
JC.addJob(conditionalProbilityJob);
JC.addJob(predictJob);
JC.run(); //提交DAG作业
在实际运行过程中,不依赖于其他任何作业的 extractJob 会优先得到调度,一旦运行完成,classPriorJob 和
conditionalProbilityJob 两个作业同时被调度,待它们全部运行完成后,predictJob
被调度。对比以上两种方案,可以得到一个简单的结论:使用 JobControl 编写 DAG 作业更加简便,且能使多个无依赖关系的作业并行运行。
JobControl 设计原理分析
JobControl 由两个类组成:Job 和 JobControl。其中,Job 类封装了一个 MapReduce 作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态,其
状态转移图如图所示。作业刚开始处于 WAITING 状态。如果没有依赖作业或者所有依赖作业均已运行完成,则进入READY 状态。一旦进入
READY 状态,则作业可被提交到 Hadoop 集群上运行,并进入 RUNNING 状态。在 RUNNING
状态下,根据作业运行情况,可能进入 SUCCESS 或者 FAILED 状态。需要注意的是,如果一个作业的依赖作业失败,则该作业也会失败,于是形成“多米诺骨牌效应”, 后续所有作业均会失败。
JobControl
封装了一系列 MapReduce 作业及其对应的依赖关系。
它将处于不同状态的作业放入不同的哈希表中,并按照图所示的状态转移作业,直到所有作业运行完成。在实现的时候,JobControl
包含一个线程用于周期性地监控和更新各个作业的运行状态,调度依赖作业运行完成的作业,提交处于 READY 状态的作业等。同时,它还提供了一些API
用于挂起、恢复和暂停该线程。
Job类深入剖析
在Job类的起始部分,定义了一些数据域,包括job所处的状态,以及其他相关的信息,具体代码如下:
import java.util.ArrayL
import org.apache.hadoop.mapred.JobC
import org.apache.hadoop.mapred.JobC
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.jobcontrol.J
// 一个 job 将处于如下的一种状态
final public static int SUCCESS = 0;
final public static int WAITING = 1;
final public static int RUNNING = 2;
final public static int READY = 3;
final public static int FAILED = 4;
final public static int DEPENDENT_FAILED = 5;
//依赖的作业失败
private JobConf theJobC
private int
private String jobID;
// 通过JobControl class分配和使用
private JobID mapredJobID;
// 通过map/reduce分配的job ID
private String jobN
// 外部名字, 通过client app分配/使用
// 一些有用的信息例如用户消耗,
// e.g. job失败的原因
private ArrayList&Job& dependingJ
// 当前job所依赖的jobs列表
private JobClient jc = null;
// map reduce job client
接着定义了两个构造函数:
* Construct a job.
* @param jobConf a mapred job configuration representing a job to be executed.
* @param dependingJobs an array of jobs the current job depends on
public Job(JobConf jobConf, ArrayList&Job& dependingJobs) throws IOException {
this.theJobConf = jobC
this.dependingJobs = dependingJ
this.state = Job.WAITING;
this.jobID = "unassigned";
this.mapredJobID = null; //not yet assigned
this.jobName = "unassigned";
this.message = "just initialized";
this.jc = new JobClient(jobConf);
* Construct a job.
* @param jobConf mapred job configuration representing a job to be executed.
* @throws IOException
public Job(JobConf jobConf) throws IOException {
this(jobConf, null);
接着重写了String类中的toString方法,代码如下:
接下来是一长串的get/set获取设置属性的代码:
当Job处于writing状态下的时候,可以向依赖列表中添加所依赖的Job:
* Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job
* is waiting to run, not during or afterwards.
* @param dependingJob Job that this Job depends on.
* @return &tt&true&/tt& if the Job was added.
public synchronized boolean addDependingJob(Job dependingJob) {
if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
if (this.dependingJobs == null) {
this.dependingJobs = new ArrayList&Job&();
return this.dependingJobs.add(dependingJob);
return false;
还提供了是否处于完成状态和是否处于准备状态的判断方法:
* @return true if this job is in a complete state
public boolean isCompleted() {
return this.state == Job.FAILED ||
this.state == Job.DEPENDENT_FAILED ||
this.state == Job.SUCCESS;
* @return true if this job is in READY state
public boolean isReady() {
return this.state == Job.READY;
提供了检查正在运行的Job的状态,如果完成,判断是成功还是失败,代码如下:
* Check the state of this running job. The state may
* remain the same, become SUCCESS or FAILED.
private void checkRunningState() {
RunningJob running = null;
running = jc.getJob(this.mapredJobID);
if (running.isComplete()) {
if (running.isSuccessful()) {
this.state = Job.SUCCESS;
this.state = Job.FAILED;
this.message = "Job failed!";
running.killJob();
} catch (IOException e1) {
this.jc.close();
} catch (IOException e2) {
} catch (IOException ioe) {
this.state = Job.FAILED;
this.message = StringUtils.stringifyException(ioe);
if (running != null)
running.killJob();
} catch (IOException e1) {
this.jc.close();
} catch (IOException e1) {
实现了检查并更新Job的状态的checkState()方法:
* Check and update the state of this job. The state changes
* depending on its current state and the states of the depending jobs.
synchronized int checkState() {
if (this.state == Job.RUNNING) {
checkRunningState();
if (this.state != Job.WAITING) {
return this.
if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
this.state = Job.READY;
return this.
Job pred = null;
int n = this.dependingJobs.size();
for (int i = 0; i & i++) {
pred = this.dependingJobs.get(i);
int s = pred.checkState();
if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
break; // a pred is still not completed, continue in WAITING
if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
this.state = Job.DEPENDENT_FAILED;
this.message = "depending job " + i + " with jobID "
+ pred.getJobID() + " failed. " + pred.getMessage();
// pred must be in success state
if (i == n - 1) {
this.state = Job.READY;
return this.
最后包含提交Job的方法submit(),代码如下:
* Submit this job to mapred. The state becomes RUNNING if submission
* is successful, FAILED otherwise.
protected synchronized void submit() {
if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
FileSystem fs = FileSystem.get(theJobConf);
Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
for (int i = 0; i & inputPaths. i++) {
if (!fs.exists(inputPaths[i])) {
fs.mkdirs(inputPaths[i]);
} catch (IOException e) {
RunningJob running = jc.submitJob(theJobConf);
this.mapredJobID = running.getID();
this.state = Job.RUNNING;
} catch (IOException ioe) {
this.state = Job.FAILED;
this.message = StringUtils.stringifyException(ioe);
完整的Job类源代码如下:
JobControl类深入剖析
在JobControl类的起始部分,定义了一些数据域,包括线程所处的状态,以及其他相关的信息,具体代码如下:
// The thread can be in one of the following state
private static final int RUNNING = 0;
private static final int SUSPENDED = 1;
private static final int STOPPED = 2;
private static final int STOPPING = 3;
private static final int READY = 4;
private int runnerS
// the thread state
private Map&String, Job& waitingJ
private Map&String, Job& readyJ
private Map&String, Job& runningJ
private Map&String, Job& successfulJ
private Map&String, Job& failedJ
private long nextJobID;
private String groupN
接下来是对应的构造函数:
* Construct a job control for a group of jobs.
* @param groupName a name identifying this group
public JobControl(String groupName) {
this.waitingJobs = new Hashtable&String, Job&();
this.readyJobs = new Hashtable&String, Job&();
this.runningJobs = new Hashtable&String, Job&();
this.successfulJobs = new Hashtable&String, Job&();
this.failedJobs = new Hashtable&String, Job&();
this.nextJobID = -1;
this.groupName = groupN
this.runnerState = JobControl.READY;
接着是一个将Map的Jobs转换为ArrayList的转换方法(toArrayList),代码如下:
private static ArrayList&Job& toArrayList(Map&String, Job& jobs) {
ArrayList&Job& retv = new ArrayList&Job&();
synchronized (jobs) {
for (Job job : jobs.values()) {
retv.add(job);
类中当然少不了一些get方法:
* @return the jobs in the success state
public ArrayList&Job& getSuccessfulJobs() {
return JobControl.toArrayList(this.successfulJobs);
public ArrayList&Job& getFailedJobs() {
return JobControl.toArrayList(this.failedJobs);
private String getNextJobID() {
nextJobID += 1;
return this.groupName + this.nextJobID;
类中还有将Job插入Job队列的方法:
private static void addToQueue(Job aJob, Map&String, Job& queue) {
synchronized(queue) {
queue.put(aJob.getJobID(), aJob);
private void addToQueue(Job aJob) {
Map&String, Job& queue = getQueue(aJob.getState());
addToQueue(aJob, queue);
既然有插入队列,就有从Job队列根据Job运行状态而取出的方法,代码如下:
private Map&String, Job& getQueue(int state) {
Map&String, Job& retv = null;
if (state == Job.WAITING) {
retv = this.waitingJ
} else if (state == Job.READY) {
retv = this.readyJ
} else if (state == Job.RUNNING) {
retv = this.runningJ
} else if (state == Job.SUCCESS) {
retv = this.successfulJ
} else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
retv = this.failedJ
添加一个新的Job的方法:
* Add a new job.
* @param aJob the new job
synchronized public String addJob(Job aJob) {
String id = this.getNextJobID();
aJob.setJobID(id);
aJob.setState(Job.WAITING);
this.addToQueue(aJob);
* Add a collection of jobs
* @param jobs
public void addJobs(Collection&Job& jobs) {
for (Job job : jobs) {
addJob(job);
获取线程的状态,设置、停止线程的方法:
* @return the thread state
public int getState() {
return this.runnerS
* set the thread state to STOPPING so that the
* thread will stop when it wakes up.
public void stop() {
this.runnerState = JobControl.STOPPING;
* suspend the running thread
public void suspend () {
if (this.runnerState == JobControl.RUNNING) {
this.runnerState = JobControl.SUSPENDED;
* resume the suspended thread
public void resume () {
if (this.runnerState == JobControl.SUSPENDED) {
this.runnerState = JobControl.RUNNING;
检查运行、等待的Jobs,将符合条件的添加至相应的队列:
synchronized private void checkRunningJobs() {
Map&String, Job& oldJobs = null;
oldJobs = this.runningJ
this.runningJobs = new Hashtable&String, Job&();
for (Job nextJob : oldJobs.values()) {
int state = nextJob.checkState();
if (state != Job.RUNNING) {
System.out.println("The state of the running job " +
nextJob.getJobName() + " has changed to: " + nextJob.getState());
this.addToQueue(nextJob);
synchronized private void checkWaitingJobs() {
Map&String, Job& oldJobs = null;
oldJobs = this.waitingJ
this.waitingJobs = new Hashtable&String, Job&();
for (Job nextJob : oldJobs.values()) {
int state = nextJob.checkState();
if (state != Job.WAITING) {
System.out.println("The state of the waiting job " +
nextJob.getJobName() + " has changed to: " + nextJob.getState());
this.addToQueue(nextJob);
synchronized private void startReadyJobs() {
Map&String, Job& oldJobs = null;
oldJobs = this.readyJ
this.readyJobs = new Hashtable&String, Job&();
for (Job nextJob : oldJobs.values()) {
//System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
nextJob.submit();
//System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
this.addToQueue(nextJob);
判断是否所有的JOb都结束的方法:
synchronized public boolean allFinished() {
return this.waitingJobs.size() == 0 &&
this.readyJobs.size() == 0 &&
this.runningJobs.size() == 0;
检查运行Jobs的状态、更新等待Job状态、在准备状态下提交的Run方法:
The main loop for the thread.
The loop does the following:
Check the states of the running jobs
Update the states of waiting jobs
Submit the jobs in ready state
public void run() {
this.runnerState = JobControl.RUNNING;
while (true) {
while (this.runnerState == JobControl.SUSPENDED) {
Thread.sleep(5000);
catch (Exception e) {
checkRunningJobs();
checkWaitingJobs();
startReadyJobs();
if (this.runnerState != JobControl.RUNNING &&
this.runnerState != JobControl.SUSPENDED) {
Thread.sleep(5000);
catch (Exception e) {
if (this.runnerState != JobControl.RUNNING &&
this.runnerState != JobControl.SUSPENDED) {
this.runnerState = JobControl.STOPPED;
完整的JobControl类:
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.
See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.
The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.mapred.
import java.util.ArrayL
import java.util.C
import java.util.H
import java.util.M
/** This class encapsulates a set of MapReduce jobs and its dependency. It tracks
the states of the jobs by placing them into different tables according to their
This class provides APIs for the client app to add a job to the group and to get
the jobs in the group in different states. When a
job is added, an ID unique to the group is assigned to the job.
This class has a thread that submits jobs when they become ready, monitors the
states of the running jobs, and updates the states of jobs based on the state changes
of their depending jobs states. The class provides APIs for suspending/resuming
the thread,and for stopping the thread.
public class JobControl implements Runnable{
// The thread can be in one of the following state
private static final int RUNNING = 0;
private static final int SUSPENDED = 1;
private static final int STOPPED = 2;
private static final int STOPPING = 3;
private static final int READY = 4;
private int runnerS
// the thread state
private Map&String, Job& waitingJ
private Map&String, Job& readyJ
private Map&String, Job& runningJ
private Map&String, Job& successfulJ
private Map&String, Job& failedJ
private long nextJobID;
private String groupN
* Construct a job control for a group of jobs.
* @param groupName a name identifying this group
public JobControl(String groupName) {
this.waitingJobs = new Hashtable&String, Job&();
this.readyJobs = new Hashtable&String, Job&();
this.runningJobs = new Hashtable&String, Job&();
this.successfulJobs = new Hashtable&String, Job&();
this.failedJobs = new Hashtable&String, Job&();
this.nextJobID = -1;
this.groupName = groupN
this.runnerState = JobControl.READY;
private static ArrayList&Job& toArrayList(Map&String, Job& jobs) {
ArrayList&Job& retv = new ArrayList&Job&();
synchronized (jobs) {
for (Job job : jobs.values()) {
retv.add(job);
* @return the jobs in the waiting state
public ArrayList&Job& getWaitingJobs() {
return JobControl.toArrayList(this.waitingJobs);
* @return the jobs in the running state
public ArrayList&Job& getRunningJobs() {
return JobControl.toArrayList(this.runningJobs);
* @return the jobs in the ready state
public ArrayList&Job& getReadyJobs() {
return JobControl.toArrayList(this.readyJobs);
* @return the jobs in the success state
public ArrayList&Job& getSuccessfulJobs() {
return JobControl.toArrayList(this.successfulJobs);
public ArrayList&Job& getFailedJobs() {
return JobControl.toArrayList(this.failedJobs);
private String getNextJobID() {
nextJobID += 1;
return this.groupName + this.nextJobID;
private static void addToQueue(Job aJob, Map&String, Job& queue) {
synchronized(queue) {
queue.put(aJob.getJobID(), aJob);
private void addToQueue(Job aJob) {
Map&String, Job& queue = getQueue(aJob.getState());
addToQueue(aJob, queue);
private Map&String, Job& getQueue(int state) {
Map&String, Job& retv = null;
if (state == Job.WAITING) {
retv = this.waitingJ
} else if (state == Job.READY) {
retv = this.readyJ
} else if (state == Job.RUNNING) {
retv = this.runningJ
} else if (state == Job.SUCCESS) {
retv = this.successfulJ
} else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
retv = this.failedJ
* Add a new job.
* @param aJob the new job
synchronized public String addJob(Job aJob) {
String id = this.getNextJobID();
aJob.setJobID(id);
aJob.setState(Job.WAITING);
this.addToQueue(aJob);
* Add a collection of jobs
* @param jobs
public void addJobs(Collection&Job& jobs) {
for (Job job : jobs) {
addJob(job);
* @return the thread state
public int getState() {
return this.runnerS
* set the thread state to STOPPING so that the
* thread will stop when it wakes up.
public void stop() {
this.runnerState = JobControl.STOPPING;
* suspend the running thread
public void suspend () {
if (this.runnerState == JobControl.RUNNING) {
this.runnerState = JobControl.SUSPENDED;
* resume the suspended thread
public void resume () {
if (this.runnerState == JobControl.SUSPENDED) {
this.runnerState = JobControl.RUNNING;
synchronized private void checkRunningJobs() {
Map&String, Job& oldJobs = null;
oldJobs = this.runningJ
this.runningJobs = new Hashtable&String, Job&();
for (Job nextJob : oldJobs.values()) {
int state = nextJob.checkState();
if (state != Job.RUNNING) {
System.out.println("The state of the running job " +
nextJob.getJobName() + " has changed to: " + nextJob.getState());
this.addToQueue(nextJob);
synchronized private void checkWaitingJobs() {
Map&String, Job& oldJobs = null;
oldJobs = this.waitingJ
this.waitingJobs = new Hashtable&String, Job&();
for (Job nextJob : oldJobs.values()) {
int state = nextJob.checkState();
if (state != Job.WAITING) {
System.out.println("The state of the waiting job " +
nextJob.getJobName() + " has changed to: " + nextJob.getState());
this.addToQueue(nextJob);
synchronized private void startReadyJobs() {
Map&String, Job& oldJobs = null;
oldJobs = this.readyJ
this.readyJobs = new Hashtable&String, Job&();
for (Job nextJob : oldJobs.values()) {
//System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
nextJob.submit();
//System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
this.addToQueue(nextJob);
synchronized public boolean allFinished() {
return this.waitingJobs.size() == 0 &&
this.readyJobs.size() == 0 &&
this.runningJobs.size() == 0;
The main loop for the thread.
The loop does the following:
Check the states of the running jobs
Update the states of waiting jobs
Submit the jobs in ready state
public void run() {
this.runnerState = JobControl.RUNNING;
while (true) {
while (this.runnerState == JobControl.SUSPENDED) {
Thread.sleep(5000);
catch (Exception e) {
checkRunningJobs();
checkWaitingJobs();
startReadyJobs();
if (this.runnerState != JobControl.RUNNING &&
this.runnerState != JobControl.SUSPENDED) {
Thread.sleep(5000);
catch (Exception e) {
if (this.runnerState != JobControl.RUNNING &&
this.runnerState != JobControl.SUSPENDED) {
this.runnerState = JobControl.STOPPED;
引入实例:贝叶斯分类
贝叶斯分类是一种利用概率统计知识进行分类的统计学分类方法。该方法包括两个步骤:训练样本和分类。
其实现由多个MapReduce 作业完成,如图所示。其中,训练样本可由三个 Ma…
发表了一篇文章
参考资料:http://dojotoolkit.org/reference-guide/1.9/dijit/themes.html
       http://archive.dojotoolkit…
发表了一篇文章
TotalOrderPartitioner 提供了一种基于区间的分片方法,通常用在数据全排序中。在MapReduce
环境中,容易想到的全排序方案是归并排序,即在 Map 阶段,每个 Map Task进行局部排序;在 Reduce 阶段,启动一个 Reduce
Task 进行全局排序。由于作业只能有一个 Reduce Task,因而 Reduce 阶段会成为作业的瓶颈。为了提高全局排序的性能和扩展性,MapReduce 提供了 TotalOrderPartitioner。它能够按照大小将数据分成若干个区间(分片),并保证后一个区间的所有数据均大于前一个区间数据,这使得全排序的步骤如下:步骤1:数据采样。在
Client 端通过采样获取分片的分割点。Hadoop 自带了几个采样算法,如 IntercalSampler、 RandomSampler、
SplitSampler 等(具体见org.apache.hadoop.mapred.lib 包中的 InputSampler 类)。
下面举例说明。
采样数据为: b, abc, abd, bcd, abcd, efg, hii, afd, rrr, mnk
经排序后得到: abc, abcd, abd, afd, b, bcd, efg, hii, mnk, rrr
如果 Reduce Task 个数为 4,则采样数据的四等分点为 abd、 bcd、 mnk,将这 3 个字符串作为分割点。步骤2:Map 阶段。本
阶段涉及两个组件,分别是 Mapper 和 Partitioner。其中,Mapper 可采用
IdentityMapper,直接将输入数据输出,但 Partitioner 必须选用TotalOrderPartitioner,它将步骤 1
中获取的分割点保存到 trie 树中以便快速定位任意一个记录所在的区间,这样,每个 Map Task 产生 R(Reduce Task
个数)个区间,且区间之间有序。TotalOrderPartitioner 通过 trie 树查找每条记录所对应的 Reduce Task 编号。
如图所示, 我们将分割点 保存在深度为 2 的 trie 树中, 假设输入数据中 有两个字符串“ abg”和“ mnz”, 则字符串“
abg” 对应 partition1, 即第 2 个 Reduce Task, 字符串“ mnz” 对应partition3, 即第 4 个
Reduce Task。步骤 3:Reduce 阶段。每
个 Reducer 对分配到的区间数据进行局部排序,最终得到全排序数据。从以上步骤可以看出,基于 TotalOrderPartitioner
全排序的效率跟 key 分布规律和采样算法有直接关系;key 值分布越均匀且采样越具有代表性,则 Reduce Task
负载越均衡,全排序效率越高。TotalOrderPartitioner 有两个典型的应用实例: TeraSort 和 HBase 批量数据导入。
其中,TeraSort 是 Hadoop 自 带的一个应用程序实例。 它曾在 TB 级数据排序基准评估中 赢得第一名,而
TotalOrderPartitioner正是从该实例中提炼出来的。HBase 是一个构建在 Hadoop之上的 NoSQL 数据仓库。它以
Region为单位划分数据,Region 内部数据有序(按 key 排序),Region 之间也有序。很明显,一个 MapReduce
全排序作业的 R 个输出文件正好可对应 HBase 的 R 个 Region。
新版 API 的 Partitioner 解析
新版 API 中的Partitioner类图如图所示。它不再实现JobConfigurable 接口。当用户需要让 Partitioner通过某个JobConf 对象初始化时,可自行实现Configurable 接口,如:
public class TotalOrderPartitioner&K, V& extends Partitioner&K,V& implements Configurable
Partition所处的位置
Partition主要作用就是将map的结果发送到相应的reduce。这就对partition有两个要求:
1)均衡负载,尽量的将工作均匀的分配给不同的reduce。
2)效率,分配速度一定要快。
Mapreduce提供的Partitioner
patition类结构
1. Partitioner&k,v&是partitioner的基类,如果需要定制partitioner也需要继承该类。源代码如下:
package org.apache.hadoop.
* Partitions the key space.
* &p&&code&Partitioner&/code& controls the partitioning of the keys of the
* intermediate map-outputs. The key (or a subset of the key) is used to derive
* the partition, typically by a hash function. The total number of partitions
* is the same as the number of reduce tasks for the job. Hence this controls
* which of the &code&m&/code& reduce tasks the intermediate key (and hence the
* record) is sent for reduction.&/p&
* @see Reducer
* @deprecated Use {@link org.apache.hadoop.mapreduce.Partitioner} instead.
@Deprecated
public interface Partitioner&K2, V2& extends JobConfigurable {
* Get the paritition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
* &p&Typically a hash function on a all or a subset of the key.&/p&
* @param key the key to be paritioned.
* @param value the entry value.
* @param numPartitions the total number of partitions.
* @return the partition number for the &code&key&/code&.
int getPartition(K2 key, V2 value, int numPartitions);
2. HashPartitioner&k,v&是mapreduce的默认partitioner。源代码如下:
package org.apache.hadoop.mapreduce.lib.
import org.apache.hadoop.mapreduce.P
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner&K, V& extends Partitioner&K, V& {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceT
3. BinaryPatitioner继承于Partitioner&BinaryComparable
,V&,是Partitioner&k,v&的偏特化子类。该类提供leftOffset和rightOffset,在计算which
reducer时仅对键值K的[rightOffset,leftOffset]这个区间取hash。
reducer=(hash & Integer.MAX_VALUE) % numReduceTasks
4. KeyFieldBasedPartitioner&k2,
v2=""&也是基于hash的个partitioner。和BinaryPatitioner不同,它提供了多个区间用于计算hash。当区间数
为0时KeyFieldBasedPartitioner退化成HashPartitioner。 源代码如下:
package org.apache.hadoop.mapred.
import java.io.UnsupportedEncodingE
import java.util.L
import mons.logging.L
import mons.logging.LogF
import org.apache.hadoop.mapred.JobC
import org.apache.hadoop.mapred.P
import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyD
Defines a way to partition keys based on certain key fields (also see
{@link KeyFieldBasedComparator}.
The key specification supported is of the form -k pos1[,pos2], where,
pos is of the form f[.c][opts], where f is the number
of the key field to use, and c is the number of the first character from
the beginning of the field. Fields and character posns are numbered
starting with 1; a character position of zero in pos2 indicates the
field's last character. If '.c' is omitted from pos1, it defaults to 1
(the beginning of the field); if omitted from pos2, it defaults to 0
(the end of the field).
public class KeyFieldBasedPartitioner&K2, V2& implements Partitioner&K2, V2& {
private static final Log LOG = LogFactory.getLog(KeyFieldBasedPartitioner.class.getName());
private int numOfPartitionF
private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
public void configure(JobConf job) {
String keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
if (job.get("num.key.fields.for.partition") != null) {
LOG.warn("Using deprecated num.key.fields.for.partition. " +
"Use mapred.text.key.partitioner.options instead");
this.numOfPartitionFields = job.getInt("num.key.fields.for.partition",0);
keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
String option = job.getKeyFieldPartitionerOption();
keyFieldHelper.parseOption(option);
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
byte[] keyB
List &KeyDescription& allKeySpecs = keyFieldHelper.keySpecs();
if (allKeySpecs.size() == 0) {
return getPartition(key.toString().hashCode(), numReduceTasks);
keyBytes = key.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not " +
"support UTF-8 encoding!", e);
// return 0 if the key is empty
if (keyBytes.length == 0) {
int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0,
keyBytes.length);
int currentHash = 0;
for (KeyDescription keySpec : allKeySpecs) {
int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, keyBytes.length,
lengthIndicesFirst, keySpec);
// no key found! continue
if (startChar & 0) {
int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length,
lengthIndicesFirst, keySpec);
currentHash = hashCode(keyBytes, startChar, endChar,
currentHash);
return getPartition(currentHash, numReduceTasks);
protected int hashCode(byte[] b, int start, int end, int currentHash) {
for (int i = i &= i++) {
currentHash = 31*currentHash + b[i];
return currentH
protected int getPartition(int hash, int numReduceTasks) {
return (hash & Integer.MAX_VALUE) % numReduceT
5. TotalOrderPartitioner这个类可以实现输出的全排序。不同于以上3个partitioner,这个类并不是基于hash的。下面详细的介绍TotalOrderPartitioner
TotalOrderPartitioner 类
每一个reducer的输出在默认的情况下都是有顺序的,但是reducer之间在输入是无序的情况下也是无序的。如果要实现输出是全排序的那就会用到TotalOrderPartitioner。
要使用TotalOrderPartitioner,得给TotalOrderPartitioner提供一个partition
file。这个文件要求Key(这些key就是所谓的划分)的数量和当前reducer的数量-1相同并且是从小到大排列。对于为什么要用到这样一个文
件,以及这个文件的具体细节待会还会提到。
TotalOrderPartitioner对不同Key的数据类型提供了两种方案:
1) 对于非BinaryComparable 类型的Key,TotalOrderPartitioner采用二分发查找当前的K所在的index。
例如:reducer的数量为5,partition file 提供的4个划分为【2,4,6,8】。如果当前的一个key/value
是&4,”good”&,利用二分法查找到index=1,index+1=2那么这个key/value
将会发送到第二个reducer。如果一个key/value为&4.5,
“good”&。那么二分法查找将返回-3,同样对-3加1然后取反就是这个key/value将要去的reducer。
对于一些数值型的数据来说,利用二分法查找复杂度是O(log(reducer count)),速度比较快。
2) 对于BinaryComparable类型的Key(也可以直接理解为字符串)。字符串按照字典顺序也是可以进行排序的。
这样的话也可以给定一些划分,让不同的字符串key分配到不同的reducer里。这里的处理和数值类型的比较相近。
例如:reducer的数量为5,partition file 提供了4个划分为【“abc”, “bce”, “eaa”, ”fhc”】那么“ab”这个字符串将会被分配到第一个reducer里,因为它小于第一个划分“abc”。
但是不同于数值型的数据,字符串的查找和比较不能按照数值型数据的比较方法。mapreducer采用的Tire tree(关于Tire tree可以参考《》)的字符串查找方法。查找的时间复杂度o(m),m为树的深度,空间复杂度o(255^m-1)。是一个典型的空间换时间的案例。
Tire tree的构建
假设树的最大深度为3,划分为【aaad ,aaaf, aaaeh,abbx】
Mapreduce里的Tire tree主要有两种节点组成:
1) Innertirenode
Innertirenode在mapreduce中是包含了255个字符的一个比较长的串。上图中的例子只包含了26个英文字母。
2) 叶子节点{unslipttirenode, singesplittirenode, leaftirenode}
Unslipttirenode 是不包含划分的叶子节点。
Singlesplittirenode 是只包含了一个划分点的叶子节点。
Leafnode是包含了多个划分点的叶子节点。(这种情况比较少见,达到树的最大深度才出现这种情况。在实际操作过程中比较少见)
Tire tree的搜索过程
接上面的例子:
1)假如当前 key value pair &aad, 10=""&这时会找到图中的leafnode,在leafnode内部使用二分法继续查找找到返回 aad在划分数组中的索引。找不到会返回一个和它最接近的划分的索引。
2)假如找到singlenode,如果和singlenode的划分相同或小返回他的索引,比singlenode的划分大则返回索引+1。
3)假如找到nosplitnode则返回前面的索引。如&zaa, 20=""&将会返回abbx的在划分数组中的索引。
TotalOrderPartitioner的疑问
上面介绍了partitioner有两个要求,一个是速度,另外一个是均衡负载。使用tire tree提高了搜素的速度,但是我们怎么才能找到这样的partition file 呢?让所有的划分刚好就能实现均衡负载。
InputSampler
输入采样类,可以对输入目录下的数据进行采样。提供了3种采样方法。
采样类结构图
采样方式对比表:
SplitSampler&K,V&
对前n个记录进行采样
采样总数,划分数
RandomSampler&K,V&
遍历所有数据,随机采样
采样频率,采样总数,划分数
IntervalSampler&K,V&
固定间隔采样
采样频率,划分数
对有序的数据十分适用
writePartitionFile这个方法很关键,这个方法就是根据采样类提供的样本,首先进行排序,然后选定(随机的方法)和reducer
数目-1的样本写入到partition
file。这样经过采样的数据生成的划分,在每个划分区间里的key/value就近似相同了,这样就能完成均衡负载的作用。
SplitSampler类的源代码如下:
* Samples the first n records from s splits.
* Inexpensive way to sample random data.
public static class SplitSampler&K,V& implements Sampler&K,V& {
private final int numS
private final int maxSplitsS
* Create a SplitSampler sampling &em&all&/em& splits.
* Takes the first numSamples / numSplits records from each split.
* @param numSamples Total number of samples to obtain from all selected
public SplitSampler(int numSamples) {
this(numSamples, Integer.MAX_VALUE);
* Create a new SplitSampler.
* @param numSamples Total number of samples to obtain from all selected
* @param maxSplitsSampled The maximum number of splits to examine.
public SplitSampler(int numSamples, int maxSplitsSampled) {
this.numSamples = numS
this.maxSplitsSampled = maxSplitsS
* From each split sampled, take the first numSamples / numSplits records.
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat&K,V& inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList&K& samples = new ArrayList&K&(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToS
int samplesPerSplit = numSamples / splitsToS
long records = 0;
for (int i = 0; i & splitsToS ++i) {
RecordReader&K,V& reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
if ((i+1) * samplesPerSplit &= records) {
reader.close();
return (K[])samples.toArray();
RandomSampler类的源代码如下:
* Sample from random points in the input.
* General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
* each split.
public static class RandomSampler&K,V& implements Sampler&K,V& {
private double
private final int numS
private final int maxSplitsS
* Create a new RandomSampler sampling &em&all&/em& splits.
* This will read every split at the client, which is very expensive.
* @param freq Probability with which a key will be chosen.
* @param numSamples Total number of samples to obtain from all selected
public RandomSampler(double freq, int numSamples) {
this(freq, numSamples, Integer.MAX_VALUE);
* Create a new RandomSampler.
* @param freq Probability with which a key will be chosen.
* @param numSamples Total number of samples to obtain from all selected
* @param maxSplitsSampled The maximum number of splits to examine.
public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
this.freq =
this.numSamples = numS
this.maxSplitsSampled = maxSplitsS
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat&K,V& inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList&K& samples = new ArrayList&K&(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i & splits. ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] =
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i & splitsToSample ||
(i & splits.length && samples.size() & numSamples); ++i) {
RecordReader&K,V& reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() &= freq) {
if (samples.size() & numSamples) {
samples.add(key);
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
freq *= (numSamples - 1) / (double) numS
key = reader.createKey();
reader.close();
return (K[])samples.toArray();
IntervalSampler类的源代码为:
* Sample from s splits at regular intervals.
* Useful for sorted data.
public static class IntervalSampler&K,V& implements Sampler&K,V& {
private final double
private final int maxSplitsS
* Create a new IntervalSampler sampling &em&all&/em& splits.
* @param freq The frequency with which records will be emitted.
public IntervalSampler(double freq) {
this(freq, Integer.MAX_VALUE);
* Create a new IntervalSampler.
* @param freq The frequency with which records will be emitted.
* @param maxSplitsSampled The maximum number of splits to examine.
* @see #getSample
public IntervalSampler(double freq, int maxSplitsSampled) {
this.freq =
this.maxSplitsSampled = maxSplitsS
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat&K,V& inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList&K& samples = new ArrayList&K&();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToS
long records = 0;
long kept = 0;
for (int i = 0; i & splitsToS ++i) {
RecordReader&K,V& reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if ((double) kept / records & freq) {
samples.add(key);
key = reader.createKey();
reader.close();
return (K[])samples.toArray();
InputSampler类完整源代码如下:
InputSampler
TotalOrderPartitioner实例
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
implements Tool
public int run(String[] args) throws Exception
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (conf == null) {
return -1;
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat
.setOutputCompressorClass(conf, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(conf,
CompressionType.BLOCK);
conf.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.Sampler&IntWritable, Text& sampler = new InputSampler.RandomSampler&IntWritable, Text&(
0.1, 10000, 10);
Path input = FileInputFormat.getInputPaths(conf)[0];
input = input.makeQualified(input.getFileSystem(conf));
Path partitionFile = new Path(input, "_partitions");
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
InputSampler.writePartitionFile(conf, sampler);
// Add to DistributedCache
URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
JobClient.runJob(conf);
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new SortByTemperatureUsingTotalOrderPartitioner(), args);
System.exit(exitCode);
旧版 API 的 Partitioner 解析
Partitioner 的作用是对 Mapper 产生的中间结果进行分片,以便将同一分组的数据交给同一个 Reducer 处理,它直接影响
发表了一篇文章
该函数的参数除了 key 和 value 之外, 还包括 OutputCollector 和 Reporter 两个类型的参数, 分别用于输出结果和修改 Counter 值。
Mapper 通过继承 Closeable 接口(它又继承了 Java IO 中的 Closeable 接口)获得 close方法,用户可通过实现该方法对 Mapper 进行清理。
MapReduce 提供了很多 Mapper/Reducer 实现,但大部分功能比较简单,具体如图所示。它们对应的功能分别是:
ChainMapper/ChainReducer:用于支持链式作业。
IdentityMapper/IdentityReducer:对于输入 key/value 不进行任何处理, 直接输出。
InvertMapper:交换 key/value 位置。
RegexMapper:正则表达式字符串匹配。
TokenMapper:将字符串分割成若干个 token(单词),可用作 WordCount 的 Mapper。
LongSumReducer:以 key 为组,对 long 类型的 value 求累加和。
对于一个 MapReduce 应用程序,不一定非要存在 Mapper。MapReduce 框架提供了比 Mapper
更通用的接口:MapRunnable,如图所示。用 户可以实现该接口以定制Mapper 的调用 方式或者自己实现 key/value
的处理逻辑,比如,Hadoop Pipes 自行实现了MapRunnable,直接将数据通过 Socket
发送给其他进程处理。提供该接口的另外一个好处是允许用户实现多线程 Mapper。
如图所示, MapReduce 提供了两个 MapRunnable 实现,分别是 MapRunner
和MultithreadedMapRunner,其中 MapRunner 为默认实现。 MultithreadedMapRunner
实现了一种多线程的 MapRunnable。 默认情况下,每个 Mapper 启动 10 个线程,通常用于非 CPU类型的作业以提供吞吐率。
2. 新版 API 的 Mapper/Reducer 解析
从图可知, 新 API 在旧 API 基础上发生了以下几个变化:
Mapper 由接口变为类,且不再继承 JobConfigurable 和 Closeable 两个接口,而是直接在类中添加了 setup 和 cleanup 两个方法进行初始化和清理工作。
将参数封装到 Context 对象中,这使得接口具有良好的扩展性。
去掉 MapRunnable 接口,在 Mapper 中添加 run 方法,以方便用户定制 map() 函数的调用方法,run 默认实现与旧版本中 MapRunner 的 run 实现一样。
新 API 中 Reducer 遍历 value 的迭代器类型变为 java.lang.Iterable,使得用户可以采用“ foreach” 形式遍历所有 value,如下所示:
void reduce(KEYIN key, Iterable&VALUEIN& values, Context context) throws IOException, InterruptedException {
for(VALUEIN value: values)
{ // 注意遍历方式
context.write((KEYOUT) key, (VALUEOUT) value);
Mapper类的完整代码如下:
package org.apache.hadoop.
import java.io.IOE
import org.apache.hadoop.conf.C
import org.apache.hadoop.io.RawC
import org.apache.pressionC
* Maps input key/value pairs to a set of intermediate key/value pairs.
* &p&Maps are the individual tasks which transform input records into a
* intermediate records. The transformed intermediate records need not be of
* the same type as the input records. A given input pair may map to zero or
* many output pairs.&/p&
* &p&The Hadoop Map-Reduce framework spawns one map task for each
* {@link InputSplit} generated by the {@link InputFormat} for the job.
* &code&Mapper&/code& implementations can access the {@link Configuration} for
* the job via the {@link JobContext#getConfiguration()}.
* &p&The framework first calls
* {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
* {@link #map(Object, Object, Context)}
* for each key/value pair in the &code&InputSplit&/code&. Finally
* {@link #cleanup(Context)} is called.&/p&
* &p&All intermediate values associated with a given output key are
* subsequently grouped by the framework, and passed to a {@link Reducer} to
* determine the final output. Users can control the sorting and grouping by
* specifying two key {@link RawComparator} classes.&/p&
* &p&The &code&Mapper&/code& outputs are partitioned per
* &code&Reducer&/code&. Users can control which keys (and hence records) go to
* which &code&Reducer&/code& by implementing a custom {@link Partitioner}.
* &p&Users can optionally specify a &code&combiner&/code&, via
* {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
* intermediate outputs, which helps to cut down the amount of data transferred
* from the &code&Mapper&/code& to the &code&Reducer&/code&.
* &p&Applications can specify if and how the intermediate
* outputs are to be compressed and which {@link CompressionCodec}s are to be
* used via the &code&Configuration&/code&.&/p&
* &p&If the job has zero
* reduces then the output of the &code&Mapper&/code& is directly written
* to the {@link OutputFormat} without sorting by keys.&/p&
* &p&Example:&/p&
* &p&&blockquote&&pre&
* public class TokenCounterMapper
extends Mapper&Object, Text, Text, IntWritable&{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.collect(word, one);
* &/pre&&/blockquote&&/p&
* &p&Applications may override the {@link #run(Context)} method to exert
* greater control on map processing e.g. multi-threaded &code&Mapper&/code&s
* etc.&/p&
* @see InputFormat
* @see JobContext
* @see Partitioner
* @see Reducer
public class Mapper&KEYIN, VALUEIN, KEYOUT, VALUEOUT& {
public class Context
extends MapContext&KEYIN,VALUEIN,KEYOUT,VALUEOUT& {
public Context(Configuration conf, TaskAttemptID taskid,
RecordReader&KEYIN,VALUEIN& reader,
RecordWriter&KEYOUT,VALUEOUT& writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) throws IOException, InterruptedException {
super(conf, taskid, reader, writer, committer, reporter, split);
* Called once at the beginning of the task.
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
* Called once at the end of the task.
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
cleanup(context);
从代码中可以看到,Mapper类中定义了一个新的类Context,继承自MapContext
我们来看看MapContext类的源代码:
package org.apache.hadoop.
import java.io.IOE
import org.apache.hadoop.conf.C
* The context that is given to the {@link Mapper}.
* @param &KEYIN& the key input type to the Mapper
* @param &VALUEIN& the value input type to the Mapper
* @param &KEYOUT& the key output type from the Mapper
* @param &VALUEOUT& the value output type from the Mapper
public class MapContext&KEYIN,VALUEIN,KEYOUT,VALUEOUT&
extends TaskInputOutputContext&KEYIN,VALUEIN,KEYOUT,VALUEOUT& {
private RecordReader&KEYIN,VALUEIN&
private InputS
public MapContext(Configuration conf, TaskAttemptID taskid,
RecordReader&KEYIN,VALUEIN& reader,
RecordWriter&KEYOUT,VALUEOUT& writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader =
this.split =
* Get the input split for this map.
public InputSplit getInputSplit() {
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return reader.getCurrentKey();
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
MapContext类继承自TaskInputOutputContext,再看看TaskInputOutputContext类的代码:
package org.apache.hadoop.
import java.io.IOE
import org.apache.hadoop.conf.C
import org.apache.hadoop.util.P
* A context object that allows input and output from the task. It is only
* supplied to the {@link Mapper} or {@link Reducer}.
* @param &KEYIN& the input key type for the task
* @param &VALUEIN& the input value type for the task
* @param &KEYOUT& the output key type for the task
* @param &VALUEOUT& the output value type for the task
public abstract class TaskInputOutputContext&KEYIN,VALUEIN,KEYOUT,VALUEOUT&
extends TaskAttemptContext implements Progressable {
private RecordWriter&KEYOUT,VALUEOUT&
private StatusR
private OutputC
public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid,
RecordWriter&KEYOUT,VALUEOUT& output,
OutputCommitter committer,
StatusReporter reporter) {
super(conf, taskid);
this.output =
this.reporter =
this.committer =
* Advance to the next key, value pair, returning null if at end.
* @return the key object that was read into, or null if no more
public abstract
boolean nextKeyValue() throws IOException, InterruptedE
* Get the current key.
* @return the current key object or null if there isn't one
* @throws IOException
* @throws InterruptedException
public abstract
KEYIN getCurrentKey() throws IOException, InterruptedE
* Get the current value.
* @return the value object that was read into
* @throws IOException
* @throws InterruptedException
public abstract VALUEIN getCurrentValue() throws IOException,
InterruptedE
* Generate an output key/value pair.
public void write(KEYOUT key, VALUEOUT value
) throws IOException, InterruptedException {
output.write(key, value);
public Counter getCounter(Enum&?& counterName) {
return reporter.getCounter(counterName);
public Counter getCounter(String groupName, String counterName) {
return reporter.getCounter(groupName, counterName);
public void progress() {
reporter.progress();
public void setStatus(String status) {
reporter.setStatus(status);
public OutputCommitter getOutputCommitter() {
TaskInputOutputContext类继承自TaskAttemptContext,实现了Progressable接口,先看看Progressable接口的代码:
package org.apache.hadoop.
* A facility for reporting progress.
* &p&Clients and/or applications can use the provided &code&Progressable&/code&
* to explicitly report progress to the Hadoop framework. This is especially
* important for operations which take an insignificant amount of time since,
* in-lieu of the reported progress, the framework has to assume that an error
* has occured and time-out the operation.&/p&
public interface Progressable {
* Report progress to the Hadoop framework.
public void progress();
TaskAttemptContext类的代码:
package org.apache.hadoop.
import java.io.IOE
import org.apache.hadoop.conf.C
import org.apache.hadoop.util.P
* The context for task attempts.
public class TaskAttemptContext extends JobContext implements Progressable {
private final TaskAttemptID taskId;
private String status = "";
public TaskAttemptContext(Configuration conf,
TaskAttemptID taskId) {
super(conf, taskId.getJobID());
this.taskId = taskId;
* Get the unique name for this task attempt.
public TaskAttemptID getTaskAttemptID() {
return taskId;
* Set the current status of the task to the given string.
public void setStatus(String msg) throws IOException {
* Get the last set status message.
* @return the current status message
public String getStatus() {
* Report progress. The subtypes actually do work in this method.
public void progress() {
TaskAttemptContext继承自类JobContext,最后来看看JobContext的源代码:
package org.apache.hadoop.
import java.io.IOE
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.RawC
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.mapreduce.lib.partition.HashP
* A read-only view of the job that is provided to the tasks while they
* are running.
public class JobContext {
// Put all of the attribute names in here so that Job and JobContext are
// consistent.
protected static final String INPUT_FORMAT_CLASS_ATTR =
"mapreduce.inputformat.class";
protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";
protected static final String COMBINE_CLASS_ATTR = "bine.class";
protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
protected static final String OUTPUT_FORMAT_CLASS_ATTR =
"mapreduce.outputformat.class";
protected static final String PARTITIONER_CLASS_ATTR =
"mapreduce.partitioner.class";
protected final org.apache.hadoop.mapred.JobC
private final JobID jobId;
public JobContext(Configuration conf, JobID jobId) {
this.conf = new org.apache.hadoop.mapred.JobConf(conf);
this.jobId = jobId;
* Return the configuration for the job.
* @return the shared configuration object
public Configuration getConfiguration() {
* Get the unique ID for the job.
* @return the object with the job id
public JobID getJobID() {
return jobId;
* Get configured the number of reduce tasks for this job. Defaults to
* &code&1&/code&.
* @return the number of reduce tasks for this job.
public int getNumReduceTasks() {
return conf.getNumReduceTasks();
* Get the current working directory for the default file system.
* @return the directory name.
public Path getWorkingDirectory() throws IOException {
return conf.getWorkingDirectory();
* Get the key class for the job output data.
* @return the key class for the job output data.
public Class&?& getOutputKeyClass() {
return conf.getOutputKeyClass();
* Get the value class for job outputs.
* @return the value class for job outputs.
public Class&?& getOutputValueClass() {
return conf.getOutputValueClass();
* Get the key class for the map output data. If it is not set, use the
* (final) output key class. This allows the map output key class to be
* different than the final output key class.
* @return the map output key class.
public Class&?& getMapOutputKeyClass() {
return conf.getMapOutputKeyClass();
* Get the value class for the map output data. If it is not set, use the
* (final) output value class This allows the map output value class to be
* different than the final output value class.
* @return the map output value class.
public Class&?& getMapOutputValueClass() {
return conf.getMapOutputValueClass();
* Get the user-specified job name. This is only used to identify the
* job to the user.
* @return the job's name, defaulting to "".
public String getJobName() {
return conf.getJobName();
* Get the {@link InputFormat} class for the job.
* @return the {@link InputFormat} class for the job.
@SuppressWarnings("unchecked")
public Class&? extends InputFormat&?,?&& getInputFormatClass()
throws ClassNotFoundException {
return (Class&? extends InputFormat&?,?&&)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
* Get the {@link Mapper} class for the job.
* @return the {@link Mapper} class for the job.
@SuppressWarnings("unchecked")
public Class&? extends Mapper&?,?,?,?&& getMapperClass()
throws ClassNotFoundException {
return (Class&? extends Mapper&?,?,?,?&&)
conf.getClass(MAP_CLASS_ATTR, Mapper.class);
* Get the combiner class for the job.
* @return the combiner class for the job.
@SuppressWarnings("unchecked")
public Class&? extends Reducer&?,?,?,?&& getCombinerClass()
throws ClassNotFoundException {
return (Class&? extends Reducer&?,?,?,?&&)
conf.getClass(COMBINE_CLASS_ATTR, null);
* Get the {@link Reducer} class for the job.
* @return the {@link Reducer} class for the job.
@SuppressWarnings("unchecked")
public Class&? extends Reducer&?,?,?,?&& getReducerClass()
throws ClassNotFoundException {
return (Class&? extends Reducer&?,?,?,?&&)
conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
* Get the {@link OutputFormat} class for the job.
* @return the {@link OutputFormat} class for the job.
@SuppressWarnings("unchecked")
public Class&? extends OutputFormat&?,?&& getOutputFormatClass()
throws ClassNotFoundException {
return (Class&? extends OutputFormat&?,?&&)
conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
* Get the {@link Partitioner} class for the job.
* @return the {@link Partitioner} class for the job.
@SuppressWarnings("unchecked")
public Class&? extends Partitioner&?,?&& getPartitionerClass()
throws ClassNotFoundException {
return (Class&? extends Partitioner&?,?&&)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
* Get the {@link RawComparator} comparator used to compare keys.
* @return the {@link RawComparator} comparator used to compare keys.
public RawComparator&?& getSortComparator() {
return conf.getOutputKeyComparator();
* Get the pathname of the job's jar.
* @return the pathname
public String getJar() {
return conf.getJar();
* Get the user defined {@link RawComparator} comparator for
* grouping keys of inputs to the reduce.
* @return comparator set by the user for grouping values.
* @see Job#setGroupingComparatorClass(Class) for details.
public RawComparator&?& getGroupingComparator() {
return conf.getOutputValueGroupingComparator();
1 . 旧版 API 的 Mapper/Reducer 解析
Mapper/Reducer 中封装了应用程序的数据处理逻辑。为了简化接口,MapReduce 要求所有存储在底层分布式文件系统上的数据…
发表了一篇文章
 参考帮助:
  /en/help/install-guides/arcgis-portal-windows/10.2/index.html#/…
发表了一篇文章
checkOutputSpecs 方法一般在用户作业被提交到 JobTracker 之前, 由 JobClient 自动调用,以检查输出目录是否合法。
getRecordWriter 方法返回一个 RecordWriter 类对象。 该类中的方法 write 接收一个key/value
对, 并将之写入文件。在 Task 执行过程中, MapReduce 框架会将 map() 或者reduce() 函数产生的结果传入 write
方法, 主要代码(经过简化)如下。假设用户编写的 map() 函数如下:
public void map(Text key, Text value,
OutputCollector&Text, Text& output,
Reporter reporter) throws IOException {
// 根据当前 key/value 产生新的输出 &newKey, newValue&, 并输出
output.collect(newKey, newValue);
则函数 output.collect(newKey, newValue) 内部执行代码如下:
RecordWriter&K, V& out = job.getOutputFormat().getRecordWriter(...);
out.write(newKey, newValue);
Hadoop 自带了很多OutputFormat 实现, 它们与 InputFormat 实现相对应,具体如图所示。所有基于文件的
OutputFormat 实现的基类为 FileOutputFormat, 并由此派生出一些基于文本文件格式、
二进制文件格式的或者多输出的实现。
为了深入分析OutputFormat的实现方法,选取比较有代表性的FileOutputFormat类进行分析。同介绍
InputFormat 实现的思路一样,我们先介绍基类FileOutputFormat,再介绍其派生类 TextOutputFormat。基类
FileOutputFormat 需要提供所有基于文件的 OutputFormat 实现的公共功能,总结起来,主要有以下两个:(1) 实现 checkOutputSpecs 接口
该接口 在作业运行之前被调用, 默认功能是检查用户配置的输出目 录是否存在,如果存在则抛出异常,以防止之前的数据被覆盖。(2) 处理 side-effect file
务的 side-effect file 并不是任务的最终输出文件,而是具有特殊用途的任务专属文件。 它的典型应用是执行推测式任务。 在
Hadoop 中,因为硬件老化、网络故障等原因,同一个作业的某些任务执行速度可能明显慢于其他任务,这种任务会拖慢整个作业的执行速度。为了对这种“
慢任务” 进行优化, Hadoop
会为之在另外一个节点上启动一个相同的任务,该任务便被称为推测式任务,最先完成任务的计算结果便是这块数据对应的处理结果。为防止这两个任务同
时往一个输出 文件中 写入数据时发生写冲突, FileOutputFormat会为每个 Task 的数据创建一个 side-effect
file,并将产生的数据临时写入该文件,待 Task完成后,再移动到最终输出目 录中。 这些文件的相关操作, 比如创建、删除、移动等,均由
OutputCommitter 完成。它是一个接口,Hadoop 提供了默认实现
FileOutputCommitter,用户也可以根据自己的需求编写 OutputCommitter 实现, 并通过参数
{mitter.class} 指定。OutputCommitter 接口定义以及
FileOutputCommitter 对应的实现如表所示。
表-- OutputCommitter 接口定义以及 FileOutputCommitter 对应的实现
何时被调用
FileOutputCommitter 实现
作业初始化
创建临时目录 ${mapred.out.dir} /_temporary
作业成功运行完成
删除临时目录,并在${mapred.out.dir} 目录下创建空文件_SUCCESS
作业运行失败
删除临时目录
任务初始化
不进行任何操作。原本是需要在临时目录下创建 side-effect file
的,但它是用时创建的(create on demand)
needsTaskCommit
判断是否需要提交结果
只要存在side-effect file,就返回 true
commitTask
任务成功运行完成
提交结果, 即将 side-effect file 移动到 ${mapred.out.dir} 目录下
任务运行失败
删除任务的 side-effect file注意默认情况下,当作业成功运行完成后,会在最终结果目录 ${mapred.out.dir} 下生成
注意:默认情况下,当作业成功运行完成后,会在最终结果目录 ${mapred.out.dir} 下生成空文件 _SUCCESS。该文件主要为高层应用提供作业运行完成的标识,比如,Oozie 需要通过检测结果目 录下是否存在该文件判 断作业是否运行完成。
2. 新版 API 的 OutputFormat 解析
如图所示,除了接口变为抽象类外,新 API 中的 OutputFormat 增加了一个新的方法:getOutputCommitter,以允许用户自 己定制合适的 OutputCommitter 实现。
OutputFormat 主要用于描述输出数据的格式,它能够将用户提供的 key/value 对写入特定格式的文件中。 本文将介绍 Hadoop 如何设计 OutputFormat 接口 , 以及一…
发表了一篇文章
很气愤的一件事,我刚刚写好的一篇博客,点击提交时,博客园的程序报错!白写了!现在大家看到这篇是减缩版,代码是可以使用的,只是有些解释型语言,我不想在重复一遍了,希望博客园的管理者好好管理,不要再出现…
发表了一篇文章
getSplits 方法主要完成数据切分的功能, 它会尝试着将输入数据切分成 numSplits 个InputSplit。 InputSplit 有以下两个特点。逻辑分片:它只是在逻辑上对输入数据进行分片, 并不会在磁盘上将其切分成分片进行存储。 InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的
节点列表等。可序列化:在 Hadoop 中,对象序列化主要有两个作用:进程间通信和永久存储。
此处,InputSplit 支持序列化操作主要是为了进程间通信。 作业被提交到 JobTracker 之前,Client 会调用作业
InputFormat 中的 getSplits 函数, 并将得到的 InputSplit 序列化到文件中。这样,当作业提交到
JobTracker 端对作业初始化时,可直接读取该文件,解析出所有 InputSplit, 并创建对应的 MapTask。
getRecordReader 方法返回一个RecordReader 对象,该对象可将输入的 InputSplit解析成若干个 key/value 对。 MapReduce 框架在 MapTask 执行过程中,会不断调用RecordReader 对象中的方法, 迭代获取 key/value 对并交给 map() 函数处理, 主要代码(经过简化)如下:
//调用 InputSplit 的 getRecordReader 方法获取 RecordReader&K1, V1& input
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
//调用用户编写的 map() 函数
input.close();
前面分析了 InputFormat 接口的定义, 接下来介绍系统自带的各种 InputFormat 实现。为了方便用户编写
MapReduce 程序, Hadoop 自带了一些针对数据库和文件的 InputFormat实现,
具体如图所示。通常而言用户需要处理的数据均以文件形式存储到 HDFS 上,所以这里重点针对文件的 InputFormat 实现进行讨论。
图所示, 所有基于文件的 InputFormat 实现的基类是 FileInputFormat, 并由此派生出针对文本文件格式的
TextInputFormat、 KeyValueTextInputFormat 和 NLineInputFormat,针对二进制文件格式的
SequenceFileInputFormat 等。 整个基于文件的 InputFormat 体系的设计思路是,由公共基类FileInputFormat 采用统一的方法 对各种输入文件进行切分,比如按照某个固定大小等分,而由各个派生 InputFormat 自己提供机制将进一步解析InputSplit。 对应到具体的实现是,基类 FileInputFormat 提供 getSplits 实现, 而派生类提供getRecordReader 实现。
为了深入理解这些 InputFormat 的实现原理, 选取extInputFormat 与SequenceFileInputFormat 进行重点介绍。
首先介绍基类FileInputFormat的实现。它最重要的功能是为各种 InputFormat 提供统一的getSplits 函数。该函数实现中最核心的两个算法是文件切分算法和 host 选择算法。(1) 文件切分算法
文件切分算法主要用于确定 InputSplit 的个数以及每个 InputSplit 对应的数据段。FileInputFormat 以文件为单位切分生成 InputSplit。 对于每个文件, 由以下三个属性值确定其对应的 InputSplit 的个数。
goalSize : 它是根据用户期望的 InputSplit 数目计算出来的, 即 totalSize/numSplits。其中, totalSize 为文件总大小; numSplits 为用户设定的 MapTask 个数, 默认情况下是 1。
minSize: InputSplit 的最小值, 由配置参数 mapred.min.split.size 确定, 默认是 1。
blockSize: 文件在 HDFS 中存储的 block 大小, 不同文件可能不同, 默认是 64 MB。这三个参数共同决定 InputSplit 的最终大小, 计算方法如下:splitSize = max{minSize, min{goalSize, blockSize}}
一旦确定 splitSize 值后, FileInputFormat 将文件依次切成大小为 splitSize 的 InputSplit,最后剩下不足 splitSize 的数据块单独成为一个 InputSplit。
例】 输入目录下有三个文件 file1、file2 和 file3,大小依次为 1 MB,32 MB 和250 MB。 若 blockSize
采用 默认值 64 MB, 则不同 minSize 和 goalSize 下, file3 切分结果如表所示(三种情况下, file1 与
file2 切分结果相同, 均为 1 个 InputSplit)。
表-minSize、 goalSize、 splitSize 与 InputSplit 对应关系
file3 对应的 InputSplit 数目
输入目 录对应的 InputSplit 总数
(numSplits=1 )
totalSize/5
totalSize/2
结合表和公式可以知道, 如果想让 InputSplit 尺寸大于 block 尺寸, 则直接增大配置参数 mapred.min.split.size 即可。(2) host 选择算法待 InputSplit 切分方案确定后,下一步要确定每个 InputSplit 的元数据信息。 这通常由四部分组成:&file, start, length, hosts&, 分别表示 InputSplit 所在的文件、起始位置、长度以及所在的 host(节点)列表。 其中,前三项很容易确定,难点在于 host 列表的选择方法。
InputSplit 的 host 列表选择策略直接影响到运行过程中的任务本地性。
HDFS 上的文件是以 block 为单位组织的,一个大文件对应的block 可能遍布整个 Hadoop 集群, 而 InputSplit
的划分算法可能导致一个 InputSplit 对应多个 block , 这些 block 可能位于不同节点上, 这使得 Hadoop
不可能实现完全的数据本地性。为此,Hadoop 将数据本地性按照代价划分成三个等级:node locality、rack locality 和 datacenter locality(Hadoop
还未实现该 locality 级别)。在进行任务调度时, 会依次考虑这 3 个节点的 locality,
即优先让空闲资源处理本节点上的数据,如果节点上没有可处理的数据,则处理同一个机架上的数据, 最差情况是处理其他机架上的数据(但是必须位于同一个数
据中心)。
然 InputSplit 对应的 block 可能位于多个节点上, 但考虑到任务调度的效率,通常不会把所有节点加到 InputSplit 的
host 列表中,而是选择包含(该 InputSplit)数据总量最大的前几个节点(Hadoop 限制最多选择 10
个,多余的会过滤掉),以作为任务调度时判断任务是否具有本地性的主要凭证。为此,FileInputFormat 设计了一个简单有效的启发式算法
:首先按照 rack 包含的数据量对 rack 进行排序, 然后在 rack 内部按照每个 node 包含的数据量对 node 排序, 最后取前
N个node 的 host 作为InputSplit 的 host 列表, 这里的 N为 block副本数。这样,当任务调度器调度 Task 时,只要将 Task 调度给位于 host 列表的节点,就认为该 Task 满足本地性。
【实例】某个 Hadoop 集群的网络拓扑结构如图所示, HDFS中block 副本数为3,某个InputSplit 包含 3 个 block,大小依次是100、150 和 75,很容易计算,4 个rack 包
含的(该 InputSplit 的)数据量分别是175、250、150 和 75。rack2 中的 node3 和 node4,rack1 中的 node1 将被添加到该 InputSplit 的 host 列表中。
从以上 host 选择算法可知, 当 InputSplit 尺寸大于 block 尺寸时, Map Task 并不能实现完全数据本地性, 也就是说, 总有一部分数据需要从远程节点上读取, 因而可以得出以下结论:
当使用基于 FileInputFormat 实现 InputFormat 时, 为了提高 Ma

我要回帖

更多关于 人脸识别模块 的文章

 

随机推荐