RocketMQ-整体信息
创始人
2024-06-03 10:06:07
0

源码版本号:版本号:4.9.4

部署架构

NameServer

它负责存储RocketMQ集群的元数据信息,例如某个Topic有多少个队列(MessageQueue),又分别在哪些Broker

定时任务每隔10秒去尝试清理一次已经120秒没有发送过心跳的Broker

Broker

存储数据的地方

定时任务每隔30秒向NameServer发送心跳,携带topic信息和更新心跳时间

Producer

生产者,每隔30秒向所有的Broker发送心跳信息

Consumer

消费者,每隔30秒向所有的Broker发送心跳信息,主要包括当前消费者订阅了哪些topic信息

每个消费者就可以从Broker拿到消费某个Topic的所有实例,每个消费者就可以通过具体的算法计算出自己应该消费哪些队列

某个Topic下的某个队列,在同一个消费者组内,只允许一个消费者对其进行消费,可以保证同一个队列的消息能够被顺序消费

MQClientInstance

每个生产者和消费者都有一个 MQClientInstance 实例

生产者和消费者启动时通过调用 MQClientManager.getInstance().getOrCreateMQClientInstance获取

重要字段分析

public class MQClientInstance {/*** 生产者启动时注册进来, key 为 groupName* 如果生产者groupName相同, 则会报错, 生产者实例里面会保存一份发送过消息的topic的队列信息*/private final ConcurrentMap producerTable = new ConcurrentHashMap();/*** 消费者启动时注册进来, key 为 groupName* 保存消费者信息, 消费者实例里面能知道需要订阅哪些topic信息*/private final ConcurrentMap consumerTable = new ConcurrentHashMap();/*** 保存所有的broker信息, 给Broker发送心跳就是通过这个拿到Broker地址* 有定时任务定时去查询Broker地址是否在topicRouteTable中, 如果不在则需要去除* 生产者发送消息时, 通过brokerName和brokerId=0来查找对应的brokerAddress*/private final ConcurrentMap> brokerAddrTable =new ConcurrentHashMap>();/*** 所有的topic元数据信息, TopicRouteData就是从NameServer拉回来的数据* TopicRouteData里面会有Topic的队列信息[brokerName、readQueueNums、writeQueueNums]* 和Broker信息[cluster、brokerName、brokerAddrs(key=brokerId, value=broker address)]*/private final ConcurrentMap topicRouteTable = new ConcurrentHashMap();
}

启动

生产者和消费者启动的时候都会调用 MQClientInstance 的启动方法

代码入口: MQClientInstance#start

public class MQClientInstance {// 实例状态private ServiceState serviceState = ServiceState.CREATE_JUST;/*** 找到225行*/public void start() throws MQClientException {synchronized (this) {// 启动前 serviceState = ServiceState.CREATE_JUSTswitch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// 里面会启动一个Netty服务, 用来进行通信服务this.mQClientAPIImpl.start();/*** 开启各种定时任务*/this.startScheduledTask();/*** 开启消费者的消息拉取任务* 拉取消息的任务被放在一个队列里面, 消息队列负载的时候往里面放任务* 查看PullMessageService#run方法, 从队列里面拿拉取消息的任务* 拿到任务后会交给具体的消费者去broker拉取消息* 最终负责拉取消息的代码在DefaultMQPushConsumerImpl#pullMessage方法中*/this.pullMessageService.start();/*** 开启消息队列负载* 查看RebalanceService#run方法* 可以发现默认每隔20s就会执行MQClientInstance#doRebalance方法* 然后调用每个消费者的DefaultMQPushConsumerImpl#doRebalance方法*/this.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}
}

定时任务分析

MQClientInstance会启动多个定时任务

具体代码在这个方法里
MQClientInstance.startScheduledTask

