运维部署 docker-compose.yml
version: '3.5'
services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./logs:/opt/logs- ./store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxiswho/rocketmq:brokercontainer_name: rmqbrokerports:- 10909:10909- 10911:10911volumes:- ./logs:/opt/logs- ./store:/opt/store- ./conf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"command: mqbroker -c /etc/rocketmq/broker.confdepends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqbrokerrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rmqconsoleports:- 8080:8080environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqconsolenetworks:rmq:name: rmqdriver: bridge
然后在与docker-compose.yml
同级下面相应的建立三个文件夹conf
、logs
、store
。然后在conf
文件夹下面建立broker.conf
配置文件,所有文件的目录位置如下所示。
docker-compose.yml
conf- broker.conf
logs
store
然后在编写broker.conf
配置文件里面的内容
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# 所属集群名字
brokerClusterName=DefaultCluster# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a# 0 表示 Master,> 0 表示 Slave
brokerId=0# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.1.16# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口
listenPort=10911# 删除文件时间点,默认凌晨4点
deleteWhen=04# 文件保留时间,默认48小时
fileReservedTime=120# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
配置文件中的内容我们只需要改动一点即可,即brokerIP1
这个属性,我们将其更改为我们本机的ip,可以利用ipconfig
进行查看。
修改完以后我们直接在docker-compose.yml
文件所在的位置输入命令docker-compose up
即可启动。启动成功以后在浏览器中输入http://localhost:8080/
即可看到管理页面,就表示我们搭建成功了。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T2Lk6A0V-1678195927847)(null)]
RocketMQ架构上主要分为四部分,如上图所示:
结合部署架构图,描述集群工作流程:
结合部署架构图,描述集群工作流程:
1、启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
2、Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
3、Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
4、Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
总结:
Rocketmq发送消息的时候,先启动NameServer,NameServer成功启动会先去和Broken连接,这时候NameServer和Broken就有心跳。当生产者Producer发送消息的时候,先跟NameServer连接,判断发送的Topic在哪些Broken上,然后按轮询选择Broken中的一个队列。然后Producer会和Broken直接建立连接,以后所有发送的消息(同个Topic)都是直接和Broken连接,消费者也是这样流程。
补充:
集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。
ps:表示改消费组有一条未消费
数量2 指的是 consumer_topic-queue-three 有两个消费组都是叫这个名字
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
说几个概念
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
例如代码:生成者发送消息
//同步发送
public void sync() {Message message = new Message<>();message.setId(UUID.randomUUID().toString());message.setContent("Hello, springboot-ac-rocketmq !");rocketMQTemplate.convertAndSend("topic-queue-one", message);rocketMQTemplate.convertAndSend("topic-queue-two", "Hello, springboot-ac-rocketmq !");
}
消费者消费消息
@Slf4j
@Component
public class RocketmqConsumer {@Component@RocketMQMessageListener(topic = "topic-queue-one", consumerGroup = "consumer_topic-queue-one")public class ConsumerOne implements RocketMQListener {@Overridepublic void onMessage(Message message) {log.info("consumer-one received message: {}", message);}}@Component@RocketMQMessageListener(topic = "topic-queue-two", consumerGroup = "consumer_topic-queue-two")public class ConsumerTwo implements RocketMQListener {@Overridepublic void onMessage(String message) {System.out.println("哈哈哈哈我进来消费 topic-queue-two 消息啦");log.info("consumer-two received message: {}", message);}}
}
运行后打断点发送,队列是先进后去,所以topic-queue-two先消费
消费完再消费这个topic-queue-one
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
看代码 生成者发送消息
public void async() {Message message = new Message<>();message.setId(UUID.randomUUID().toString());message.setContent("Hello,I am asyncSend !");rocketMQTemplate.asyncSend("async-one", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("send successful");}@Overridepublic void onException(Throwable throwable) {log.info("send fail; {}", throwable.getMessage());}});
}
消费者代码
@Component
@RocketMQMessageListener(topic = "async-one", consumerGroup = "consumer_topic-queue-three")
public class ConsumerThreee implements RocketMQListener {@Overridepublic void onMessage(Message message) {System.out.println("哈哈哈哈我进来消费 async-one 消息啦");log.info("consumer-two received message: {}", message);}
}
运行后打断点发现可以正常消费
从运行的结果看,15:37分是发送成功的,可是消费是15:39分
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
rocketMQTemplate.sendOneWay("topic-oneWay", "send one-way message");
一调接口里面就返回成功,可是消费等了一会菜消费
技术原理:
就是用hashKey作为每个队列的唯一标志,在电商中,一般是引订单id作为hashKey
模拟2个队列,id1和id2进行操作
id1的消息有10,30
id2的消息有 20,40
演示代码如下:
private final String id1 = "10086";private final String id2 = "10087";/**** hashKey为订单id*/
public void testSendSyncOrderly1() {Message stringMessage = new Message<>();stringMessage.setId(id1);String message = "10";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id1);
}/**** hashKey为订单id*/
public void testSendSyncOrderly2() {Message stringMessage = new Message<>();stringMessage.setId(id2);String message = "20";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id2);
}/**** hashKey为订单id*/
public void testSendSyncOrderly3() {Message stringMessage = new Message<>();stringMessage.setId(id1);String message = "30";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id1);
}/**** hashKey为订单id*/
public void testSendSyncOrderly4() {Message stringMessage = new Message<>();stringMessage.setId(id2);String message = "40";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id2);
}
消费端代码
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic-orderly",consumerGroup = "orderly-consumer-group", consumeMode = ConsumeMode.ORDERLY
)public class OrderConsumer implements RocketMQListener {int sumId1 = 0;int sumId2 = 0;@Overridepublic void onMessage(Message message) {if(message.getId().equals("10086")){sumId1 = sumId1+Integer.parseInt((String)message.getContent());}else{sumId2 = sumId2+Integer.parseInt((String)message.getContent());}System.out.println("开始消费");log.info("========{}=======", sumId1);log.info("========{}=======", sumId2);System.out.println("消费结束");}}
发现最后消费是
证明是分区有序性。
6.延时消息样例
应用场景:
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
CONSUME_FROM_LAST_OFFSET, //第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
CONSUME_FROM_MIN_OFFSET,
CONSUME_FROM_MAX_OFFSET,
CONSUME_FROM_FIRST_OFFSET, //第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP; //第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 (一般选这个)
消费端要实现这个类RocketMQPushConsumerLifecycleListener,代码如下:
/**** 延时消费*/
@Component
@Slf4j
public class OffsetConsumerByHjt {@Component@RocketMQMessageListener(topic = "topic-offset-by-hjt", consumerGroup = "topic-offset-by-hjt-consumer")public class OfferConsumerBy implements RocketMQListener, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(Message message) {System.out.println("哈哈哈哈我进来消费");String result = result(message.getBody());System.out.println("输出 result "+result);log.info("topic-offset-by-hjt: {}", new String(message.getBody()));}@Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {//第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费defaultMQPushConsumer.setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);}}public static String result(byte[] decrypt) {try {String result = new String(decrypt, "UTF-8");return result;} catch (UnsupportedEncodingException var2) {var2.printStackTrace();return null;}}
}
生成者代码,还是按顺序消费测试
/**** hjt写的延时消费demo*/public void sendByHjt() throws Exception {Message message = new Message();//生产者DefaultMQProducer producer = new DefaultMQProducer("topic-offset-by-hjt-product");producer.setNamesrvAddr("192.168.1.219:9876");producer.start();for(int i = 0;i<5;i++){message.setTopic("topic-offset-by-hjt");message.setBody(("我是延迟消费啊啊" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";//4对于的就是延迟30smessage.setDelayTimeLevel(4);producer.send(message, new SendCallback() {//成功后执行的方法@Overridepublic void onSuccess(SendResult sendResult) {log.info("延迟消费成功");}//失败后执行的方法@Overridepublic void onException(Throwable throwable) {log.error("还未到指定的消费时间");}});}//关闭生产者producer.shutdown();}
30s后发现已进来消费
为什么能同时消费这么多数据,因为rocketmq在那一瞬间同时去队列中拿数据,那一瞬间一起消费掉。
看了下后台,发现读和写都是4个队列。其中 perm为6 指的是可读可写的队列为6
1.生成者代码
/*** @author hjt* @date 2019/8/21*/
@Component
@Slf4j
public class TagProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;public void sendTagsMessage() {String[] tags = new String[]{"A", "B", "C", "D"};String message = "tags message : ";for (int i = 0; i < tags.length; i++) {rocketMQTemplate.syncSend("topic-tags:" + tags[i], message + tags[i]);}}
}
2.消费者代码
/*** @author hjt* @date 2019/8/21*/
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic-tags",consumerGroup = "tags-consumer-group",selectorExpression = "A||C")
public class TagConsumer implements RocketMQListener {@Overridepublic void onMessage(String message) {System.out.println("messgaetag:"+message);log.info("======={}=======", message);}
}
运行结果: 因为 selectorExpression = “A||C” 选择A和C
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JN0sDuGu-1678195925254)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220801163823708.png)]
ps:注意
rocketMQTemplate.syncSend("topic-tags:" + tags[i], message + tags[i]); //topic-tags: 一定要有冒号
RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:
1、Half(Prepare) Message——半消息(预处理消息)
半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。
2、Message Status Check——消息状态回查
由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。
1、A服务先发送个Half Message给Brock端,消息中携带 B服务 即将要+100元的信息。
2、当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。
3、执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)
4.1)、如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。
4.2)、如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。
4.3)、如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。
什么情况会回查
也会有两种情况
1)执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(commit或者rollback)导致最终返回UNKNOW,那么就会回查。2) 本地事务执行成功后,返回Commit进行消息二次确认的时候的服务挂了,在重启服务那么这个时候在brock端它还是个Half Message(半消息),这也会回查。
特别注意: 如果回查,那么一定要先查看当前事务的执行情况,再看是否需要重新执行本地事务。
想象下如果出现第二种情况而引起的回查,如果不先查看当前事务的执行情况,而是直接执行事务,那么就相当于成功执行了两个本地事务。
为什么说MQ是最终一致性事务
通过上面这幅图,我们可以看出,在上面举例事务不一致的两种情况中,永远不会发生
A账户减100 (失败),B账户加100 (成功)
因为:如果A服务本地事务都失败了,那B服务永远不会执行任何操作,因为消息压根就不会传到B服务。
那么 A账户减100 (成功),B账户加100 (失败) 会不会可能存在的。
答案是会的
因为A服务只负责当我消息执行成功了,保证消息能够送达到B,至于B服务接到消息后最终执行结果A并不管。
那B服务失败怎么办?
如果B最终执行失败,几乎可以断定就是代码有问题所以才引起的异常,因为消费端RocketMQ有重试机制,如果不是代码问题一般重试几次就能成功。
如果是代码的原因引起多次重试失败后,也没有关系,将该异常记录下来,由人工处理
,人工兜底处理后,就可以让事务达到最终的一致性。
补充说明
事务消息共有三种状态,提交状态、回滚状态、中间状态:
TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener
类来修改这个行为。transactionTimeout
参数。看 生产者代码
package com.hjt.transaction;import com.hjt.message.Message;
import com.hjt.message.MessageTransaction;
import com.hjt.transaction.mapper.TransactionMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.UUID;/*** @author hjt* @date 2019/8/20*/
@Component
@Slf4j
public class TransactionProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;public void produce() {MessageTransaction message = new MessageTransaction<>();//在正在的业务中 Aid和Bid应该是前端已经知道是啥,传给后端,比如A的userId和B的UserIdmessage.setAId(UUID.randomUUID().toString());message.setBId(UUID.randomUUID().toString());message.setContent("B即将要+100元,A要减100元");log.info("========sending message=========:{}",message);
// rocketMQTemplate.sendMessageInTransaction("tx-group", "topic-tx", MessageBuilder.withPayload(message).build(), null); 2.0.3有这个版本 tx-grouprocketMQTemplate.sendMessageInTransaction( "topic-tx", MessageBuilder.withPayload(message).build(), null);log.info("========finish send =========");}}
监听者代码
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.StringUtils;import java.util.concurrent.ConcurrentHashMap;/*** @author hjt* @date 2019/8/20*/
@Slf4j
//@RocketMQTransactionListener(txProducerGroup = "tx-group") 2.0.3的版本有这个
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {/**** 存放事务的状态 支持并发的场景*/private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {log.info("==============进到这里说明 Half Message 发送成功");//获取队列中的事务idString rocketmqTransactionId = getRocketmqTransactionId(msg);try{//模拟 执行A服务-100元操作int redMoneyByA = -100;//定义// 0 是中间状态 1 是提交事务状态 2是回滚事务localTrans.put(rocketmqTransactionId,1);//模拟 执行A服务-100元操作失败
// int redMoneyExceptionByA = 100/0;return RocketMQLocalTransactionState.UNKNOWN;}catch (Exception e){// 执行A服务-100元操作出现异常就 事务回查 调用下面的checkLocalTransaction方法localTrans.put(rocketmqTransactionId,2);log.error("插入数据库失败,原因为:{}",e.getMessage());return RocketMQLocalTransactionState.UNKNOWN;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {log.info("============== 模拟回查本地事务 checkLocalTransaction");Object payload = msg.getPayload();MessageHeaders headers = msg.getHeaders();System.out.println("输出:" + payload);System.out.println("输出:" + headers);String rocketmqTransactionId = getRocketmqTransactionId(msg);//查rocketmqTransactionId的事务状态Integer status = localTrans.get(rocketmqTransactionId);if(null!=status){switch (status){case 0:log.info("============== 模拟回查本地事务结束 提交状态为:UNKNOWN");return RocketMQLocalTransactionState.UNKNOWN;case 1:log.info("============== 模拟回查本地事务结束 提交状态为:COMMIT");return RocketMQLocalTransactionState.COMMIT;case 2:log.info("============== 模拟回查本地事务结束 提交状态为:ROLLBACK");return RocketMQLocalTransactionState.ROLLBACK;}}log.info("============== 模拟回查本地事务结束,提交状态为 ROLLBACK");return RocketMQLocalTransactionState.ROLLBACK;}/**** 获取事务id* @param msg* @return*/public String getRocketmqTransactionId(Message msg){JSONObject json = JSONUtil.parseObj(msg.getHeaders(), false, true);String rocketmqTransactionId = (String)json.get("rocketmq_TRANSACTION_ID");String topic = (String)json.get("rocketmq_TOPIC");log.info("=======事务id========{}",rocketmqTransactionId);log.info("=======topic========{}",topic);if(!StringUtils.isEmpty(rocketmqTransactionId)){return rocketmqTransactionId;}return "";}
消费者代码
package com.hjt.transaction;import com.hjt.message.MessageTransaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** @author hjt* @date 2019/8/20*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "topic-tx", consumerGroup = "tx-consumer-group")
public class TransactionConsumer implements RocketMQListener {@Overridepublic void onMessage(MessageTransaction message) {log.info("topic-tx received message: {}", message);log.info("消费端开始消费信息 执行B服务加100操作");//执行B服务加100的操作try{//B服务加100int addMoneyByB = 100;}//如果B服务加100失败,可是A已经减100成功了,这时候要把异常记录下来,人工进行处理catch (Exception e){log.error("B服务加100异常,需要人工处理,异常信息为:{}",e.getMessage());//用一张异常表单独记录 该消息的id 可以作为异常表的主键String id = message.getBId();}}}
rocketmq会稍微等一点时间再去执行checkLocalTransaction方法
正常运行结果:
模拟下执行A操作异常的时候
运行结果: