先执行业务操作,业务操作成功后执行行消息发送,消息发送过程通过try catch 方式捕获异常,
在异常处理理的代码块中执行回滚业务操作或者执行重发操作等。这是一种最大努力确保的方式,
并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。
另外,可以通过spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试。
没有捕获到异常并不能代表消息就一定投递成功了。
一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销
比较大,一般也不推荐使用。
事务实现
channel.txSelect(): 将当前信道设置成事务模式
channel.txCommit(): 用于提交事务
channel.txRollback(): 用于回滚事务
RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信
道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派
一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么
确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一
ID),这样生产者就知道消息已经正确送达了。
RabbitMQ 回传给生产者的确认消息中的deliveryTag 字段包含了确认消息的序号,另外,通过设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息是否都已经得到了处理了。生产者投递消息后并不需要一直阻塞着,可以继续投递下一条消息并通过回调方式处理理ACK响应。如果 RabbitMQ 因为自身内部错误导致消息丢失等异常情况发生,就会响应一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理理该 nack 命令。
package confirm;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import util.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherConfirmsProducer {public static void main(String[] args) throws Exception{Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();// 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();channel.queueDeclare("queue.pc", true, false, false, null);channel.exchangeDeclare("ex.pc", "direct", true, false, null);channel.queueBind("queue.pc", "ex.pc", "key.pc");try {// 发送消息for (int i = 1 ; i < 10000 ; i++){channel.basicPublish("ex.pc", "key.pc", null, "hello world".getBytes());}// 同步的方式等待RabbitMQ的确认消息channel.waitForConfirmsOrDie(5000);System.out.println("发送的消息已经得到确认");} catch (IOException ex) {System.out.println("消息被拒收");} catch (IllegalStateException ex) {System.out.println("发送消息的通道不是PublisherConfirms通道");} catch (TimeoutException ex) {System.out.println("等待消息确认超时");}channel.close();connection.close();}
}
waitForConfirm方法有个重载的,可以自定义timeout超时时间,超时后会抛TimeoutException。类似的有几个waitForConfirmsOrDie方法,Broker端在返回nack(Basic.Nack)之后该方法会抛出java.io.IOException。需要根据异常类型来做区别处理理, TimeoutException超时是属于第三状态(无法确定成功还是失败),而返回Basic.Nack抛出IOException这种是明确的失败。上面的代码主要只是演示confirm机制,实际上还是同步阻塞模式的,性能并不不是太好。
实际上,我们也可以通过“批处理理”的方式来改善整体的性能(即批量量发送消息后仅调用一次
waitForConfirms方法)。正常情况下这种批量处理的方式效率会高很多,但是如果发生了超时或者nack(失败)后那就需要批量量重发消息或者通知上游业务批量回滚(因为我们只知道这个批次中有消息没投递成功,而并不知道具体是那条消息投递失败了,所以很难针对性处理),如此看来,批量重发消息肯定会造成部分消息重复。另外,我们可以通过异步回调的方式来处理Broker的响应。addConfirmListener 方法可以添加ConfirmListener 这个回调接口,这个 ConfirmListener 接口包含两个方法:handleAck 和handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。
package confirm;/*** 创建者: 魏红* 创建时间: 2023-02-28* 描述:*/
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import util.ConnectionUtil;public class PublisherConfirmsProducer2 {public static void main(String[] args) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();// 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();channel.queueDeclare("queue.pc", true, false, false, null);channel.exchangeDeclare("ex.pc", "direct", true, false, null);channel.queueBind("queue.pc", "ex.pc", "key.pc");String message = "hello-";// 批处理的大小int batchSize = 10;// 用于对需要等待确认消息的计数int outstrandingConfirms = 0;for (int i = 0; i < 10000; i++) {channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());outstrandingConfirms++;if (outstrandingConfirms == batchSize) {// 此时已经有一个批次的消息需要同步等待broker的确认消息// 同步等待channel.waitForConfirmsOrDie(5000);System.out.println("消息已经被确认了");outstrandingConfirms = 0;}}if (outstrandingConfirms > 0) {channel.waitForConfirmsOrDie(5000);System.out.println("剩余消息已经被确认了");}channel.close();connection.close();}
}
还可以使用异步方法:
package confirm;/*** 创建者: 魏红* 创建时间: 2023-02-28* 描述:*/import com.rabbitmq.client.*;
import util.ConnectionUtil;import javax.management.loading.MLet;
import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;public class PublisherConfirmsProducer3 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();// 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();channel.queueDeclare("queue.pc", true, false, false, null);channel.exchangeDeclare("ex.pc", "direct", true, false, null);channel.queueBind("queue.pc", "ex.pc", "key.pc");// ConfirmCallback clearOutstandingConfirms = new ConfirmCallback() {
// @Override
// public void handle(long deliveryTag, boolean multiple) throws IOException {
// if (multiple) {
// System.out.println("编号小于等于 " + deliveryTag + " 的消息都已经被确认了");
// } else {
// System.out.println("编号为:" + deliveryTag + " 的消息被确认");
// }
// }
// };ConcurrentNavigableMap outstandingConfirms = new ConcurrentSkipListMap<>();ConfirmCallback clearOutstandingConfirms = (deliveryTag, multiple) -> {if (multiple) {System.out.println("编号小于等于 " + deliveryTag + " 的消息都已经被确认了");final ConcurrentNavigableMap headMap= outstandingConfirms.headMap(deliveryTag, true);// 清空outstandingConfirms中已经被确认的消息信息headMap.clear();} else {// 移除已经被确认的消息outstandingConfirms.remove(deliveryTag);System.out.println("编号为:" + deliveryTag + " 的消息被确认");}};ConfirmCallback confirmCallback = (deliveryTag, multiple) -> {if (multiple) {// 将没有确认的消息记录到一个集合中// 此处省略实现System.out.println("消息编号小于等于:" + deliveryTag + " 的消息 不确认");} else {System.out.println("编号为:" + deliveryTag + " 的消息不确认");}};// 设置channel的监听器,处理确认的消息和不确认的消息channel.addConfirmListener(clearOutstandingConfirms, confirmCallback);String message = "hello-";for (int i = 0; i < 500000; i++) {// 获取下一条即将发送的消息的消息IDfinal long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());System.out.println("编号为:" + nextPublishSeqNo + " 的消息已经发送成功,尚未确认");outstandingConfirms.put(nextPublishSeqNo, (message + i));}// 等待消息被确认Thread.sleep(10000);channel.close();connection.close();}
}
持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机等)数据将会丢失。主要从以下几个方面来保障消息的持久性:
前面我们讲了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的
可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我
们又没有任何重试,那结果跟消息丢失没什么分别。
RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自
己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。这也是我们之前一直在讲的“最终一致性”、“可恢复性” 的基础。
一般而言,我们有如下处理手段:
package workmode;import com.rabbitmq.client.*;
import util.ConnectionUtil;import java.io.IOException;/**
* NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
* AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
* MANUAL模式,需要显式的调用当前channel的basicAck方法*/
public class Recer2 {public static void main(String[] args) throws Exception {// 1.获得连接Connection connection = ConnectionUtil.getConnection();// 2.获得通道(信道)final Channel channel = connection.createChannel();channel.queueDeclare("work_queue",false,false,false,null);// 3.从信道中获得消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);
// System.out.println("【顾客2】吃掉 " + s+" ! 总共吃【"+i+++"】串!");System.out.println("【消费者2】得到 " + s);// 模拟网络延迟try{Thread.sleep(400);}catch (Exception e){}// 手动确认(收件人信息,是否同时确认多个消息)channel.basicAck(envelope.getDeliveryTag(),false);}};// 4.监听队列 false:手动消息确认channel.basicConsume("work_queue", false,consumer);}
}
本小节的内容总结起来就如图所示,本质上就是“请求/应答”确认模式
下一篇: 温馨教师节祝福语