RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-上篇
创始人
2024-05-28 13:31:56
0

代码@1,根据 offset 从 commitlog 找到一条消息,如果找不到,退出此次循环,doReput方法跳出,此处从 commitlog 文件中取出消息的逻辑,在下文会重点分析,故在此暂时跳过。

先浏览一下 SelectMappedBufferResult

代码@2:尝试构建转发请求对象 DispatchRequest ,我大概浏览了一下 commitLog.checkMessageAndReturnSize,主要是从Nio ByteBuffer中,根据 commitlog 消息存储格式,解析出消息的核心属性:

// 消息主题
private final String topic; 
// 消息队列
private final int queueId; 
// commitlog中的偏移量
private final long commitLogOffset;
// 消息大小
private final int msgSize; // tagsCode
private final long tagsCode;
// 消息存储时间
private final long storeTimestamp; 
//消息在消费队列的offset
private final long consumeQueueOffset; 
// 存放在消息属性中的keys: PROPERTY_KEYS = "KEYS"
private final String keys; 
// 是否成功
private final boolean success; 
// 消息唯一键 "UNIQ_KEY"
private final String uniqKey; 
// 系统标志
private final int sysFlag;
// 事务pre消息偏移量
private final long preparedTransactionOffset; 
// 属性
private final Map propertiesMap; 

代码@3:转发DistpachRequest。

根据实现类,consumequeue,index 分别对应 CommitLogDispatcherBuildConsumeQueue 与 CommitlogDispatcherBuildIndex。

2.1 CommitLogDispatcherBuildConsumeQueue

核心处理方法:

