RocketMQ 高级特性
创始人
2024-05-29 21:44:25
0

1,事务消息代码实现

之前我们已经在讨论订单业务消息丢失问题中引出了事务消息,本内容我们就实际用代码来实现一下事务消息吧。 首先我们用原生代码来实现一下事务消息,下面是事务消息生产者TransactionProducer类的代码,具体代码解释已经用注释标明。

package com.huc.rocketmq.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;/*** @author buwt*/
public class TransactionProducer {
public static void main(String[] args)throws MQClientException, UnsupportedEncodingException {// 这里是一个自定义的接收RocketMQ回调的监听接口TransactionListener transactionListener = new TransactionListenerImpl();// 创建支持事务消息的Producer,并指定生产者组TransactionMQProducer producer =new TransactionMQProducer("testTransactionGroup");// 指定一个线程池,用于处理RocketMQ回调请求的ExecutorService executorService = new ThreadPoolExecutor(2,5,100,TimeUnit.SECONDS,new ArrayBlockingQueue(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("testThread");return thread;}});// 给事务消息生产者设置线程池producer.setExecutorService(executorService);// 给事务消息生产者设置回调接口producer.setTransactionListener(transactionListener);// 启动生产者producer.start();// 构造一条订单支付成功的消息Message message = new Message("PayOrderSuccessTopic","testTag","testKey","订单支付消息".getBytes(RemotingHelper.DEFAULT_CHARSET));// 将消息作为half消息发送出去try {TransactionSendResult result = producer.sendMessageInTransaction(message, null);} catch (Exception e) {// half消息发送失败// 订单系统执行回滚逻辑,比如退款、关闭订单}}
}

针对于half消息发送失败的情况,是有可能一直接收不到消息发送失败的异常的,所以我们可以在发送half消息的时候,同时保存一份half消息到内存中,或者写入磁盘里,后台开启线程去检查half消息,如果超过10分钟都没有接到响应,就自动执行回滚逻辑。 那么如果half消息成功了,如何执行本地事务逻辑呢?这就要说到代码中自定义的回调监听接口TransactionListenerImpl类了,代码如下:

package com.huc.rocketmq.transaction;import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;public class TransactionListenerImpl implements TransactionListener {/*** 如果half消息发送成功了,就会回调这个方法,执行本地事务* @param message* @param o* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行订单本地业务,并根据结构返回commit/rollbacktry {// 本地事务执行成功,返回commitreturn LocalTransactionState.COMMIT_MESSAGE;}catch (Exception e){// 本地事务执行失败,返回rollback,作废half消息return LocalTransactionState.ROLLBACK_MESSAGE;}}/*** 如果没有正确返回commit或rollback,会执行此方法* @param messageExt* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {// 查询本地事务是否已经成功执行了,再次根据结果返回commit/rollbacktry {// 本地事务执行成功,返回commitreturn LocalTransactionState.COMMIT_MESSAGE;}catch (Exception e){// 本地事务执行失败,返回rollback,作废half消息return LocalTransactionState.ROLLBACK_MESSAGE;}}
}

到这里事务消息的代码我们就完成了,但是我相信小伙伴们不会满足于仅仅使用原生代码实现,那接下来我们就用Spring Boot重写编写一次相同的逻辑。

使用Spring Boot项目后,我们还是先准备一个消息的实体类TranMessage,代码如下:

package com.huc.rocketmq.transaction.spring;/*** 事务消息实体*/
public class TranMessage {public static final String TOPIC = "Tran";/*** 编号*/private Integer id;public TranMessage setId(Integer id) {this.id = id;return this;}public Integer getId() {return id;}@Override
public String toString() {return "TranMessage{" +"id=" + id +'}';}
}然后我们编写事务消息的生产者TranProducer:package com.huc.rocketmq.transaction.spring;import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;@Component
public class TranProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessageInTransaction(Integer id) {// 创建TranMessage消息Message message = MessageBuilder.withPayload(new TranMessage().setId(id)).build();// 发送事务消息return rocketMQTemplate.sendMessageInTransaction(TranMessage.TOPIC,message,id);}}

同样的,我们需要编写一个回调监听的实现类,用于自定义处理本地事务,返回commit或者rollback消息。代码如下:

package com.huc.rocketmq.transaction.spring;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
// 注解中可以指定线程池参数
@RocketMQTransactionListener(corePoolSize=2,maximumPoolSize=5)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行订单本地业务,并根据结构返回commit/rollbacktry {// 本地事务执行成功,返回commitreturn RocketMQLocalTransactionState.COMMIT;}catch (Exception e){// 本地事务执行失败,返回rollback,作废half消息return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 查询本地事务是否已经成功执行了,再次根据结果返回commit/rollbacktry {// 本地事务执行成功,返回commitreturn RocketMQLocalTransactionState.COMMIT;}catch (Exception e){// 本地事务执行失败,返回rollback,作废half消息return RocketMQLocalTransactionState.ROLLBACK;}}
}

有了原生代码的实现经验,相信小伙伴们对于使用Spring Boot集成后的代码同样可以轻松看得懂。 好了,至此事务消息的代码我们就已经实现了。

2,顺序消息代码实现

有关消息乱序的出现原因以及解决方案我们已经在8.4.3小节中讲解过了,小伙伴们可以去复习一下,本节我们将直接讨论代码的实现,首先还是使用原生代码实现。

经过之前的学习我们知道,解决消息乱序的方案就是把需要保证顺序的消息发送到同一个MessageQueue中,所以我们一定是需要编写一个MessageQueue的选择器的,RocketMQ的API中确实是有这部分内容的,就是MessageQueueSelector,下面就以原生代码异步的发送为例,在发送消息的时候指定队列选择器,主要代码如下,注释已经说明代码的含义:

producer.send(msg,new MessageQueueSelector() {@Overridepublic MessageQueue select(List mqs, Message msg, Object arg) {Long orderId = (Long) arg; // 根据订单id选择发送的queuelong index = orderId % mqs.size();// 用订单id于MessageQueue的数量取模return mqs.get((int) index); // 返回一个运算后固定的MessageQueue}},orderId, // 传入订单idnew SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println(sendResult);}@Override
public void onException(Throwable throwable) {System.out.println(throwable);}});

在发送消息时增加一个MessageQueueSelector,就可以实现统一订单id的消息一直会发送到同一个MessageQueue之中,可以解决消息乱序问题。

接着我们来看消费者部分的代码实现,主要代码如下:

consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List msgs,ConsumeOrderlyContext context) {try {// 对有序的消息进行顺序处理for (MessageExt t : msgs) {}return ConsumeOrderlyStatus.SUCCESS;} catch (Exception e) {// 如果消息处理出错,返回一个状态,暂停一会儿再来处理这批消息。return 
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}});

这里面要注意的是我们注册的监听器是MessageListenerOrderly,这个监听器为了保证顺序消费,Consumer会对每一个ConsumerQueue只使用一个线程来处理消息,如果使用了多线程,是无法避免消息乱序的。

至此原生代码的实现已经完成了,Spring Boot的代码原理也是一样的。

消息实体的代码我们就省略了,直接看生产者的代码,如下:

package com.huc.rocketmq.order.spring;import com.huc.rocketmq.spring.DemoMessage;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class OrderProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult syncSend(Integer id) {// 创建 DemoMessage 消息DemoMessage message = new DemoMessage();message.setId(id);// 同步发送消息return rocketMQTemplate.syncSendOrderly(DemoMessage.TOPIC,message,String.valueOf(id));}public void asyncSend(Integer id, SendCallback callback) {// 创建 DemoMessage 消息DemoMessage message = new DemoMessage();message.setId(id);// 异步发送消息rocketMQTemplate.asyncSendOrderly(DemoMessage.TOPIC,message,String.valueOf(id),callback);}public void onewaySend(Integer id) {// 创建 DemoMessage 消息DemoMessage message = new DemoMessage();message.setId(id);// oneway 发送消息rocketMQTemplate.sendOneWayOrderly(DemoMessage.TOPIC,message,String.valueOf(id));}}

以上代码中可以看出,每个发送方法中都调用了对应的Orderly方法,并传入了一个id值,默认根据id值采用SelectMessageQueueByHash策略来选择MessageQueue。

接下来我们继续看消费者代码的实现。

package com.huc.rocketmq.order.spring;import com.huc.rocketmq.spring.DemoMessage;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(topic = DemoMessage.TOPIC,consumerGroup = "demo-consumer-group-" + DemoMessage.TOPIC,consumeMode = ConsumeMode.ORDERLY // 设置为顺序消费
)
public class OrderConsumer implements RocketMQListener {private Logger logger = LoggerFactory.getLogger(getClass());@Overridepublic void onMessage(DemoMessage message) {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);}}

可以看到消费者代码改动很小,只需要在@RocketMQMessageListener注解中新增consumeMode = ConsumeMode.ORDERLY,就可以指定顺序消费了,小伙伴们可以大胆的猜测它的实现原理,和我们的原生代码实现的方式是相同的。

3,消息过滤代码实现

RocketMQ是包含消息过滤功能的,现在假如我们不使用消息过滤功能,获取到一个Topic中的消息可能包含了相关主题的多个表的信息。 如果我们的需求是根据获取的消息同步某张表A的数据,那么就需要在获取消息后自行判断消息是否属于表A,如果属于表A才去处理,如果不是表A就直接丢弃。 这种做法多了一层逻辑判断,自然会对系统的性能产生影响。这个时候RocketMQ的过滤机制就可以展示它的作用了,我们在发送消息的时候可以直接给消息指定tag和属性,主要代码如下:

// 构建消息对象Message msg = new Message(topic, //这里指定的是topic"A",//这里存放的Tag 消费者会根据tag进行消息过滤message.getBytes(RemotingHelper.DEFAULT_CHARSET));// 我们还可以设置一些用户自定义的属性msg.putUserProperty("name","value");

消费者在消费数据时就可以根据tag和属性进行过滤了,比如下边的写法:

// 订阅test Topic , 第二个参数是通过tag过滤,意思是过滤出tag为A或B的消息consumer.subscribe("test", "A||B");

对应到spring boot中的实现也很简单,生产者部分关键代码如下:

// 创建 DemoMessage 消息Message message = MessageBuilder.withPayload(new DemoMessage().setId(id)).setHeader(MessageConst.PROPERTY_TAGS,"A")// 设置消息的tag.build();

消费者过滤的主要代码如下:

@RocketMQMessageListener(topic = DemoMessage.TOPIC,consumerGroup = "demo-consumer-group-" + DemoMessage.TOPIC,selectorExpression = "A||B" // 通过tag过滤
)

消费者部分只要在@RocketMQMessageListener注解中增加selectorExpression属性就可以了。

4,延时消息代码实现

在讨论延时消息的代码实现之前,先讨论一下电商系统的超时未支付业务流程。如图1所示:

图1放弃支付流程

这个流程的关键问题就是超时未支付的订单处于“待支付”状态,并锁定了库存,当时我们提出的解决方案就是提供一个后台线程,来扫描待支付订单,如果超过30分钟还未支付,就把订单关闭,解锁库存。 小伙伴们可以思考一下,这样的解决方案真的可以在生产环境落地吗? 首先,后台线程不停的扫描订单数据,如果订单数据量很大,就会导致严重的系统性能问题。 其次,如果我们的订单系统是一个分布式系统,你的后台线程要如何部署?多久扫描一次? 所以,使用后台线程扫描订单数据并不是一个优雅的解决方案,这个时候本小节的主人公延时消息就该出场了。 RocketMQ的延时消息可以做到这样的效果,订单系统发送一条消息,等30分钟后,这条消息才可以被消费者消费。所以我们引入延时消息后,就可以单独准备一个订单扫描服务,来消费延时消息,当它获得消息的时候再去验证订单是否已经支付,如果已经支付什么都不用做,如果还未支付就去进行关闭订单,解锁库存的操作。如图2所示:

图2延时消息放弃支付流程

使用延时消息后,就可以避免扫描大量订单数据的操作了,而且订单扫描服务也可以分布式部署多个,只要同时订阅一个Topic就可以了。 应用场景我们已经了解了,现在我们来看一下代码应该如何实现。 延时消息使用原生代码实现特别容易,主要代码如下:

// 构建消息对象Message msg = new Message(topic, //这里指定延时消息的topicmessage.getBytes(RemotingHelper.DEFAULT_CHARSET));// 指定延时级别为3msg.setDelayTimeLevel(3);producer.send(msg);

可以看到最核心的内容就是msg.setDelayTimeLevel(3),设置了延迟级别。

RocketMQ支持的延迟级别有18个,这个我们之前已经介绍过了,如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

所以设置为3代表10s后消息可以被消费者消费。

消费者的代码这里就不演示了,没有什么特殊的写法。

下面我们来看一下Spring Boot的生产者代码实现:

// 创建 DemoMessage 消息Message message = MessageBuilder.withPayload(new DemoMessage().setId(id)).build();// 同步发送消息return rocketMQTemplate.syncSend(DemoMessage.TOPIC,message,30*1000,3);// 此处设置的就是延时级别

相关内容

热门资讯

常用商务英语口语   商务英语是以适应职场生活的语言要求为目的,内容涉及到商务活动的方方面面。下面是小编收集的常用商务...
六年级上册英语第一单元练习题   一、根据要求写单词。  1.dry(反义词)__________________  2.writ...
复活节英文怎么说 复活节英文怎么说?复活节的英语翻译是什么?复活节:Easter;"Easter,anniversar...
2008年北京奥运会主题曲 2008年北京奥运会(第29届夏季奥林匹克运动会),2008年8月8日到2008年8月24日在中华人...
英语道歉信 英语道歉信15篇  在日常生活中,道歉信的使用频率越来越高,通过道歉信,我们可以更好地解释事情发生的...
六年级英语专题训练(连词成句... 六年级英语专题训练(连词成句30题)  1. have,playhouse,many,I,toy,i...
上班迟到情况说明英语   每个人都或多或少的迟到过那么几次,因为各种原因,可能生病,可能因为交通堵车,可能是因为天气冷,有...
小学英语教学论文 小学英语教学论文范文  引导语:英语教育一直都是每个家长所器重的,那么有关小学英语教学论文要怎么写呢...
英语口语学习必看的方法技巧 英语口语学习必看的方法技巧如何才能说流利的英语? 说外语时,我们主要应做到四件事:理解、回答、提问、...
四级英语作文选:Birth ... 四级英语作文范文选:Birth controlSince the Chinese Governmen...
金融专业英语面试自我介绍 金融专业英语面试自我介绍3篇  金融专业的学生面试时,面试官要求用英语做自我介绍该怎么说。下面是小编...
我的李老师走了四年级英语日记... 我的李老师走了四年级英语日记带翻译  我上了五个学期的小学却换了六任老师,李老师是带我们班最长的语文...
小学三年级英语日记带翻译捡玉... 小学三年级英语日记带翻译捡玉米  今天,我和妈妈去外婆家,外婆家有刚剥的`玉米棒上带有玉米籽,好大的...
七年级英语优秀教学设计 七年级英语优秀教学设计  作为一位兢兢业业的人民教师,常常要写一份优秀的教学设计,教学设计是把教学原...
我的英语老师作文 我的英语老师作文(通用21篇)  在日常生活或是工作学习中,大家都有写作文的经历,对作文很是熟悉吧,...
英语老师教学经验总结 英语老师教学经验总结(通用19篇)  总结是指社会团体、企业单位和个人对某一阶段的学习、工作或其完成...
初一英语暑假作业答案 初一英语暑假作业答案  英语练习一(基础训练)第一题1.D2.H3.E4.F5.I6.A7.J8.C...
大学生的英语演讲稿 大学生的英语演讲稿范文(精选10篇)  使用正确的写作思路书写演讲稿会更加事半功倍。在现实社会中,越...
VOA美国之音英语学习网址 VOA美国之音英语学习推荐网址 美国之音网站已经成为语言学习最重要的资源站点,在互联网上还有若干网站...
商务英语期末试卷 Part I Term Translation (20%)Section A: Translate ...