RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-上篇
创始人
2024-05-28 13:31:56
0

代码@1,根据 offset 从 commitlog 找到一条消息,如果找不到,退出此次循环,doReput方法跳出,此处从 commitlog 文件中取出消息的逻辑,在下文会重点分析,故在此暂时跳过。

先浏览一下 SelectMappedBufferResult

代码@2:尝试构建转发请求对象 DispatchRequest ,我大概浏览了一下 commitLog.checkMessageAndReturnSize,主要是从Nio ByteBuffer中,根据 commitlog 消息存储格式,解析出消息的核心属性:

// 消息主题
private final String topic; 
// 消息队列
private final int queueId; 
// commitlog中的偏移量
private final long commitLogOffset;
// 消息大小
private final int msgSize; // tagsCode
private final long tagsCode;
// 消息存储时间
private final long storeTimestamp; 
//消息在消费队列的offset
private final long consumeQueueOffset; 
// 存放在消息属性中的keys: PROPERTY_KEYS = "KEYS"
private final String keys; 
// 是否成功
private final boolean success; 
// 消息唯一键 "UNIQ_KEY"
private final String uniqKey; 
// 系统标志
private final int sysFlag;
// 事务pre消息偏移量
private final long preparedTransactionOffset; 
// 属性
private final Map propertiesMap; 

代码@3:转发DistpachRequest。

根据实现类,consumequeue,index 分别对应 CommitLogDispatcherBuildConsumeQueue 与 CommitlogDispatcherBuildIndex。

2.1 CommitLogDispatcherBuildConsumeQueue

核心处理方法:

