请教一个关于spark bulkloadd入库的问题,求解答交流

批量(bulkload)载入数据到hbase - CSDN博客
批量(bulkload)载入数据到hbase
HBase提供了操作表的java api,但是这种方式是单条数据插入,对于大量数据载入来说效率太低。
对于批量数据导入,直接生成HBase的内部存储结构:HFile,并将其导入到Hbase中的效率无疑是最高。
步骤如下:
通过mapreduce将源数据导出为HFile文件
HBaseBulkLoadDriver.java
import com.sq.platform.hbaseLoadService.constant.MapperType
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configured
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.Tool
import org.apache.hadoop.util.ToolRunner
public class HBaseBulkLoadDriver extends Configured implements Tool {
private static final String DATA_SEPARATOR = "\t"
public static final String COLUMN_FAMILY = "data"
public int run(String[] args) throws Exception {
if (args.length != 2)
int result = 0
String inputPath = args[0]
String outputPath = args[1]
Configuration configuration = getConf()
configuration.set("DATA_SEPARATOR", DATA_SEPARATOR)
configuration.set("TABLE_NAME", tableName)
configuration.set("COLUMN_FAMILY", COLUMN_FAMILY)
Job job = Job.getInstance(configuration)
job.setJarByClass(HBaseBulkLoadDriver.class)
job.setJobName("Bulk Loading HBase Table::" + tableName)
job.setMapOutputKeyClass(ImmutableBytesWritable.class)
// 设置Mapper
job.setMapperClass(CanMapper.class)
FileInputFormat.addInputPaths(job, inputPath)
FileSystem.getLocal(getConf()).delete(new Path(outputPath), true)
FileOutputFormat.setOutputPath(job, new Path(outputPath))
job.setMapOutputValueClass(Put.class)
HFileOutputFormat2.configureIncrementalLoad(job, new HTable(configuration, tableName))
job.waitForCompletion(true)
if (job.isSuccessful()) {
// mapreduce执行完成后进行批量hbase文件载入
HBaseBulkLoad.doBulkLoad(outputPath, tableName)
result = -1
return result
public static void main(String[] args) {
int response = ToolRunner.run(HBaseConfiguration.create(), new HBaseBulkLoadDriver(), args)
if (response == 0) {
System.out.println("Job is successfully completed...")
System.out.println("Job failed...")
} catch (Exception exception) {
exception.printStackTrace()
CanMapper.java
import org.apache.hadoop.hbase.client.P
import org.apache.hadoop.hbase.util.B
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import java.io.IOE
public class CanMapper extends Mapper&LongWritable, Text, ImmutableBytesWritable, Put& {
* 数据字段分隔符
protected String dataS
protected String columnF
protected ImmutableBytesWritable hbaseTableN
public void setup(Mapper.Context context) {
Configuration configuration = context.getConfiguration();
dataSeperator = configuration.get("DATA_SEPARATOR");
columnFamily = configuration.get("COLUMN_FAMILY");
hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(configuration.get("TABLE_NAME")));
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(dataSeperator);
String rowKey = values[6];
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("type"), Bytes.toBytes(values[0]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("value"), Bytes.toBytes(values[1]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("typeTag"), Bytes.toBytes(values[2]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("result"), Bytes.toBytes(values[3]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("dm1Info"), Bytes.toBytes(values[4]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("dm1Code"), Bytes.toBytes(values[5]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("time_t"), Bytes.toBytes(values[7]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("stamp"), Bytes.toBytes(values[8]));
context.write(hbaseTableName, put);
} catch (Exception exception) {
exception.printStackTrace();
. 导入HFile文件到HBase
HBaseBulkLoad.java
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.hbase.HBaseC
import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHF
* hbase批量载入
public class HBaseBulkLoad {
public static void doBulkLoad(String outputPath, String tableName) {
Configuration configuration = new Configuration();
configuration.set("mapreduce.child.java.opts", "-Xmx1g");
HBaseConfiguration.addHbaseResources(configuration);
LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
HTable hTable = new HTable(configuration, tableName);
loadFfiles.doBulkLoad(new Path(outputPath), hTable);
} catch (Exception exception) {
exception.printStackTrace();
maven依赖:
&org.apache.hbase&
&hbase-common&
&org.apache.hbase&
&hbase-client&
&org.apache.hbase&
&hbase-server&
&org.codehaus.jackson&
&jackson-mapper-asl&
&org.codehaus.jackson&
&jackson-core-asl&
&org.codehaus.jackson&
&jackson-jaxrs&
&org.codehaus.jackson&
&jackson-xc&
&org.apache.hbase&
&hbase-protocol&
本文已收录于以下专栏:
相关文章推荐
以下主要介绍BulkLoad导入数据到hbase 中
HBase有多种导入数据的方法,最直接的方法就是在MapReduce作业中使用TableOutputFormat作为输出,或者使用标准的...
加载数据到HBase的方式有多种,通过HBase API导入或命令行导入或使用第三方(如sqoop)来导入或使用MR来批量导入(耗费磁盘I/O,容易在导入的过程使节点宕机),但是这些方式不...
Apache HBase是一个分布式的、面向列的开源数据库,它可以让我们随机的、实时的访问大数据。但是怎样有效的将数据导入到HBase呢?HBase有多种导入数据的方法,最直接的方法就是在MapRed...
导入数据最快的方式,可以略过WAL直接生产底层HFile文件
(环境:centos6.5、Hadoop2.6.0、HBase0.98.9)
1.SHELL方式
1.1 ImportTsv直接导入...
HBase数据快速导入之ImportTsv&Bulkload
导入数据最快的方式,可以略过WAL直接生产底层HFile文件...
HBase快速导入数据--BulkLoad
Apache HBase是一个分布式的、面向列的开源数据库,它可以让我们随机的、实时的访问大数据。但是怎样有效的将数据导入到H...
1、为何要 BulkLoad 导入?传统的 HTableOutputFormat 写 HBase 有什么问题?2、bulkload 流程与实践3、说明与注意事项:4、Refer:
HBase是一个分布式的、面向列的开源数据库,它可以让我们随机的、实时的访问大数据。大量的数据导入到Hbase中时速度回很慢,不过我们可以使用bulkload来导入。
BulkLoad的过程主要有以...
使用org.apache.hadoop.hbase.client.Put来写数据使用 org.apache.hadoop.hbase.client.Put 将数据一条一条写入Hbase中,但是和Bul...
他的最新文章
讲师:宋宝华
讲师:何宇健
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)HBase 写优化之 BulkLoad 实现数据快速入库 - CSDN博客
HBase 写优化之 BulkLoad 实现数据快速入库
转自&http://my.oschina.net/leejun2005/blog/187309
1、为何要&BulkLoad 导入?传统的&HTableOutputFormat 写 HBase 有什么问题?
2、bulkload&流程与实践
3、说明与注意事项:
4、Refer:
1、为何要&BulkLoad 导入?传统的&HTableOutputFormat
写 HBase 有什么问题?
我们先看下 HBase 的写流程:
通常 MapReduce 在写HBase时使用的是 TableOutputFormat 方式,在reduce中直接生成put对象写入HBase,该方式在大数据量写入时效率低下(HBase会block写入,频繁进行flush,split,compact等大量IO操作),并对HBase节点的稳定性造成一定的影响(GC时间过长,响应变慢,导致节点超时退出,并引起一系列连锁反应),而HBase支持
bulk load 的入库方式,它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在HDFS中生成持久化的HFile数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载,在大数据量写入时能极大的提高写入效率,并降低对HBase节点的写入压力。
通过使用先生成HFile,然后再BulkLoad到Hbase的方式来替代之前直接调用HTableOutputFormat的方法有如下的好处:
(1)消除了对HBase集群的插入压力
(2)提高了Job的运行速度,降低了Job的执行时间
目前此种方式仅仅适用于只有一个列族的情况,在新版 HBase 中,单列族的限制会消除。
2、bulkload&流程与实践
bulkload 方式需要两个Job配合完成:&
(1)第一个Job还是运行原来业务处理逻辑,处理的结果不直接调用HTableOutputFormat写入到HBase,而是先写入到HDFS上的一个中间目录下(如 middata)&
(2)第二个Job以第一个Job的输出(middata)做为输入,然后将其格式化HBase的底层存储文件HFile&
(3)调用BulkLoad将第二个Job生成的HFile导入到对应的HBase表中
下面给出相应的范例代码:
import&java.io.IOE
import&org.apache.hadoop.conf.C
import&org.apache.hadoop.fs.P
import&org.apache.hadoop.hbase.HBaseC
import&org.apache.hadoop.hbase.KeyV
import&org.apache.hadoop.hbase.client.HT
import&org.apache.hadoop.hbase.client.P
import&org.apache.hadoop.hbase.io.ImmutableBytesW
import&org.apache.hadoop.hbase.mapreduce.HFileOutputF
import&org.apache.hadoop.hbase.mapreduce.KeyValueSortR
import&org.apache.hadoop.hbase.mapreduce.LoadIncrementalHF
import&org.apache.hadoop.hbase.util.B
import&org.apache.hadoop.io.IntW
import&org.apache.hadoop.io.LongW
import&org.apache.hadoop.io.T
import&org.apache.hadoop.mapreduce.J
import&org.apache.hadoop.mapreduce.M
import&org.apache.hadoop.mapreduce.R
import&org.apache.hadoop.mapreduce.lib.input.FileInputF
import&org.apache.hadoop.mapreduce.lib.input.TextInputF
import&org.apache.hadoop.mapreduce.lib.output.FileOutputF
import&org.apache.hadoop.mapreduce.lib.output.TextOutputF
import&org.apache.hadoop.util.GenericOptionsP
public&class&GeneratePutHFileAndBulkLoadToHBase
&&&&public&static&class&WordCountMapper&extends&Mapper&LongWritable,
Text, Text, IntWritable&
&&&&&&&&private&Text
wordText=new&Text();
&&&&&&&&private&IntWritable
one=new&IntWritable(1);
&&&&&&&&@Override
&&&&&&&&protected&void&map(LongWritable
key, Text value, Context context)
&&&&&&&&&&&&&&&&throws&IOException,
InterruptedException {
&&&&&&&&&&&&//
TODO Auto-generated method stub
&&&&&&&&&&&&String
line=value.toString();
&&&&&&&&&&&&String[]
wordArray=line.split(&
&&&&&&&&&&&&for(String
word:wordArray)
&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&wordText.set(word);
&&&&&&&&&&&&&&&&context.write(wordText,
&&&&&&&&&&&&}
&&&&&&&&&&&&&
&&&&public&static&class&WordCountReducer&extends&Reducer&Text,
IntWritable, Text, IntWritable&
&&&&&&&&private&IntWritable
result=new&IntWritable();
&&&&&&&&protected&void&reduce(Text
key, Iterable&IntWritable& valueList,
&&&&&&&&&&&&&&&&Context
&&&&&&&&&&&&&&&&throws&IOException,
InterruptedException {
&&&&&&&&&&&&//
TODO Auto-generated method stub
&&&&&&&&&&&&int&sum=0;
&&&&&&&&&&&&for(IntWritable
value:valueList)
&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&sum+=value.get();
&&&&&&&&&&&&}
&&&&&&&&&&&&result.set(sum);
&&&&&&&&&&&&context.write(key,
&&&&public&static&class&ConvertWordCountOutToHFileMapper&extends&Mapper&LongWritable,
Text, ImmutableBytesWritable, Put&
&&&&&&&&@Override
&&&&&&&&protected&void&map(LongWritable
key, Text value, Context context)
&&&&&&&&&&&&&&&&throws&IOException,
InterruptedException {
&&&&&&&&&&&&//
TODO Auto-generated method stub
&&&&&&&&&&&&String
wordCountStr=value.toString();
&&&&&&&&&&&&String[]
wordCountArray=wordCountStr.split(&\t&);
&&&&&&&&&&&&String
word=wordCountArray[0];
&&&&&&&&&&&&int&count=Integer.valueOf(wordCountArray[1]);
&&&&&&&&&&&&&
&&&&&&&&&&&&//创建HBase中的RowKey
&&&&&&&&&&&&byte[]
rowKey=Bytes.toBytes(word);
&&&&&&&&&&&&ImmutableBytesWritable
rowKeyWritable=new&ImmutableBytesWritable(rowKey);
&&&&&&&&&&&&byte[]
family=Bytes.toBytes(&cf&);
&&&&&&&&&&&&byte[]
qualifier=Bytes.toBytes(&count&);
&&&&&&&&&&&&byte[]
hbaseValue=Bytes.toBytes(count);
&&&&&&&&&&&&//
Put 用于列簇下的多列提交,若只有一个列,则可以使用 KeyValue 格式
&&&&&&&&&&&&//
KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);
&&&&&&&&&&&&Put
put=new&Put(rowKey);
&&&&&&&&&&&&put.add(family,
qualifier, hbaseValue);
&&&&&&&&&&&&context.write(rowKeyWritable,
&&&&&&&&&&&&&
&&&&public&static&void&main(String[]
args)&throws&Exception
&&&&&&&&//
TODO Auto-generated method stub
&&&&&&&&Configuration
hadoopConfiguration=new&Configuration();
&&&&&&&&String[]
dfsArgs =&new&GenericOptionsParser(hadoopConfiguration,
args).getRemainingArgs();
&&&&&&&&//第一个Job就是普通MR,输出到指定的目录
&&&&&&&&Job
job=new&Job(hadoopConfiguration,&&wordCountJob&);
&&&&&&&&job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);
&&&&&&&&job.setMapperClass(WordCountMapper.class);
&&&&&&&&job.setReducerClass(WordCountReducer.class);
&&&&&&&&job.setOutputKeyClass(Text.class);
&&&&&&&&job.setOutputValueClass(IntWritable.class);
&&&&&&&&FileInputFormat.setInputPaths(job,&new&Path(dfsArgs[0]));
&&&&&&&&FileOutputFormat.setOutputPath(job,&new&Path(dfsArgs[1]));
&&&&&&&&//提交第一个Job
&&&&&&&&int&wordCountJobResult=job.waitForCompletion(true)?0:1;
&&&&&&&&//第二个Job以第一个Job的输出做为输入,只需要编写Mapper类,在Mapper类中对一个job的输出进行分析,并转换为HBase需要的KeyValue的方式。
&&&&&&&&Job
convertWordCountJobOutputToHFileJob=new&Job(hadoopConfiguration,&&wordCount_bulkload&);
&&&&&&&&convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);
&&&&&&&&convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.class);
&&&&&&&&//ReducerClass
无需指定,框架会自行根据 MapOutputValueClass 来决定是使用 KeyValueSortReducer 还是 PutSortReducer
&&&&&&&&//convertWordCountJobOutputToHFileJob.setReducerClass(KeyValueSortReducer.class);
&&&&&&&&convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
&&&&&&&&convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.class);
&&&&&&&&//以第一个Job的输出做为第二个Job的输入
&&&&&&&&FileInputFormat.addInputPath(convertWordCountJobOutputToHFileJob,&new&Path(dfsArgs[1]));
&&&&&&&&FileOutputFormat.setOutputPath(convertWordCountJobOutputToHFileJob,&new&Path(dfsArgs[2]));
&&&&&&&&//创建HBase的配置对象
&&&&&&&&Configuration
hbaseConfiguration=HBaseConfiguration.create();
&&&&&&&&//创建目标表对象
&&&&&&&&HTable
wordCountTable =new&HTable(hbaseConfiguration,&&word_count&);
&&&&&&&&HFileOutputFormat.configureIncrementalLoad(convertWordCountJobOutputToHFileJob,wordCountTable);
&&&&&&&&//提交第二个job
&&&&&&&&int&convertWordCountJobOutputToHFileJobResult=convertWordCountJobOutputToHFileJob.waitForCompletion(true)?0:1;
&&&&&&&&//当第二个job结束之后,调用BulkLoad方式来将MR结果批量入库
&&&&&&&&LoadIncrementalHFiles
loader =&new&LoadIncrementalHFiles(hbaseConfiguration);
&&&&&&&&//第一个参数为第二个Job的输出目录即保存HFile的目录,第二个参数为目标表
&&&&&&&&loader.doBulkLoad(new&Path(dfsArgs[2]),
wordCountTable);
&&&&&&&&//最后调用System.exit进行退出
&&&&&&&&System.exit(convertWordCountJobOutputToHFileJobResult);
比如原始的输入数据的目录为:/rawdata/test/wordcount/&
中间结果数据保存的目录为:/middata/test/wordcount/&
最终生成的HFile保存的目录为:/resultdata/test/wordcount/&
运行上面的Job的方式如下:&
hadoop jar test.jar /rawdata/test/wordcount/ /middata/test/wordcount/ /resultdata/test/wordcount/&
3、说明与注意事项:
(1)HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作。
(2)最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: & ImmutableBytesWritable, KeyValue&或者& ImmutableBytesWritable, Put&。
否则报这样的错误:
java.lang.IllegalArgumentException:
Can't read partitions file
by: java.io.IOException: wrong key&class:
org.apache.hadoop.io.*** is not&class&org.apache.hadoop.hbase.io.ImmutableBytesWritable
(3)最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer,这个&SorterReducer 可以不指定,因为源码中已经做了判断:
if&(KeyValue.class.equals(job.getMapOutputValueClass()))
&&&&job.setReducerClass(KeyValueSortReducer.class);
}&else&if&(Put.class.equals(job.getMapOutputValueClass()))
&&&&job.setReducerClass(PutSortReducer.class);
&&&&LOG.warn(&Unknown
map output value type:&&+
job.getMapOutputValueClass());
(4) MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件,多列簇需要起多个 job,不过新版本的 Hbase 已经解决了这个限制。&
(5) MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。
(6)最后一个 Reduce 没有&setNumReduceTasks 是因为,该设置由框架根据region个数自动配置的。
(7)下边配置部分,注释掉的其实写不写都无所谓,因为看源码就知道configureIncrementalLoad方法已经把固定的配置全配置完了,不固定的部分才需要手动配置。
public&class&HFileOutput
&&&&&&&&//job
&&&&public&static&Job
configureJob(Configuration conf)&throws&IOException
&&&&&&&&Job
job =&new&Job(configuration,&&countUnite1&);
&&&&&&&&job.setJarByClass(HFileOutput.class);
&&&&&&&&&&&&&&&&//job.setNumReduceTasks(2);&
&&&&&&&&//job.setOutputKeyClass(ImmutableBytesWritable.class);
&&&&&&&&//job.setOutputValueClass(KeyValue.class);
&&&&&&&&//job.setOutputFormatClass(HFileOutputFormat.class);
&&&&&&&&Scan
scan =&new&Scan();
&&&&&&&&scan.setCaching(10);
&&&&&&&&scan.addFamily(INPUT_FAMILY);
&&&&&&&&TableMapReduceUtil.initTableMapperJob(inputTable,
&&&&&&&&&&&&&&&&HFileOutputMapper.class,
ImmutableBytesWritable.class,
LongWritable.class,
&&&&&&&&//这里如果不定义reducer部分,会自动识别定义成KeyValueSortReducer.class
和PutSortReducer.class
&&&&&&&&&&&&&&&&job.setReducerClass(HFileOutputRedcuer.class);
&&&&&&&&//job.setOutputFormatClass(HFileOutputFormat.class);
&&&&&&&&HFileOutputFormat.configureIncrementalLoad(job,&new&HTable(
&&&&&&&&&&&&&&&&configuration,
outputTable));
&&&&&&&&HFileOutputFormat.setOutputPath(job,&new&Path());
&&&&&&&&&&&&&&&&//FileOutputFormat.setOutputPath(job,
new Path()); //等同上句
&&&&&&&&return&
&&&&public&static&class&HFileOutputMapper&extends
&&&&&&&&&&&&TableMapper&ImmutableBytesWritable,
LongWritable& {
&&&&&&&&public&void&map(ImmutableBytesWritable
key, Result values,
&&&&&&&&&&&&&&&&Context
context)&throws&IOException,
InterruptedException {
&&&&&&&&&&&&//mapper逻辑部分
&&&&&&&&&&&&context.write(new&ImmutableBytesWritable(Bytes()),
LongWritable());
&&&&public&static&class&HFileOutputRedcuer&extends
&&&&&&&&&&&&Reducer&ImmutableBytesWritable,
LongWritable, ImmutableBytesWritable, KeyValue& {
&&&&&&&&public&void&reduce(ImmutableBytesWritable
key, Iterable&LongWritable& values,
&&&&&&&&&&&&&&&&Context
context)&throws&IOException,
InterruptedException {
&&&&&&&&&&&&&&&&&&&&&&&&//reducer逻辑部分
&&&&&&&&&&&&KeyValue
kv =&new&KeyValue(row,
OUTPUT_FAMILY, tmp[1].getBytes(),
&&&&&&&&&&&&&&&&&&&&Bytes.toBytes(count));
&&&&&&&&&&&&context.write(key,
4、Refer:
1、Hbase几种数据入库(load)方式比较
http://blog.csdn.net/kirayuan/article/details/6371635
2、MapReduce生成HFile入库到HBase及源码分析
http://blog.pureisle.net/archives/1950.html
3、MapReduce生成HFile入库到HBase
/2013/02/hbase-hfile-bulk-load/
本文已收录于以下专栏:
相关文章推荐
声明: 若要转载, 请标明出处.
前提: 在对于大量的数据导入到HBase中, 如果一条一条进行插入, 则太耗时了, 所以可以先采用MapReduce生成HFile文件, 然后使用BulkLoad导...
HBase本身提供了很多种数据导入的方式,通常有两种常用方式:
1、使用HBase提供的TableOutputFormat,原理是通过一个Mapreduce作业将数据导入HBase
2、另一种方式就是...
一、MR生成HFile文件
package insert.tools.
import java.io.IOE
import org.apache.hadoop....
kafka最初是被LinkedIn设计用来处理log的分布式消息系统,因此它的着眼点不在数据的安全性(log偶尔丢几条无所谓),换句话说kafka并不能完全保证数据不丢失。
尽管kaf...
1. 预先生成HFile入库
这个地址有详细的说明http://blog.csdn.net/dajuezhao/archive//6365053.aspx
2. 通过MapRe...
Phoenix 提供了一个导入海量数据的MapReduce工具 CsvBulkLoadTool,根据官方的说明,使用这个工具可以高效地往hbase导入csv文本数据,内部会使用phoenix api去...
本篇主要说明在实际应用场景中可以会有一些格式比较规整的数据文件需要导入到HBase,Phoenix提供了两种方法来加载CSV格式的文件phoenix的数据表。一种是使用单线程的psql工具进行小批量数...
Apache HBase是一个分布式的、面向列的开源数据库,它可以让我们随机的、实时的访问大数据。但是怎样有效的将数据导入到HBase呢?HBase有多种导入数据的方法,最直接的方法就是在MapRed...
我的需求是从kafka里取数据数据,然后对数据进行加工,最后保存到HBase里。
这里我的spout用的是storm-kafka-0.93.jar里的KafkaSpout类来作为...
他的最新文章
讲师:宋宝华
讲师:何宇健
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)

我要回帖

更多关于 phoenix bulkload 的文章

 

随机推荐