如何用MQ实现RPC远程调用?(附代码)
创始人
2024-06-03 08:38:38
0

💗推荐阅读文章💗

  • 🌸JavaSE系列🌸👉1️⃣《JavaSE系列教程》
  • 🌺MySQL系列🌺👉2️⃣《MySQL系列教程》
  • 🍀JavaWeb系列🍀👉3️⃣《JavaWeb系列教程》
  • 🌻SSM框架系列🌻👉4️⃣《SSM框架系列教程》

🎉本博客知识点收录于🎉👉🚀《RabbitMQ系列教程》🚀—>✈️《RabbitMQ系列教程-第四章-06-RabbitMQ工作模式之RPC模式》✈️

文章目录

    • 4.6 RPC 模式
      • 4.6.1 简介
      • 4.6.2 客户端
      • 4.6.2 服务端
      • 4.6.3 RPC模式小结

4.6 RPC 模式

4.6.1 简介

以前的几种模式的通信都是基于Producer发送消息到Consumer,然后Consumer进行消费,假设我们需要Consumer操作完毕之后返回给Producer一个回调呢?前面几种模式就行不通了;

例如我们要做一个远程调用加钱操作,客户端远程调用服务端进行加钱操作,操作完毕之后服务端将用户最新的余额返回给客户端;客户端进行后续操作,例如更新到数据库等;

  • RPC业务分析

在这里插入图片描述

在RPC模式中,客户端和服务器都是Producer也都是Consumer;

RPC模式官网介绍:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

  • RPC调用图解:

在这里插入图片描述

4.6.2 客户端

package com.dfbz.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;public class RPCClient implements AutoCloseable {public Connection connection;public Channel channel;public static final String RPC_QUEUE_NAME = "rpc_queue";public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.40.132");factory.setPort(5672);factory.setUsername("lscl");factory.setPassword("admin");factory.setVirtualHost("/lscl");connection = factory.newConnection();channel = connection.createChannel();}public static void main(String[] argv) throws Exception {// 初始化信息RPCClient rpcClient = new RPCClient();// 发起远程调用Integer response = rpcClient.call(20);System.out.println(response);rpcClient.channel.close();rpcClient.connection.close();}public Integer call(Integer money) throws IOException, InterruptedException {// 随机生成一个correlationId(密钥)final String corrId = UUID.randomUUID().toString();// 后期服务端回调给客户端的队列名(随机生成的回调队列名)String replyQueueName = channel.queueDeclare().getQueue();// 设置发送消息的一些参数AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId)			// 密钥.replyTo(replyQueueName)		// 回调队列名.build();// 采用Simple模式发送给Server端channel.basicPublish("", RPC_QUEUE_NAME, props, (money + "").getBytes("UTF-8"));// 定义延迟队列final BlockingQueue response = new ArrayBlockingQueue<>(1);channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {// 回调方法,当收到消息之后,会自动执行该方法public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {if (properties.getCorrelationId().equals(corrId)) {System.out.println("响应的消息:" + new String(body));// 往延迟队列中添加信息(服务端响应的最新余额)response.offer(Integer.parseInt(new String(body, "UTF-8")));}}});// 获取延迟队列中的信息(如果没有信息将一直阻塞)return response.take();}public void close() throws IOException {connection.close();}
}

4.6.2 服务端

package com.dfbz.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";// 总金额private static Integer money = 0;/*** 加钱方法* @param n* @return*/private static Integer addMoney(int n) {money += n;return money;}public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.40.132");factory.setPort(5672);factory.setUsername("lscl");factory.setPassword("admin");factory.setVirtualHost("/lscl");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null);System.out.println("等待客户端请求.....");while (true) {// 接受到客户端的请求(消息)channel.basicConsume(RPC_QUEUE_NAME, true, new DefaultConsumer(channel) {// 回调方法,当收到消息之后,会自动执行该方法public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 本次的消息配置AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId())		// 客户端发送的密钥.build();System.out.println("客户端的消息: "+new String(body,"UTF-8"));String response = "";try {String message = new String(body, "UTF-8");// 调用加钱方法response = addMoney(Integer.parseInt(message)) + "";} finally {// 发送一个消息给客户端/*properties.getReplyTo(): Client端设置的回调队列名replyProps:	封装的参数(主要是CorrelationId)*/channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));}}});}}}
}