public void putMessagePositionInfoWrapper(DispatchRequest request) {final int maxRetries = 30;boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();      // @1for (int i = 0; i < maxRetries && canWrite; i++) {long tagsCode = request.getTagsCode();if (isExtWriteEnable()) {ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();cqExtUnit.setFilterBitMap(request.getBitMap());cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());cqExtUnit.setTagsCode(request.getTagsCode());long extAddr = this.consumeQueueExt.put(cqExtUnit);if (isExtAddr(extAddr)) {tagsCode = extAddr;} else {log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,topic, queueId, request.getCommitLogOffset());}}boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());    // @2if (result) {this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());     // @3return;} else {// XXX: warn and notify melog.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()+ " failed, retry " + i + " times");try {Thread.sleep(1000);} catch (InterruptedException e) {log.warn("", e);}}}// XXX: warn and notify melog.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();

代码@1:判断 ConsumeQueue 是否可写。

代码@2:写入 consumequeue文件,真正的写入到 ConsumeQueue 逻辑如下。

Consumequeue#putMessagePositionInfoWrapper

Consumequeue#putMessagePositionInfoWrapper
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {if (offset <= this.maxPhysicOffset) {return true;}this.byteBufferIndex.flip();this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);    // 代码@1final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;   // @2MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile != null) {if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {    // @3this.minLogicOffset = expectLogicOffset;this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);this.fillPreBlank(mappedFile, expectLogicOffset);log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "+ mappedFile.getWrotePosition());}if (cqOffset != 0) {long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}this.maxPhysicOffset = offset;return mappedFile.appendMessage(this.byteBufferIndex.array());    // @4}return false;

首先说一下参数:

  • long offset
    commitlog偏移量,8字节。
  • int size
    消息体大小 4字节。
  • long tagsCode
    消息 tags 的 hashcode。
  • long cqOffset
    写入 consumequeue 的偏移量。

代码@1:首先将一条 ConsueQueue 条目总共20个字节,写入到 ByteBuffer 中。

代码@2:计算期望插入 ConsumeQueue 的 consumequeue 文件位置。

代码@3:如果文件是新建的,需要先填充空格。

代码@4:写入到 ConsumeQueue 文件中,整个过程都是基于 MappedFile 来操作的。

我们现在已经知道 ConsumeQueue 每一个条目都是 20个字节(8个字节commitlog偏移量+4字节消息长度+8字节tag的hashcode

那 consumqu e文件的路径,默认大小是多少呢?

默认路径为:rockemt_home/store/consume/ {topic} / {queryId},默认大小为,30W条记录,也就是30W * 20字节。

2.2 CommitLogDispatcherBuildIndex

其核心实现类 IndexService#buildIndex,存放 Index 文件的封装类为:IndexFile。

2.2.1 IndexFile 详解

2、2.1.1 核心属性

private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 每个 hash  槽所占的字节数
private static int hashSlotSize = 4;
// 每条indexFile条目占用字节数
private static int indexSize = 20; 
// 用来验证是否是一个有效的索引。
private static int invalidIndex = 0;
// index 文件中 hash 槽的总个数
private final int hashSlotNum;
// indexFile中包含的条目数
private final int indexNum; 
// 对应的映射文件
private final MappedFile mappedFile;
// 对应的文件通道
private final FileChannel fileChannel;
// 对应 PageCache
private final MappedByteBuffer mappedByteBuffer;
// IndexHeader,每一个indexfile的头部信息

IndexHeader 详解:

index存储路径:/rocket_home/store/index/年月日时分秒。

目前了解到这来,目光继续投向IndexService。

2.2.2 IndexService

2、2.2.1 核心属性与构造方法

private final DefaultMessageStore defaultMessageStore;private final int hashSlotNum;private final int indexNum;private final String storePath;private final ArrayList indexFileList = new ArrayList();private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();public IndexService(final DefaultMessageStore store) {this.defaultMessageStore = store;this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();this.storePath =StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
  • hashSlotNum
    hash槽数量,默认5百万个。
  • indexNum
    index条目个数,默认为 2千万个。
  • storePath
    index存储路径,默认为:/rocket_home/store/index。

相关内容

热门资讯

唯美的外国现代诗歌 唯美的外国现代诗歌  在日常学习、工作或生活中,大家都看到过许多经典的诗歌吧,诗歌是表现诗人思想感情...
诗经采薇教学 采问答访稿推荐度:诗经中的女孩灵动名字1500个推荐度:教学总结推荐度:教学反思推荐度:《翠鸟》教学...
天国的记忆诗歌 天国的记忆诗歌  在日常学习、工作和生活中,大家一定没少看到经典的诗歌吧,诗歌具有精炼含蓄的特点,起...
拉萨河之恋诗歌 拉萨河之恋诗歌  拉萨河之恋  记得,那是个正午  敌不过念乡之苦  茫然不知所措的我  像一片绿叶...
思念是一种莫名的痛诗歌外 思念是一种莫名的痛诗歌外一首  今夜我一如既往等你  夜已深等你良久  相思的苦痛爬满心底  你终究...
天兵怒气冲霄汉 “天兵怒气冲霄汉”出处 出自 现代 毛泽东 的《渔家傲·反第一次大“围剿”》“天兵怒气冲霄汉”平仄韵...
陶渊明比较著名的诗词 陶渊明比较著名的诗词(精选10首)   陶渊明是东晋大诗人,也是中国历史上成就最高的诗人之一。宋代的...
唐诗之李商隐:无题·其二 唐诗三百首之李商隐:无题·其二唐诗三百首全集《无题·其二》作者:李商隐飒飒东风细雨来,芙蓉塘外有轻雷...
鲁迅《淡淡的血痕中》原文及读... 鲁迅《淡淡的血痕中》原文及读后感  【鲁迅《淡淡的血痕中》原文】  —纪念几个死者和生者和未生者— ...
适合儿童的唐诗 适合儿童的唐诗  唐诗的'辉煌成就,引起后人学习的兴趣和研究的热望。下面就是小编给大家整理的幼儿唐诗...
嫦娥 李商隐 嫦娥 李商隐嫦娥 李商隐,此其诗诗题为“嫦娥”,实际上抒写的是处境孤寂的主人公对于环境的感受和心灵独...
咏秋古诗词 中秋将至,秋风猎猎,秋高气爽。作为我国的传统节日,古代诗词中有很多千古传颂的咏秋之作,以诗抒情怀,以...
谷雨的古诗句 谷雨的古诗句  谷雨的雨,现身在古代文人墨客笔下,是相当愁人的。小编今天为大家带来关于谷雨的.古诗句...
表达奉献精神的诗句 表达奉献精神的诗句  在日常学习、工作和生活中,大家总少不了接触一些耳熟能详的诗句吧,诗句能使人们自...
经典爱情现代诗   经典爱情现代诗  1、《炉中煤》  啊,我年青的女郎!  我不辜负你的殷勤,  你也不要辜负了我...
描写兰花的诗句分享 描写兰花的诗句分享  丛兰生幽谷,莓莓遍林薄。不纫亦何伤,已胜当门托。辇至逾关山,滋培珍几阁。  冬...
朱自清的代表作品有哪些 朱自清的代表作品有哪些  朱自清(1898.11.22—1948.8.12)原名自华,号秋实,后改名...
春思诗,春思诗谢朓,春思诗的... 春思诗,春思诗谢朓,春思诗的意思,春思诗赏析 -诗词大全  春思诗,春思诗谢朓,春思诗的意思,春思诗...
客醉倚河桥,清光愁玉箫 “客醉倚河桥,清光愁玉箫。”出处 出自 宋代 高观国 的《菩萨蛮·何须急管吹云暝》“客醉倚河桥,清光...
古代的诗词歌赋 古代的诗词歌赋五篇  导语:诗词歌赋是专门集我国历代名家精品如唐诗宋词元曲为一体的诗词歌赋作品的软件...