怎么通过rocketmq源码分析读取信息

rocketmq 官网_rocketmq_壮志凌云
以下内容已过滤百度推广
rocketmq-broker release 3.4.6 jul 15, 2016
rocketmq-client release 3.4.6 jul 15, 2016
rocketmq-common release 3.4.6 jul 15, 2016
rocketmq-ex...&&普通
rocketmq单机支持1万以上的持久化队列,前提是足够的内存、硬盘空间,过期数据数据删除(rocketmq中的消息队列长度不是无限的,只是足够大的内存+数据定时删除) rocketmq...&&普通
日&-&rocketmq中的所有消息都是持久化的,先写入系统pagecache,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,可以直接从内存读取 2.1异步刷盘 在有raid ...&&普通
日&-&27 iii 项目开源主页:/alibaba/rocketmq 1 前言本文档旨在描述 rocketmq 的多个关键特性的实现原理,并对消息中间件遇到的各种问题...&&百度文库
===一、rocketmq简介==...&&普通
rocketmq是什么? rocketmq 是一款分布式、队列模型的消息中间件,具有以下特点: 能够保证严格的消息顺序 提供丰富的消息拉取模式 高效的订阅者水平扩展能力 实时的消息...&&普通
日&-&因此,系统的默认设置并不能使rocketmq发挥很好的性能,需要对系统的pagecache,内存分配,i/o调度,文件句柄限制做一些针对性的参数设置。 ...&&普通
3、开源社区产品名:rocketmq,无技术绑定风险4、应用灵活,无任何强制绑定其他产品 独立部署 1、支持专有云独立输出,支持物理机和虚拟机,最小部署仅几台机器2、专...&&普通
日&-&2.tar -xvf alibaba-rocketmq-3.0.7.tar.gz 解压到适当的目录如/opt/目录 3.启动rocketmq:进入rocketmq/bin 目录 执行 nohup sh mqnamesrv & 4.启动br...&&普通
日&-&备注:1.如果您此前未接触过rocketmq,请先阅读附录部分,以便了解rocketmq的整体架构和相关术语2.文中的mqserver与broker表示同一概念分布式消息系统作为实现分布式系统...&&普通
语义关联近似词猜&正规性45地理位置网址标题|网址|摘要F0略略分类信息&|&猜&非正规中略略略精确匹配1略略分类信息&|&猜&非正规中略略略精确匹配2略略分类信息&|&猜&非正规中略略略精确匹配3略略分类信息&|&猜&非正规中略略略精确匹配4略略分类信息&|&猜&非正规中略略略精确匹配5略略分类信息&|&猜&非正规中略略略精确匹配6略略分类信息&|&猜&非正规中略略略精确匹配7略略分类信息&|&猜&非正规中略略略精确匹配8略略分类信息&|&猜&非正规中略略略精确匹配9略略分类信息&|&猜&非正规中略略略精确匹配10
12时间限制猜&实时动态5相关检索词泛时效性8F1略略略略略略略略1略略略略略略略略2略略略略略略略略3百度文库略略略略略略略4略略略略略略略略5略略略略略略略略6略略略略略略略略7略略略略略略略略8略略略略略略略略9略略略略略略略略10
url2345摘要前标题后标题F2略略略略略正文略1略略略略略正文略2略略略略略正文略3略略略略略正文略4略略略略略略略5略略略略略略略6略略略略略正文略7略略略略略正文略8略略略略略正文略9略略略略略略略10
123原创猜&网址形式6相关词猜&相似度F3略略略略子页优先级较低略略精确匹配1略略略略子页优先级较低略略精确匹配2略略略略子页优先级较低略略精确匹配3略略略略子页优先级较低略略精确匹配4略略略略子页优先级较低略略精确匹配5略略略略子页优先级较低略略精确匹配6略略略略子页优先级较低略略精确匹配7略略略略主页次优先&|&子页内容充实略略精确匹配8略略略略子页优先级较低略略精确匹配9略略略略子页优先级较低略略精确匹配10RocketMQ入门(1) - 推酷
RocketMQ入门(1)
能够保证严格的消息顺序
提供丰富的消息拉取模式
高效的订阅者水平扩展能力
实时的消息订阅机制
亿级消息堆积能力
一.RocketMQ网络部署特点
& & (1)NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步
& & (2)Broker部署相对复杂,Broker氛围Master与Slave,一个Master可以对应多个Slaver,但是一个Slaver只能对应一个Master,Master与Slaver的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer
& & (3)Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Produce完全无状态,可集群部署
& & (4)Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slaver建立长连接,且定时向Master、Slaver发送心跳。Consumer即可从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定
二.RocketMQ储存特点
& &(1)零拷贝原理:Consumer消费消息过程,使用了零拷贝,零拷贝包括一下2中方式,RocketMQ使用第一种方式,因小块数据传输的要求效果比sendfile方式好
& & & & & &a )使用mmap+write方式
& &&& &&& & &优点:即使频繁调用,使用小文件块传输,效率也很高
& &&& &&& & &缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU资源,内存安全性控制复杂,需要避免JVM Crash问题
& & & & b)使用sendfile方式
& &&& &&& & &优点:可以利用DMA方式,消耗CPU资源少,大块文件传输效率高,无内存安全新问题
& &&& &&& & &缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO
& & (2)数据存储结构
三.RocketMQ关键特性
1.单机支持1W以上的持久化队列
& & (1)所有数据单独储存到commit Log ,完全顺序写,随机读
& & (2)对最终用户展现的队列实际只储存消息在Commit Log 的位置信息,并且串行方式刷盘
& & &这样做的好处:
& & (1)队列轻量化,单个队列数据量非常少
& & (2)对磁盘的访问串行话,避免磁盘竞争,不会因为队列增加导致IOWait增高
& & &每个方案都有优缺点,他的缺点是:
& & (1)写虽然是顺序写,但是读却变成了随机读
& & (2)读一条消息,会先读Consume&&Queue,再读Commit Log,增加了开销
& & (3)要保证Commit Log 与&Consume&&Queue完全的一致,增加了编程的复杂度
& & &以上缺点如何客服:
& & (1)随机读,尽可能让读命中pagecache,减少IO操作,所以内存越大越好。如果系统中堆积的消息过多,读数据要访问硬盘会不会由于随机读导致系统性能急剧下降,答案是否定的。
& &&& & a)访问pagecache时,即使只访问1K的消息,系统也会提前预读出更多的数据,在下次读时就可能命中pagecache
& &&& & b)随机访问Commit Log 磁盘数据,系统IO调度算法设置为NOOP方式,会在一定程度上将完全的随机读变成顺序跳跃方式,而顺序跳跃方式读较完全的随机读性能高5倍
& &&(2)由于Consume Queue存储数量极少,而且顺序读,在pagecache的与读取情况下,Consume&Queue的读性能与内存几乎一直,即使堆积情况下。所以可以认为Consume&Queue完全不会阻碍读性能
& &&(3)Commit Log中存储了所有的元信息,包含消息体,类似于MySQl、Oracle的redolog,所以只要有Commit Log存在,&Consume& Queue即使丢失数据,仍可以恢复出来
2.刷盘策略
rocketmq中的所有消息都是持久化的,先写入系统pagecache,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,可以直接从内存读取
2.1异步刷盘
在有 RAID 卡, SAS 15000 转磁盘测试顺序写文件,速度可以达到 300M 每秒左右,而线上的网卡一般都为千兆网卡,写磁盘速度明显快于数据网络入口速度,那么是否可以做到写完& &&内存就向用户返回,由后台线程刷盘呢?
(1). &由于磁盘速度大于网卡速度,那么刷盘的进度肯定可以跟上消息的写入速度。
(2). &万一由于此时系统压力过大,可能堆积消息,除了写入 IO,还有读取 IO,万一出现磁盘读取落后情况,会不会导致系统内存溢出,答案是否定的,原因如下:
a) &写入消息到 PAGECACHE 时,如果内存不足,则尝试丢弃干净的 PAGE,腾出内存供新消息使用,策略
是 LRU 方式。
b) &如果干净页不足,此时写入 PAGECACHE 会被阻塞,系统尝试刷盘部分数据,大约每次尝试 32 个 PAGE,
来找出更多干净 PAGE。
综上,内存溢出的情况不会出现
2.2同步刷盘:
同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PAGECACHE 直接返回,而同步刷盘需要等待刷盘完成才返回,同步刷盘流程如下:
& &&(1)写入 PAGECACHE 后,线程等待,通知刷盘线程刷盘。
& &&(2)刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。
& &&(3)前端等待线程向用户返回成功。
3.消息查询
3.1按照MessageId查询消息
MsgId总共16个字节,包含消息储存主机地址,消息Commit Log Offset。从MsgId中解析出Broker的地址和Commit Log 偏移地址,然后按照存储格式所在位置消息buffer解析成一个完整消息
3.2按照Message Key查询消息
1.根据查询的key的hashcode%slotNum得到具体的槽位置 &(slotNum是一个索引文件里面包含的最大槽目数目,例如图中所示 slotNum=500W)
2.根据slotValue(slot对应位置的值)查找到索引项列表的最后一项(倒序排列,slotValue总是指向最新的一个索引项)
3.遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的32条记录)
4.Hash冲突,寻找key的slot位置时相当于执行了两次散列函数,一次key的hash,一次key的hash取值模,因此这里存在两次冲突的情况;第一种,key的hash值不同但模数相同,此时查询的时候会在比较第一次key的hash值(每个索引项保存了key的hash值),过滤掉hash值不想等的情况。第二种,hash值相等key不想等,出于性能的考虑冲突的检测放到客户端处理(key的原始值是存储在消息文件中的,避免对数据文件的解析),客户端比较一次消息体的key是否相同
5.存储,为了节省空间索引项中存储的时间是时间差值(存储时间——开始时间,开始时间存储在索引文件头中),整个索引文件是定长的,结构也是固定的
4.服务器消息过滤
RocketMQ的消息过滤方式有别于其他的消息中间件,是在订阅时,再做过滤,先来看下Consume Queue存储结构
1.在Broker端进行Message Tag比较,先遍历Consume Queue,如果存储的Message Tag与订阅的Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer。注意Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode
2.Consumer收到过滤消息后,同样也要执行在broker端的操作,但是比对的是真实的Message Tag字符串,而不是hashcode
为什么过滤要这么做?
1.Message Tag存储hashcode,是为了在Consume Queue定长方式存储,节约空间
2.过滤过程中不会访问Commit Log 数据,可以保证堆积情况下也能高效过滤
3.即使存在hash冲突,也可以在Consumer端进行修正,保证万无一失
5.单个JVM进程也能利用机器超大内存
1.Producer发送消息,消息从socket进入java 堆
2.Producer发送消息,消息从java堆进入pagecache,物理内存
3.Producer发送消息,由异步线程刷盘,消息从pagecache刷入磁盘
4.Consumer拉消息(正常消费),消息直接从pagecache(数据在物理内存)转入socket,到达Consumer,不经过java堆。这种消费场景最多,线上96G物理内存,按照1K消息算,可以物理缓存1亿条消息
5.Consumer拉消息(异常消费),消息直接从pagecache转入socket
6.Consumer拉消息(异常消费),由于socket访问了虚拟内存,产生缺页中断,此时会产生磁盘IO,从磁盘Load消息到pagecache,然后直接从socket发出去
6.消息堆积问题解决办法
堆积性能指标
消息的堆积容量
依赖磁盘大小
发消息的吞吐量大小受影响程度
无Slave情况,会受一定影响
有Slave情况,不受影响
正常消费的Consumer是否会受影响
无Slave情况,会受一定影响
有Slave情况,不受影响
访问堆积在磁盘的消息时,吞吐量有多大
与访问的并发有关,最终会降到5000左右
在有Slave情况下,Master一旦发现Consumer访问堆积在磁盘的数据时,回想Consumer下达一个重定向指令,令Consumer从Slave拉取数据,这样正常的发消息与正常的消费不会因为堆积受影响,因为系统将堆积场景与非堆积场景分割在了两个不同的节点处理。这里会产生一个问题,Slave会不会写性能下降,答案是否定的。因为Slave的消息写入只追求吞吐量,不追求实时性,只要整体的吞吐量高就行了,而Slave每次都是从Master拉取一批数据,如1M,这种批量顺序写入方式使堆积情况,整体吞吐量影响相对较小,只是写入RT会变长
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致#探秘RocketMQ消息持久化
之前对RocketMq的上层接口进行过介绍,但是作为一个可持久化的MQ中间件,那么其核心必然是对消息的持久化这一块。这也是我一直想去了解一下,最近稍微闲下来一点,把RocketMQ这一块代码实现给过了一下,这里对看到得内容和想法进行一次总结。这里分三部分来介绍RocketMQ消息持久化,分别是来自消息提供端的写,来自消费端的读以及从本地磁盘中进行恢复。
在介绍之前先了解一下RocketMQ中对数IO的统一入口,分别是`MapedFileQueue`和`MapedFile`。`MapedFileQueue`是对某个目录下面文件IO的统一入口,而`MapedFile`是对`MapedFileQueue` 目录下某个文件的IO封装(具体是通过`MapedByteBuffer`来进行IO操作),对于`MapedFileQueue `和`MapedFile`来说,它们关注的只是往哪个位置些多少数据,以及从哪个位置读取数据,而不管里面存储的内容,写入的内容都将是`byte`数组。RocketMQ的消息文件以及其他文件的IO都是基于这两个类来做,而不是直接操作IO。介绍完这个之后,那么下面将对RocketMQ如何基于`MapedFileQueue`和`MapedFile`来做到消息的写和读。
在介绍之前,先看看RocketMQ的broker的整体架构图
![输入图片说明](https://static.oschina.net/uploads/img/44529_JVHd.jpg &在这里输入图片标题&)
##来自消息提供端的写
消息提供端发起一个`send`操作,会被`broker`中`SendMessageProcessor`所处理,至于`SendMessageProcessor`这个类中做了哪些事情,这里就不做解释,主要是对`SendMessageProcessor`如何将消息写入到磁盘进行介绍,`SendMessageProcessor `会把写磁盘的操作交给`DefaultMessageStore`类去处理,而`DefaultMessageStore `也不会做具体IO的事情,而是交给`CommitLog`,在`CommitLog`之下则是`MapedFileQueue`,在`MapedFileQueue`中会写入到最新的`MapedFile`中(此时的`MapedFile`默认最大1G,所有存储的配置都在`MessageStoreConfig`类中获取。),这里从`SendMessageProcessor `到`DefaultMessageStore `再到`CommitLog `,最后到`MapedFileQueue `,这个过程中是所有的topic都操作同一个`MapedFileQueue`,那就是说所有的Topic的消息都些在一个目录下面(因为一个`MapedFileQueue`对应一个目录,`CommitLog`的目录默认是在${user_home}/store/commitlog下),上面由消息提供端每次`send`都是一个完整的消息体,那就是一个完整的消息,这个消息体将会连续的写到`MapedFileQueue`的最新`MapedFile`中,在`MapedFileQueue`里面维护了commitlog的全局offset,那么只需要告诉`MapedFileQueue`一个全局offset和消息体的大小,那么就可以从`MapedFileQueue`中读取一个消息。但是在commitlog中只是负责将消息写入磁盘,而不管你怎么来读取,但是`CommitLog`通过`MapedFileQueue`写完之后,那么会得到当前写的位置,以及消息体大小,同时加上topic的元数据信息,通过异步队列的方式写到topic的索引文件,这个文件就是下面介绍消息读取的时候用到。
在`CommitLog`的`putMessage`方法调用`MapedFileQueue`写完消息之后,那么会调用`DefaultMessageStore`的`putDispatchRequest`方法进行将本次写操作广播出去,具体代码如下:
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
..........
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
..........
DispatchRequest dispatchRequest = new DispatchRequest(//
topic,// 1
queueId,// 2
result.getWroteOffset(),// 3
result.getWroteBytes(),// 4
tagsCode,// 5
msg.getStoreTimestamp(),// 6
result.getLogicsOffset(),// 7
msg.getKeys(),// 8
* Transaction
msg.getSysFlag(),// 9
msg.getPreparedTransactionOffset());// 10
this.defaultMessageStore.putDispatchRequest(dispatchRequest);
..........
return putMessageR
如果继续跟进`putDispatchRequest`代码,就会发现是将`dispatchRequest`放到一个队列,然后由另一个线程去处理这个数据,这样可以提高消息提供端写入broker的效率,在这个线程中,会触发`DefaultMessageStore`的`putMessagePostionInfo`方法,该方法实现如下:
public void putMessagePostionInfo(String topic, int queueId, long offset, int size, long tagsCode,
long storeTimestamp, long logicOffset) {
ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
cq.putMessagePostionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
可以看到通过`topic`和`queueId`得到具体的`ConsumerQueue`,确定这个消息在哪个消费队列里面,同事触发`cq.putMessagePostionInfoWrapper `调用,从方法名就知道是记录消息位置的,最后会调用`ConsumerQueue `的`putMessagePostionInfo`方法,这个方法实现如下:
private boolean putMessagePostionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
// 在数据恢复时会走到这个流程
if (offset &= this.maxPhysicOffset) {
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQStoreUnitSize);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQStoreUnitS
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(expectLogicOffset);
if (mapedFile != null) {
// 纠正MapedFile逻辑队列索引顺序
if (mapedFile.isFirstCreateInQueue() && cqOffset != 0 && mapedFile.getWrotePostion() == 0) {
this.minLogicOffset = expectLogicO
this.fillPreBlank(mapedFile, expectLogicOffset);
(&fill pre blank space & + mapedFile.getFileName() + & & + expectLogicOffset + & &
+ mapedFile.getWrotePostion());
if (cqOffset != 0) {
long currentLogicOffset = mapedFile.getWrotePostion() + mapedFile.getFileFromOffset();
if (expectLogicOffset != currentLogicOffset) {
// XXX: warn and notify me
&[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}&,//
expectLogicOffset, //
currentLogicOffset,//
this.topic,//
this.queueId,//
expectLogicOffset - currentLogicOffset//
// 记录物理队列最大offset
this.maxPhysicOffset =
return mapedFile.appendMessage(this.byteBufferIndex.array());
下面是要写入磁盘的内容
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQStoreUnitSize);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
可以看到是写入了commitlog得全局offset和消息体的大小,以及tags信息。
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(expectLogicOffset);
上面看到,`ConsumerQueue`(默认情况下`ConsumerQueue`是在${user_home}/store/consumerqueue/${queueId})也是通过`MapedFileQueue`来进行磁盘IO的,素以`MapedFileQueue`可以理解为RocketMQ的磁盘访问入口。
到这里基本上完成了一次消息提供端发起`send`操作所做的哪些事情,主要是通过CommitLog来进行消息内容的持久化,以及通过`ConsumerQueue`来确定消息被哪个队列消费,以及消息的索引持久化。
这里再介绍一下上面的queueId是怎么来的,因为消息提供端`send`某个topic的消息并不知道queueId,这个queueId是在`broker`端生成的,生成代码在`SendMessageProcessor`的方法`consumerSendMsgBack`中,代码段如下:
int queueIdInt =
Math.abs(this.random.nextInt() % ) % subscriptionGroupConfig.getRetryQueueNums();
##来自消息消费端的读
在`broker`端处理来自消费端的读请求,是交给`PullMessageProcessor`类来处理,在方法`processRequest`经过一系列处理之后,会交给`DefaultMessageStore`的`getMessage`方法,我这里贴出该方法主要代码段
public GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final SubscriptionData subscriptionData) {
GetMessageResult getResult = new GetMessageResult();
// 有个读写锁,所以只访问一次,避免锁开销影响性能
final long maxOffsetPy = mitLog.getMaxOffset();
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQuque();
maxOffset = consumeQueue.getMaxOffsetInQuque();
.........//逻辑校验
//这里的offset是只从第几个消息开始消费,该方法返回的时从offset之后的消息体索引的io
SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int MaxFilterMessageCount = 16000;
boolean diskFallRecorded =
//循环读出多个消息内容
for (; i & bufferConsumeQueue.getSize() && i & MaxFilterMessageC i +=
ConsumeQueue.CQStoreUnitSize) {
//得到了一个消息体索引信息
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();//commitlog的全局偏移量
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();//消息大小
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();//tag信息
maxPhyOffsetPulling = offsetPy;
//参数校验
// 消息过滤
if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
//从commitlog中读取消息
SelectMapedBufferResult selectResult =
mitLog.getMessage(offsetPy, sizePy);
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getR
上面是读取消息的部分逻辑,可以看到是先从`ConsumerQueue`中获取消息索引,然后再从commitlog中读取消息内容。这些内容也是在存储消息的时候写入的。因为broker端并不是一直运行的,而里面的commitlog的offset是有状态的,不能说你的broker挂掉了,导致commitlog的offset丢失,可能导致消息被覆盖。所以下面再用上一小段来介绍RocketMQ如何做到commitlog的offset重启后不丢失。
##从本地磁盘恢复
在`MapedFileQueue`中有一个`load`方法,这个方法是将`MapedFileQueue`所管理目录中得文件加载到`MapedFile`中,如果你追踪这个`load`方法的被调用链路,会发现是在`BrokerController`的`initialize`触发了整个调用,那就是说在broker启动的时候,会触发`CommitLog`去将本地磁盘的数据关系加载到系统里面来,上面说了`CommitLog`有一个全局offset,这个offset在broker启动的时候怎么被查找的呢?如果你们熟悉的话,应该猜得到,是将`CommitLog`的`MapedFileQueue`中文件进行计算,得到当前`CommitLog`的全局offset,下面我贴出具体找得代码:
* recover时调用,不需要加锁
public void truncateDirtyFiles(long offset) {
List&MapedFile& willRemoveFiles = new ArrayList&MapedFile&();
for (MapedFile file : this.mapedFiles) {
long fileTailOffset = file.getFileFromOffset() + this.mapedFileS
if (fileTailOffset & offset) {
if (offset &= file.getFileFromOffset()) {
file.setWrotePostion((int) (offset % this.mapedFileSize));
file.setCommittedPosition((int) (offset % this.mapedFileSize));
// 将文件删除掉
file.destroy(1000);
willRemoveFiles.add(file);
this.deleteExpiredFile(willRemoveFiles);
上面代码是在`MapedFileQueue`中得,是被`CommitLog`的`recoverAbnormally`方法调用,而`recoverAbnormally`最上层触发也是在`BrokerController`的`initialize`方法中。上面说的是`CommitLog`的恢复过程,而`ConsumerQueue`的恢复恢复过程也是类似,感兴趣可以自己去看看。

我要回帖

更多关于 rocketmq github 的文章

 

随机推荐