RocketMQ-整体信息
创始人
2024-06-03 10:06:07
0

源码版本号:版本号:4.9.4

部署架构

NameServer

它负责存储RocketMQ集群的元数据信息,例如某个Topic有多少个队列(MessageQueue),又分别在哪些Broker

定时任务每隔10秒去尝试清理一次已经120秒没有发送过心跳的Broker

Broker

存储数据的地方

定时任务每隔30秒向NameServer发送心跳,携带topic信息和更新心跳时间

Producer

生产者,每隔30秒向所有的Broker发送心跳信息

Consumer

消费者,每隔30秒向所有的Broker发送心跳信息,主要包括当前消费者订阅了哪些topic信息

每个消费者就可以从Broker拿到消费某个Topic的所有实例,每个消费者就可以通过具体的算法计算出自己应该消费哪些队列

某个Topic下的某个队列,在同一个消费者组内,只允许一个消费者对其进行消费,可以保证同一个队列的消息能够被顺序消费

MQClientInstance

每个生产者和消费者都有一个 MQClientInstance 实例

生产者和消费者启动时通过调用 MQClientManager.getInstance().getOrCreateMQClientInstance获取

重要字段分析

public class MQClientInstance {/*** 生产者启动时注册进来, key 为 groupName* 如果生产者groupName相同, 则会报错, 生产者实例里面会保存一份发送过消息的topic的队列信息*/private final ConcurrentMap producerTable = new ConcurrentHashMap();/*** 消费者启动时注册进来, key 为 groupName* 保存消费者信息, 消费者实例里面能知道需要订阅哪些topic信息*/private final ConcurrentMap consumerTable = new ConcurrentHashMap();/*** 保存所有的broker信息, 给Broker发送心跳就是通过这个拿到Broker地址* 有定时任务定时去查询Broker地址是否在topicRouteTable中, 如果不在则需要去除* 生产者发送消息时, 通过brokerName和brokerId=0来查找对应的brokerAddress*/private final ConcurrentMap> brokerAddrTable =new ConcurrentHashMap>();/*** 所有的topic元数据信息, TopicRouteData就是从NameServer拉回来的数据* TopicRouteData里面会有Topic的队列信息[brokerName、readQueueNums、writeQueueNums]* 和Broker信息[cluster、brokerName、brokerAddrs(key=brokerId, value=broker address)]*/private final ConcurrentMap topicRouteTable = new ConcurrentHashMap();
}

启动

生产者和消费者启动的时候都会调用 MQClientInstance 的启动方法

代码入口: MQClientInstance#start

public class MQClientInstance {// 实例状态private ServiceState serviceState = ServiceState.CREATE_JUST;/*** 找到225行*/public void start() throws MQClientException {synchronized (this) {// 启动前 serviceState = ServiceState.CREATE_JUSTswitch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// 里面会启动一个Netty服务, 用来进行通信服务this.mQClientAPIImpl.start();/*** 开启各种定时任务*/this.startScheduledTask();/*** 开启消费者的消息拉取任务* 拉取消息的任务被放在一个队列里面, 消息队列负载的时候往里面放任务* 查看PullMessageService#run方法, 从队列里面拿拉取消息的任务* 拿到任务后会交给具体的消费者去broker拉取消息* 最终负责拉取消息的代码在DefaultMQPushConsumerImpl#pullMessage方法中*/this.pullMessageService.start();/*** 开启消息队列负载* 查看RebalanceService#run方法* 可以发现默认每隔20s就会执行MQClientInstance#doRebalance方法* 然后调用每个消费者的DefaultMQPushConsumerImpl#doRebalance方法*/this.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}
}

定时任务分析

MQClientInstance会启动多个定时任务

具体代码在这个方法里
MQClientInstance.startScheduledTask

