如何用MQ实现RPC远程调用?(附代码)
创始人
2024-06-03 08:38:38
0

💗推荐阅读文章💗

  • 🌸JavaSE系列🌸👉1️⃣《JavaSE系列教程》
  • 🌺MySQL系列🌺👉2️⃣《MySQL系列教程》
  • 🍀JavaWeb系列🍀👉3️⃣《JavaWeb系列教程》
  • 🌻SSM框架系列🌻👉4️⃣《SSM框架系列教程》

🎉本博客知识点收录于🎉👉🚀《RabbitMQ系列教程》🚀—>✈️《RabbitMQ系列教程-第四章-06-RabbitMQ工作模式之RPC模式》✈️

文章目录

    • 4.6 RPC 模式
      • 4.6.1 简介
      • 4.6.2 客户端
      • 4.6.2 服务端
      • 4.6.3 RPC模式小结

4.6 RPC 模式

4.6.1 简介

以前的几种模式的通信都是基于Producer发送消息到Consumer,然后Consumer进行消费,假设我们需要Consumer操作完毕之后返回给Producer一个回调呢?前面几种模式就行不通了;

例如我们要做一个远程调用加钱操作,客户端远程调用服务端进行加钱操作,操作完毕之后服务端将用户最新的余额返回给客户端;客户端进行后续操作,例如更新到数据库等;

  • RPC业务分析

在这里插入图片描述

在RPC模式中,客户端和服务器都是Producer也都是Consumer;

RPC模式官网介绍:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

  • RPC调用图解:

在这里插入图片描述

4.6.2 客户端

package com.dfbz.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;public class RPCClient implements AutoCloseable {public Connection connection;public Channel channel;public static final String RPC_QUEUE_NAME = "rpc_queue";public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.40.132");factory.setPort(5672);factory.setUsername("lscl");factory.setPassword("admin");factory.setVirtualHost("/lscl");connection = factory.newConnection();channel = connection.createChannel();}public static void main(String[] argv) throws Exception {// 初始化信息RPCClient rpcClient = new RPCClient();// 发起远程调用Integer response = rpcClient.call(20);System.out.println(response);rpcClient.channel.close();rpcClient.connection.close();}public Integer call(Integer money) throws IOException, InterruptedException {// 随机生成一个correlationId(密钥)final String corrId = UUID.randomUUID().toString();// 后期服务端回调给客户端的队列名(随机生成的回调队列名)String replyQueueName = channel.queueDeclare().getQueue();// 设置发送消息的一些参数AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId)			// 密钥.replyTo(replyQueueName)		// 回调队列名.build();// 采用Simple模式发送给Server端channel.basicPublish("", RPC_QUEUE_NAME, props, (money + "").getBytes("UTF-8"));// 定义延迟队列final BlockingQueue response = new ArrayBlockingQueue<>(1);channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {// 回调方法,当收到消息之后,会自动执行该方法public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {if (properties.getCorrelationId().equals(corrId)) {System.out.println("响应的消息:" + new String(body));// 往延迟队列中添加信息(服务端响应的最新余额)response.offer(Integer.parseInt(new String(body, "UTF-8")));}}});// 获取延迟队列中的信息(如果没有信息将一直阻塞)return response.take();}public void close() throws IOException {connection.close();}
}

4.6.2 服务端

package com.dfbz.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";// 总金额private static Integer money = 0;/*** 加钱方法* @param n* @return*/private static Integer addMoney(int n) {money += n;return money;}public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.40.132");factory.setPort(5672);factory.setUsername("lscl");factory.setPassword("admin");factory.setVirtualHost("/lscl");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null);System.out.println("等待客户端请求.....");while (true) {// 接受到客户端的请求(消息)channel.basicConsume(RPC_QUEUE_NAME, true, new DefaultConsumer(channel) {// 回调方法,当收到消息之后,会自动执行该方法public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 本次的消息配置AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId())		// 客户端发送的密钥.build();System.out.println("客户端的消息: "+new String(body,"UTF-8"));String response = "";try {String message = new String(body, "UTF-8");// 调用加钱方法response = addMoney(Integer.parseInt(message)) + "";} finally {// 发送一个消息给客户端/*properties.getReplyTo(): Client端设置的回调队列名replyProps:	封装的参数(主要是CorrelationId)*/channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));}}});}}}
}