public void putMessagePositionInfoWrapper(DispatchRequest request) {final int maxRetries = 30;boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();      // @1for (int i = 0; i < maxRetries && canWrite; i++) {long tagsCode = request.getTagsCode();if (isExtWriteEnable()) {ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();cqExtUnit.setFilterBitMap(request.getBitMap());cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());cqExtUnit.setTagsCode(request.getTagsCode());long extAddr = this.consumeQueueExt.put(cqExtUnit);if (isExtAddr(extAddr)) {tagsCode = extAddr;} else {log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,topic, queueId, request.getCommitLogOffset());}}boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());    // @2if (result) {this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());     // @3return;} else {// XXX: warn and notify melog.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()+ " failed, retry " + i + " times");try {Thread.sleep(1000);} catch (InterruptedException e) {log.warn("", e);}}}// XXX: warn and notify melog.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();

代码@1:判断 ConsumeQueue 是否可写。

代码@2:写入 consumequeue文件,真正的写入到 ConsumeQueue 逻辑如下。

Consumequeue#putMessagePositionInfoWrapper

Consumequeue#putMessagePositionInfoWrapper
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {if (offset <= this.maxPhysicOffset) {return true;}this.byteBufferIndex.flip();this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);    // 代码@1final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;   // @2MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile != null) {if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {    // @3this.minLogicOffset = expectLogicOffset;this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);this.fillPreBlank(mappedFile, expectLogicOffset);log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "+ mappedFile.getWrotePosition());}if (cqOffset != 0) {long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}this.maxPhysicOffset = offset;return mappedFile.appendMessage(this.byteBufferIndex.array());    // @4}return false;

首先说一下参数:

  • long offset
    commitlog偏移量,8字节。
  • int size
    消息体大小 4字节。
  • long tagsCode
    消息 tags 的 hashcode。
  • long cqOffset
    写入 consumequeue 的偏移量。

代码@1:首先将一条 ConsueQueue 条目总共20个字节,写入到 ByteBuffer 中。

代码@2:计算期望插入 ConsumeQueue 的 consumequeue 文件位置。

代码@3:如果文件是新建的,需要先填充空格。

代码@4:写入到 ConsumeQueue 文件中,整个过程都是基于 MappedFile 来操作的。

我们现在已经知道 ConsumeQueue 每一个条目都是 20个字节(8个字节commitlog偏移量+4字节消息长度+8字节tag的hashcode

那 consumqu e文件的路径,默认大小是多少呢?

默认路径为:rockemt_home/store/consume/ {topic} / {queryId},默认大小为,30W条记录,也就是30W * 20字节。

2.2 CommitLogDispatcherBuildIndex

其核心实现类 IndexService#buildIndex,存放 Index 文件的封装类为:IndexFile。

2.2.1 IndexFile 详解

2、2.1.1 核心属性

private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 每个 hash  槽所占的字节数
private static int hashSlotSize = 4;
// 每条indexFile条目占用字节数
private static int indexSize = 20; 
// 用来验证是否是一个有效的索引。
private static int invalidIndex = 0;
// index 文件中 hash 槽的总个数
private final int hashSlotNum;
// indexFile中包含的条目数
private final int indexNum; 
// 对应的映射文件
private final MappedFile mappedFile;
// 对应的文件通道
private final FileChannel fileChannel;
// 对应 PageCache
private final MappedByteBuffer mappedByteBuffer;
// IndexHeader,每一个indexfile的头部信息

IndexHeader 详解:

index存储路径:/rocket_home/store/index/年月日时分秒。

目前了解到这来,目光继续投向IndexService。

2.2.2 IndexService

2、2.2.1 核心属性与构造方法

private final DefaultMessageStore defaultMessageStore;private final int hashSlotNum;private final int indexNum;private final String storePath;private final ArrayList indexFileList = new ArrayList();private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();public IndexService(final DefaultMessageStore store) {this.defaultMessageStore = store;this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();this.storePath =StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
  • hashSlotNum
    hash槽数量,默认5百万个。
  • indexNum
    index条目个数,默认为 2千万个。
  • storePath
    index存储路径,默认为:/rocket_home/store/index。

相关内容

热门资讯

常用商务英语口语   商务英语是以适应职场生活的语言要求为目的,内容涉及到商务活动的方方面面。下面是小编收集的常用商务...
六年级上册英语第一单元练习题   一、根据要求写单词。  1.dry(反义词)__________________  2.writ...
复活节英文怎么说 复活节英文怎么说?复活节的英语翻译是什么?复活节:Easter;"Easter,anniversar...
2008年北京奥运会主题曲 2008年北京奥运会(第29届夏季奥林匹克运动会),2008年8月8日到2008年8月24日在中华人...
英语道歉信 英语道歉信15篇  在日常生活中,道歉信的使用频率越来越高,通过道歉信,我们可以更好地解释事情发生的...
六年级英语专题训练(连词成句... 六年级英语专题训练(连词成句30题)  1. have,playhouse,many,I,toy,i...
上班迟到情况说明英语   每个人都或多或少的迟到过那么几次,因为各种原因,可能生病,可能因为交通堵车,可能是因为天气冷,有...
小学英语教学论文 小学英语教学论文范文  引导语:英语教育一直都是每个家长所器重的,那么有关小学英语教学论文要怎么写呢...
英语口语学习必看的方法技巧 英语口语学习必看的方法技巧如何才能说流利的英语? 说外语时,我们主要应做到四件事:理解、回答、提问、...
四级英语作文选:Birth ... 四级英语作文范文选:Birth controlSince the Chinese Governmen...
金融专业英语面试自我介绍 金融专业英语面试自我介绍3篇  金融专业的学生面试时,面试官要求用英语做自我介绍该怎么说。下面是小编...
我的李老师走了四年级英语日记... 我的李老师走了四年级英语日记带翻译  我上了五个学期的小学却换了六任老师,李老师是带我们班最长的语文...
小学三年级英语日记带翻译捡玉... 小学三年级英语日记带翻译捡玉米  今天,我和妈妈去外婆家,外婆家有刚剥的`玉米棒上带有玉米籽,好大的...
七年级英语优秀教学设计 七年级英语优秀教学设计  作为一位兢兢业业的人民教师,常常要写一份优秀的教学设计,教学设计是把教学原...
我的英语老师作文 我的英语老师作文(通用21篇)  在日常生活或是工作学习中,大家都有写作文的经历,对作文很是熟悉吧,...
英语老师教学经验总结 英语老师教学经验总结(通用19篇)  总结是指社会团体、企业单位和个人对某一阶段的学习、工作或其完成...
初一英语暑假作业答案 初一英语暑假作业答案  英语练习一(基础训练)第一题1.D2.H3.E4.F5.I6.A7.J8.C...
大学生的英语演讲稿 大学生的英语演讲稿范文(精选10篇)  使用正确的写作思路书写演讲稿会更加事半功倍。在现实社会中,越...
VOA美国之音英语学习网址 VOA美国之音英语学习推荐网址 美国之音网站已经成为语言学习最重要的资源站点,在互联网上还有若干网站...
商务英语期末试卷 Part I Term Translation (20%)Section A: Translate ...