public class MQClientInstance {/*** 找到256行*/private void startScheduledTask() {// 省略部分代码, 只贴上主要代码/*** 每隔30s执行一次* 从NameServer中获取最新的topic信息, 将最新的topic信息更新到每个生产者和消费者中* 1.遍历所有的生产者和消费者, 拿到所有的topic* 2.一个一个地去NameServer查询, 然后更新生产者和消费者里面的topic信息(topic对应的队列列表)* 消费者: 能够知道是否存在新增或者减少消费者实例, 消费者就能重新分配拉取消息的队列* 生产者: 如果存在Broker不可用, 这个时候生产者就不会往不可用Broker的队列里发送消息*/MQClientInstance.this.updateTopicRouteInfoFromNameServer();/*** 每隔30s执行一次* 1.清理掉下线的Broker* 2.给每个Broker发送心跳信息*   这样Broker就能知道有哪些生产者、哪些消费者消费哪些topic*/MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();/*** 每个10s执行一次* 更新消费者的消费进度*/MQClientInstance.this.persistAllConsumerOffset();}
}

相关内容

热门资讯

美剧经典台词 美剧精选经典台词  在快速变化和不断变革的今天,能够利用到台词的场合越来越多,台词是一种特殊的,也是...
朗诵会主持词 关于朗诵会主持词4篇  主持词要根据活动对象的不同去设置不同的主持词。在当下这个社会中,很多场合都需...
记者节晚会主持词 记者节晚会主持词  主持词是主持人在台上表演的灵魂之所在。随着社会一步步向前发展,主持词的实用频率越...
婚礼父亲致辞 婚礼父亲致辞(精选15篇)  在平凡的学习、工作、生活中,大家肯定对各类致辞都很熟悉吧,致辞具有“礼...
校园红歌赛的主持词 校园红歌赛的主持词  主持词是主持人在节目进行过程中用于串联节目的串联词。在现今人们越来越重视活动氛...
开业主持词开场白 开业主持词开场白  根据活动对象的不同,需要设置不同的主持词。在当今社会生活中,活动集会越来越多,主...
关于唱歌比赛主持词   主持词是指主持人在主持节目的过程中进行节目串联的串联词,一般由开场白、中间部分与结束语组成。以下...
动漫感人台词 动漫感人台词(通用175句)  台词可以刻画人物的性格,表现人物的感情,加强剧情的表现力。那些广为流...
最新年会主持词 最新年会主持词(精选11篇)  契合现场环境的主持词能给集会带来双倍的效果。在如今这个时代,主持人的...
新生文艺汇演主持词 新生文艺汇演主持词  主持词要根据活动对象的不同去设置不同的主持词。在当今社会生活中,各种集会的节目...
家长代表幼儿园毕业典礼主持词 家长代表幼儿园毕业典礼主持词  主持词是各种演出活动和集会中主持人串联节目的串联词。在人们积极参与各...
学校元旦晚会主持词开场白和结... 学校元旦晚会主持词开场白和结束语  2017年元旦晚会主持词怎么写?怎么开场比较好呢?结束语又该怎么...
毕业晚会致辞 毕业晚会致辞(精选18篇)  在学习、工作或生活中,大家都写过致辞吧,致辞要求风格的雅、俗、庄、谐要...
幼儿园六一节目串词 幼儿园六一节目串词红黄蓝幼第一文库网儿园节目串词主持人(师):亲爱的家长朋友们( ):敬爱的老师们(...
祝寿主持词 祝寿主持词  主持词要尽量增加文化内涵、寓教于乐,不断提高观众的文化知识和素养。在人们积极参与各种活...
回门宴主持词 让你的回门宴顺... 回门宴主持词 让你的回门宴顺利完成  篇一:新婚回门宴主持词  亲爱的各位来宾,各位亲朋好友,先生们...
结婚新郎致辞 结婚新郎致辞(15篇)  在学习、工作、生活中,说到致辞,大家肯定都不陌生吧,致辞具有能伸能缩,可以...
庆中秋迎国庆联欢晚会主持词(...   (念8条短信)  男:我们的驻外营销健儿发来的每一条祝福都是那么的感人。由于时间关系,我们不能一...
重庆森林经典台词 重庆森林经典台词  《重庆森林》由两个基本不相干的故事构成。两个故事之间的关系,就像擦身而过的金城武...
歌曲奔跑主持词串词 歌曲奔跑主持词串词范文  借鉴诗词和散文诗是主持词的一种写作手法。在如今这个中国,主持成为很多活动不...