RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-上篇
创始人
2024-05-28 13:31:56
0

代码@1,根据 offset 从 commitlog 找到一条消息,如果找不到,退出此次循环,doReput方法跳出,此处从 commitlog 文件中取出消息的逻辑,在下文会重点分析,故在此暂时跳过。

先浏览一下 SelectMappedBufferResult

代码@2:尝试构建转发请求对象 DispatchRequest ,我大概浏览了一下 commitLog.checkMessageAndReturnSize,主要是从Nio ByteBuffer中,根据 commitlog 消息存储格式,解析出消息的核心属性:

// 消息主题
private final String topic; 
// 消息队列
private final int queueId; 
// commitlog中的偏移量
private final long commitLogOffset;
// 消息大小
private final int msgSize; // tagsCode
private final long tagsCode;
// 消息存储时间
private final long storeTimestamp; 
//消息在消费队列的offset
private final long consumeQueueOffset; 
// 存放在消息属性中的keys: PROPERTY_KEYS = "KEYS"
private final String keys; 
// 是否成功
private final boolean success; 
// 消息唯一键 "UNIQ_KEY"
private final String uniqKey; 
// 系统标志
private final int sysFlag;
// 事务pre消息偏移量
private final long preparedTransactionOffset; 
// 属性
private final Map propertiesMap; 

代码@3:转发DistpachRequest。

根据实现类,consumequeue,index 分别对应 CommitLogDispatcherBuildConsumeQueue 与 CommitlogDispatcherBuildIndex。

2.1 CommitLogDispatcherBuildConsumeQueue

核心处理方法:

