我们知道Consumer拉取消息、消费消息时分开的,分别由两个类去实现:
拉取消息:PullMessageService
消费消息:ConsumeMessageConcurrentlyService
1.如果我们拉取到消息,准备提交ConsumeMessageConcurrentlyService进行消费时,会触发调用该代码块:
// ConsumeMessageConcurrentlyService
public void submitConsumeRequest(final List msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {// 消息封装到里面ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 丢线程池消费this.consumeExecutor.submit(consumeRequest);}}
}
2.ConsumeRequest代码片段:
@Override
public void run() {// 1、Consumer 中设计的回调方法MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {// 2、回调 Consumer 中的监听回调方法status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {hasException = true;}// 3、如果status 返回null,设置为 RECONSUME_LATER 类型if (null == status) {status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 4、对返回的 status 结果进行处理ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
3.根据返回的status判断是否需要进行重试:
public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest
) {int ackIndex = context.getAckIndex();switch (status) {// 1、消费成功case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}break;// 2、消费延迟case RECONSUME_LATER:ackIndex = -1;break;default:break;}// 3、针对不同的消息模式做不同的处理switch (this.defaultMQPushConsumer.getMessageModel()) {// 4、广播模式:如果消费是爱 ackIndex 为-1就会执行循环,可以看到只是打印日志,没有其它多余的操作case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;// 5、集群模式case CLUSTERING:List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());// 6、RECONSUME_LATER 时,ackIndex 为-1,执行循环。CONSUME_SUCCESS 时不会执行循环for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// 7、能到这里说明是 RECONSUME_LATER 状态:回退Msg到Broker,也就是ACK(重试)boolean result = this.sendMessageBack(msg, context);// 8、ACK 可能会失败,需要记录失败的ACKif (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);// 9、存在ACK 失败的消息,将消息丢到线程池延迟 5s 重新消费this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 10、更新消费的偏移量:注意这里 CONSUME_SUCCESS 和 RECONSUME_LATER 都会更新long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
}
①从上面简单的几段代码片段中可以看出:广播模式就算消费失败,也不会进行重试,只是打个日志进行告警。
②只有消费失败的消息才会发送ACK重试。
③如果ACK失败,就算是重试失败吧。
④消息消费成功、失败,都会更新Consumer偏移量。
4.ConsumeMessageConcurrentlyService.sendMessageBack:准备请求Broker
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {// 1、注意这里:默认为0,其实一直都是0,其它地方没有修改。这表示RocketMQ延迟消息的 延迟级别int delayLevel = context.getDelayLevelWhenNextConsume();try {// 2、发送给Brokerthis.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}
RocketMQ延迟级别分为18级,delayLevel从1-18,每个数字都对应一个延迟的时间:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这块的代码需要深入的理解Broker源码才可以了解透彻,本人也是简单看了一下没有深入的钻研。主要总结了一下做了以下几件事:
①更消息的 Topic 为 “%RETRY%”+ group,计算queueId(重试队列,队列数为1)
②如果消息重试 >= 16次(默认)。继续更改消息的Topic 为死信队列的Topic:“%DLQ%” + group,消费队列为1(死信队列只有一个消费队列)
③如果没有变成死信,计算消息的延迟级别
④复制原来Msg,重新生成一个Msg,将新Msg丢给BrokerController中,然后存到CommitLog中进行存储(什么?你不知道什么是CommitLog? 下期写一篇RocketMQ内部存储结构)
重试消息发到Broker后,被作为一个新的延迟消息存到了CommitLog中,当该消息到了消费时间点是会被Consumer重新消费的。消息重试16次才会被丢到 死信队列中,才不会被消费了。那其余15次消息每次延迟是延迟多久呢?
我们在上面的源码其实可以看得出:消息的延迟级别是受重试次数(reconsumeTimes)影响的。重试次数越大,延迟越久。
本次整理知识本人的一点小小的理解,如有错误,请各位看到的大佬指出以免误人子弟。
下一篇:ES6学习笔记