如何用MQ实现RPC远程调用?(附代码)
创始人
2024-06-03 08:38:38
0

💗推荐阅读文章💗

  • 🌸JavaSE系列🌸👉1️⃣《JavaSE系列教程》
  • 🌺MySQL系列🌺👉2️⃣《MySQL系列教程》
  • 🍀JavaWeb系列🍀👉3️⃣《JavaWeb系列教程》
  • 🌻SSM框架系列🌻👉4️⃣《SSM框架系列教程》

🎉本博客知识点收录于🎉👉🚀《RabbitMQ系列教程》🚀—>✈️《RabbitMQ系列教程-第四章-06-RabbitMQ工作模式之RPC模式》✈️

文章目录

    • 4.6 RPC 模式
      • 4.6.1 简介
      • 4.6.2 客户端
      • 4.6.2 服务端
      • 4.6.3 RPC模式小结

4.6 RPC 模式

4.6.1 简介

以前的几种模式的通信都是基于Producer发送消息到Consumer,然后Consumer进行消费,假设我们需要Consumer操作完毕之后返回给Producer一个回调呢?前面几种模式就行不通了;

例如我们要做一个远程调用加钱操作,客户端远程调用服务端进行加钱操作,操作完毕之后服务端将用户最新的余额返回给客户端;客户端进行后续操作,例如更新到数据库等;

  • RPC业务分析

在这里插入图片描述

在RPC模式中,客户端和服务器都是Producer也都是Consumer;

RPC模式官网介绍:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

  • RPC调用图解:

在这里插入图片描述

4.6.2 客户端

package com.dfbz.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;public class RPCClient implements AutoCloseable {public Connection connection;public Channel channel;public static final String RPC_QUEUE_NAME = "rpc_queue";public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.40.132");factory.setPort(5672);factory.setUsername("lscl");factory.setPassword("admin");factory.setVirtualHost("/lscl");connection = factory.newConnection();channel = connection.createChannel();}public static void main(String[] argv) throws Exception {// 初始化信息RPCClient rpcClient = new RPCClient();// 发起远程调用Integer response = rpcClient.call(20);System.out.println(response);rpcClient.channel.close();rpcClient.connection.close();}public Integer call(Integer money) throws IOException, InterruptedException {// 随机生成一个correlationId(密钥)final String corrId = UUID.randomUUID().toString();// 后期服务端回调给客户端的队列名(随机生成的回调队列名)String replyQueueName = channel.queueDeclare().getQueue();// 设置发送消息的一些参数AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId)			// 密钥.replyTo(replyQueueName)		// 回调队列名.build();// 采用Simple模式发送给Server端channel.basicPublish("", RPC_QUEUE_NAME, props, (money + "").getBytes("UTF-8"));// 定义延迟队列final BlockingQueue response = new ArrayBlockingQueue<>(1);channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {// 回调方法,当收到消息之后,会自动执行该方法public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {if (properties.getCorrelationId().equals(corrId)) {System.out.println("响应的消息:" + new String(body));// 往延迟队列中添加信息(服务端响应的最新余额)response.offer(Integer.parseInt(new String(body, "UTF-8")));}}});// 获取延迟队列中的信息(如果没有信息将一直阻塞)return response.take();}public void close() throws IOException {connection.close();}
}

4.6.2 服务端

package com.dfbz.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";// 总金额private static Integer money = 0;/*** 加钱方法* @param n* @return*/private static Integer addMoney(int n) {money += n;return money;}public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.40.132");factory.setPort(5672);factory.setUsername("lscl");factory.setPassword("admin");factory.setVirtualHost("/lscl");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null);System.out.println("等待客户端请求.....");while (true) {// 接受到客户端的请求(消息)channel.basicConsume(RPC_QUEUE_NAME, true, new DefaultConsumer(channel) {// 回调方法,当收到消息之后,会自动执行该方法public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 本次的消息配置AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId())		// 客户端发送的密钥.build();System.out.println("客户端的消息: "+new String(body,"UTF-8"));String response = "";try {String message = new String(body, "UTF-8");// 调用加钱方法response = addMoney(Integer.parseInt(message)) + "";} finally {// 发送一个消息给客户端/*properties.getReplyTo(): Client端设置的回调队列名replyProps:	封装的参数(主要是CorrelationId)*/channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));}}});}}}
}