public void putMessagePositionInfoWrapper(DispatchRequest request) {final int maxRetries = 30;boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();      // @1for (int i = 0; i < maxRetries && canWrite; i++) {long tagsCode = request.getTagsCode();if (isExtWriteEnable()) {ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();cqExtUnit.setFilterBitMap(request.getBitMap());cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());cqExtUnit.setTagsCode(request.getTagsCode());long extAddr = this.consumeQueueExt.put(cqExtUnit);if (isExtAddr(extAddr)) {tagsCode = extAddr;} else {log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,topic, queueId, request.getCommitLogOffset());}}boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());    // @2if (result) {this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());     // @3return;} else {// XXX: warn and notify melog.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()+ " failed, retry " + i + " times");try {Thread.sleep(1000);} catch (InterruptedException e) {log.warn("", e);}}}// XXX: warn and notify melog.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();

代码@1:判断 ConsumeQueue 是否可写。

代码@2:写入 consumequeue文件,真正的写入到 ConsumeQueue 逻辑如下。

Consumequeue#putMessagePositionInfoWrapper

Consumequeue#putMessagePositionInfoWrapper
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {if (offset <= this.maxPhysicOffset) {return true;}this.byteBufferIndex.flip();this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);    // 代码@1final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;   // @2MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile != null) {if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {    // @3this.minLogicOffset = expectLogicOffset;this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);this.fillPreBlank(mappedFile, expectLogicOffset);log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "+ mappedFile.getWrotePosition());}if (cqOffset != 0) {long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}this.maxPhysicOffset = offset;return mappedFile.appendMessage(this.byteBufferIndex.array());    // @4}return false;

首先说一下参数:

  • long offset
    commitlog偏移量,8字节。
  • int size
    消息体大小 4字节。
  • long tagsCode
    消息 tags 的 hashcode。
  • long cqOffset
    写入 consumequeue 的偏移量。

代码@1:首先将一条 ConsueQueue 条目总共20个字节,写入到 ByteBuffer 中。

代码@2:计算期望插入 ConsumeQueue 的 consumequeue 文件位置。

代码@3:如果文件是新建的,需要先填充空格。

代码@4:写入到 ConsumeQueue 文件中,整个过程都是基于 MappedFile 来操作的。

我们现在已经知道 ConsumeQueue 每一个条目都是 20个字节(8个字节commitlog偏移量+4字节消息长度+8字节tag的hashcode

那 consumqu e文件的路径,默认大小是多少呢?

默认路径为:rockemt_home/store/consume/ {topic} / {queryId},默认大小为,30W条记录,也就是30W * 20字节。

2.2 CommitLogDispatcherBuildIndex

其核心实现类 IndexService#buildIndex,存放 Index 文件的封装类为:IndexFile。

2.2.1 IndexFile 详解

2、2.1.1 核心属性

private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 每个 hash  槽所占的字节数
private static int hashSlotSize = 4;
// 每条indexFile条目占用字节数
private static int indexSize = 20; 
// 用来验证是否是一个有效的索引。
private static int invalidIndex = 0;
// index 文件中 hash 槽的总个数
private final int hashSlotNum;
// indexFile中包含的条目数
private final int indexNum; 
// 对应的映射文件
private final MappedFile mappedFile;
// 对应的文件通道
private final FileChannel fileChannel;
// 对应 PageCache
private final MappedByteBuffer mappedByteBuffer;
// IndexHeader,每一个indexfile的头部信息

IndexHeader 详解:

index存储路径:/rocket_home/store/index/年月日时分秒。

目前了解到这来,目光继续投向IndexService。

2.2.2 IndexService

2、2.2.1 核心属性与构造方法

private final DefaultMessageStore defaultMessageStore;private final int hashSlotNum;private final int indexNum;private final String storePath;private final ArrayList indexFileList = new ArrayList();private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();public IndexService(final DefaultMessageStore store) {this.defaultMessageStore = store;this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();this.storePath =StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
  • hashSlotNum
    hash槽数量,默认5百万个。
  • indexNum
    index条目个数,默认为 2千万个。
  • storePath
    index存储路径,默认为:/rocket_home/store/index。

相关内容

热门资讯

婚礼喝交杯酒主持词 婚礼喝交杯酒主持词范文(精选6篇)  主持词的写作要突出活动的主旨并贯穿始终。随着中国在不断地进步,...
婚宴的致辞 婚宴的致辞合集15篇  在平日的学习、工作和生活里,大家都尝试过写致辞吧,致辞具有有张有弛、错落有致...
运动会方阵解说词 运动会方阵解说词运动会方阵解说词运动会方阵解说词范文一:1、国旗解说词。女:四位旗手手执鲜艳的五星红...
功夫熊猫中的经典台词 功夫熊猫中的经典台词  1、一切都不是偶然。  2、何必躲呢,躲不过的。  3、着急的时候脑子也乱了...
《十全九美》的台词 《十全九美》的台词  1、艺人啊 要是不红那就是死 要是红了… 那是生不如死 !  2、南宫小姐被太...
中秋节晚会上的主持词 关于中秋节晚会上的主持词(精选5篇)  主持词需要富有情感,充满热情,才能有效地吸引到观众。在当下的...
民主生活会主持词 民主生活会主持词  一、主持词简介  由主持人于节目进行过程中串联节目的串联词。如今的各种演出活动和...
2022年央视春节联欢晚会主... 2022年央视春节联欢晚会主持词  借鉴诗词和散文诗是主持词的一种写作手法。在现今人们越来越重视活动...
少先队建队日主持词 少先队建队日主持词  什么是主持词  由主持人于节目进行过程中串联节目的串联词。如今的各种演出活动和...
晚会节目串词主持稿 晚会节目串词主持稿  在现在社会,很多地方都会使用到主持稿,通过主持稿的写作将主题贯穿于所有的节目之...
幼儿园开园揭牌剪彩仪式主持词 幼儿园开园揭牌剪彩仪式主持词  主持词要把握好吸引观众、导入主题、创设情境等环节以吸引观众。在一步步...
公司辞旧迎新晚会主持词串词   男:尊敬的各位领导、各位来宾,  女:亲爱的同事们  合:大家下午好!  男:光阴似箭,岁月如梭...
纯中式婚礼主持词 纯中式婚礼主持词(通用5篇)  主持词是主持人在台上表演的灵魂之所在。在现在的社会生活中,越来越多的...
悟空传的经典台词 悟空传的经典台词  1、我曾深爱过,我不在乎结局。  2、我知道天会愤怒,那,你知不知道,天也会颤抖...
最有创意的广告词(经典 最有创意的广告词(经典  01 钱不是问题,问题是没钱。  02 钻石恆久远,一颗就破產。  03 ...
毕业感谢致辞 关于毕业感谢致辞(精选15篇)  无论是在学校还是在社会中,大家都写过致辞吧,致辞的措词造句要考虑与...
年会嘉宾简短致辞 年会嘉宾简短致辞  在日复一日的学习、工作或生活中,大家总少不了要接触或使用致辞吧,致辞具有很强的实...
成长礼主持稿 成长礼主持稿(通用8篇)  在日常生活和工作中,需要使用主持稿的情况越来越多,主持稿是在晚会、联欢会...
电视剧《放羊的星星》经典台词 电视剧《放羊的星星》经典台词  在现实社会中,用到台词的地方越来越多,台词是一种特殊的,也是很难掌握...
抓周仪式主持词 抓周仪式主持词范文  主持词是主持人在台上表演的灵魂之所在。在如今这个中国,主持词是活动、集会等的必...