RocketMQ重试机制一次深入理解
创始人
2024-05-11 18:45:11
0

背景

我们知道Consumer拉取消息、消费消息时分开的,分别由两个类去实现:
拉取消息:PullMessageService
消费消息:ConsumeMessageConcurrentlyService

消息消费流程

1.如果我们拉取到消息,准备提交ConsumeMessageConcurrentlyService进行消费时,会触发调用该代码块:

// ConsumeMessageConcurrentlyService 
public void submitConsumeRequest(final List msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {// 消息封装到里面ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 丢线程池消费this.consumeExecutor.submit(consumeRequest);}}
}

2.ConsumeRequest代码片段:

@Override
public void run() {// 1、Consumer 中设计的回调方法MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {// 2、回调 Consumer 中的监听回调方法status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {hasException = true;}// 3、如果status 返回null,设置为 RECONSUME_LATER 类型if (null == status) {status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 4、对返回的 status 结果进行处理ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}

3.根据返回的status判断是否需要进行重试:

public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest
) {int ackIndex = context.getAckIndex();switch (status) {// 1、消费成功case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}break;// 2、消费延迟case RECONSUME_LATER:ackIndex = -1;break;default:break;}// 3、针对不同的消息模式做不同的处理switch (this.defaultMQPushConsumer.getMessageModel()) {// 4、广播模式:如果消费是爱 ackIndex 为-1就会执行循环,可以看到只是打印日志,没有其它多余的操作case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;// 5、集群模式case CLUSTERING:List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());// 6、RECONSUME_LATER 时,ackIndex 为-1,执行循环。CONSUME_SUCCESS 时不会执行循环for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// 7、能到这里说明是 RECONSUME_LATER 状态:回退Msg到Broker,也就是ACK(重试)boolean result = this.sendMessageBack(msg, context);// 8、ACK 可能会失败,需要记录失败的ACKif (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);// 9、存在ACK 失败的消息,将消息丢到线程池延迟 5s 重新消费this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 10、更新消费的偏移量:注意这里 CONSUME_SUCCESS 和 RECONSUME_LATER 都会更新long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
}

①从上面简单的几段代码片段中可以看出:广播模式就算消费失败,也不会进行重试,只是打个日志进行告警。
②只有消费失败的消息才会发送ACK重试。
③如果ACK失败,就算是重试失败吧。
④消息消费成功、失败,都会更新Consumer偏移量。
4.ConsumeMessageConcurrentlyService.sendMessageBack:准备请求Broker

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {// 1、注意这里:默认为0,其实一直都是0,其它地方没有修改。这表示RocketMQ延迟消息的 延迟级别int delayLevel = context.getDelayLevelWhenNextConsume();try {// 2、发送给Brokerthis.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}

RocketMQ延迟级别分为18级,delayLevel从1-18,每个数字都对应一个延迟的时间:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker端对重试的处理

这块的代码需要深入的理解Broker源码才可以了解透彻,本人也是简单看了一下没有深入的钻研。主要总结了一下做了以下几件事:
①更消息的 Topic 为 “%RETRY%”+ group,计算queueId(重试队列,队列数为1)
②如果消息重试 >= 16次(默认)。继续更改消息的Topic 为死信队列的Topic:“%DLQ%” + group,消费队列为1(死信队列只有一个消费队列)
③如果没有变成死信,计算消息的延迟级别
④复制原来Msg,重新生成一个Msg,将新Msg丢给BrokerController中,然后存到CommitLog中进行存储(什么?你不知道什么是CommitLog? 下期写一篇RocketMQ内部存储结构)

  • 新的Msg 会有新的messageId。
  • 非死信:该消息以新的Topic名:“%RETRY%”+ group 存到CommitLog中作为延迟消息。
  • 死信:以"%DLQ%" + group为Topic名,存到CommitLog中:存到死信队列中的消息不会被Consumer消费了。

什么是死信队列(DLQ队列)

  • Broker中单独的一个队列(DLQ),该队列存储了Consumer端重试16次后都没成功消费的消息
  • 该队列:只有写权限,没有读权限。所以是不能被Consumer重新消费的,只能进行人工干预,重新投递(Rocket-MQ-Console 中可以操作)
  • DLQ队列中,该消息的TOPIC 重新被命名为: “%DLQ%” + groupName
  • DLQ队列其实就是(consumequeue文件夹的"%DLQ%" + groupName 命名的Topic文件夹下的队列)

重试消息延时机制

重试消息发到Broker后,被作为一个新的延迟消息存到了CommitLog中,当该消息到了消费时间点是会被Consumer重新消费的。消息重试16次才会被丢到 死信队列中,才不会被消费了。那其余15次消息每次延迟是延迟多久呢?
我们在上面的源码其实可以看得出:消息的延迟级别是受重试次数(reconsumeTimes)影响的。重试次数越大,延迟越久。

最后

本次整理知识本人的一点小小的理解,如有错误,请各位看到的大佬指出以免误人子弟。

相关内容

热门资讯

英雄联盟经典台词 英雄联盟经典台词  英雄联盟经典台词  1、正义,要么靠法律,要么靠武力!  2、你迷失在黑暗之中,...
小学元旦节的主持词 小学元旦节的主持词(精选16篇)  主持词是主持人在台上表演的灵魂之所在。在当今不断发展的世界,各种...
婚纱走秀主持词 婚纱走秀主持词三篇  篇一:婚纱走秀演出主持词  当您披上洁白的婚纱,点亮您一生中最美丽的日子,您是...
医者仁心台词 医者仁心台词大全  1. 钟立行对丁祖望:我们都在努力做一个能够被人怀念的人。  2.罗雪樱旁白:从...
《美丽人生》的经典台词 《美丽人生》的经典台词  意大利电影《美丽人生》,由罗伯托贝尼尼自编自演,讲述了意大利一对犹太父子被...
二年级主持词 二年级主持词  主持词分为会议主持词、晚会主持词、活动主持词、婚庆主持词等。在一步步向前发展的社会中...
年会的主持词 年会的主持词范文(通用5篇)  根据活动对象的不同,需要设置不同的主持词。时代不断在进步,主持词是活...
姨妈的后现代生活经典台词分享 姨妈的后现代生活经典台词分享  吉日良辰当欢笑,为什么鲛珠化泪抛?此时却又明白了,世上何尝尽富豪。也...
学校语文教研活动主持词 学校语文教研活动主持词  借鉴诗词和散文诗是主持词的一种写作手法。在一步步向前发展的社会中,很多晚会...
六一庆祝大会主持词 六一庆祝大会主持词  六一就是我们的节日,六一就是一个欢乐的日子,下面小编整理的六一庆祝大会主持词,...
婚礼感谢词 婚礼感谢词(15篇)婚礼感谢词1各位来宾,各位亲友:  大家晚上好!  今日是我女儿XXX和女婿XX...
文艺汇演主持词开场白 文艺汇演主持词开场白9篇  根据活动对象的不同,需要设置不同的主持词。我们眼下的社会,活动集会越来越...
总结大会主持词   导语:春去秋回又一栽,似水流年旺年来。  2017年公司年会主持词  一.主持人开场白  男:一...
农村结婚典礼主持词 农村结婚典礼主持词  一对新人立堂前,两心相印似蜜甜,三生有幸结良缘,下面是小编分享的 农村结婚典礼...
春节联欢晚会主持词开场白   翻开精美的日历,我细数着春节将要来临。那是我们中华民族的传统节日,更是我期盼的佳节,因为只有这几...
真爱至上中英文经典台词 真爱至上中英文经典台词  1、All I want for Christmas is you.  今...
欢聚一堂舞蹈主持词 欢聚一堂舞蹈主持词  串词是在晚会、联欢会等大型联欢活动中,主持人把前后节目,把整台节目恰到好处的联...
主持词结束语 主持词结束语(通用12篇)  利用在中国拥有几千年文化的诗词能够有效提高主持词的感染力。在人们越来越...
秋季小学开学典礼校长的致辞 秋季小学开学典礼校长的致辞(通用13篇)  在学习、工作、生活中,许多人都有过写致辞的经历,对致辞都...
五月的主持词开场白 五月的主持词开场白  主持人在台上表演的灵魂就表现在主持词中。在当下这个社会中,各种场合中活跃现场气...