RocketMQ重试机制一次深入理解
创始人
2024-05-11 18:45:11
0

背景

我们知道Consumer拉取消息、消费消息时分开的,分别由两个类去实现:
拉取消息:PullMessageService
消费消息:ConsumeMessageConcurrentlyService

消息消费流程

1.如果我们拉取到消息,准备提交ConsumeMessageConcurrentlyService进行消费时,会触发调用该代码块:

// ConsumeMessageConcurrentlyService 
public void submitConsumeRequest(final List msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {// 消息封装到里面ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 丢线程池消费this.consumeExecutor.submit(consumeRequest);}}
}

2.ConsumeRequest代码片段:

@Override
public void run() {// 1、Consumer 中设计的回调方法MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {// 2、回调 Consumer 中的监听回调方法status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {hasException = true;}// 3、如果status 返回null,设置为 RECONSUME_LATER 类型if (null == status) {status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 4、对返回的 status 结果进行处理ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}

3.根据返回的status判断是否需要进行重试:

public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest
) {int ackIndex = context.getAckIndex();switch (status) {// 1、消费成功case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}break;// 2、消费延迟case RECONSUME_LATER:ackIndex = -1;break;default:break;}// 3、针对不同的消息模式做不同的处理switch (this.defaultMQPushConsumer.getMessageModel()) {// 4、广播模式:如果消费是爱 ackIndex 为-1就会执行循环,可以看到只是打印日志,没有其它多余的操作case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;// 5、集群模式case CLUSTERING:List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());// 6、RECONSUME_LATER 时,ackIndex 为-1,执行循环。CONSUME_SUCCESS 时不会执行循环for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// 7、能到这里说明是 RECONSUME_LATER 状态:回退Msg到Broker,也就是ACK(重试)boolean result = this.sendMessageBack(msg, context);// 8、ACK 可能会失败,需要记录失败的ACKif (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);// 9、存在ACK 失败的消息,将消息丢到线程池延迟 5s 重新消费this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 10、更新消费的偏移量:注意这里 CONSUME_SUCCESS 和 RECONSUME_LATER 都会更新long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
}

①从上面简单的几段代码片段中可以看出:广播模式就算消费失败,也不会进行重试,只是打个日志进行告警。
②只有消费失败的消息才会发送ACK重试。
③如果ACK失败,就算是重试失败吧。
④消息消费成功、失败,都会更新Consumer偏移量。
4.ConsumeMessageConcurrentlyService.sendMessageBack:准备请求Broker

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {// 1、注意这里:默认为0,其实一直都是0,其它地方没有修改。这表示RocketMQ延迟消息的 延迟级别int delayLevel = context.getDelayLevelWhenNextConsume();try {// 2、发送给Brokerthis.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}

RocketMQ延迟级别分为18级,delayLevel从1-18,每个数字都对应一个延迟的时间:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker端对重试的处理

这块的代码需要深入的理解Broker源码才可以了解透彻,本人也是简单看了一下没有深入的钻研。主要总结了一下做了以下几件事:
①更消息的 Topic 为 “%RETRY%”+ group,计算queueId(重试队列,队列数为1)
②如果消息重试 >= 16次(默认)。继续更改消息的Topic 为死信队列的Topic:“%DLQ%” + group,消费队列为1(死信队列只有一个消费队列)
③如果没有变成死信,计算消息的延迟级别
④复制原来Msg,重新生成一个Msg,将新Msg丢给BrokerController中,然后存到CommitLog中进行存储(什么?你不知道什么是CommitLog? 下期写一篇RocketMQ内部存储结构)

  • 新的Msg 会有新的messageId。
  • 非死信:该消息以新的Topic名:“%RETRY%”+ group 存到CommitLog中作为延迟消息。
  • 死信:以"%DLQ%" + group为Topic名,存到CommitLog中:存到死信队列中的消息不会被Consumer消费了。

什么是死信队列(DLQ队列)

  • Broker中单独的一个队列(DLQ),该队列存储了Consumer端重试16次后都没成功消费的消息
  • 该队列:只有写权限,没有读权限。所以是不能被Consumer重新消费的,只能进行人工干预,重新投递(Rocket-MQ-Console 中可以操作)
  • DLQ队列中,该消息的TOPIC 重新被命名为: “%DLQ%” + groupName
  • DLQ队列其实就是(consumequeue文件夹的"%DLQ%" + groupName 命名的Topic文件夹下的队列)

重试消息延时机制

重试消息发到Broker后,被作为一个新的延迟消息存到了CommitLog中,当该消息到了消费时间点是会被Consumer重新消费的。消息重试16次才会被丢到 死信队列中,才不会被消费了。那其余15次消息每次延迟是延迟多久呢?
我们在上面的源码其实可以看得出:消息的延迟级别是受重试次数(reconsumeTimes)影响的。重试次数越大,延迟越久。

最后

本次整理知识本人的一点小小的理解,如有错误,请各位看到的大佬指出以免误人子弟。

相关内容

热门资讯

常用商务英语口语   商务英语是以适应职场生活的语言要求为目的,内容涉及到商务活动的方方面面。下面是小编收集的常用商务...
六年级上册英语第一单元练习题   一、根据要求写单词。  1.dry(反义词)__________________  2.writ...
复活节英文怎么说 复活节英文怎么说?复活节的英语翻译是什么?复活节:Easter;"Easter,anniversar...
2008年北京奥运会主题曲 2008年北京奥运会(第29届夏季奥林匹克运动会),2008年8月8日到2008年8月24日在中华人...
英语道歉信 英语道歉信15篇  在日常生活中,道歉信的使用频率越来越高,通过道歉信,我们可以更好地解释事情发生的...
六年级英语专题训练(连词成句... 六年级英语专题训练(连词成句30题)  1. have,playhouse,many,I,toy,i...
上班迟到情况说明英语   每个人都或多或少的迟到过那么几次,因为各种原因,可能生病,可能因为交通堵车,可能是因为天气冷,有...
小学英语教学论文 小学英语教学论文范文  引导语:英语教育一直都是每个家长所器重的,那么有关小学英语教学论文要怎么写呢...
英语口语学习必看的方法技巧 英语口语学习必看的方法技巧如何才能说流利的英语? 说外语时,我们主要应做到四件事:理解、回答、提问、...
四级英语作文选:Birth ... 四级英语作文范文选:Birth controlSince the Chinese Governmen...
金融专业英语面试自我介绍 金融专业英语面试自我介绍3篇  金融专业的学生面试时,面试官要求用英语做自我介绍该怎么说。下面是小编...
我的李老师走了四年级英语日记... 我的李老师走了四年级英语日记带翻译  我上了五个学期的小学却换了六任老师,李老师是带我们班最长的语文...
小学三年级英语日记带翻译捡玉... 小学三年级英语日记带翻译捡玉米  今天,我和妈妈去外婆家,外婆家有刚剥的`玉米棒上带有玉米籽,好大的...
七年级英语优秀教学设计 七年级英语优秀教学设计  作为一位兢兢业业的人民教师,常常要写一份优秀的教学设计,教学设计是把教学原...
我的英语老师作文 我的英语老师作文(通用21篇)  在日常生活或是工作学习中,大家都有写作文的经历,对作文很是熟悉吧,...
英语老师教学经验总结 英语老师教学经验总结(通用19篇)  总结是指社会团体、企业单位和个人对某一阶段的学习、工作或其完成...
初一英语暑假作业答案 初一英语暑假作业答案  英语练习一(基础训练)第一题1.D2.H3.E4.F5.I6.A7.J8.C...
大学生的英语演讲稿 大学生的英语演讲稿范文(精选10篇)  使用正确的写作思路书写演讲稿会更加事半功倍。在现实社会中,越...
VOA美国之音英语学习网址 VOA美国之音英语学习推荐网址 美国之音网站已经成为语言学习最重要的资源站点,在互联网上还有若干网站...
商务英语期末试卷 Part I Term Translation (20%)Section A: Translate ...