求下图代码或fgo彩色名字代码

下载数量:197264
开发者:上海巨升网络科技有限公司
手机扫码,快速下载!
球球大作战名字颜色代码大全
球球大作战名字颜色代码大全,球球大作战名字颜色更换方法介绍,是不是在游戏中看到很多五颜六色的昵称呢,不用羡慕今天小编就给大家送上球球大作战名字颜色代码大全,让你每一轮的名字颜色都不重复,而且绝对炫酷装哔。下面为大家送上所有的颜色代码大全具体的效果就是下图这样啦,如果能登上排行榜,自然是有很多人能看到你炫酷名字的颜色。
球球大作战名字颜色代码大全,以上就是119手游网小编葫芦娃为您带来的详细介绍!
球球大作战怎样改变字体颜色方法介绍,球球大作战最有趣的地方不仅仅是可以进行团战等等,还可以给你的名字改颜色,实在是非常吸引人,那么接下来小编就要带大家一起来看一...
北京新鱼网络科技有限公司 Copyright (C) 2015.www.119you.com ALL Rights Reserved
京网文[8号
最新手机游戏排行榜热门游戏下载就在119手游网
违法和不良信息举报电话:维护中
举报邮箱:维护中求下图识别码和名字。谢谢!_百度知道
求下图识别码和名字。谢谢!
我有更好的答案
片吗。。。
我看看能不能找到
就是不知道名字……
那你发图片?
兄弟建议你少撸→_→
为您推荐:
换一换
回答问题,赢新手礼包
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。求下图游戏的游戏名称_百度知道
求下图游戏的游戏名称
我有更好的答案
しこぶるっ!DeluxeOccasion
采纳率:74%
来自团队:
为您推荐:
其他类似问题
您可能关注的内容
换一换
回答问题,赢新手礼包
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。1运行环境说明
1.1硬软件环境
l主机操作系统:Windows 64 bit,双核4线程,主频2.2G,6G内存
l虚拟软件:VMware& Workstation 9.0.0 build-812388
l虚拟机操作系统:CentOS 64位,单核,1G内存
lJDK:1.7.0_55 64 bit
lHadoop:1.1.2
1.2机器网络环境
集群包含三个节点:1个namenode、2个datanode,其中节点之间可以相互ping通。节点IP地址和主机名分布如下:
10.88.147.221
NN、SNN、JobTracer
10.88.147.222
DN、TaskTracer
10.88.147.223
DN、TaskTracer
所有节点均是CentOS6.5 64bit系统,防火墙均禁用,所有节点上均创建了一个hadoop用户,用户主目录是/usr/hadoop。所有节点上均创建了一个目录/usr/local/hadoop,并且拥有者是hadoop用户。
2书面作业1:计算员工相关
2.1书面作业1内容
(本题10选2)把作业素材demo.txt中的两个表数据用适当的方式导入hadoop(来自Oracle数据库的样板表,可考虑分成2个文件存放,注意空值的处理)
书写Map-Reduce程序,求出以下结果
1) 求各个部门的总工资
2) 求各个部门的人数和平均工资
3) 求每个部门最早进入公司的员工姓名
4) 求各个城市的员工的总工资
5) 列出工资比上司高的员工姓名及其工资
6) 列出工资比公司平均工资要高的员工姓名及其工资
7) 列出名字以J开头的员工姓名及其所属部门名称
8) 列出工资最高的头三名员工姓名及其工资
9) 将全体员工按照总收入(工资+提成)从高到低排列,要求列出姓名及其总收入
10) 如果每位员工只能和他的直接上司,直接下属,同一部门的同事交流,求任何两名员工之间若要进行信息传递所需要经过的中间节点数。请评价一下这个问题是否适合使用map-reduce解决
2.2实现过程
2.2.1准备测试数据
2.2.1.1拆分文件
把提供的测试数据第7-8周作业素材.txt按照要求拆分成两个文件dept(部门)和emp(员工),其中各字段用逗号分隔:
dept文件内容:
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
emp文件内容:
7369,SMITH,CLERK,月-80,800,,20
7499,ALLEN,SALESMAN,月-81,
7521,WARD,SALESMAN,月-81,
7566,JONES,MANAGER,月-81,2975,,20
7654,MARTIN,SALESMAN,月-81,
7698,BLAKE,MANAGER,月-81,2850,,30
7782,CLARK,MANAGER,月-81,2450,,10
7839,KING,PRESIDENT,,17-11月-81,5000,,10
7844,TURNER,SALESMAN,月-81,
7900,JAMES,CLERK,月-81,950,,30
7902,FORD,ANALYST,月-81,3000,,20
7934,MILLER,CLERK,月-82,1300,,10
2.2.1.2上传测试文件
使用SSH工具(参见第1、2周2.1.3.1Linux文件传输工具所描述)把dept和emp两个文件上传到本地目录/usr/local/hadoop-1.1.2/input中,然后使用eclipse的HDFS插件工具上传该文件到/usr/hadoop/in目录中,如下图所示:
2.2.2问题1:求各个部门的总工资
2.2.2.1问题分析
MapReduce中的join分为好几种,比如有最常见的 reduce side join、map side join和semi join 等。reduce join 在shuffle阶段要进行大量的数据传输,会造成大量的网络IO效率低下,而map side join 在处理多个小表关联大表时非常有用 。
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
在下面代码中,将会把数据量小的表(部门dept )缓存在内存中,在Mapper阶段对员工部门编号映射成部门名称,该名称作为key输出到Reduce中,在Reduce中计算按照部门计算各个部门的总工资。
2.2.2.2处理流程图
2.2.2.3编写代码
Q1SumDeptSalary.java代码:
import java.io.BufferedR
import java.io.FileR
import java.io.IOE
import java.util.HashM
import java.util.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.filecache.DistributedC
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Q1SumDeptSalary extends Configured implements Tool {
&&& publicstaticclass MapClass extends Mapper&LongWritable, Text, Text, Text& {
&&&&&&& // 用于缓存 dept文件中的数据
&&&&&&& private Map&String, String& deptMap = new HashMap&String, String&();
&&&&&&& private String[] kv;
&&&&&&& // 此方法会在Map方法执行之前执行且执行一次
&&&&&&& @Override
&&&&&&& protectedvoid setup(Context context) throws IOException, InterruptedException {
&&&&&&&&&&& BufferedReader in = null;
&&&&&&&&&&& try {
&&&&&&&&&&&&&&& // 从当前作业中获取要缓存的文件
&&&&&&&&&&&&&&& Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
&&&&&&&&&&&&&&& String deptIdName = null;
&&&&&&&&&&&&&&& for (Path path : paths) {
&&&&&&&&&&&&&&&&&&& // 对部门文件字段进行拆分并缓存到deptMap中
&&&&&&&&&&&&&&&&&&& if (path.toString().contains("dept")) {
&&&&&&&&&&&&&&&&&&&&&&& in = new BufferedReader(new FileReader(path.toString()));
&&&&&&&&&&&&&&&&&&&&&&& while (null != (deptIdName = in.readLine())) {
&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&& // 对部门文件字段进行拆分并缓存到deptMap中
&&&&&&&&&&&&&&&&&&&&&&&&&&& // 其中Map中key为部门编号,value为所在部门名称
&&&&&&&&&&&&&&&&&&&&&&&&&&& deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
&&&&&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& }
&&&&&&&&&&& } catch (IOException e) {
&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&& } finally {
&&&&&&&&&&&&&&& try {
&&&&&&&&&&&&&&&&&&& if (in != null) {
&&&&&&&&&&&&&&&&&&&&&&& in.close();
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& } catch (IOException e) {
&&&&&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
&&&&&&&&&&& // 对员工文件字段进行拆分
&&&&&&&&&&& kv = value.toString().split(",");
&&&&&&&&&&& // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资
&&&&&&&&&&& if (deptMap.containsKey(kv[7])) {
&&&&&&&&&&&&&&& if (null != kv[5] && !"".equals(kv[5].toString())) {
&&&&&&&&&&&&&&&&&&& context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&& publicstaticclass Reduce extends Reducer&Text, Text, Text, LongWritable& {
publicvoid reduce(Text key, Iterable&Text& values, Context context) throws IOException, InterruptedException {
&&&&&&&&&&& // 对同一部门的员工工资进行求和
&&& &&&&&&& long sumSalary = 0;
&&&&&&&&&&& for (Text val : values) {
&&&&&&&&&&&&&&& sumSalary += Long.parseLong(val.toString());
&&&&&&&&&&& }
&&&&&&&&&&& // 输出key为部门名称和value为该部门员工工资总和
&&&&&&&&&&& context.write(key, new LongWritable(sumSalary));
&&& @Override
&&& publicint run(String[] args) throws Exception {
&&&&&&& // 实例化作业对象,设置作业名称、Mapper和Reduce类
&&&&&&& Job job = new Job(getConf(), "Q1SumDeptSalary");
&&&&&&& job.setJobName("Q1SumDeptSalary");
&&&&&&& job.setJarByClass(Q1SumDeptSalary.class);
&&&&&&& job.setMapperClass(MapClass.class);
&&&&&&& job.setReducerClass(Reduce.class);
&&&&&&& // 设置输入格式类
&&&&&&& job.setInputFormatClass(TextInputFormat.class);
&&&&&&& // 设置输出格式
&&&&&&& job.setOutputFormatClass(TextOutputFormat.class);
&&&&&&& job.setOutputKeyClass(Text.class);
&&&&&&& job.setOutputValueClass(Text.class);
&&&&&&& // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
&&&&&& String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
&&&&&& DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
&&&&&&& FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
&&&&&&& FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
&&&&&&& job.waitForCompletion(true);
&&&&&&& return job.isSuccessful() ? 0 : 1;
&&& &* 主方法,执行入口
&&& &* @param args 输入参数
&&& publicstaticvoid main(String[] args) throws Exception {
&&&&&&& int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);
&&&&&&& System.exit(res);
2.2.2.4配置运行参数
新建一个Java应用运行程序,需要在Arguments页签填写Q1SumDeptSalary运行的部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l部门数据路径:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp
l输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q1
2.2.2.5运行并查看结果
设置运行参数完毕后,点击运行按钮:
运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q1目录,打开part-r-00000文件,可以看到运行结果:
ACCOUNTING8750
RESEARCH6775
SALES& 9400
2.2.3问题2:求各个部门的人数和平均工资
2.2.3.1问题分析
求各个部门的人数和平均工资,需要得到各部门工资总数和部门人数,通过两者相除获取各部门平均工资。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段抽取出部门编号和员工工资,利用缓存部门数据把部门编号对应为部门名称,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工工资的列表,最后在Reduce中按照部门归组,遍历部门所有员工,求出总数和员工数,输出部门名称和平均工资。
2.2.3.2处理流程图
2.2.3.3编写代码
Q2DeptNumberAveSalary.java代码:
import java.io.BufferedR
import java.io.FileR
import java.io.IOE
import java.util.HashM
import java.util.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.filecache.DistributedC
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Q2DeptNumberAveSalary extends Configured implements Tool {
&&& publicstaticclass MapClass extends Mapper&LongWritable, Text, Text, Text& {
&&&&&&& // 用于缓存 dept文件中的数据
&&&&&&& private Map&String, String& deptMap = new HashMap&String, String&();
&&&&&&& private String[] kv;
&&&&&&& // 此方法会在Map方法执行之前执行且执行一次
&&&&&&& @Override
&&&&&&& protectedvoid setup(Context context) throws IOException, InterruptedException {
&&&&&&&&&&& BufferedReader in = null;
&&&&&&&&&&& try {
&&&&&&&&&&&&&&& // 从当前作业中获取要缓存的文件
&&&&&&&&&&&&&&& Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
&&&&&&&&&&&&&&& String deptIdName = null;
&&&&&&&&&&&&&&& for (Path path : paths) {
&&&&&&&&&&&&&&&&&&& // 对部门文件字段进行拆分并缓存到deptMap中
&&&&&&&&&&&&&&&&&&& if (path.toString().contains("dept")) {
&&&&&&&&&&&&&&&&&&&&&&& in = new BufferedReader(new FileReader(path.toString()));
&&&&&&&&&&&&&&&&&&&&&&& while (null != (deptIdName = in.readLine())) {
&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&& // 对部门文件字段进行拆分并缓存到deptMap中
&&&&&&&&&&&&&&&&&&&&&&&&&&& // 其中Map中key为部门编号,value为所在部门名称
&&&&&&&&&&&&&&&&&&&&&&&&&&& deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
&&&&&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& }
&&&&&&&&&&& } catch (IOException e) {
&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&& } finally {
&&&&&&&&&&&&&&& try {
&&&&&&&&&&&&&&&&&&& if (in != null) {
&&&&&&&&&&&&&&&&&&&&&&& in.close();
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& } catch (IOException e) {
&&&&&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&&&&&& publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
&&&&&&&&&&& // 对员工文件字段进行拆分
&&&&&&&&&&& kv = value.toString().split(",");
&&&&&&&&&&& // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资
&&&&&&&&&&& if (deptMap.containsKey(kv[7])) {
&&&&&&&&&&&&&&& if (null != kv[5] && !"".equals(kv[5].toString())) {
&&&&&&&&&&&&&&&&&&& context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&& publicstaticclass Reduce extends Reducer&Text, Text, Text, Text& {
&&&&&&& publicvoid reduce(Text key, Iterable&Text& values, Context context) throws IOException, InterruptedException {
&&&&&&&&&&& long sumSalary = 0;
&&&&&&&&&&& int deptNumber = 0;
&&&&&&&&&&& // 对同一部门的员工工资进行求和
&&&&&&&&&&& for (Text val : values) {
&&&&&&&&&&&&&&& sumSalary += Long.parseLong(val.toString());
&&&&&&&&&&&&&&& deptNumber++;
&&&&&&&&&&& }
&&&&&&&&&&& // 输出key为部门名称和value为该部门员工工资平均值
&&&&&&&&&&& context.write(key, new Text("Dept Number:" + deptNumber + ", Ave Salary:" + sumSalary / deptNumber));
&&& @Override
&&& publicint run(String[] args) throws Exception {
&&&&&&& // 实例化作业对象,设置作业名称、Mapper和Reduce类
&&&&&&& Job job = new Job(getConf(), "Q2DeptNumberAveSalary");
&&&&&&& job.setJobName("Q2DeptNumberAveSalary");
&&&&&&& job.setJarByClass(Q2DeptNumberAveSalary.class);
&&&&&&& job.setMapperClass(MapClass.class);
&&&&&&& job.setReducerClass(Reduce.class);
&&&&&&& // 设置输入格式类
&&&&&&& job.setInputFormatClass(TextInputFormat.class);
&&&&&&& // 设置输出格式类
&&&&&&& job.setOutputFormatClass(TextOutputFormat.class);
&&&&&&& job.setOutputKeyClass(Text.class);
&&&&&&& job.setOutputValueClass(Text.class);
&&&&&&& // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
&&&&&&& String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
&&&&&&& DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
&&&&&&& FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
&&&&&&& FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
&&&&&&& job.waitForCompletion(true);
&&&&&&& return job.isSuccessful() ? 0 : 1;
&&& &* 主方法,执行入口
&&& &* @param args 输入参数
&&& publicstaticvoid main(String[] args) throws Exception {
&&&&&&& int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args);
&&&&&&& System.exit(res);
2.2.3.4配置运行参数
新建一个Java应用运行程序,需要在Arguments页签填写Q2DeptNumberAveSalary运行的部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l部门数据路径:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp
l输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q2
2.2.3.5运行并查看结果
设置运行参数完毕后,点击运行按钮:
运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q2目录,打开part-r-00000文件,可以看到运行结果:
ACCOUNTINGDept Number:3,Ave Salary:2916
RESEARCHDept Number:3,Ave Salary:2258
SALES& Dept Number:6,Ave Salary:1566
2.2.4问题3:求每个部门最早进入公司的员工姓名
2.2.4.1问题分析
求每个部门最早进入公司员工姓名,需要得到各部门所有员工的进入公司日期,通过比较获取最早进入公司员工姓名。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后Mapper阶段抽取出key为部门名称(利用缓存部门数据把部门编号对应为部门名称),value为员工姓名和进入公司日期,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工+进入公司日期的列表,最后在Reduce中按照部门归组,遍历部门所有员工,找出最早进入公司的员工并输出。
2.2.4.2处理流程图
2.2.4.3编写代码
import java.io.BufferedR
import java.io.FileR
import java.io.IOE
import java.text.DateF
import java.text.ParseE
import java.text.SimpleDateF
import java.util.D
import java.util.HashM
import java.util.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.filecache.DistributedC
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Q3DeptEarliestEmp extends Configured implements Tool {
&&& publicstaticclass MapClass extends Mapper&LongWritable, Text, Text, Text& {
&&&&&&& // 用于缓存 dept文件中的数据
&&&&&&& private Map&String, String& deptMap = new HashMap&String, String&();
&&&&&&& private String[] kv;
&&&&&&& // 此方法会在Map方法执行之前执行且执行一次
&&&&&&& @Override
&&&&&&& protectedvoid setup(Context context) throws IOException, InterruptedException {
&&&&&&&&&&& BufferedReader in = null;
&&&&&&&&&&& try {
&&&&&&&&&&&&&&& // 从当前作业中获取要缓存的文件
&&&&&&&&&&&&&&& Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
&&&&&&&&&&&&&&& String deptIdName = null;
&&&&&&&&&&&&&&& for (Path path : paths) {
&&&&&&&&&&&&&&&&&&& if (path.toString().contains("dept")) {
&&&&&&&&&&&&&&&&&&&&&&& in = new BufferedReader(new FileReader(path.toString()));
&&&&&&&&&&&&&&&&&&&&&&& while (null != (deptIdName = in.readLine())) {
&&&&&&&&&&&&&&&&&&&&&&&&&&& // 对部门文件字段进行拆分并缓存到deptMap中
&&&&&&&&&&&&&&&&&&&&&&&&&&& // 其中Map中key为部门编号,value为所在部门名称
&&&&&&&&&&&&&&&&&&&&&&&&&&& deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
&&&&&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& }
&&&&&&&&&&& } catch (IOException e) {
&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&& } finally {
&&&&&&&&&&&&&&& try {
&&&&&&&&&&&&&&&&&&& if (in != null) {
&&&&&&&&&&&&&&&&&&&&&&& in.close();
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& } catch (IOException e) {
&&&&&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&&&&&& publicvoid map(LongWritable key, Text value, Context context) throws IOException, &&& InterruptedException {
&&&&&&&&&&& // 对员工文件字段进行拆分
&&&&&&&&&&& kv = value.toString().split(",");
&&&&&&&&&&& // map join: 在map阶段过滤掉不需要的数据
&&&&&&&&&&& // 输出key为部门名称和value为员工姓名+","+员工进入公司日期
&&&&&&&&&&& if (deptMap.containsKey(kv[7])) {
&&&&&&&&&&&&&&& if (null != kv[4] && !"".equals(kv[4].toString())) {
&&&&&&&&&&&&&&&&&&& context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[1].trim() &&&&&&&&&& &&&&&&& + "," + kv[4].trim()));
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&& publicstaticclass Reduce extends Reducer&Text, Text, Text, Text& {
&&&&&&& publicvoid reduce(Text key, Iterable&Text& values, Context context) throws IOException, && &&& InterruptedException {
&&&&&&&&&&& // 员工姓名和进入公司日期
&&&&&&&&&&& String empName = null;
&&&&&&&&&&& String empEnterDate = null;
&&&&&&&&&&& // 设置日期转换格式和最早进入公司的员工、日期
&&&&&&&&&&& DateFormat df = new SimpleDateFormat("dd-MM月-yy");
&&&&&&&&&&& Date earliestDate = new Date();
&&&&&&&&&&& String earliestEmp = null;
&&&&&&&&&&& // 遍历该部门下所有员工,得到最早进入公司的员工信息
&&&&&&&&&&& for (Text val : values) {
&&&&&&&&&&&&&&& empName = val.toString().split(",")[0];
&&&&&&&&&&&&&&& empEnterDate = val.toString().split(",")[1].toString().trim();
&&&&&&&&&&&&&&& try {
&&&&&&&&&&&&&&&&&&& System.out.println(df.parse(empEnterDate));
&&&&&&&&&&&&&&&&&&& if (df.parse(empEnterDate).compareTo(earliestDate) & 0) {
&&&&&&&&&&&&&&&&&&&&&&& earliestDate = df.parse(empEnterDate);
&&&&&&&&&&&&&&&&&&&&&&& earliestEmp = empName;
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& } catch (ParseException e) {
&&&&&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&&&&&&&&&& // 输出key为部门名称和value为该部门最早进入公司员工
&&&&&&&&&&& context.write(key, new Text("The earliest emp of dept:" + earliestEmp + ", Enter &&&&&& &&& date:" + new SimpleDateFormat("yyyy-MM-dd").format(earliestDate)));
&&& @Override
&&& publicint run(String[] args) throws Exception {
&&&&&&& // 实例化作业对象,设置作业名称
&&&&&&& Job job = new Job(getConf(), "Q3DeptEarliestEmp");
&&&&&&& job.setJobName("Q3DeptEarliestEmp");
&&&&&&& // 设置Mapper和Reduce类
&&&&&&& job.setJarByClass(Q3DeptEarliestEmp.class);
&&&&&&& job.setMapperClass(MapClass.class);
&&&&&&& job.setReducerClass(Reduce.class);
&&& &&& // 设置输入格式类
&&&&&&& job.setInputFormatClass(TextInputFormat.class);
&&&&&&& // 设置输出格式类
&&&&&&& job.setOutputFormatClass(TextOutputFormat.class);
&&&&&&& job.setOutputKeyClass(Text.class);
&&&&&&& job.setOutputValueClass(Text.class);
&&&&&&& // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第三个参数为输出路径
&&&&&&& String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
&&&&&&& DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
&&&&&&& FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
&&&&&&& FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
&&&&&&& job.waitForCompletion(true);
&&&&&&& returnjob.isSuccessful() ? 0 : 1;
&&& &* 主方法,执行入口
&&& &* @param args 输入参数
&&& publicstaticvoid main(String[] args) throws Exception {
&&&&&&& intres = ToolRunner.run(new Configuration(), new Q3DeptEarliestEmp(), args);
&&&&&&& System.exit(res);
2.2.4.4配置运行参数
新建一个Java应用运行程序,需要在Arguments页签填写Q3DeptEarliestEmp运行的部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l部门数据路径:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp
l输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q3
2.2.4.5运行并查看结果
设置运行参数完毕后,点击运行按钮:
运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q3目录,打开part-r-00000文件,可以看到运行结果:
ACCOUNTINGThe earliest emp of dept:CLARK, Enter date:
RESEARCHThe earliest emp of dept:SMITH, Enter date:
SALES& The earliest emp of dept:ALLEN, Enter date:
2.2.5问题4:求各个城市的员工的总工资
2.2.5.1问题分析
求各个城市员工的总工资,需要得到各个城市所有员工的工资,通过对各个城市所有员工工资求和得到总工资。首先和问题1类似在Mapper的Setup阶段缓存部门对应所在城市数据,然后在Mapper阶段抽取出key为城市名称(利用缓存数据把部门编号对应为所在城市名称),value为员工工资,接着在Shuffle阶段把传过来的数据处理为城市名称对应该城市所有员工工资,最后在Reduce中按照城市归组,遍历城市所有员工,求出工资总数并输出。
2.2.5.2处理流程图
2.2.5.3编写代码
import java.io.BufferedR
import java.io.FileR
import java.io.IOE
import java.util.HashM
import java.util.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.filecache.DistributedC
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Q4SumCitySalary extends Configured implements Tool {
&&& publicstaticclass MapClass extends Mapper&LongWritable, Text, Text, Text& {
&&&&&&& // 用于缓存 dept文件中的数据
&&&&&&& private Map&String, String& deptMap = new HashMap&String, String&();
&&&&&&& private String[] kv;
&&&&&&& // 此方法会在Map方法执行之前执行且执行一次
&&&&&&& @Override
&&&&&&& protectedvoid setup(Context context) throws IOException, InterruptedException {
&&&&&&&&&&& BufferedReader in = null;
&&&&&&&&&&& try {
&&&&&&&&&&&&&&& // 从当前作业中获取要缓存的文件
&&&&&&&&&&&&&&& Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
&&&&&&&&&&&&&&& String deptIdName = null;
&&&&&&&&&&&&&&& for (Path path : paths) {
&&&&&&&&&&&&&&&&&&& if (path.toString().contains("dept")) {
&&&&&&&&&&&&&&&&&&&&&&& in = new BufferedReader(new FileReader(path.toString()));
&&&&&&&&&&&&&&&&&&&&&&& while (null != (deptIdName = in.readLine())) {
&&&&&&&&&&&&&&&&&&&&&&&&&&& // 对部门文件字段进行拆分并缓存到deptMap中
&&&&&&&&&&&&&&&&&&&&&&&&&&& // 其中Map中key为部门编号,value为所在城市名称
&&&&&&&&&&&&&&&&&&&&&&&&&&& deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[2]);
&&&&&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& }
&&&&&&&&&&& } catch (IOException e) {
&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&& } finally {
&&&&&&&&&&&&&&& try {
&&&&&&&&&&&&&&&&&&& if (in != null) {
&&&&&&&&&&&&&&&&&&&&&&& in.close();
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& } catch (IOException e) {
&&&&&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&&&&&& publicvoid map(LongWritable key, Text value, Context context) throws IOException, &&& InterruptedException {
&&&&&&&&&&& // 对员工文件字段进行拆分
&&&&&&&&&&& kv = value.toString().split(",");
&&&&&&&&&&& // map join: 在map阶段过滤掉不需要的数据,输出key为城市名称和value为员工工资
&&&&&&&&&&& if (deptMap.containsKey(kv[7])) {
&&&&&&&&&&&&&&& if (null != kv[5] && !"".equals(kv[5].toString())) {
&&&&&&&&&&&&&&&&&&& context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&& publicstaticclass Reduce extends Reducer&Text, Text, Text, LongWritable& {
&&&&&&& publicvoid reduce(Text key, Iterable&Text& values, Context context) throws IOException, && &&& InterruptedException {
&&&&&&&&&&& // 对同一城市的员工工资进行求和
&&&&&&&&&&& long sumSalary = 0;
&&&&&&&&&&& for (Text val : values) {
&&&&&&&&&&&&&&& sumSalary += Long.parseLong(val.toString());
&&&&&&&&&&& }
&&&&&&&&&&& // 输出key为城市名称和value为该城市工资总和
&&&&&&&&&&& context.write(key, new LongWritable(sumSalary));
&&& @Override
&&& publicint run(String[] args) throws Exception {
&&&&&&& // 实例化作业对象,设置作业名称
&&&&&&& Job job = new Job(getConf(), "Q4SumCitySalary");
&&&&&&& job.setJobName("Q4SumCitySalary");
&&&&&&& // 设置Mapper和Reduce类
&&&&&&& job.setJarByClass(Q4SumCitySalary.class);
&&&&&&& job.setMapperClass(MapClass.class);
&&&&&&& job.setReducerClass(Reduce.class);
&&&&&&& // 设置输入格式类
&&&&&&& job.setInputFormatClass(TextInputFormat.class);
&&&&&&& // 设置输出格式类
&&&&&&& job.setOutputFormatClass(TextOutputFormat.class);
&&&&&&& job.setOutputKeyClass(Text.class);
&&&&&&& job.setOutputValueClass(Text.class);
&&&&&&& // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
&&&&&&& String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
&&&&&&& DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
&&&&&&& FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
&&&&&&& FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
&&&&&&& job.waitForCompletion(true);
&&&&&&& return job.isSuccessful() ? 0 : 1;
&&& &* 主方法,执行入口
&&& &* @param args 输入参数
&&& publicstaticvoid main(String[] args) throws Exception {
&&&&&&& int res = ToolRunner.run(new Configuration(), new Q4SumCitySalary(), args);
&&&&&&& System.exit(res);
2.2.5.4配置运行参数
新建一个Java应用运行程序,需要在Arguments页签填写Q4SumCitySalary运行的部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l部门数据路径:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp
l输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q4
2.2.5.5运行并查看结果
设置运行参数完毕后,点击运行按钮:
运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q4目录,打开part-r-00000文件,可以看到运行结果:
CHICAGO& 9400
DALLAS&&&& 6775
NEW YORK&&&& 8750
2.2.6问题5:列出工资比上司高的员工姓名及其工资
2.2.6.1问题分析
求工资比上司高的员工姓名及工资,需要得到上司工资及上司所有下属员工,通过比较他们工资高低得到比上司工资高的员工。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为员工编号、value为"M,该员工工资",员工对应经理表数据key为经理编号、value为"E,该员工姓名,该员工工资";然后在Shuffle阶段把传过来的经理数据和员工对应经理表数据进行归组,如编号为7698员工,value中标志M为自己工资,value中标志E为其下属姓名及工资;最后在Reduce中遍历比较员工与经理工资高低,输出工资高于经理的员工。
2.2.6.2处理流程图
2.2.6.3编写代码
import java.io.IOE
import java.util.HashM
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Q5EarnMoreThanManager extends Configured implements Tool {
&&& publicstaticclass MapClass extends Mapper&LongWritable, Text, Text, Text& {
&&&&&&& publicvoid map(LongWritable key, Text value, Context context) throws IOException, &&& InterruptedException {
&&&&&&&&&&& // 对员工文件字段进行拆分
&&&&&&&&&&& String[] kv = value.toString().split(",");
&&&&&&&&&&& // 输出经理表数据,其中key为员工编号和value为M+该员工工资
&&&&&&&&&&& context.write(new Text(kv[0].toString()), new Text("M," + kv[5]));
&&&&&&&&&&& // 输出员工对应经理表数据,其中key为经理编号和value为(E,该员工姓名,该员工工资)
&&&&&&&&&&& if (null != kv[3] && !"".equals(kv[3].toString())) {
&&&&&&&&&&&&&&& context.write(new Text(kv[3].toString()), new Text("E," + kv[1] + "," + kv[5]));
&&&&&&&&&&& }
&&& publicstaticclass Reduce extends Reducer&Text, Text, Text, Text& {
&&&&&&& publicvoid reduce(Text key, Iterable&Text& values, Context context) throws IOException, && &&& InterruptedException {
&&&&&&&&&&& // 定义员工姓名、工资和存放部门员工Map
&&&&&&&&&&& String empN
&&&&&&&&&&& long empSalary = 0;
&&&&&&&&&&& HashMap&String, Long& empMap = new HashMap&String, Long&();
&&&&&&&&&&&
&&&&&&&&&&& // 定义经理工资变量
&&&&&&&&&&& long mgrSalary = 0;
&&&&&&&&&&& for (Text val : values) {
&&&&&&&&&&&&&&& if (val.toString().startsWith("E")) {
&&&&&&&&&&&&&&&&&&& // 当是员工标示时,获取该员工对应的姓名和工资并放入Map中
&&&&&&&&&&&&&&&&&&& empName = val.toString().split(",")[1];
&&&&&&&&&&&&&&&&&&& empSalary = Long.parseLong(val.toString().split(",")[2]);
&&&&&&&&&&&&&&&&&&& empMap.put(empName, empSalary);
&&&&&&&&&&&&&&& } else {
&&&&&&&&&&&&&&&&&&& // 当时经理标志时,获取该经理工资
&&&&&&&&&&&&&&&&&&& mgrSalary = Long.parseLong(val.toString().split(",")[1]);
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&&&&&&&&&& // 遍历该经理下属,比较员工与经理工资高低,输出工资高于经理的员工
&&&&&&&&&&& for (java.util.Map.Entry&String, Long& entry : empMap.entrySet()) {
&&&&&&&&&&&&&&& if (entry.getValue() & mgrSalary) {
&&&&&&&&&&&&&&&&&&& context.write(new Text(entry.getKey()), new Text("" + entry.getValue()));
&&&&&&&&&&&&&&& }
&&& &&&&&&& }
&&& @Override
&&& publicint run(String[] args) throws Exception {
&&&&&&& // 实例化作业对象,设置作业名称
&&&&&&& Job job = new Job(getConf(), "Q5EarnMoreThanManager");
&&&&&&& job.setJobName("Q5EarnMoreThanManager");
&&&&&&& // 设置Mapper和Reduce类
&&&&&&& job.setJarByClass(Q5EarnMoreThanManager.class);
&&&&&&& job.setMapperClass(MapClass.class);
&&&&&&& job.setReducerClass(Reduce.class);
&&&&&&& // 设置输入格式类
&&&&&&& job.setInputFormatClass(TextInputFormat.class);
&&&&&&& // 设置输出格式类
&&&&&&& job.setOutputFormatClass(TextOutputFormat.class);
&&&&&&& job.setOutputKeyClass(Text.class);
&&&&&&& job.setOutputValueClass(Text.class);
&&&&&&& // 第1个参数为员工数据路径和第2个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
&&&&&&& FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
&&&&&&& FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
&&&&&&& job.waitForCompletion(true);
&&&&&&& return job.isSuccessful() ? 0 : 1;
&&& &* 主方法,执行入口
&&& &* @param args 输入参数
&&& publicstaticvoid main(String[] args) throws Exception {
&&&&&&& int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args);
&&&&&&& System.exit(res);
2.2.6.4配置运行参数
新建一个Java应用运行程序,需要在Arguments页签填写Q5EarnMoreThanManager运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp
l输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q5
2.2.6.5运行并查看结果
设置运行参数完毕后,点击运行按钮:
运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q5目录,打开part-r-00000文件,可以看到运行结果:
FORD& 3000
2.2.7问题6:列出工资比公司平均工资要高的员工姓名及其工资
2.2.7.1问题分析
求工资比公司平均工资要高的员工姓名及工资,需要得到公司的平均工资和所有员工工资,通过比较得出工资比平均工资高的员工姓名及工资。这个问题可以分两个作业进行解决,先求出公司的平均工资,然后与所有员工进行比较得到结果;也可以在一个作业进行解决,这里就得使用作业setNumReduceTasks方法,设置Reduce任务数为1,保证每次运行一个reduce任务,从而能先求出平均工资,然后进行比较得出结果。
在Mapper阶段输出两份所有员工数据,其中一份key为0、value为该员工工资,另外一份key为0、value为"该员工姓名 ,员工工资";然后在Shuffle阶段把传过来数据按照key进行归组,在该任务中有key值为0和1两组数据;最后在Reduce中对key值0的所有员工求工资总数和员工数,获得平均工资;对key值1,比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资。
2.2.7.2处理流程图
2.2.7.3编写代码
import java.io.IOE
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Q6HigherThanAveSalary extends Configured implements Tool {
&&& publicstaticclass MapClass extends Mapper&LongWritable, Text, IntWritable, Text& {
&&&&&&& publicvoid map(LongWritable key, Text value, Context context) throws IOException, &&& InterruptedException {
&&&&&&&&&&& // 对员工文件字段进行拆分
&&&&&&&&&&& String[] kv = value.toString().split(",");
&&&&&&&&&&& // 获取所有员工数据,其中key为0和value为该员工工资
&&&&&&&&&&& context.write(new IntWritable(0), new Text(kv[5]));
&&&&&&&&&&& // 获取所有员工数据,其中key为0和value为(该员工姓名 ,员工工资)
&&&&&&&&&&& context.write(new IntWritable(1), new Text(kv[1] + "," + kv[5]));
&&& publicstaticclass Reduce extends Reducer&IntWritable, Text, Text, Text& {
&&&&&&& // 定义员工工资、员工数和平均工资
&&&&&&& privatelongallSalary = 0;
&&&&&&& privateintallEmpCount = 0;
&&&&&&& privatelongaveSalary = 0;
&&&&&&& // 定义员工工资变量
&&&&&&& privatelongempSalary = 0;
&&& &&& publicvoid reduce(IntWritable key, Iterable&Text& values, Context context) throws &&& IOException, InterruptedException {
&&&&&&&&&&& for (Text val : values) {
&&&&&&&&&&&&&&& if (0 == key.get()) {
&&&&&&&&&&&&&&&&&&& // 获取所有员工工资和员工数
&&&&&&&&&&&&&&&&&&& allSalary += Long.parseLong(val.toString());
&&&&&&&&&&&&&&&&&&& allEmpCount++;
&&&&&&&&&&&&&&&&&&& System.out.println("allEmpCount = " + allEmpCount);
&&&&&&&&&&&&&&& } elseif (1 == key.get()) {
&&&&&&&&&&&&&&&&&&& if (aveSalary == 0) {
&&&&&&&&&&&&&&&&&&&&&&& aveSalary = allSalary / allEmpCount;
&&&&&&&&&&&&&&&&&&&&&&& context.write(new Text("Average Salary = "), new Text("" + aveSalary));
&&&&&&&&&&&&&&&&&&& &&& context.write(new Text("Following employees have salarys higher than &&&&&&&&&& &&&&&&&&&&& Average:"), new Text(""));
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& // 获取员工的平均工资
&&&&&&&&&&&&&&&&&&& System.out.println("Employee salary = " + val.toString());
&&&&&&&&&&&&&&&&&&& aveSalary = allSalary / allEmpCount;
&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&& // 比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资
&&&&&&&&&&&&&&&&&&& empSalary = Long.parseLong(val.toString().split(",")[1]);
&&&&&&&&&&&&&&&&&&& if (empSalary & aveSalary) {
&&&&&&&&&&&&&&&&&&&&&&& context.write(new Text(val.toString().split(",")[0]), new Text("" + &&&&&&&&&&& &&&&&&&&&&& empSalary));
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&& @Override
&&& publicint run(String[] args) throws Exception {
&&&&&&& // 实例化作业对象,设置作业名称
&&&&&&& Job job = new Job(getConf(), "Q6HigherThanAveSalary");
&&&&&&& job.setJobName("Q6HigherThanAveSalary");
&&&&&&& // 设置Mapper和Reduce类
&&&&&&& job.setJarByClass(Q6HigherThanAveSalary.class);
&&&&&&& job.setMapperClass(MapClass.class);
&&&&&&& job.setReducerClass(Reduce.class);
&&&&&&& // 必须设置Reduce任务数为1 # -D mapred.reduce.tasks = 1
&&&&&&& // 这是该作业设置的核心,这样才能够保证各reduce是串行的
&&&&&&& job.setNumReduceTasks(1);
&&&&&&& // 设置输出格式类
&&&&&&& job.setMapOutputKeyClass(IntWritable.class);
&&&&&&& job.setMapOutputValueClass(Text.class);
&&&&&&& // 设置输出键和值类型
&&&&&&& job.setOutputFormatClass(TextOutputFormat.class);
&&&&&&& job.setOutputKeyClass(Text.class);
&&&&&&& job.setOutputValueClass(LongWritable.class);
&&&&&&& // 第1个参数为员工数据路径和第2个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
&&&&&&& FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
&&&&&&& FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
&&&&&&& job.waitForCompletion(true);
&&&&&&& return job.isSuccessful() ? 0 : 1;
&&& &* 主方法,执行入口
&&& &* @param args 输入参数
&&& publicstaticvoid main(String[] args) throws Exception {
&&&&&&& int res = ToolRunner.run(new Configuration(), new Q6HigherThanAveSalary(), args);
&&&&&&& System.exit(res);
2.2.7.4配置运行参数
新建一个Java应用运行程序,需要在Arguments页签填写Q6HigherThanAveSalary运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp
l输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q6
2.2.7.5运行并查看结果
设置运行参数完毕后,点击运行按钮:
运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q6目录,打开part-r-00000文件,可以看到运行结果:
Average Salary = 2077
Following employees have salarys higher than Average:&&&&
FORD& 3000
KING&& 5000
BLAKE& 2850
2.2.8问题7:列出名字以J开头的员工姓名及其所属部门名称
2.2.8.1问题分析
求名字以J开头的员工姓名机器所属部门名称,只需判断员工姓名是否以J开头。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段判断员工姓名是否以J开头,如果是抽取出员工姓名和员工所在部门编号,利用缓存部门数据把部门编号对应为部门名称,转换后输出结果。
2.2.8.2处理流程图
2.2.8.3编写代码
import java.io.BufferedR
import java.io.FileR
import java.io.IOE
import java.util.HashM
import java.util.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.filecache.DistributedC
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Q7NameDeptOfStartJ extends Configured implements Tool {
&&& publicstaticclass MapClass extends Mapper&LongWritable, Text, Text, Text& {
&&&&&&& // 用于缓存 dept文件中的数据
&&&&&&& private Map&String, String& deptMap = new HashMap&String, String&();
&&&&&&& private String[] kv;
&&&&&&& // 此方法会在Map方法执行之前执行且执行一次
&&&&&&& @Override
&&&&&&& protectedvoid setup(Context context) throws IOException, InterruptedException {
&&&&&&&&&&& BufferedReader in = null;
&&&&&&&&&&& try {
&&&&&&&&&&&&&&& // 从当前作业中获取要缓存的文件
&&&&&&&&&&&&&&& Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
&&&&&&&&&&&&&&& String deptIdName = null;
&&&&&&&&&&&&&&& for (Path path : paths) {
&&&&&&&&&&&&&&&&&&& // 对部门文件字段进行拆分并缓存到deptMap中
&&&&&&&&&&&&&&&&&&& if (path.toString().contains("dept")) {
&&&&&&&&&&&&&&&&&&&&&&& in = new BufferedReader(new FileReader(path.toString()));
&&&&&&&&&&&&&&&&&&&&&&& while (null != (deptIdName = in.readLine())) {
&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&& // 对部门文件字段进行拆分并缓存到deptMap中
&&&&&&&&&&&&&&&&&&&&&&&&&&& // 其中Map中key为部门编号,value为所在部门名称
&&&&&&&&&&&&&&&&&&&&&&&&&&& deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
&&&&&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& }
&&&&&&&&&&& } catch (IOException e) {
&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&& } finally {
&&&&&&&&&&&&&&& try {
&&&&&&&&&&&&&&&&&&& if (in != null) {
&&&&&&&&&&&&&&&&&&&&&&& in.close();
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& } catch (IOException e) {
&&&&&&&&&&&&&&&&&&& e.printStackTrace();
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&&&&&& publicvoid map(LongWritable key, Text value, Context context) throws IOException, &&& InterruptedException {
&&&&&&&&&&& // 对员工文件字段进行拆分
&&&&&&&&&&& kv = value.toString().split(",");
&&&&&&&&&&& // 输出员工姓名为J开头的员工信息,key为员工姓名和value为员工所在部门名称
&&&&&&&&&&& if (kv[1].toString().trim().startsWith("J")) {
&&&&&&&&&&&&&&& context.write(new Text(kv[1].trim()), new Text(deptMap.get(kv[7].trim())));
&&&&&&&&&&& }
&&& @Override
&&& publicint run(String[] args) throws Exception {
&&&&&&& // 实例化作业对象,设置作业名称
&&&&&&& Job job = new Job(getConf(), "Q7NameDeptOfStartJ");
&&&&&&& job.setJobName("Q7NameDeptOfStartJ");
&&&&&&& // 设置Mapper和Reduce类
&&&&&&& job.setJarByClass(Q7NameDeptOfStartJ.class);
&&&&&&& job.setMapperClass(MapClass.class);
&&&&&&& // 设置输入格式类
&&&&&&& job.setInputFormatClass(TextInputFormat.class);
&&&&&&& // 设置输出格式类
&&&&&&& job.setOutputFormatClass(TextOutputFormat.class);
&&&&&&& job.setOutputKeyClass(Text.class);
&&&&&&& job.setOutputValueClass(Text.class);
&&&&&&& // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
&&&&&&& DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
&&&&&&& FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
&&&&&&& FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
&&&&&&& job.waitForCompletion(true);
&&&&&&& return job.isSuccessful() ? 0 : 1;
&&& &* 主方法,执行入口
&&& &* @param args 输入参数
&&& publicstaticvoid main(String[] args) throws Exception {
&&&&&&& int res = ToolRunner.run(new Configuration(), new Q7NameDeptOfStartJ(), args);
&&&&&&& System.exit(res);
2.2.8.4配置运行参数
新建一个Java应用运行程序,需要在Arguments页签填写Q7NameDeptOfStartJ运行的部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l部门数据路径:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp
l输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q7
2.2.8.5运行并查看结果
设置运行参数完毕后,点击运行按钮:
运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q7目录,打开part-r-00000文件,可以看到运行结果:
JAMESSALES
JONESRESEARCH
2.2.9问题8:列出工资最高的头三名员工姓名及其工资
2.2.9.1问题分析
求工资最高的头三名员工姓名及工资,可以通过冒泡法得到。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为0值、value为"员工姓名,员工工资";最后在Reduce中通过冒泡法遍历所有员工,比较员工工资多少,求出前三名。
2.2.9.2处理流程图
2.2.9.3编写代码
import java.io.IOE
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Q8SalaryTop3Salary extends Configured implements Tool {
&&& publicstaticclass MapClass extends Mapper&LongWritable, Text, IntWritable, Text& {
&&&&&&& publicvoid map(LongWritable key, Text value, Context context) throws IOException, &&& InterruptedException {
&&&&&&&&&&& // 对员工文件字段进行拆分
&&&&&&&&&&& String[] kv = value.toString().split(",");
&&&&&&&&&&& // 输出key为0和value为员工姓名+","+员工工资
&&&&&&&&&&& context.write(new IntWritable(0), new Text(kv[1].trim() + "," + kv[5].trim()));
&&& publicstaticclass Reduce extends Reducer&IntWritable, Text, Text, Text& {
&&&&&&& publicvoid reduce(IntWritable key, Iterable&Text& values, Context context) throws &&& IOException, InterruptedException {
&&&&&&&&&&& // 定义工资前三员工姓名
&&&&&&&&&&& String empN
&&&&&&&&&&& String firstEmpName = "";
&&&&&&&&&&& String secondEmpName = "";
&&&&&&&&&&& String thirdEmpName = "";
&&&&&&&&&&&
&&&&&&&&&&& // 定义工资前三工资
&&&&&&&&&&& long empSalary = 0;
&&&&&&&&&&& long firstEmpSalary = 0;
&&&&&&&&&&& long secondEmpSalary = 0;
&&&&&&&&&&& long thirdEmpSalary = 0;
&&&&&&&&&&& // 通过冒泡法遍历所有员工,比较员工工资多少,求出前三名
&&&&&&&&&&& for (Text val : values) {
&&&&&&&&&&&&&&& empName = val.toString().split(",")[0];
&&&&&&&&&&&&&&& empSalary = Long.parseLong(val.toString().split(",")[1]);
&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&& if(empSalary & firstEmpSalary) {
&&&&&&&&&&&&&&&&&&& thirdEmpName = secondEmpN
&&&&&&&&&&&&&&&&&&& thirdEmpSalary = secondEmpS
&&&&&&&&&&&&&&&&&&& secondEmpName = firstEmpN
&&&&&&&&&&&&&&&&&&& secondEmpSalary = firstEmpS
&&&&&&&&&&&&&&&&&&& firstEmpName = empN
&&&&&&&&&&&&&&&&&&& firstEmpSalary = empS
&&&&&&&&&&&&&&& } elseif (empSalary & secondEmpSalary) {
&&&&&&&&&&&&&&&&&&& thirdEmpName = secondEmpN
&&&&&&&&&&&&&&&&&&& thirdEmpSalary = secondEmpS
&&&&&&&&&&&&&&&&&&& secondEmpName = empN
&&&&&&&&&&&&&&&&&&& secondEmpSalary = empS
&&&&&&&&&&&&&&& } elseif (empSalary & thirdEmpSalary) {
&&&&&&&&&&&&&&&&&&& thirdEmpName = empN
&&&&&&&&&&&&&&&&&&& thirdEmpSalary = empS
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&&&&&&&&&&
&&&&&&&&&&& // 输出工资前三名信息
&&&&&&&&&&& context.write(new Text( "First employee name:" + firstEmpName), new Text("Salary:" &&&&&&&& + firstEmpSalary));
&&&&&&&&&&& context.write(new Text( "Second employee name:" + secondEmpName), new &&&&&&&&&&&&& &&& Text("Salary:" + secondEmpSalary));
&&&&&&&&&&& context.write(new Text( "Third employee name:" + thirdEmpName), new Text("Salary:" &&&&&&&& + thirdEmpSalary));
&&& @Override
&&& publicint run(String[] args) throws Exception {
&&&&&&& // 实例化作业对象,设置作业名称
&&&&&&& Job job = new Job(getConf(), "Q8SalaryTop3Salary");
&&&&&&& job.setJobName("Q8SalaryTop3Salary");
&&&&&&& // 设置Mapper和Reduce类
&&&&&&& job.setJarByClass(Q8SalaryTop3Salary.class);
&&&&&&& job.setMapperClass(MapClass.class);
&&&&&&& job.setReducerClass(Reduce.class);
&&&&&&& job.setMapOutputKeyClass(IntWritable.class);
&&&&&&& job.setMapOutputValueClass(Text.class);
&&&&&&& // 设置输入格式类
&&&&&&& job.setInputFormatClass(TextInputFormat.class);
&&&&&&& // 设置输出格式类
&&&&&&& job.setOutputKeyClass(Text.class);
&&&&&&& job.setOutputFormatClass(TextOutputFormat.class);
&&&&&&& job.setOutputValueClass(Text.class);
&&&&&&& // 第1个参数为员工数据路径和第2个参数为输出路径
&&&&&&& String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), &&&&&&&&&&&&& &&& args).getRemainingArgs();
&&&&&&& FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
&&&&&&& FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
&&&&&&& job.waitForCompletion(true);
&&&&&&& return job.isSuccessful() ? 0 : 1;
&&& &* 主方法,执行入口
&&& &* @param args 输入参数
&&& publicstaticvoid main(String[] args) throws Exception {
&&&&&&& int res = ToolRunner.run(new Configuration(), new Q8SalaryTop3Salary(), args);
&&&&&&& System.exit(res);
2.2.9.4配置运行参数
新建一个Java应用运行程序,需要在Arguments页签填写Q8SalaryTop3Salary运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp
l输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q8
2.2.9.5运行并查看结果
设置运行参数完毕后,点击运行按钮:
运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q8目录,打开part-r-00000文件,可以看到运行结果:
First employee name:KING&&& Salary:5000
Second employee name:FORD&&& Salary:3000
Third employee name:JONESSalary:2975
2.2.10问题9:将全体员工按照总收入(工资+提成)从高到低排列
2.2.10.1问题分析
求全体员工总收入降序排列,获得所有员工总收入并降序排列即可。在Mapper阶段输出所有员工总工资数据,其中key为员工总工资、value为员工姓名,在Mapper阶段的最后会先调用job.setPartitionerClass对数据进行分区,每个分区映射到一个reducer,每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。由于在本作业中Map的key只有0值,故能实现对所有数据进行排序。
2.2.10.2处理流程图
2.2.10.3编写代码
import java.io.IOE
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.io.WritableC
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Q9EmpSalarySort extends Configured implements Tool {
&&& publicstaticclass MapClass extends Mapper&LongWritable, Text, IntWritable, Text& {
&&&&&&& publicvoid map(LongWritable key, Text value, Context context) throws IOException, &&& InterruptedException {
&&&&&&&&&&& // 对员工文件字段进行拆分
&&&&&&&&&&& String[] kv = value.toString().split(",");
&&&&&&&&&&& // 输出key为员工所有工资和value为员工姓名
&&&&&&&&&&& int empAllSalary = "".equals(kv[6]) ? Integer.parseInt(kv[5]) : &&&&&&&&&&&&&&&&&&&&&&& &&& Integer.parseInt(kv[5]) + Integer.parseInt(kv[6]);
&&&&&&&&&&& context.write(new IntWritable(empAllSalary), new Text(kv[1]));
&&& &* 递减排序算法
&&& publicstaticclass DecreaseComparator extends IntWritable.Comparator {
&&&&&&& publicint compare(WritableComparable a, WritableComparable b) {
&&&&&&&&&&& return -super.compare(a, b);
&&&&&&& publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
&&&&&&&&&&& return -super.compare(b1, s1, l1, b2, s2, l2);
&&& @Override
&&& publicint run(String[] args) throws Exception {
&&&&&&& // 实例化作业对象,设置作业名称
&&&&&&& Job job = new Job(getConf(), "Q9EmpSalarySort");
&&&&&&& job.setJobName("Q9EmpSalarySort");
&&&&&&& // 设置Mapper和Reduce类
&&&&&&& job.setJarByClass(Q9EmpSalarySort.class);
&&&&&&& job.setMapperClass(MapClass.class);
&&&&&&& // 设置输出格式类
&&&&&&& job.setMapOutputKeyClass(IntWritable.class);
&&&&&&& job.setMapOutputValueClass(Text.class);
&&&&&&& job.setSortComparatorClass(DecreaseComparator.class);
&&&&&&& // 第1个参数为员工数据路径和第2个参数为输出路径
&&&&&&& String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), &&&&&&&&&&&&& &&& args).getRemainingArgs();
&&&&&&& FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
&&&&&&& FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
&&&&&&& job.waitForCompletion(true);
&&&&&&& return job.isSuccessful() ? 0 : 1;
&&& &* 主方法,执行入口
&&& &* @param args 输入参数
&&& publicstaticvoid main(String[] args) throws Exception {
&&&&&&& int res = ToolRunner.run(new Configuration(), new Q9EmpSalarySort(), args);
&&&&&&& System.exit(res);
2.2.10.4配置运行参数
新建一个Java应用运行程序,需要在Arguments页签填写Q9EmpSalarySort运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp
l输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q9
2.2.10.5运行并查看结果
设置运行参数完毕后,点击运行按钮:
运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q9目录,打开part-r-00000文件,可以看到运行结果:
5000&&& KING
3000&&& FORD
2975&&& JONES
2850&&& BLAKE
2.2.11问题10:求任何两名员工信息传递所需要经过的中间节点数
2.2.11.1问题分析
该公司所有员工可以形成入下图的树形结构,求两个员工的沟通的中间节点数,可转换在员工树中求两个节点连通所经过的节点数,即从其中一节点到汇合节点经过节点数加上另一节点到汇合节点经过节点数。例如求M到Q所需节点数,可以先找出M到A经过的节点数,然后找出Q到A经过的节点数,两者相加得到M到Q所需节点数。
在作业中首先在Mapper阶段所有员工数据,其中经理数据key为0值、value为"员工编号,员工经理编号",然后在Reduce阶段把所有员工放到员工列表和员工对应经理链表Map中,最后在Reduce的Cleanup中按照上面说所算法对任意两个员工计算出沟通的路径长度并输出。
2.2.11.2处理流程图
2.2.11.3编写代码
import java.io.IOE
import java.util.ArrayL
import java.util.HashM
import java.util.L
import java.util.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.NullW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.GenericOptionsP
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Q10MiddlePersonsCountForComm extends Configured implements Tool {
&&& publicstaticclass MapClass extends Mapper&LongWritable, Text, IntWritable, Text& {
&&&&&&& publicvoid map(LongWritable key, Text value, Context context) throws IOException, &&& InterruptedException {
&&&&&&&&&&& // 对员工文件字段进行拆分
&&&&&&&&&&& String[] kv = value.toString().split(",");
&&&&&&&&&&& // 输出key为0和value为员工编号+","+员工经理编号
&&&&&&&&&&& context.write(new IntWritable(0), new Text(kv[0] + "," + ("".equals(kv[3]) ? " " : kv[3])));
&&& publicstaticclass Reduce extends Reducer&IntWritable, Text, NullWritable, Text& {
&&&&&&& // 定义员工列表和员工对应经理Map
&&&&&&& List&String& employeeList = new ArrayList&String&();
&&&&&&& Map&String, String& employeeToManagerMap = new HashMap&String, String&();
&&&&&&& publicvoid reduce(IntWritable key, Iterable&Text& values, Context context) throws &&& IOException, InterruptedException {
&&&&&&&&&&& // 在reduce阶段把所有员工放到员工列表和员工对应经理Map中
&&&&&&&&&&& for (Text value : values) {
&&&&&&&&&&&&&&& employeeList.add(value.toString().split(",")[0].trim());
&&&&&&&&&&&&&&& employeeToManagerMap.put(value.toString().split(",")[0].trim(), &&&&&&&&&&&&&&&&&&&&&&& &&& value.toString().split(",")[1].trim());
&&&&&&&&&&& }
&&&&&&& @Override
&&&&&&& protectedvoid cleanup(Context context) throws IOException, InterruptedException {
&&&&&&&&&&& int totalEmployee = employeeList.size();
&&&&&&&&&&& int i,
&&&&&&&&&&& int
&&&&&&&&&&& System.out.println(employeeList);
&&&&&&&&&&& System.out.println(employeeToManagerMap);
&&&&&&&&&&& // 对任意两个员工计算出沟通的路径长度并输出
&&&&&&&&&&& for (i = 0; i & (totalEmployee - 1); i++) {
&&&&&&&&&&&&&&& for (j = (i + 1); j & totalE j++) {
&&&&&&&&&&&&&&&&&&& distance = calculateDistance(i, j);
&&&&&&&&&&&&&&&&&&& String value = employeeList.get(i) + " and " + employeeList.get(j) + " = &&&&&&&&&& &&&&&&& " +
&&&&&&&&&&&&&&&&&&& context.write(NullWritable.get(), new Text(value));
&&&&&&&&&&&&&&& }
&&&&&&&&&&& }
&&&&&&& /**
&&&&&&& &* 该公司可以由所有员工形成树形结构,求两个员工的沟通的中间节点数,可以转换在员工树中两员工之间的距离
&&&&&&& &* 由于在树中任意两点都会在某上级节点汇合,根据该情况设计了如下算法
&&&&&&& &*/
&&&&&&& privateint calculateDistance(int i, int j) {
&&&&&&&&&&& String employeeA = employeeList.get(i);
&&&&&&&&&&& String employeeB = employeeList.get(j);
&&&&&&&&&&& int distance = 0;
&&&&&&&&&&& // 如果A是B的经理,反之亦然
&&&&&&&&&&& if (employeeToManagerMap.get(employeeA).equals(employeeB) || &&&&&&&&&&&&&&&&&&&&&&&&&&&&&& &&& employeeToManagerMap.get(employeeB).equals(employeeA)) {
&&&&&&&&&&&&&&& distance = 0;
&&&&&&&&&&& }
&&&&&&&&&&& // A和B在同一经理下
&&&&&&&&&&& elseif &(employeeToManagerMap.get(employeeA).equals(
&&&&&&&&&&&&&&&&&&& employeeToManagerMap.get(employeeB))) {
&&&&&&&&&&&&&&& distance = 0;
&&&&&&&&&&& } else {
&&&&&&&&&&&&&&& // 定义A和B对应经理链表
&&&&&&&&&&&&&&& List&String& employeeA_ManagerList = new ArrayList&String&();
&&&&&&&&&&&&&&& List&String& employeeB_ManagerList = new ArrayList&String&();
&&&&&&&&&&&&&&& // 获取从A开始经理链表
&&&&&&&&&&&&&&& employeeA_ManagerList.add(employeeA);
&&&&&&&&&&&&&&& String current = employeeA;
&&&&&&&&&&&&&&& while (false == employeeToManagerMap.get(current).isEmpty()) {
&&&&&&&&&&&&&&&&&&& current = employeeToManagerMap.get(current);
&&&&&&&&&&&&&&&&&&& employeeA_ManagerList.add(current);
&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& // 获取从B开始经理链表
&&&&&&&&&&&&&&& employeeB_ManagerList.add(employeeB);
&&&&&&&&&&&&&&& current = employeeB;
&&&&&&&&&&&&&&& while (false == employeeToManagerMap.get(current).isEmpty()) {
&&&&&&&&&&&&&&&&&&& current = employeeToManagerMap.get(current);
&&&&&&&&&&&&&&&&&&& employeeB_ManagerList.add(current);
&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& int ii = 0, jj = 0;
&&&&&&&&&&&&&&& String currentA_manager, currentB_
&&&&&&&&&&&&&&& boolean found = false;
&&&&&&&&&&&&&&& // 遍历A与B开始经理链表,找出汇合点计算
&&&&&&&&&&&&&&& for (ii = 0; ii & employeeA_ManagerList.size(); ii++) {
&&&&&&&&&&&&&&&&&&& currentA_manager = employeeA_ManagerList.get(ii);
&&&&&&&&&&&&&&&&&&& for (jj = 0; jj & employeeB_ManagerList.size(); jj++) {
&&&&&&&&&&&&&&&&&&&&&&& currentB_manager = employeeB_ManagerList.get(jj);
&&&&&&&&&&&&&&&&&&&&&&& if (currentA_manager.equals(currentB_manager)) {
&&&&&&&&&&&&&&&&&&&&&&&&&&& found = true;
&&&&&&&&&&&&&&&&&&&&&&&&&&& break;
&&&&&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& if (found) {
&&&&&&&&&&&&&&&&&&&&&&& break;
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& // 最后获取两只之前的路径
&&&&&&&&&&&&&&& distance = ii + jj - 1;
&&&&&&&&&&& }
&&&&&&&&&&& return
&&& @Override
&&& publicint run(String[] args) throws Exception {
&&&&&&& // 实例化作业对象,设置作业名称
&&&&&&& Job job = new Job(getConf(), "Q10MiddlePersonsCountForComm");
&&&&&&& job.setJobName("Q10MiddlePersonsCountForComm");
&&&&&&& // 设置Mapper和Reduce类
&&&&&&& job.setJarByClass(Q10MiddlePersonsCountForComm.class);
&&&&&&& job.setMapperClass(MapClass.class);
&&&&&&& job.setReducerClass(Reduce.class);
&&&&&&& // 设置Mapper输出格式类
&&&&&&& job.setMapOutputKeyClass(IntWritable.class);
&&&&&&& job.setMapOutputValueClass(Text.class);
&&&&&&& // 设置Reduce输出键和值类型
&&&&&&& job.setOutputFormatClass(TextOutputFormat.class);
&&&&&&& job.setOutputKeyClass(NullWritable.class);
&&&&&&& job.setOutputValueClass(Text.class);
&&&&&&& // 第1个参数为员工数据路径和第2个参数为输出路径
&&&&&&& String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), &&&&&&&&&&&&& &&& args).getRemainingArgs();
&&&&&&& FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
&&&&&&& FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
&&&&&&& job.waitForCompletion(true);
&&&&&&& return job.isSuccessful() ? 0 : 1;
&&& &* 主方法,执行入口
&&& &* @param args 输入参数
&&& publicstaticvoid main(String[] args) throws Exception {
&&&&&&& int res = ToolRunner.run(new Configuration(), new Q10MiddlePersonsCountForComm(), args);
&&&&&&& System.exit(res);
2.2.11.4配置运行参数
新建一个Java应用运行程序,需要在Arguments页签填写Q10MiddlePersonsCountForComm运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp
l输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q10
2.2.11.5运行并查看结果
设置运行参数完毕后,点击运行按钮:
运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q10目录,打开part-r-00000文件,可以看到运行结果:
7369 and 7499 = 4
7369 and 7521 = 4
7369 and 7566 = 1
7369 and 7654 = 4
7369 and 7698 = 3
3书面作业2:MapReduce实现推荐系统
3.1书面作业2内容
(本题可选)参考http://f.dataguru.cn/forum.php?mod=viewthread&tid=84
使用Map-Reduce实现该场景
3.2程序代码
3.2.1CountThread.java
import java.io.DataI
import java.io.DataO
import java.io.IOE
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.io.WritableC
publicclass CountThread implements WritableComparable {
&&& Text threadId;
&&& IntWritable cnt;
&&& CountThread(Text id, IntWritable t) {
&&&&&&& this.threadId =
&&&&&&& this.cnt =
&&& CountThread() {
&&&&&&& this.threadId = new Text();
&&&&&&& this.cnt = new IntWritable();
&&& @Override
&&& publicvoid readFields(DataInput in) throws IOException {
&&&&&&& threadId.readFields(in);
&&&&&&& cnt.readFields(in);
&&& @Override
&&& publicvoid write(DataOutput out) throws IOException {
&&&&&&& threadId.write(out);
&&&&&&& cnt.write(out);
&&& // 按cnt值倒序排列
&&& @Override
&&& publicint compareTo(Object o) {
&&&&&&& return ((CountThread) o).cnt.compareTo(cnt) == 0 ? threadId.compareTo(((CountThread) && &&& o).threadId) : ((CountThread) o).cnt.compareTo(cnt);
&&& publicboolean equals(Object o) {
&&&&&&& if (!(o instanceof CountThread))
&&&&&&&&&&& returnfalse;
&&&&&&& CountThread ct = (CountThread)
&&&&&&& returnthreadId.equals(ct.threadId) && cnt.equals(ct.cnt);
&&& // 按threadId值来生成hashCode,默认是按此值来分区的
&&& // 保证同一个threadId在一个partiotion中
&&& publicint hashCode() {
&&&&&&& returnthreadId.hashCode();
&&& // 默认输出格式
&&& public String toString() {
&&&&&&& StringBuffer buf = new StringBuffer("");
&&&&&&& buf.append(threadId.toString());
&&&&&&& buf.append("\t");
&&&&&&& buf.append(cnt.toString());
&&&&&&& return buf.toString();
&&& public Text getThreadId() {
&&&&&&& returnthreadId;
&&& publicvoid setThreadId(Text threadId) {
&&&&&&& this.threadId = threadId;
&&& public IntWritable getCnt() {
&&&&&&& returncnt;
&&& publicvoid setCnt(IntWritable cnt) {
&&&&&&& this.cnt =
3.2.2Recommendation.java
import java.io.IOE
import java.util.HashS
import java.util.S
import org.apache.hadoop.conf.C
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputF
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJ
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobC
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.apache.hadoop.util.T
import org.apache.hadoop.util.ToolR
publicclass Recommendation extends Configured implements Tool {
&&& /* ********Job1******************* */
&&& publicstaticclass PostsMapper extends Mapper&Object, Text, IntWritable, Text& {
&&&&&&& publicvoid map(Object key, Text value, Context context) throws IOException, &&&&&&&&&& &&& InterruptedException {
&

我要回帖

更多关于 fgo彩色名字代码 的文章

 

随机推荐