💗推荐阅读文章💗
- 🌸JavaSE系列🌸👉1️⃣《JavaSE系列教程》
- 🌺MySQL系列🌺👉2️⃣《MySQL系列教程》
- 🍀JavaWeb系列🍀👉3️⃣《JavaWeb系列教程》
- 🌻SSM框架系列🌻👉4️⃣《SSM框架系列教程》
🎉本博客知识点收录于🎉👉🚀《RabbitMQ系列教程》🚀—>✈️《RabbitMQ系列教程-第四章-06-RabbitMQ工作模式之RPC模式》✈️
以前的几种模式的通信都是基于Producer发送消息到Consumer,然后Consumer进行消费,假设我们需要Consumer操作完毕之后返回给Producer一个回调呢?前面几种模式就行不通了;
例如我们要做一个远程调用加钱操作,客户端远程调用服务端进行加钱操作,操作完毕之后服务端将用户最新的余额返回给客户端;客户端进行后续操作,例如更新到数据库等;
在RPC模式中,客户端和服务器都是Producer也都是Consumer;
RPC模式官网介绍:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
package com.dfbz.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;public class RPCClient implements AutoCloseable {public Connection connection;public Channel channel;public static final String RPC_QUEUE_NAME = "rpc_queue";public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.40.132");factory.setPort(5672);factory.setUsername("lscl");factory.setPassword("admin");factory.setVirtualHost("/lscl");connection = factory.newConnection();channel = connection.createChannel();}public static void main(String[] argv) throws Exception {// 初始化信息RPCClient rpcClient = new RPCClient();// 发起远程调用Integer response = rpcClient.call(20);System.out.println(response);rpcClient.channel.close();rpcClient.connection.close();}public Integer call(Integer money) throws IOException, InterruptedException {// 随机生成一个correlationId(密钥)final String corrId = UUID.randomUUID().toString();// 后期服务端回调给客户端的队列名(随机生成的回调队列名)String replyQueueName = channel.queueDeclare().getQueue();// 设置发送消息的一些参数AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId) // 密钥.replyTo(replyQueueName) // 回调队列名.build();// 采用Simple模式发送给Server端channel.basicPublish("", RPC_QUEUE_NAME, props, (money + "").getBytes("UTF-8"));// 定义延迟队列final BlockingQueue response = new ArrayBlockingQueue<>(1);channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {// 回调方法,当收到消息之后,会自动执行该方法public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {if (properties.getCorrelationId().equals(corrId)) {System.out.println("响应的消息:" + new String(body));// 往延迟队列中添加信息(服务端响应的最新余额)response.offer(Integer.parseInt(new String(body, "UTF-8")));}}});// 获取延迟队列中的信息(如果没有信息将一直阻塞)return response.take();}public void close() throws IOException {connection.close();}
}
package com.dfbz.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";// 总金额private static Integer money = 0;/*** 加钱方法* @param n* @return*/private static Integer addMoney(int n) {money += n;return money;}public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.40.132");factory.setPort(5672);factory.setUsername("lscl");factory.setPassword("admin");factory.setVirtualHost("/lscl");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null);System.out.println("等待客户端请求.....");while (true) {// 接受到客户端的请求(消息)channel.basicConsume(RPC_QUEUE_NAME, true, new DefaultConsumer(channel) {// 回调方法,当收到消息之后,会自动执行该方法public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 本次的消息配置AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()) // 客户端发送的密钥.build();System.out.println("客户端的消息: "+new String(body,"UTF-8"));String response = "";try {String message = new String(body, "UTF-8");// 调用加钱方法response = addMoney(Integer.parseInt(message)) + "";} finally {// 发送一个消息给客户端/*properties.getReplyTo(): Client端设置的回调队列名replyProps: 封装的参数(主要是CorrelationId)*/channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));}}});}}}
}
严格意义上来说RPC并不是一种新的交换模式,他其实还是借助原有的模式(上述案例是采用Simple模式)来达到一些不同的功能,在RPC模式中只有客户端和服务端,并且客户端和服务端都既是Producer也是Consumer;
Tips:RPC模式已经违背了消息队列设计的初衷了;即一些无需及时返回且耗时的操作;