大家都用什么ios软件看NBAios电视直播软件

博客访问: 3485221
博文数量: 295
博客积分: 0
博客等级: 民兵
技术积分: 7302
注册时间:
认证徽章:
阿里巴巴是个快乐的青年
IT168企业级官微
微信号:IT168qiye
系统架构师大会
微信号:SACC2013
分类: HADOOP
一、环境配置
&&&&& & 这里选择的环境是hadoop-0.20.2和hbase-0.90.4,Hadoop环境配置参看,HBase环境配置请看。
&&&&& & 需要注意的是,本文的需求是在Hadoop上跑MapReduce job来分析日志并将结果持久化到HBase,所以,在编译程序时,Hadoop需要用到HBase和Zookeeper包,因此,需要分别将hbase-0.90.4.jar和zookeeper-3.3.2.jar拷贝到Hadoop的lib目录下,具体操作如下:
&&&&& & #cp&/root/hbase-0.90.4/hbase-0.90.4.jar&/root/hadoop-0.20.2/lib
&&&&& & #cp&/root/hbase-0.90.4/lib/zookeeper-3.3.2.jar&/root/hadoop-0.20.2/lib
二、实例编写
&&&&& & 日志文件xxxlog.txt的内容如下:
&&&&& & version-------------time-----------------id-------rt----filter--------id----rt-----filter
&&&&&&&&1.0^A 00:00:01^Ad40^Bd
&&&&&&&&1.0^A 00:00:01^Ad41^Bd
&&&&& & 同样,需要将此文件放到hdfs目录下,比如:hadoop fs -put /tmp/input。
&&&&& & 为持久化在HBase中创建table和family,比如:./hbase shell,create 'xxxlog', 'dsp_filter'。
&&&&& & 为了清晰便于扩展,将Maper、Reducer、Driver分开,具体如下:
&&&&& & #vi xxxLogMaper.java
&&&&&&&&import java.io.IOE
&&&&&&&&import org.apache.hadoop.mapreduce.M
&&&&&&&&import org.apache.hadoop.io.IntW
&&&&&&&&import org.apache.hadoop.io.T
&&&&&&&&public class&xxxLogMaper
& & &&&&&&&&extends Mapper {
& & &&&&&&&&public final static String CONTROL_A & & & &= "^A";
& & &&&&&&&&public final static String CONTROL_B & & & &= "^B";
& & &&&&&&&&public final static String CONTROL_C & & & &= "^C";
& & &&&&&&&&public final static int PV_TIME & & & & & & = 1;
& & &&&&&&&&public final static int DSP_INFO_LIST & & & = 5;
& & &&&&&&&&public final static int DSP_ID & & & & & & &= 0;
& & &&&&&&&&public final static int DSP_FILTER & & & & &= 2;
& & &&&&&&&&public void map(Object key, Text value, Context context) {
& & & & &&&&&&&&try {
& & & & & & &&&&&&&&System.out.println("\n------------map come on-----------");
& & & & & & &&&&&&&&System.out.println("\nline=-----------"+value.toString());
& & & & & & &&&&&&&&String[] line = value.toString().split(CONTROL_A);
& & & & & & &&&&&&&&String pvtime = "";
& & & & & & &&&&&&&&System.out.println("\npvtime=-----------"+line[PV_TIME]);
& & & & & & &&&&&&&&String year = line[PV_TIME].substring(0, 4);
& & & & & & &&&&&&&&String month = line[PV_TIME].substring(5, 7);
& & & & & & &&&&&&&&String day = line[PV_TIME].substring(8, 10);
& & & & & & &&&&&&&&String hour = line[PV_TIME].substring(11, 13);
& & & & & & &&&&&&&&String minute = ""; &
& & & & & & &&&&&&&&int m_tmp = Integer.parseInt(line[PV_TIME].substring(14, 16));
& & & & & & &&&&&&&&if (m_tmp >= 0 && m_tmp <= 30) {
& & & & & & & & &&&&&&&&minute = "00";
& & & & & & &&&&&&&&} else {
& & & & & & & & &&&&&&&&minute = "30";&
& & & & & & &&&&&&&&}
& & & & & & &&&&&&&&pvtime = year + month + day + hour +
& & & & & & &&&&&&&&String[] dspInfoList = line[DSP_INFO_LIST].split(CONTROL_B);
& & & & & & &&&&&&&&String dspid = "";&
& & & & & & &&&&&&&&String dspfilter = "";
& & & & & & &&&&&&&&Text k = new Text();
& & & & & & &&&&&&&&IntWritable v = new IntWritable(1);
& & & & & & &&&&&&&&for(int i=0; i<dspInfoList. i++) {
& & & & & & & & &&&&&&&&System.out.println("\n------------map-----------");
& & & & & & & & &&&&&&&&System.out.println("\ndspinfo="+dspInfoList[i]);
& & & & & & & & &&&&&&&&String[] dspInfo = dspInfoList[i].split(CONTROL_C);
& & & & & & & & &&&&&&&&dspid = dspInfo[DSP_ID];
& & & & & & & & &&&&&&&&dspfilter = dspInfo[DSP_FILTER];
& & & & & & & & &&&&&&&&//key=ddspid^Afilter^Apvtime, value=1
& & & & & & & & &&&&&&&&k.set(dspid+CONTROL_A+dspfilter+CONTROL_A+pvtime);
& & & & & & & & &&&&&&&&context.write(k, v);
& & & & & & & & &&&&&&&&System.out.println("\nkey="+k.toString());
& & & & & & & & &&&&&&&&System.out.println("\nvalue="+v.toString());
& & & & & & &&&&&&&&}
& & & & &&&&&&&&} catch (IOException e) {
& & & & & & &&&&&&&&e.printStackTrace();
& & & & &&&&&&&&} catch (InterruptedException e) {
& & & & & & &&&&&&&&e.printStackTrace();
& & & & &&&&&&&&}
& &&&&&&&&& }
2、Reducer
&&&&&&&&import java.io.IOE
&&&&&&&&import org.apache.hadoop.hbase.client.G
&&&&&&&&import org.apache.hadoop.hbase.client.P
&&&&&&&&import org.apache.hadoop.hbase.io.ImmutableBytesW
&&&&&&&&import org.apache.hadoop.hbase.mapreduce.TableR
&&&&&&&&import org.apache.hadoop.mapreduce.R
&&&&&&&&import org.apache.hadoop.io.T
&&&&&&&&import org.apache.hadoop.io.IntW
&&&&&&&&import org.apache.hadoop.hbase.HBaseC
&&&&&&&&import org.apache.hadoop.conf.C
&&&&&&&&import org.apache.hadoop.hbase.client.HT
&&&&&&&&import org.apache.hadoop.hbase.client.HTableP
&&&&&&&&import org.apache.hadoop.hbase.client.R
&&&&&&&&import org.apache.hadoop.hbase.util.B
&&&&&&&&import org.apache.hadoop.hbase.KeyV
&&&&&&&&public class BidLogReducer
& & &&&&&&&&extends TableReducer {
& & &&&&&&&&public final static String COL_FAMILY & & & = "dsp_filter";
& & &&&&&&&&public final static String COL_NAME & & & & = "sum";
& & &&&&&&&&private final static String ZK_HOST & & & & = "localhost";
& & &&&&&&&&private final static String TABLE_NAME & & &= "xxxlog";
& & &&&&&&&&public void reduce(Text key, Iterable values, Context context)
& & &&&&&&&&throws IOException, InterruptedException {
& & & & &&&&&&&&System.out.println("\n------------reduce come on-----------");
& & & & &&&&&&&&String k = key.toString();
& & & & &&&&&&&&IntWritable v = new IntWritable();
& & & & &&&&&&&&int sum = 0;
& & & & &&&&&&&&for (IntWritable val:values) {
& & & & & & &&&&&&&&sum += val.get();
& & & & &&&&&&&&}
& & & & &&&&&&&&//v.set(sum);
& & & & &&&&&&&&//context.write(key, v);
& & & & &&&&&&&&System.out.println("\n------------reduce-----------");
& & & & &&&&&&&&System.out.println("\ncur-key="+key.toString());
& & & & &&&&&&&&System.out.println("\ncur-value="+sum);
& & & &&&&&&&&& Configuration conf = HBaseConfiguration.create();
& & & & &&&&&&&&conf.set("hbase.zookeeper.quorum.", ZK_HOST);
& & & & &&&&&&&&HTablePool pool = new HTablePool(conf, 3);
& & & & &&&&&&&&HTable table = (HTable)pool.getTable(TABLE_NAME);
& & & & &&&&&&&&Get getrow = new Get(k.getBytes());
& & & & &&&&&&&&Result r = table.get(getrow);
& & & & &&&&&&&&int m_tmp = 0;
& & & & &&&&&&&&for(KeyValue kv:r.raw()) {
& & & & & & &&&&&&&&System.out.println("\nraw-KeyValugge---"+kv);
& & & & & & &&&&&&&&System.out.println("\nraw-row=>"+Bytes.toString(kv.getRow()));
& & & & & & &&&&&&&&System.out.println("\nraw-family=>"+Bytes.toString(kv.getFamily()));
& & & & & & &&&&&&&&System.out.println("\nraw-qualifier=>"+Bytes.toString(kv.getQualifier()));
& & & & & & &&&&&&&&System.out.println("\nraw-value=>"+Bytes.toString(kv.getValue()));
& & & & & & &&&&&&&&m_tmp += Integer.parseInt(Bytes.toString(kv.getValue()));
& & & & &&&&&&&&}
& & & & &&&&&&&&sum = sum + m_
& & & & &&&&&&&&v.set(sum);
& & & & &&&&&&&&System.out.println("\nreal-key="+key.toString());
& & & & &&&&&&&&System.out.println("\nreal-value="+v.toString());
& & & & &&&&&&&&Put putrow = new Put(k.getBytes());
& & & & &&&&&&&&putrow.add(COL_FAMILY.getBytes(), COL_NAME.getBytes(), String.valueOf(v).getBytes());
& & & & &&&&&&&&try {
& & & & & & &&&&&&&&context.write(new ImmutableBytesWritable(key.getBytes()), putrow);
& & & & &&&&&&&&} catch (IOException e) {
& & & & & & &&&&&&&&e.printStackTrace();
& & & & &&&&&&&&} catch (InterruptedException e) {
& & & & & & &&&&&&&&e.printStackTrace();
& & & &&&&&&&&& }
& & &&&&&&&&}
3、Driver&
&&&&& &&#vi xxxLogDriver.java
&&&&& & #vi&xxxLogReducer.java
&&&&&&&&import org.apache.hadoop.conf.C
&&&&&&&&import org.apache.hadoop.fs.P
&&&&&&&&import org.apache.hadoop.hbase.HBaseC
&&&&&&&&import org.apache.hadoop.hbase.mapreduce.TableMapReduceU
&&&&&&&&import org.apache.hadoop.io.T
&&&&&&&&import org.apache.hadoop.io.IntW
&&&&&&&&import org.apache.hadoop.mapreduce.J
&&&&&&&&import org.apache.hadoop.mapreduce.lib.input.FileInputF
&&&&&&&&import org.apache.hadoop.mapreduce.lib.output.FileOutputF
&&&&&&&&import org.apache.hadoop.mapreduce.lib.input.TextInputF
&&&&&&&&import org.apache.hadoop.util.GenericOptionsP
&&&&&&&&public class&xxxLogDriver {
& & &&&&&&&&public final static String ZK_HOST & & & & &= "localhost";
& & &&&&&&&&public final static String TABLE_NAME & & & = "xxxlog";
& & &&&&&&&&public static void main(String[] args) throws Exception {
& & & & &&&&&&&&//Hbase Configuration
& & & & &&&&&&&&Configuration conf = HBaseConfiguration.create();
& & & & &&&&&&&&conf.set("hbase.zookeeper.quorum.", ZK_HOST);
& & & & &&&&&&&&String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
& & & & &&&&&&&&if (otherArgs.length != 2) {
& & & & & & &&&&&&&&System.err.println("Usage: please input
& & & & & & &&&&&&&&System.exit(2);
& & & & &&&&&&&&}
& & & &&&&&&&&& Job job = new Job(conf,"xxxLog");
& & & &&&&&&&&& job.setJarByClass(xxxLogDriver.class);
& & & & &&&&&&&&FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
& & & & &&&&&&&&FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
& & & & &&&&&&&&System.out.println("\n------------driver come on-----------");
& & &&&&&&&&& & job.setMapperClass(xxxLogMaper.class);
& & & &&&&&&&&& job.setReducerClass(xxxLogReducer.class);
& & & & &&&&&&&&job.setMapOutputKeyClass(Text.class);
& & & & &&&&&&&&job.setMapOutputValueClass(IntWritable.class);
& & & & &&&&&&&&TableMapReduceUtil.initTableReducerJob(TABLE_NAME,&xxxLogReducer.class, job);
& & & & &&&&&&&&System.exit(job.waitForCompletion(true)? 0 : 1);
& & &&&&&&&&}
三、编译运行
&&&&& & 在当前目录下编译源码,具体如下:
&&&&& & #javac -classpath /root/hadoop-0.20.2/hadoop-0.20.2-core.jar:/root/hadoop-0.20.2/lib/commons-cli-1.2.jar:/root/hbase-0.90.4/hbase-0.90.4.jar -d ./ xxxLogMaper.java&xxxLogReducer.java xxxLogDriver.java
&&&&& & 需要注意的是,必须三个一起编译否则出错:
&&&&& & xxxLogDriver.java:22: error: cannot find symbol
&&&&&&&&job.setMapperClass(xxxLogMaper.class);
&&&&& & 打包class文件,具体如下:
&&&&& & #jar cvf xxxLog.jar *class
&&&&& & #rm -rf *class
&&&&& & 运行任务,具体如下:
&&&&& & #hadoop jar xxxLog.jar &xxxLogDriver /tmp/input&/tmp/output
&&&&& & 查询结果,具体如下:
&&&&& & #./hbase shell
&&&&&&&&hbase(main):014:0>scan 'xxxlog'
&&&&& & 关于更多Hadoop和HBase的信息请参见,,,,,还有。
阅读(6956) | 评论(1) | 转发(2) |
相关热门文章
给主人留下些什么吧!~~
文明上网,理性发言...
请登录后评论。PHP开发框架
开发工具/编程工具
服务器环境
ThinkSAAS商业授权:
ThinkSAAS为用户提供有偿个性定制开发服务
ThinkSAAS将为商业授权用户提供二次开发指导和技术支持
让ThinkSAAS更好,把建议拿来。
开发客服微信采用MapReduce作业如何在HBase中加载大数据_老文章_赛迪网
采用MapReduce作业如何在HBase中加载大数据
采用MapReduce作业如何在HBase中加载大数据
发布时间: 14:51&&&&&&&&来源:&&&&&&&&作者:博客
HBase有很多种方法将数据加载到表中,最简单直接的方法就是通过MapReduce调用TableOutputFormat方法,或者在client上调用API写入数据。但是,这都不是最有效的方式。
这篇文档将向你描述如何在HBase中加载大数据。采用MapReduce作业,将数据以HBase内部的组织格式输出成文件,然后将数据文件加载到已运行的集群中。(注:就是生成HFile,然后加载到HBase中。)
二、大数据载入的步骤
大数据的加载包含了2个步骤:
1、通过MapReduce的作业进行数据准备过程
首先,通过MapReduce使用HFileOutputFormat来生成HBase的数据文件格式。这样格式的数据文件就是HBase内部的文件组织格式,并且在将数据写入到集群的过程中是相当容易的。
为了使该方法更有效,HFileOutputFormat必须通过配置,每个输出的HFile必须适应单个的region。为了实现此功能,MapReduce的Job采用了Hadoop的TotalOrderPartitioner类,通过进行分区操作用以对应表中各个region。
同时,HFileOutputFormat包含有一个非常方便的方法,configureIncrementalLoad(), 这个方法会基于表的当前区域边界自动设置一个TotalOrderPartitioner。
2、数据加载过程
通过HFileOutputFormat准备好数据之后,使用命令行工具将数据加载到集群中。这个命令行工具遍历准备好的数据文件,并确定每一个文件所属的region。然后,当连接到对应的Region Server,移动到HFile到存储目录为用户提供数据。
如果在数据准备或者数据载入的时候,region边界发生了变化,那么HBase将自动进行块分割,用以适应新的边界变化。这个过程效率是很低下的,特别是有其他的client在做数据录入操作。所以需要注意,尽量使用少的时间去创造数据文件以及录入该数据文件进入集群。
3、使用importtsv为大数据加载做准备
HBase自带了importtsv命令工具。通过hadoop jar /path/to/hbase-VERSION.jar importtsv 来使用这个命令。如果不带参数的执行会打印以下帮助信息:
Usage: importtsv -Dimporttsv.columns=a,b,c
Imports the given input directory of TSV data into the specified table.
The column names of the TSV data must be specified using the -Dimporttsv.columns option.
This option takes the form of comma-separated column names, where each column name is either a simple column family, or a columnfamily:qualifier.
The special column name HBASE_ROW_KEY is used to designate that this column should be usedas the row key for each imported record.
You must specify exactly one column to be the row key.
In order to prepare data for a bulk data load, pass the option:
-Dimporttsv.bulk.output=/path/for/output
Other options that may be specified with -D include:
-Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line
4、使用completebulkload来载入数据
当使用importtsv导入数据之后,completebulkload 是用来导入数据到在运行的集群中。
completebulkload就是采用与importtsv 相同的输出路径和表的名称来执行。 例如:
$ hadoop jar hbase-VERSION.jar completebulkload /user/todd/myoutput mytable
这个命令会执行的非常快,完成之后在集群中就能看到新的数据。
5、高级用法
虽然importtsv 命令很有用,但是在许多情况下,用户可能需要通过编写代码或其他形式的导入数据。
如果要这样做,可以查看ImportTsv.java 源代码,并阅读HFileOutputFormat的Javadoc帮助文档。
通过代码编写载入大数据量可关注 LoadIncrementalHFiles类。
关键词阅读:
1(共条评论)
2(共条评论)
3(共条评论)
4(共条评论)
5(共条评论)
降低上网资费迫在眉睫 企业应关注民心所向
日前,锐捷网络政府和交通行业部总经理肖广...
联系我们:
广告发布:
方案、案例展示:
京ICP000080号 网站-3
&&&&&&&&京公网安备45号

我要回帖

更多关于 ios直播软件 的文章

 

随机推荐