4.6.3 RPC模式小结

严格意义上来说RPC并不是一种新的交换模式,他其实还是借助原有的模式(上述案例是采用Simple模式)来达到一些不同的功能,在RPC模式中只有客户端和服务端,并且客户端和服务端都既是Producer也是Consumer;

在这里插入图片描述

Tips:RPC模式已经违背了消息队列设计的初衷了;即一些无需及时返回且耗时的操作;

相关内容

热门资讯

一到六年级的作文下雨【实用6... 一到六年级的作文下雨 篇一雨天的乐趣下雨了,这是多少孩子最喜欢的天气啊!尤其是在一到六年级的小学生们...
哥哥真厉害作文1000字【优... 哥哥真厉害作文1000字 篇一哥哥真厉害我有一个哥哥,他是我心目中最厉害的人。他不仅聪明、勤奋,还有...
六年级上册第二单元作文(优秀... 六年级上册第二单元作文 篇一我的梦想我是一个六年级的学生,我的名字叫小明。每个人都有自己的梦想,而我...
小学六年级暑假作文400字(... 小学六年级暑假作文400字 篇一:我的暑假生活暑假来临了,我迫不及待地迎接了属于我的自由时光。暑假里...
我是一场秋雨小学六年级作文(... 我是一场秋雨小学六年级作文 篇一:秋雨的魅力秋天是一个美丽的季节,它给大地带来了一场场降雨,也给我们...
感恩心作文小学六年级作文【优... 感恩心作文小学六年级作文 篇一:感恩父母我有一对非常好的父母,他们对我无微不至的关爱让我感到非常幸福...
功夫之家六年级作文(精彩3篇... 功夫之家六年级作文 篇一我的功夫之家我是一名六年级的学生,我家附近有一家特别特别酷的地方,叫做功夫之...
六年级班干部职责分工【精彩3... 六年级班干部职责分工 篇一六年级是小学生活的最后一年,也是学生们步入初中前最后一个学期。在这个特殊的...
英雄的魅力六年级作文800字... 英雄的魅力六年级作文800字 篇一英雄的魅力英雄是一个让人们敬仰和钦佩的词汇,他们以无私的奉献和勇敢...
欣赏六年级作文 欣赏六年级作文  在日常学习、工作抑或是生活中,大家都有写作文的经历,对作文很是熟悉吧,作文一定要做...
小学生作文扫墓(精彩3篇) 小学生作文扫墓 篇一扫墓是一种传统的中国风俗,也是一种表达对逝去亲人的思念和怀念之情的方式。近日,我...
一路欢歌东台采风行六年级作文... 一路欢歌东台采风行六年级作文 篇一东台是一个美丽的小城市,有着丰富的文化底蕴和独特的风景。我们六年级...
小学六年级暑假作文400字(... 小学六年级暑假作文400字 篇一我喜欢的夏日活动夏天是一年中最热的季节,但也是我最喜欢的季节。每年的...
小学六年级暑假作文700字(... 小学六年级暑假作文700字 篇一:我与暑假的约定暑假终于来临了,这是我最期待的时刻。为了让暑假过得更...
我的好伙伴作文500字六年级... 篇一:我的好伙伴我有一个非常好的伙伴,他的名字叫小明。小明是我班上的同学,我们从小学一年级就开始一起...
生活中不是缺少美,而是缺少发... 生活中不是缺少美,而是缺少发现六年级作文 篇一生活中不是缺少美,而是缺少发现生活中的美是无处不在的,...
去北京旅作文450六年级57... 篇一:我的北京之旅我终于等到了期盼已久的暑假,爸爸妈妈带我去了北京旅游,这是我人生中的第一次北京之旅...
六年级毕业作文(实用6篇) 六年级毕业作文 篇一:我的小学生活六年级毕业作文 篇二:成长的足迹六年级毕业作文 篇一:我的小学生活...
让生活更美好作文500字六年... 让生活更美好作文500字六年级 篇一生活,是我们每个人都要面对的,无论是喜是忧,都离不开生活。那么,...
感动小学六年级作文300字(... 感动小学六年级作文300字 篇一:爱心传递的故事这是一个关于爱心传递的故事。有一天,我放学后回家的路...