4.6.3 RPC模式小结

严格意义上来说RPC并不是一种新的交换模式,他其实还是借助原有的模式(上述案例是采用Simple模式)来达到一些不同的功能,在RPC模式中只有客户端和服务端,并且客户端和服务端都既是Producer也是Consumer;

在这里插入图片描述

Tips:RPC模式已经违背了消息队列设计的初衷了;即一些无需及时返回且耗时的操作;

相关内容

热门资讯

宫崎骏《起风了》的经典台词 宫崎骏《起风了》的经典台词  1、起风了,唯有努力生存。  2、再没有什么比幸福的回忆更妨碍幸福的了...
公司成立周年庆典主持词 公司成立周年庆典主持词  主持词可以采用和历史文化有关的表述方法去写作以提升活动的文化内涵。在各种集...
辩论赛主持词 关于辩论赛主持词4篇  契合现场环境的主持词能给集会带来双倍的效果。在当下的中国社会,各种集会中主持...
同学三十周年聚会主持词 同学三十周年聚会主持词尊敬的各位老师、亲爱的同学们:  大家好!  风霜雪雨三十载,师生情谊天长地久...
六一儿童节活动主持稿 有关六一儿童节活动主持稿(通用7篇)  随着社会一步步向前发展,越来越多地方需要用到主持稿,主持稿的...
春到福来春晚主持词 春到福来春晚主持词  节目:《春到福来》  朱军:春到福来,春上春伦云天外  周涛:春到福来,春向黄...
平安夜晚会主持词 平安夜晚会主持词  主持词是主持人在节目进行过程中用于串联节目的串联词。在一步步向前发展的社会中,司...
农村简单结婚典礼主持词 农村简单结婚典礼主持词  一、结婚典礼的内容简介  在世界各国大部分的文化里,会发展出一些结婚上的传...
大话西游最经典的台词 大话西游最经典的台词  大话西游是周星驰主演的一部经典的喜剧爱情片。里面的台词曾感染了无数观众。以下...
公司会议主持词 关于公司会议主持词(通用5篇)  主持词要把握好吸引观众、导入主题、创设情境等环节以吸引观众。在如今...
生日派对会的主持串词 关于生日派对会的主持串词  作者:赵可心  题目:我非常高兴  要求:用普通话  环节:  1、 开...
少先队员宣誓主持词 少先队员宣誓主持词(精选8篇)  主持词已成为各种演出活动和集会中不可或缺的一部分。我们眼下的社会,...
圣诞节活动主持词 圣诞节活动主持词(精选14篇)  主持人在台上表演的灵魂就表现在主持词中。在当今中国社会,主持词在各...
葬礼主持词 葬礼主持词(精选8篇)  主持词可以采用和历史文化有关的表述方法去写作以提升活动的文化内涵。在一步步...
主持词 主持词范文(精选21篇)  主持词需要富有情感,充满热情,才能有效地吸引到观众。在如今这个时代,活动...
六一儿童节颁奖主持词 六一儿童节颁奖主持词范文(通用5篇)  主持词分为会议主持词、晚会主持词、活动主持词、婚庆主持词等。...
业主在开工典礼的致辞 业主在开工典礼的致辞范文(精选12篇)  无论在学习、工作或是生活中,大家肯定对各类致辞都很熟悉吧,...
六一儿童节主持词 六一儿童节主持词(精选15篇)  契合现场环境的主持词能给集会带来双倍的效果。在当下的社会中,很多晚...
圣诞节主持词开场白   圣诞节(Christmas)又称耶诞节,译名为“基督弥撒”,西方传统节日,在每年12月25日。下...
黑龙江年度经济风云人物颁奖典... 黑龙江年度经济风云人物颁奖典礼的主持词  (灯光,音乐)  甲:各位领导  乙:各位来宾  丙:现场...