4.6.3 RPC模式小结

严格意义上来说RPC并不是一种新的交换模式,他其实还是借助原有的模式(上述案例是采用Simple模式)来达到一些不同的功能,在RPC模式中只有客户端和服务端,并且客户端和服务端都既是Producer也是Consumer;

在这里插入图片描述

Tips:RPC模式已经违背了消息队列设计的初衷了;即一些无需及时返回且耗时的操作;

相关内容

热门资讯

常用商务英语口语   商务英语是以适应职场生活的语言要求为目的,内容涉及到商务活动的方方面面。下面是小编收集的常用商务...
六年级上册英语第一单元练习题   一、根据要求写单词。  1.dry(反义词)__________________  2.writ...
复活节英文怎么说 复活节英文怎么说?复活节的英语翻译是什么?复活节:Easter;"Easter,anniversar...
2008年北京奥运会主题曲 2008年北京奥运会(第29届夏季奥林匹克运动会),2008年8月8日到2008年8月24日在中华人...
英语道歉信 英语道歉信15篇  在日常生活中,道歉信的使用频率越来越高,通过道歉信,我们可以更好地解释事情发生的...
六年级英语专题训练(连词成句... 六年级英语专题训练(连词成句30题)  1. have,playhouse,many,I,toy,i...
上班迟到情况说明英语   每个人都或多或少的迟到过那么几次,因为各种原因,可能生病,可能因为交通堵车,可能是因为天气冷,有...
小学英语教学论文 小学英语教学论文范文  引导语:英语教育一直都是每个家长所器重的,那么有关小学英语教学论文要怎么写呢...
英语口语学习必看的方法技巧 英语口语学习必看的方法技巧如何才能说流利的英语? 说外语时,我们主要应做到四件事:理解、回答、提问、...
四级英语作文选:Birth ... 四级英语作文范文选:Birth controlSince the Chinese Governmen...
金融专业英语面试自我介绍 金融专业英语面试自我介绍3篇  金融专业的学生面试时,面试官要求用英语做自我介绍该怎么说。下面是小编...
我的李老师走了四年级英语日记... 我的李老师走了四年级英语日记带翻译  我上了五个学期的小学却换了六任老师,李老师是带我们班最长的语文...
小学三年级英语日记带翻译捡玉... 小学三年级英语日记带翻译捡玉米  今天,我和妈妈去外婆家,外婆家有刚剥的`玉米棒上带有玉米籽,好大的...
七年级英语优秀教学设计 七年级英语优秀教学设计  作为一位兢兢业业的人民教师,常常要写一份优秀的教学设计,教学设计是把教学原...
我的英语老师作文 我的英语老师作文(通用21篇)  在日常生活或是工作学习中,大家都有写作文的经历,对作文很是熟悉吧,...
英语老师教学经验总结 英语老师教学经验总结(通用19篇)  总结是指社会团体、企业单位和个人对某一阶段的学习、工作或其完成...
初一英语暑假作业答案 初一英语暑假作业答案  英语练习一(基础训练)第一题1.D2.H3.E4.F5.I6.A7.J8.C...
大学生的英语演讲稿 大学生的英语演讲稿范文(精选10篇)  使用正确的写作思路书写演讲稿会更加事半功倍。在现实社会中,越...
VOA美国之音英语学习网址 VOA美国之音英语学习推荐网址 美国之音网站已经成为语言学习最重要的资源站点,在互联网上还有若干网站...
商务英语期末试卷 Part I Term Translation (20%)Section A: Translate ...