hbase中hbase 关闭compactt用途是什么,什么时候触发

3314人阅读
HBase1.0.2源码分析(29)
& & & & 一般说来,任何一个比较复杂的分布式系统,针对能够使得其性能得到大幅提升的某一内部处理流程,必然有一个定期检查机制,使得该流程在满足一定条件的情况下,能够自发的进行,这样才能够很好的体现出复杂系统的自我适应与自我调节能力。我们知道,HBase内部的compact处理流程是为了解决MemStore Flush之后,文件数目太多,导致读数据性能大大下降的一种自我调节手段,它会将文件按照某种策略进行合并,大大提升HBase的数据读性能。那么,基于我刚才的陈述,compact流程是否有一个定期检查机制呢?在满足什么条件的情况下,会触发compact请求呢?
& & & & 针对第一个问题,回答当然是肯定的。在HRegionServer内部,有一个成员变量,定义如下:
* Check for compactions requests.
* 检查合并请求
Chore compactionC& & & & &单从注释,我们就可以看出,这个compactionChecker成员变量就是一个检查合并请求的Chore,那么什么是Chore呢?先来看下它的定义、成员变量以及构造函数。先来看下类的定义,代码如下:
* Chore is a task performed on a period in hbase.
The chore is run in its own
* thread. This base abstract class provides while loop and sleeping facility.
* If an unhandled exception, the threads exit is logged.
* Implementers just need to add checking if there is work to be done and if
* so, do it.
Its the base of most of the chore threads in hbase.
* &p&Don't subclass Chore if the task relies on being woken up for something to
* do, such as an entry being added to a queue, etc.
* Chore是定期在HBase中执行的一个任务。Chore在它所在的线程内执行。这个基础抽象类提供了loop循环和sleep机制。
@InterfaceAudience.Private
public abstract class Chore extends HasThread {
}& & & & 首先,从类的定义我们可以看到,Chore继承自HasThread类,而HasThread类是一个实现了Runnable接口的抽象类,并且定义了一个抽象的run()方法。自然,Chore就是一个线程了。而通过注释,我们可以很清晰的知道以下三点:1、Chore是定期在HBase中执行的一个任务;2、Chore在它所在的线程内执行;3、这个基础抽象类提供了loop循环和sleep机制。
& & & & 再来看下它的成员变量,主要包含以下几个:
private final S// 睡眠器
protected final S& & & & 上面提到,Chore提供了sleep机制,那么这个机制就是依靠Sleeper类型的sleeper这个成员变量来实现的,而stopper则是实现了Stoppable接口的任何实例,实际上是工作线程所依附的可停止运行的载体,比如HRegionServer,载体停止运行后,工作线程。等到分析其run()方法时,我们再具体分析这两个变量。
& & & & 然后,我们再看下Chore的构造方法,代码如下:
* @param p Period at which we should run.
Will be adjusted appropriately
* should we find work and it takes time to complete.
* @param stopper When {@link Stoppable#isStopped()} is true, this thread will
* cleanup and exit cleanly.
* 构造方法,需要name、p和stopper三个参数
* p为run方法循环的周期
public Chore(String name, final int p, final Stoppable stopper) {
super(name);
if (stopper == null){
throw new NullPointerException(&stopper cannot be null&);
this.sleeper = new Sleeper(p, stopper);
this.stopper =
}& & & & 它需要name、p和stopper三个参数,name很简单,String类型的线程名字而已,关键在于这个int类型的p和Stoppable类型的stopper,构造函数利用p和stopper生成了一个睡眠期sleeper,并将stopper赋值给其同名成员变量。
& & & & 下面,我们来看下这个sleeper的实现吧!Sleeper类中定义了4个关键变量和两个关键方法,实现了一个简单的睡眠器,其4个关键成员变量如下:
private final S
private final Object sleepLock = new Object();
private boolean triggerWake =& & & & 其中,period代表了睡眠周期,它是由上诉参数p赋值的,而stopper的含义与Chore中同名变量一样。sleepLock仅仅是一个Object对象,依靠它的wait()方法,我们可以实现对象等待一段时间;triggerWake是一个标志位,依靠它被设置为true,我们可以跳出睡眠,重新复苏。
& & & & 再来看下它的最终要的两个方法,第一个便是睡眠器最主要的功能性方法--睡眠sleep(),代码如下:
* Sleep for period adjusted by passed &code&startTime&code&
* @param startTime Time some task started previous to now.
Time to sleep
* will be docked current time minus passed &code&startTime&code&.
public void sleep(final long startTime) {
// 如果stopper已停止,直接返回
if (this.stopper.isStopped()) {
// 当前时间now
long now = System.currentTimeMillis();
// 计算最新的需要等待的时间,循环周期减去已过去的时间
long waitTime = this.period - (now - startTime);
// 如果等待时间waitTime已超过周期period,那么直接将period赋值给waitTime,并记录警告信息
if (waitTime & this.period) {
LOG.warn(&Calculated wait time & & + this.period +
&; setting to this.period: & + System.currentTimeMillis() + &, & +
startTime);
waitTime = this.
// 当等待时间waitTime大于0时,一直循环
while (waitTime & 0) {
long woke = -1;
// 判断标志位triggerWake,如果为true,
// 即如果其他线程已唤醒该睡眠期,跳出循环,复位triggerWake为fale,直接返回,不再睡眠
synchronized (sleepLock) {
if (triggerWake)
// 否则,依靠sleepLock等待waitTime时间
sleepLock.wait(waitTime);
// 计算已睡眠时间slept
woke = System.currentTimeMillis();
long slept = woke -
// 如果slept时间已超出周期10s,记录警告信息
if (slept - this.period & MINIMAL_DELTA_FOR_LOGGING) {
LOG.warn(&We slept & + slept + &ms instead of & + this.period +
&ms, this is likely due to a long & +
&garbage collecting pause and it's usually bad, see & +
&http://hbase.apache.org/book.html#trouble.rs.runtime.zkexpired&);
} catch(InterruptedException iex) {
// We we interrupted because we're meant to stop?
If not, just
// continue ignoring the interruption
if (this.stopper.isStopped()) {
// 重新计算等待时间:等待周期减去已睡眠时间
// Recalculate waitTime.
woke = (woke == -1)? System.currentTimeMillis():
waitTime = this.period - (woke - startTime);
// 标志位triggerWake复位为false,需要在sleepLock上用synchronized关键字进行同步
synchronized(sleepLock) {
triggerWake =
}& & & & 这个方法会根据传入的参数睡眠的起始时间startTime,结合睡眠器构造时设定好的睡眠周期period,以及当前时间now,计算出等待时间waitTime。而后,在一个等待时间waitTime大于0的while循环内,首先判断标志位triggerWake,如果其为true,则break,复位triggerWake并停止休眠,否则,利用sleepLock的wait()方法休眠指定时间waitTime,直到时间结束或者有其他线程设置triggerWake标志位为true并通过sleepLock的notifyAll()方法唤醒sleepLock对象,让其wait()方法抛出InterruptedException异常,继而重新计算等待时间,并进入下一个循环。此时,标志位triggerWake已设置为true,则直接跳出循环,结束休眠。而在休眠时间未到的情况下结束休眠的一种手段,就是通过调用另外一个很关键的方法skipSleepCycle()来实现的,代码很简单,不做解释:
* If currently asleep, if not asleep, will skip the next
* sleep cycle.
public void skipSleepCycle() {
synchronized (sleepLock) {
// 标志位triggerWake设置为true
triggerWake =
// 唤醒等待在sleepLock上的其它线程
sleepLock.notifyAll();
& & & & 接下来,再看下Chore中最重要的run()方法,定义如下:
* @see java.lang.Thread#run()
public void run() {
boolean initialChoreComplete =
// 只要stopper不停止,while循环就继续啊
while (!this.stopper.isStopped()) {
// 开始时间
long startTime = System.currentTimeMillis();
// 如果是第一次循环,完成初始化工作
if (!initialChoreComplete) {
initialChoreComplete = initialChore();
// 第一次后的每次循环,则周期性的调用chore()方法
} catch (Exception e) {
LOG.error(&Caught exception&, e);
if (this.stopper.isStopped()) {
// 睡眠期睡眠一定的时间,然后再去调用chore()方法
this.sleeper.sleep(startTime);
} catch (Throwable t) {
LOG.fatal(getName() + &error&, t);
} finally {
(getName() + & exiting&);
cleanup();
}& & & & 这个run()方法的执行逻辑非常简单,只要stopper不停止,while循环就持续进行,首先,第一次进入run()方法时,标志位initialChoreComplete初始化为false,标志着Chore尚未初始化完毕,此时调用initialChore()做初始化工作,并返回初始化结果赋值给标志位initialChoreComplete,这个initialChore()目前是一个空方法,只返回true,而&第一次后的每次循环,则周期性的调用chore()方法,每次调用完chore()方法后,都通过睡眠器sleeper的sleep()方法,从每次进入while循环时获取的时刻startTime开始,休眠Chore构造函数传入的p时间,休眠过后再次执行chore()方法。如果stopper已停止,或者发生Throwable异常,则Chore调用cleanup()完成清理工作。
& & & & 好了,Chore的运行机制到这里,已经给大家讲解清楚了。那么,再回到文章的初始,HRegionServer中名为compactionChecker的这个Chore,到底是如何初始化,并且都做了哪些事情呢?让我们继续往下看。
& & & & 在前面讲解compact合并线程CompactSplitThread的文章中,我们了解过HRegionServer的initializeThreads()方法,它负责初始化工作在HRegionServer上的各种线程,包括CompactSplitThread,当然也就包括CompactionChecker。代码如下:
pactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);& & & & 它是通过构造一个CompactionChecker对象来完成初始化的。其构造方法如下:
// 构造函数
CompactionChecker(final HRegionServer h, final int sleepTime,
final Stoppable stopper) {
// 调用父类Chore的构造方法
super(&CompactionChecker&, sleepTime, h);
// 将载体HRegionServer赋值给instance变量
this.instance =
(this.getName() + & runs every & + StringUtils.formatTime(sleepTime));
/* MajorCompactPriority is configurable.
* If not set, the compaction will use default priority.
// 设置major合并优先级,取参数pactionChecker.majorCompactPriority,默认为Integer.MAX_VALUE
this.majorCompactPriority = this.instance.conf.
getInt(&pactionChecker.majorCompactPriority&,
DEFAULT_PRIORITY);
}& & & & 很简单,调用父类Chore的构造方法,设置上面提到的线程工作周期period和stopper,而这个工作周期period就是HRegionServer的threadWakeFrequency变量,它取自参数hbase.server.thread.wakefrequency,默认为10s,它是HBase上众多后台工作线程通用的工作频率,比如周期性MemStore刷新线程等。然后,构造方法还会将载体HRegionServer赋值给instance变量,并设置major合并优先级,取参数pactionChecker.majorCompactPriority,默认为Integer.MAX_VALUE。
& & & & 不止如此,在HRegionServer上的startServiceThreads()方法中,会将该线程设置为一个后台线程,目的就是为了方便虚拟机管理,当所有用户线程退出后,该后台线程也会自动退出,代码如下:
Threads.pactionChecker.getThread(), getName() +
&.compactionChecker&, uncaughtExceptionHandler);& & & & 至此,compactionChecker的初始化已完成。那么它是如何工作的呢?换句话,为了确保回答问题的全面性,也就是上面我们提到的第二个还没回答的问题:在满足什么条件的情况下,会触发compact请求呢?既然是个Chore,我们看下CompactionChecker的chore()方法,代码如下:
// 线程的run方法会一直调用的函数chore()
protected void chore() {
// 循环检测HRegionServer的onlineRegions中的每个HRegion
for (HRegion r : this.instance.onlineRegions.values()) {
// 对应HRegion为null的话,进入下一个HRegion的循环
if (r == null)
// 取出每个Region中的Store
for (Store s : r.getStores().values()) {
// 调用Store的getCompactionCheckMultiplier()方法,获取合并检查倍增器multiplier
long multiplier = s.getCompactionCheckMultiplier();
// 合并检查倍增器multiplier必须确保大于0
assert multiplier & 0;
// 未到整数倍,跳过,每当迭代因子iteration为合并检查倍增器multiplier的整数倍时,才会发起检查
if (iteration % multiplier != 0)
if (s.needsCompaction()) {// 需要合并的话,发起SystemCompaction请求
// Queue a compaction. Will recognize if major is needed.
pactSplitThread.requestSystemCompaction(r, s, getName()
+ & requests compaction&);
} else if (s.isMajorCompaction()) {// 如果是Major合并的话,根据配置的major合并优先级majorCompactPriority确定发起合并请求
// 如果工作线程中设置的合并优先级为Integer.MAX_VALUE,即默认,或者HRegion的合并优先级小于设置值的话
if (majorCompactPriority == DEFAULT_PRIORITY
|| majorCompactPriority & r.getCompactPriority()) {
// 使用默认优先级发起合并请求
pactSplitThread.requestCompaction(r, s, getName()
+ & reque use default priority&, null);
// 使用设置的优先级发起合并请求
pactSplitThread.requestCompaction(r, s, getName()
+ & reque use configured priority&,
this.majorCompactPriority, null);
} catch (IOException e) {
LOG.warn(&Failed major compaction check on & + r, e);
// 迭代计数器设置,累加1
iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
& & & & 整个工作流程很简单,chore()方法周期性的检测HRegionServer中所有在线Region的每个HStore,调用Store的getCompactionCheckMultiplier()方法,获取合并检查倍增器multiplier,当迭代因子iteration为合并检查倍增器multiplier的整数倍时,发起针对该HStore是否需要compact的检查,如果需要合并,则根据合并的种类,确定发起何种合并请求,并且如果是Major合并的话,则需要确定优先级。毕竟Major是最耗费资源的compact,为了合理有效的利用资源,也为了防止系统性能瓶颈,增加优先级就显得十分有必要了。整个流程比较清晰,而且上述代码注释也很详细,读者可自行补脑。
& & & & 下面,我们针对几个要点进行简要说明:
& & & & 1、onlineRegions是HRegionServer上存储的所有能够提供有效服务的在线Region集合;
& & & & 2、整个检查过程是先轮询HRegion,然后针对HRegion上每个HStore进行的。并且,非常重要的是,它并不是对每个HRegion上所有HStore挨个检查,而是利用取余算法,对Region上的HStore进行检查。而这个过程的关键,就是上述代码中的合并检查倍增器multiplier,该值如果配置为1的话,则是挨个检查,如果配置成2的话,则是隔一个检查一个,依次类推。这个multiplier的获取,是通过HStore的getCompactionCheckMultiplier()方法获取的,它实际上是获取的HStore的compactionCheckMultiplier变量,而其初始化,则是取参数pactchecker.interval.multiplier,默认为1000。代码如下:
public long getCompactionCheckMultiplier() {
pactionCheckM
// 取参数pactchecker.interval.multiplier,默认为1000
pactionCheckMultiplier = conf.getInt(
COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
if (pactionCheckMultiplier &= 0) {
LOG.error(&Compaction check period multiplier must be positive, setting default: &
+ DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
pactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
}& & & & 3、对于是否需要合并,则是通过HStore的needsCompaction()方法判断的,代码如下:
public boolean needsCompaction() {
return this.storeEngine.needsCompaction(this.filesCompacting);
}& & & & 而通过StoreEngine的一种实现DefaultStoreEngine,还有CompactionPolicy的一种实现RatioBasedCompactionPolicy等一系列调用,最终实现为如下代码:
public boolean needsCompaction(final Collection&StoreFile& storeFiles,
final List&StoreFile& filesCompacting) {
// storeFile的总数减去正在合并的文件的数目
int numCandidates = storeFiles.size() - filesCompacting.size();
// 如果这个数目超过配置中合并文件的最小值
return numCandidates &= comConf.getMinFilesToCompact();
}& & & & 很简单,storeFile的总数减去正在合并的文件的数目,如果这个数目超过配置中合并文件的最小值,则视为需要发起合并请求。这个配置中合并文件的最小值,就是通过如下代码设置的:
// 先取新参数paction.min,未配置的话,再去旧参数pactionThreshold,
// 再未配置的话则默认为3,但是最终不能小于2
minFilesToCompact = Math.max(2, conf.getInt(HBASE_HSTORE_COMPACTION_MIN_KEY,
/*old name*/ conf.getInt(&pactionThreshold&, 3)));& & & & 4、需要合并的话,则调用CompactSplitThread的requestSystemCompaction()方法发起SystemCompaction请求,而如果是Major合并的话,则需要根据配置的major合并优先级majorCompactPriority确定发起合并请求,继而调用CompactSplitThread的requestCompaction()方法发起合并请求。
& & & & 那么,如何认定一个合并为Major合并呢?它的判断需要以下几个条件:
& & & & & & & 4.1、HStore下全部存储文件的Reader必须不为null,也就是全部文件必须处于打开状态,否则直接返回false;
& & & & & & & 4.2、根据合并策略来确定,以RatioBasedCompactionPolicy为例:
& & & & & & & & & & & &4.2.1、获取下一次需要Major合并的时间mcTime;
& & & & & & & & & & & &4.2.2、如果待合并的全部文件为空,或者下一次需要Major合并的时间为0,直接返回false;
& & & & & & & & & & & &4.2.3、获取待合并文件中最小的时间戳lowTimestamp,并获取当前时间now;
& & & & & & & & &4.2.4、如果最小时间戳lowTimestamp大于0,且小于当前时间now-减去下一次需要Major合并的时间:
& & & & & & & & & & & & & & & & & &4.2.4.1、获取列簇的TTL,即cfTtl;
& & & & & & & & & & & & & & & & & &4.2.4.2、如果存在多个待合并文件:直接返回true;
& & & & & & & & & & & & & & & & & &4.2.4.3、如果只存在一个待合并文件:则首先获取文件的最小时间戳minTimestamp,然后计算文件存留时间oldest,如果该文件不是元数据相关文件,且如果列簇的TTL为FOREVER,且文件保留时间仍在TTL内,那么我们需要根据数据块的位置索引与参数hbase.hstore.min.locality.to.pact大小来判断是否只针对一个文件做compact,此时的这个compact理解为压缩比合并更好点,这部分后面再讲合并策略时再着重描述。
& & & & 至此,我们把HRegionServer内部一个合并检查线程的初始化、工作方式及compact检查机制等统统讲完了。那么是否只要有这个定期检查工作线程就可以保证compact及时、正常运行,就能保证HBase的高性能了呢?
& & & & No,No,No,等着HBase源码分析之compact请求发起时机、判断条件等详情(二)吧!O(∩_∩)O哈哈~
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:447081次
积分:6818
积分:6818
排名:第3576名
原创:175篇
评论:115条
(15)(6)(3)(3)(1)(9)(54)(68)(16)(3)
(window.slotbydup = window.slotbydup || []).push({
id: '4740887',
container: s,
size: '250,250',
display: 'inlay-fix'博客分类:
博文说明:1、研究版本hbase0.94.12;2、贴出的源代码可能会有删减,只保留关键的代码
从client和server两个方面探讨hbase的写数据过程。
一、client端
1、写数据API
写数据主要是HTable的单条写和批量写两个API,源码如下:
//单条写API
public void put(final Put put) throws IOException {
doPut(put);
if (autoFlush) {
flushCommits();
//批量写API
public void put(final List&Put& puts) throws IOException {
for (Put put : puts) {
doPut(put);
if (autoFlush) {
flushCommits();
//具体的put实现
private void doPut(Put put) throws IOException{
validatePut(put);
writeBuffer.add(put);
currentWriteBufferSize += put.heapSize();
if (currentWriteBufferSize & writeBufferSize) {
flushCommits();
public void close() throws IOException {
if (this.closed) {
flushCommits();
通过两个put API可以看出如果autoFlush为false,则无论是否是批量写效果均是相同,均是等待写入的数据超过配置的writeBufferSize(通过hbase.client.write.buffer配置,默认为2M)时才提交写数据请求,如果最后的写入数据没有超过2M,则在调用close方法时会进行最后的提交,当然,如果使用批量的put方法时,自己控制flushCommits则效果不同,比如每隔1000条进行一次提交,如果1000条数据的总大小超过了2M,则实际上会发生多次提交,导致最终的提交次数多过只由writeBufferSize控制的提交次数,因此在实际的项目中,如果对写性能的要求比对数据的实时可查询和不可丢失的要求更高则可以设置autoFlush为false并采用单条写的put(final Put put)API,这样即可以简化写操作数据的程序代码,写入效率也更优,需要注意的是如果对数据的实时可查询和不可丢失有较高的要求则应该设置autoFlush为true并采用单条写的API,这样可以确保写一条即提交一条。
2、关于多线程写
通过HConnection的getTable方法获取的HTable对象进行put操作时默认就是多线程的操作,线程数与put涉及的region数有关,虽然是hbase内部是多线程,但是在进行写操作时还是需要自己写多线程进行处理,这样可以大大的提高写速度,相关源码如下,相关源码如下:
public void flushCommits() throws IOException {
Object[] results = new Object[writeBuffer.size()];
this.connection.processBatch(writeBuffer, tableName, pool, results);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
} finally {
public void processBatch(List&? extends Row& list, final byte[] tableName, ExecutorService pool,
Object[] results) throws IOException, InterruptedException {
processBatchCallback(list, tableName, pool, results, null);
public &R& void processBatchCallback(List&? extends Row& list, byte[] tableName, ExecutorService pool,
Object[] results, Batch.Callback&R& callback) throws IOException, InterruptedException {
HRegionLocation [] lastServers = new HRegionLocation[results.length];
for (int tries = 0; tries & numRetries && ++tries) {
// step 1: break up into regionserver-sized chunks and build the data structs
Map&HRegionLocation, MultiAction&R&& actionsByServer =
new HashMap&HRegionLocation, MultiAction&R&&();
for (int i = 0; i & workingList.size(); i++) {
Row row = workingList.get(i);
if (row != null) {
HRegionLocation loc = locateRegion(tableName, row.getRow());
byte[] regionName = loc.getRegionInfo().getRegionName();
MultiAction&R& actions = actionsByServer.get(loc);
if (actions == null) {
actions = new MultiAction&R&();
actionsByServer.put(loc, actions);
//每一个region对应一个MultiAction对象,每个MultiAction对象持有该region所有的put Action
Action&R& action = new Action&R&(row, i);
lastServers[i] =
actions.add(regionName, action);
// step 2: make the requests,每个region开启一个线程
Map&HRegionLocation, Future&MultiResponse&& futures =
new HashMap&HRegionLocation, Future&MultiResponse&&(actionsByServer.size());
for (Entry&HRegionLocation, MultiAction&R&& e: actionsByServer.entrySet()) {
futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
// step 3: collect the failures and successes and prepare for retry
// step 4: identify failures and prep for a retry (if applicable).
3、在写入数据前,需要先定位具体的数据应该写入的region,核心方法:
//从缓存中定位region,通过NavigableMap实现,如果没有缓存则需查询.META.表
HRegionLocation getCachedLocation(final byte [] tableName,
final byte [] row) {
SoftValueSortedMap&byte [], HRegionLocation& tableLocations =
getTableLocations(tableName);
//找到小于rowKey并且最接近rowKey的startKey对应的region,通过NavigableMap实现
possibleRegion = tableLocations.lowerValueByKey(row);
if (possibleRegion == null) {
return null;
//表的最末一个region的endKey是空字符串,如果不是最末一个region,则只有当rowKey小于endKey才返回region。
byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
KeyValue.getRowComparator(tableName).compareRows(
endKey, 0, endKey.length, row, 0, row.length) & 0) {
return possibleR
return null;
二、服务端
服务端写数据的主要过程是:写WAL日志(如果没有关闭写WAL日志)-》写memstore-》触发flush memstore(如果memstore大小超过hbase.hregion.memstore.flush.size的设置值),在flush memstore过程中可能会触发compact和split操作,在以下内容会对写put方法、flush memstore、compact和split进行讲解。
1、HTableInterface接口操作hbase数据的API对应的服务端是由HRegionServer类实现,源代码如下:
public void put(final byte[] regionName, final Put put) throws IOException {
HRegion region = getRegion(regionName);
if (!region.getRegionInfo().isMetaTable()) {
//检查HRegionServer的memstore总内存占用量是否已经超过了hbase.regionserver.global.memstore.upperLimit(默认值是0.4)或者hbase.regionserver.global.memstore.lowerLimit(默认值是0.35)的限制,如果超过了则会在flush队列中添加一个任务,其中如果是超过了upper的限制则会阻塞所有的写memstore的操作,直到内存降至upper限制以下。
this.cacheFlusher.reclaimMemStoreMemory();
boolean writeToWAL = put.getWriteToWAL();
//region会调用Store的add()方法把数据保存到相关Store的memstore中
//region在保存完数据后,会检查是否需要flush memstore,如果需要则发出flush请求,由HRegionServer的flush守护线程异步执行。
region.put(put, getLockFromId(put.getLockId()), writeToWAL);
public int put(final byte[] regionName, final List&Put& puts) throws IOException {
region = getRegion(regionName);
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
OperationStatus codes[] = region.batchMutate(putsWithLocks);
for (i = 0; i & codes. i++) {
if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
return -1;
2、Flush Memstore
memstore的flush过程由类MemStoreFlusher控制,该类是Runnable的实现类,在HRegionServer启动时会启动一个MemStoreFlusher的守护线程,每隔10s从flushQueue中获取flush任务进行刷新,如果需要flush memstore时,只需调用MemStoreFlusher的requestFlush或者requestDelayedFlush方法把flush请求加入到flush队列中即可,具体的flush是异步执行的。
memstore的大小有两个控制级别:
1)Region级
a、hbase.hregion.memstore.flush.size:默认值128M,超过将被flush到磁盘
b、hbase.hregion.memstore.block.multiplier:默认值2,如果memstore的内存大小已经超过了hbase.hregion.memstore.flush.size的2倍,则会阻塞该region的写操作,直到内存大小降至该值以下
2)RegionServer级
a、hbase.regionserver.global.memstore.lowerLimit:默认值0.35,HRegionServer的所有memstore占用内存在HRegionServer总内存中占的lower比例,当达到该值,则会从整个RS中找出最需要flush的region进行flush
b、hbase.regionserver.global.memstore.upperLimit:默认值0.4,HRegionServer的所有memstore占用内存在总内存中的upper比例,当达到该值,则会从整个RS中找出最需要flush的region进行flush,直到总内存比例降至该数限制以下,并且在降至限制比例以下前将阻塞所有的写memstore的操作
在对整个HRegionServer进行flush操作时,并不会刷新所有的region,而是每次均会根据region的memstore大小、storeFile数量等因素找出最需要flush的region进行flush,flush完成后再进行内存总比例的判断,如果还未降至lower限制以下则会再寻找新的region进行flush。
在flush region时会flush该region下所有的store,虽然可能某些store的memstore内容很少。
在flush memstore时会产生updatesLock(HRegion类的一个属性,采用jdk的ReentrantReadWriteLock实现)的排它锁write lock,当获取完memstore的快照后释放updatesLock的write lock,在释放之前,所有的需要获取updatesLock的write、read lock的操作均会被阻塞,该影响是整个HRegion范围,因此如果表的HRegion数量过少,或者数据写入时热点在一个region时会导致该region不断flush memstore,由于该过程会产生write排他锁(虽然进行memstore快照的时间会很快),因此会影响region 的整体写能力。
3、Compact操作
hbase有两种compact:minor和major,minor通常会把若干个小的storeFile合并成一个大的storeFile,minor不会删除标示为删除的数据和过期的数据,major则会删除这些数据,major合并之后,一个store只有一个storeFile文件,这个过程对store的所有数据进行重写,有较大的资源开销,major 合并默认1天执行一次,可以通过hbase.hregion.majorcompaction配置执行周期,通常是把该值设置为0进行关闭,采用手工执行,这样可以避免当集群繁忙时执行整个集群的major合并,major合并是必须执行的操作,因为删除标示为删除和过期的数据操作是在该合并过程中进行的。
compact合并的级别
1)、整个hbase集群
在HRegionServer启动时会开启一个守护线程定时扫描集群下的所有在线的region下的storeFile文件,对所有符合Store.needsCompaction()或Store.isMajorCompaction()的store进行合并操作,默认扫描周期是10000秒(大概2.7小时),即大概每隔10000秒进行一次全局的compact,应该尽量减少storefile的文件数,避免每次全局compact时真实发生compact的数量,减少整个集群的负载,可以关闭任何的compact,半夜通过脚本触发:
//threadWakeFrequency默认值是10*1000,multiplier默认值是1000,单位:毫秒
pactionChecker = new CompactionChecker(this, this.threadWakeFrequency * multiplier, this);
//chore是CompactionChecker定时执行的方法,定时进行minor和major的compcat合并,如果hbase.hregion.majorcompaction配置为0则不执行major合并,minor升级为major除外。
protected void chore() {
for (HRegion r : this.instance.onlineRegions.values()) {
if (r == null)
for (Store s : r.getStores().values()) {
if (s.needsCompaction()) {
//如果整个store下的storeFile文件均需要合并,则会自动升级到major合并
pactSplitThread.requestCompaction(r, s, getName()
+ " requests compaction", null);
} else if (s.isMajorCompaction()) {
if (majorCompactPriority == DEFAULT_PRIORITY
|| majorCompactPriority & r.getCompactPriority()) {
pactSplitThread.requestCompaction(r, s, getName()
+ " reque use default priority", null);
pactSplitThread.requestCompaction(r, s, getName()
+ " reque use configured priority",
this.majorCompactPriority, null);
} catch (IOException e) {
LOG.warn("Failed major compaction check on " + r, e);
//store内除去正在执行compact的storeFile后剩余的storeFile数如果大于配置的最小可合并数,则可以进行compact合并,最小的可合并数通过pactionThreshold配置,默认是3,最小值为2。
public boolean needsCompaction() {
return (storefiles.size() - filesCompacting.size()) & minFilesToC
//是否是major合并
private boolean isMajorCompaction(final List&StoreFile& filesToCompact) throws IOException {
boolean result =
//根据hbase.hregion.majorcompaction配置的major合并周期计算下次进行major合并的时间,如果设置为0则不进行major合并
long mcTime = getNextMajorCompactTime();
if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
// TODO: Use better method for determining stamp of last major (HBASE-2990)
//store中最久没有被修改过的storeFile文件的时间,作为上次major合并的时间进行判断下次应该进行major合并的时间,这种做法并不合理,可能会导致延后执行major合并,极端情况下会导致永远不进行major合并。
long lowTimestamp = getLowestTimestamp(filesToCompact);
long now = System.currentTimeMillis();
//只有当达到了major合并时间才可能进行major合并
if (lowTimestamp & 0l && lowTimestamp & (now - mcTime)) {
// Major compaction time has elapsed.
if (filesToCompact.size() == 1) {
StoreFile sf = filesToCompact.get(0);
//store中最久的时间与当前时间的时间差
long oldest = (sf.getReader().timeRangeTracker == null) ?
Long.MIN_VALUE :
now - sf.getReader().timeRangeTracker.minimumT
if (sf.isMajorCompaction() && (this.ttl == HConstants.FOREVER || oldest & this.ttl)) {
//如果列簇没有设置过期时间(通过HColumnDescriptor.setTimeToLive()设置),因此无需通过major合并删除过期数据。
} else if (this.ttl != HConstants.FOREVER && oldest & this.ttl) {
result = true;
result = true;
2) 、表级
通过HBaseAdmin或者CompactionTool可以触发表下的所有region和列簇进行compact合并(minor或者major)。HBaseAdmin还可以触发表下的指定列簇的compact操作。
3)、region级
通过HBaseAdmin或者CompactionTool可触发对指定region下的所有列簇进行compact操作(minor或者major)。HBaseAdmin还可以触发region下的指定列簇的compact操作。
通过Merge工具可以把给定表下的任意两个region合并成一个region,在合并region前会触发region的major compact操作。
在flush memstore过程中会触发当前region的compact,写数据或者split region等会触发flush memstore。
4)、列簇级(Store级)
有很多情况均会触发Store的compact,比如:执行CompactionTool工具的compact方式、flush memstore等。
注:以上4条只是指触发compact操作,但是不一定真正发生compact,还需满足needsCompaction()或者isMajorCompaction()的条件。
compact总结:
1)、从compact的程度可以分为:minor和major合并;
2)、从发生的范围可以分:整个集群、表、region、列簇4个级别;
3)、从触发的方式上可以分:
a、hbase内部自动触发(HRegionServer的定时器、flush memstore、split等)
b、客户端等外部触发(hbase管理工具、HBaseAdmin(client端管理类)、CompactionTool等)
4)、从执行的实时性:异步执行,立即执行;
Compact的执行逻辑如下:
//CompactSplitThread类,只由HRegionServer类持有,在以下几个地方被调用:
//1、HRegionServer的compact守护线程
//2、MemStoreFlusher的flushRegion
//3、CompactingRequest的run方法
public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
final String why, int priority, CompactionRequest request) throws IOException {
CompactionRequest cr = s.requestCompaction(priority, request);
cr.setServer(server);
//是否是large合并,只与参与合并的文件的总大小有关,超过一定值后就会通过large合并的线程池,
//注意与major合并的区别,large线程池执行的任务可能是一个minor合并也可能是major合并。
//默认的large和small线程数是1,可以通过hbase.paction.large和hbase.paction.small配置
ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())? largeCompactions : smallC
pool.execute(cr);
public CompactionRequest requestCompaction(int priority, CompactionRequest request)
throws IOException {
this.lock.readLock().lock();
synchronized (filesCompacting) {
// candidates = all storefiles not already in compaction queue
List&StoreFile& candidates = Lists.newArrayList(storefiles);
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = candidates.indexOf(last);
Preconditions.checkArgument(idx != -1);
candidates.subList(0, idx + 1).clear();
boolean override = false;
if (region.getCoprocessorHost() != null) {
override = region.getCoprocessorHost().preCompactSelection(this, candidates, request);
CompactSelection filesToC
if (override) {
// coprocessor is overriding normal file selection
filesToCompact = new CompactSelection(conf, candidates);
filesToCompact = compactSelection(candidates, priority);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompactSelection(this,
ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request);
// no files to compact
if (filesToCompact.getFilesToCompact().isEmpty()) {
return null;
// basic sanity check: do not try to compact the same StoreFile twice.
if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) {
// TODO: change this from an IAE to LOG.error after sufficient testing
Preconditions.checkArgument(false, "%s overlaps with %s",
filesToCompact, filesCompacting);
filesCompacting.addAll(filesToCompact.getFilesToCompact());
Collections.sort(filesCompacting, parators.FLUSH_TIME);
// major compaction iff all StoreFiles are included
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
// everything went better than expected. create a compaction request
int pri = getCompactPriority(priority);
//not a special compaction request, so we need to make one
if(request == null){
request = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
// update the request with what the system thinks the request should be
// its up to the request if it wants to listen
request.setSelection(filesToCompact);
request.setIsMajor(isMajor);
request.setPriority(pri);
} finally {
this.lock.readLock().unlock();
if (request != null) {
CompactionRequest.preRequest(request);
//如果合并的总文件大小超过2 * this.minFilesToCompact * this.region.memstoreFlushSize则会通过大合并的线程池进行合并,总共有两个合并的线程池
ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())? largeCompactions : smallC
// minFilesToCompact默认值为3, memstoreFlushSize默认值128M
boolean throttleCompaction(long compactionSize) {
long throttlePoint = conf.getLong(
"hbase.paction.throttle",
2 * this.minFilesToCompact * this.region.memstoreFlushSize);
return compactionSize & throttleP
HBase的默认split策略类是:IncreasingToUpperBoundRegionSplitPolicy,可以通过hbase.regionserver.region.split.policy配置,或者通过HTableDescriptor在建表时指,HTableDescriptor指定的split策略优先级最高,以下是对该类中计算split临界大小的源代码讲解:
//IncreasingToUpperBoundRegionSplitPolicy类
//返回需要split的storeFile大小,如果超过该值,则可能触发split操作
//取region数量和memstore大小的计算值与desiredMaxFileSize比较的最小值,因此在进行写数据时,我们会发现虽然配置的最大region大小为10G,但是hbase并不会真正等region大小达到10G才split,而是有各种split的触发大小,当只有一个region时,达到memstore大小就会split,如此设计可以确保写数据时可以快速分裂出多个region,充分利用集群资源,并且在早期split会比中后期进行split消耗的服务器资源更少,因为早期数据量小。
long getSizeToCheck(final int tableRegionsCount) {
return tableRegionsCount == 0? getDesiredMaxFileSize():
Math.min(getDesiredMaxFileSize(),
this.flushSize * (tableRegionsCount * tableRegionsCount));
// getDesiredMaxFileSize的逻辑如下:
//如果建表时指定了region大小,则采用建表时指定的值,否则采用hbase.hregion.max.filesize配置的值
HTableDescriptor desc = region.getTableDesc();
if (desc != null) {
this.desiredMaxFileSize = desc.getMaxFileSize();
if (this.desiredMaxFileSize &= 0) {
this.desiredMaxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE);
浏览: 62146 次
来自: 北京
看楼主的意思每次,执行sql时,会连接redis,写数据,对吧 ...
我联系不上你了,能加我的QQ号码么?我是Linda,需要帮助! ...
感谢楼主分享,学习了
总结的不错
写完已经凌晨1:40了,还好明天是周日不上班,自己顶自己
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'

我要回帖

更多关于 hbase 关闭compact 的文章

 

随机推荐