public class MQClientInstance {/*** 找到256行*/private void startScheduledTask() {// 省略部分代码, 只贴上主要代码/*** 每隔30s执行一次* 从NameServer中获取最新的topic信息, 将最新的topic信息更新到每个生产者和消费者中* 1.遍历所有的生产者和消费者, 拿到所有的topic* 2.一个一个地去NameServer查询, 然后更新生产者和消费者里面的topic信息(topic对应的队列列表)* 消费者: 能够知道是否存在新增或者减少消费者实例, 消费者就能重新分配拉取消息的队列* 生产者: 如果存在Broker不可用, 这个时候生产者就不会往不可用Broker的队列里发送消息*/MQClientInstance.this.updateTopicRouteInfoFromNameServer();/*** 每隔30s执行一次* 1.清理掉下线的Broker* 2.给每个Broker发送心跳信息*   这样Broker就能知道有哪些生产者、哪些消费者消费哪些topic*/MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();/*** 每个10s执行一次* 更新消费者的消费进度*/MQClientInstance.this.persistAllConsumerOffset();}
}

相关内容

热门资讯

感受大海作文 感受大海作文  在日复一日的学习、工作或生活中,大家都写过作文吧,根据写作命题的特点,作文可以分为命...
天使的黑白羽翼600字初中作... 天使的黑白羽翼600字初中作文  天空中,我张开了翅膀,由丑小鸭蜕变为白天鹅,而我的天使们还在原地,...
狗说明文作文500字 狗说明文作文500字(精选30篇)  在现实生活或工作学习中,大家或多或少都会接触过作文吧,作文是人...
暖优秀作文400字 暖优秀作文400字7篇  在日常学习、工作或生活中,大家或多或少都会接触过作文吧,根据写作命题的特点...
静作文150字 静作文150字第1篇静作文150字需要安静的空间,听自然 关掉电脑的音像,静静的看文章 尽管如此,马...
我不再胆小了作文350字 我不再胆小了作文350字  我是个胆小的女孩,从早到晚总是怕那些妖魔鬼怪出来一口吞了我。  记得一天...
生活需要宽容作文 生活需要宽容作文通用15篇  在日复一日的学习、工作或生活中,大家都写过作文吧,作文是由文字组成,经...
创新的6则历史故事 关于创新的6则历史故事  创新是现代工作中经常提到的一个新的理念,也是与世界接近的一个链子,我们应该...
成长,离别作文 成长,离别作文  在日常学习、工作抑或是生活中,大家一定都接触过作文吧,借助作文可以提高我们的语言组...
家长感谢老师的话 家长感谢老师的话  教师,以教育为生的职业。这个职业是人类社会最古老的职业之一。下面是小编帮大家整理...
新年的好句 关于新年的好句汇总  不知不觉新年又快到了,每年到了这个时候大家一定都很喜欢想家人和朋友发送一些新年...
夏天来了作文 夏天来了作文600字(精选10篇)  在学习、工作或生活中,大家一定都接触过作文吧,作文是人们以书面...
菊花作文 菊花作文  秋天到了,公园里的菊花绽开了美丽的笑容,红的像火,黄的像金,白的像雪……在绿叶的衬托下显...
放风筝作文400字 校庆作文推荐度:游泳作文推荐度:依靠作文推荐度:桂花作文推荐度:感激作文推荐度:相关推荐
桂林山水作文 桂林山水作文  人们都说:“桂林山水甲天下。”今天,我慕名前来,观赏山清水秀、青山碧水的美丽城市——...
感恩母爱作文400字 【精品】感恩母爱作文400字3篇  在生活、工作和学习中,大家都不可避免地要接触到作文吧,写作文可以...
缩写故事作文 缩写故事作文(精选39篇)  在日常学习、工作抑或是生活中,大家都写过作文吧,作文是人们以书面形式表...
谢谢你妈妈作文400字 谢谢你妈妈作文400字第1篇谢谢你妈妈作文400字  谢谢你,妈妈    六一班高汉青     在这...
描写夏天的四字词语 描写夏天的四字词语描写夏天的四字词语1  每年夏天的夜晚一到,我总会拿把小椅子搬到河边的树下乘凉,今...
乡情作文 关于乡情作文15篇  在平时的学习、工作或生活中,大家都不可避免地要接触到作文吧,根据写作命题的特点...