RabbitMQ消息确认机制详解(新手必看)
虽然使用 RabbitMQ 可以降低系统的耦合度,提高整个系统的高并发能力,但是也使得业务变得复杂,可能造成消息丢失,导致业务中断的情况。
使用 RabbitMQ 很可能造成消息丢失,导致业务中断的情况,例如:
针对上面的情况,RabbitMQ 提供了多种消息确认机制,确保消息的正常处理,主要有生产者消息确认机制、Return 消息机制、消费端 ACK 和 Nack 机制这 3 种消息确认模式。
如下图所示,当生产者发送消息到 MQ Broker 时,Broker 会发送一个确认(Confirm),通知发送端已经收到此消息。

图 1 生产者消息确认机制流程图
下面通过示例来演示生产者消息确认机制。

图 2 生产者消息确认机制的运行结果
通过上面的输出结果可以看到,消息发送成功,并收到 Broker 返回的 ACK 确认。这说明 RabbitMQ 的生产者消息确认机制配置成功。
如果消息端处理失败,应该如何处理呢?下面再模拟一个消费处理失败的场景。将消费者的代码修改如下:
再次单击 Run Test 或在方法上右击,选择 Run 'testConfirm',查看后台输出情况,运行结果如下图所示:

图 3 生产者消息确认机制的消息处理异常
通过上面的输出结果可以看到,消息处理异常,消息被退回到队列,在 RabbitMQ 管理后台可以看到此消息。修复该异常之后,再次使用生产者发送消息,查看是否会消费两次(之前有一条消息未被消费,正常来说,若该消息没有被丢弃,则下次会继续发送)。
在某些情况下,如果在发送消息的时候,当前的 Exchange 不存在或者指定的 routingKey 路由不到,这个时候如果需要监听这种不可达的消息,可以使用 RabbitMQ 提供的 Return 机制处理一些不可路由的消息,如下图所示。

图 4 Return机制
通过配置项 Mandatory 处理此类消息,如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理;如果为 false,则 Broker 服务器会自动删除该消息。
下面通过示例来演示生产者消息确认机制。

图 5 Return机制的运行结果
通过上面的输出结果可以看到,消息发送后,由于未找到 routingKey,导致消息不可达,Broker 服务器自动退回该消息。
这就需要消费端 ACK 和 NACK 机制,手工进行 ACK 确认,保障消费者成功处理消息,把未成功处理的消息再次发送,直到消息处理成功。
RabbitMQ 消费端的确认机制分为 3 种,分别是 none、manual、auto(默认):
下面通过示例来演示消费端 ACK 和 NACK 机制:

图 6 消费端消息确认机制的运行结果
通过上面的输出结果可以看到,消息处理失败,重回队列。再次发送失败时,则拒绝该消息。
使用 RabbitMQ 很可能造成消息丢失,导致业务中断的情况,例如:
- 生产者发送消息到 RabbitMQ 服务器失败;
- RabbitMQ 服务器自身故障导致消息丢失;
- 消费者处理消息失败。
针对上面的情况,RabbitMQ 提供了多种消息确认机制,确保消息的正常处理,主要有生产者消息确认机制、Return 消息机制、消费端 ACK 和 Nack 机制这 3 种消息确认模式。
生产者消息确认机制
生产者消息的确认是指生产者发送消息后,如果 Broker 收到消息,则会给生产者一个应答。生产者接收应答,用来确定这条消息是否正常地发送到 Broker,这种方式也是消息可靠性发送的核心保障。如下图所示,当生产者发送消息到 MQ Broker 时,Broker 会发送一个确认(Confirm),通知发送端已经收到此消息。

图 1 生产者消息确认机制流程图
下面通过示例来演示生产者消息确认机制。
1) 修改配置文件
修改 application.properties 配置文件,增加消息确认机制的相关配置,示例如下:# 开启确认机制 spring.rabbitmq.publisher-confirms=true # 设置 acknowledge-mode 为manual 手动模式 spring.rabbitmq.listener.simple.acknowledge-mode=manual在上面的示例中,修改 application.properties 配置文件开启确认机制,设置 acknowledge-mode 为 manual(手动模式)。
2) 配置队列绑定规则
先创建 Topic 配置规则类 ConfirmRabbitConfig,再创建对应的 Exchange、Queue,并将队列绑定到交换机上。示例代码如下:@Configuration public class ConfirmRabbitConfig { @Bean public Queue confirmQueue() { return new Queue("rabbit_confirm_queue"); } @Bean public DirectExchange confirmExchange() { return new DirectExchange("confirm_direct_exchange"); } @Bean public Binding confirmFanoutExchangeBing() { return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with("rabbit_confirm_queue"); } }在上面的代码中,定义了 FanoutExchange 交换机,并定义了 rabbit_confirm_queue 队列,然后通过 .bind(confirmQueue()).to(confirmExchange()) 方法将队列绑定到交换机。
3) 生产者
创建生产者,配置确认(Confirm)机制,示例代码如下:@@Component public class Producer { @Autowired private RabbitTemplate rabbitTemplate; /** * 配置消息确认机制 */ private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 消息相关的数据,一般用于获取唯一标识ID * @param b 是否发送成功 * @param error 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean b, String error) { if (b) { System.out.println("confirm 消息发送确认成功...消息ID为:" + correlationData.getId()); } else { System.out.println("confirm 消息发送确认失败...消息ID为:" + correlationData.getId() + " 失败原因: " + error); } } }; /** * 发送消息,参数有交换机、空路由键、消息,并设置一个唯一的消息ID */ public void sendConfirm(String routingKey) { rabbitTemplate.convertAndSend("confirm_direct_exchange", routingKey, "这是一个带confirm的消息", new CorrelationData("" + System.currentTimeMillis())); //使用上方配置的发送回调方法 rabbitTemplate.setConfirmCallback(confirmCallback); } }在上面的示例中,首先定义了 ConfirmCallback 消息确认回调方法,然后使用 convertAndSend() 发送消息。我们看到 convertAndSend() 方法多了一个 correlationData 参数。此参数为消息相关的数据,一般用于获取唯一标识 ID。最后使用 setConfirmCallback 配置回调方法。
4) 消费者
首先创建消费者类,然后使用 @RabbitListener 监听 3 个队列,消息处理成功后发送 ACK 应答。示例代码如下:@Component public class Consumer { @RabbitListener(queues = "rabbit_confirm_queue") public void aa(Message message, Channel channel) throws IOException, InterruptedException { try { System.out.println("正常收到消息:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 两个布尔值,若第二个设为false,则丢弃该消息;若设为true,则返回给队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.out.println("消费失败 我此次将返回给队列"); } } }在上面的示例中,与之前的消费者基本一致,只是新增加了 channel.basicAck 方法发送 ACK 确认。
5) 验证测试
创建单元测试类,发送带 Confirm 确认的消息。示例代码如下:@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Autowired Producer producer; @Test public void testConfirm() throws InterruptedException { producer.sendConfirm("rabbit_confirm_queue"); Thread.sleep(1000); } }首先,单击 Run Test 或在方法上右击,选择 Run 'testConfirm',查看后台输出情况,运行结果如下图所示:

图 2 生产者消息确认机制的运行结果
通过上面的输出结果可以看到,消息发送成功,并收到 Broker 返回的 ACK 确认。这说明 RabbitMQ 的生产者消息确认机制配置成功。
如果消息端处理失败,应该如何处理呢?下面再模拟一个消费处理失败的场景。将消费者的代码修改如下:
@Component public class Consumer { @RabbitListener(queues = "rabbit_confirm_queue") public void aa(Message message, Channel channel) throws IOException, InterruptedException { try { System.out.println("正常收到消息:" + new String(message.getBody())); int a = 1/0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 两个布尔值,若第二个设为 false,则丢弃该消息;若设为true,则返回给队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.out.println("消费失败 我此次将返回给队列"); } } }在上面的示例中,我们增加了 int a = 1/0,即除 0 异常,模拟消息处理失败的情况。
再次单击 Run Test 或在方法上右击,选择 Run 'testConfirm',查看后台输出情况,运行结果如下图所示:

图 3 生产者消息确认机制的消息处理异常
通过上面的输出结果可以看到,消息处理异常,消息被退回到队列,在 RabbitMQ 管理后台可以看到此消息。修复该异常之后,再次使用生产者发送消息,查看是否会消费两次(之前有一条消息未被消费,正常来说,若该消息没有被丢弃,则下次会继续发送)。
Return机制
我们知道,消息生产者通过指定一个 Exchange 和 routingKey 将消息送达某一个队列中,然后消费者监听队列进行消费处理操作。在某些情况下,如果在发送消息的时候,当前的 Exchange 不存在或者指定的 routingKey 路由不到,这个时候如果需要监听这种不可达的消息,可以使用 RabbitMQ 提供的 Return 机制处理一些不可路由的消息,如下图所示。

图 4 Return机制
通过配置项 Mandatory 处理此类消息,如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理;如果为 false,则 Broker 服务器会自动删除该消息。
下面通过示例来演示生产者消息确认机制。
1) 修改配置文件
修改 application.properties 配置文件,增加消息确认机制的相关配置,示例如下:# 开启 Return 确认机制 spring.rabbitmq.publisher-returns=true # 设置为true后,消费者在消息没有被路由到合适队列的情况下会被Return监听,而不会自动删除 spring.rabbitmq.template.mandatory=true在上面的示例中,修改 application.properties 配置文件,设置 mandatory 属性为 true,当设置为 true 的时候,路由不到队列的消息不会被自动删除,从而才可以被 Return 消息模式监听到。
2) 修改生产者
修改生产者,配置 Return 消息确认机制,示例代码如下:@Component public class Producer { @Autowired private RabbitTemplate rabbitTemplate; /** * 配置消息确认机制 */ private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 消息相关的数据,一般用于获取唯一标识ID * @param b 是否发送成功 * @param error 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean b, String error) { if (b) { System.out.println("confirm 消息发送确认成功...消息ID为:" + correlationData.getId()); } else { System.out.println("confirm 消息发送确认失败...消息ID为:" + correlationData.getId() + " 失败原因: " + error); } } }; private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return exchange: "+exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); } }; /** * 发送消息的参数有交换机、空路由键、消息,并设置一个唯一消息ID */ public void sendConfirm(String routingKey) { rabbitTemplate.convertAndSend("confirm_direct_exchange", routingKey, "这是一个带confirm的消息", new CorrelationData("" + System.currentTimeMillis())); // 使用上方配置的发送回调方法 rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); } }在上面的示例中,与之前的生产者示例代码基本相同,只是新增了 ReturnCallback 回调方法,并在发送消息时通过 setReturnCallback() 方法绑定 ReturnCallback。
3) 验证测试
创建单元测试类,发送带 Confirm 确认的消息。示例代码如下:@Test public void testConfirm2() throws InterruptedException { producer.sendConfirm("rabbit_return1111"); Thread.sleep(1000); }首先,单击 Run Test 或在方法上右击,选择 Run 'testConfirm2',查看后台输出情况,运行结果如下图所示。

图 5 Return机制的运行结果
通过上面的输出结果可以看到,消息发送后,由于未找到 routingKey,导致消息不可达,Broker 服务器自动退回该消息。
消费端ACK和NACK机制
消费者在处理消息时,由于业务异常,我们可以进行日志的记录,然后进行补偿。但是,如果由于服务器宕机等严重问题无法记录日志,如何确保消息被正确处理呢?这就需要消费端 ACK 和 NACK 机制,手工进行 ACK 确认,保障消费者成功处理消息,把未成功处理的消息再次发送,直到消息处理成功。
RabbitMQ 消费端的确认机制分为 3 种,分别是 none、manual、auto(默认):
- none:表示没有任何应答会被发送。
-
manual:表示监听者必须通过调用 channel.basicAck() 来告知消息被处理:
- channel.basicAck(long,boolean):确认收到消息,消息将从队列中被移除,为false时只确认当前一个消费者收到的消息,为true时确认所有消费者收到的消息。
- channel.basicNack(long,boolean,boolean):确认没有收到消息,第一个boolean表示是一个消费者还是所有的消费者,第二个boolean表示消息是否重新回到队列,为true时表示重新入队。
- channel.basicReject(long,boolean):拒绝消息,requeue=false 表示消息不再重新入队,如果配置了死信队列,则消息进入死信队列。消息重回队列时,该消息不会回到队列尾部,仍在队列头部,这时消费者又会接收到这条消息,如果想让消息进入队列尾部,需确认消息后再次发送消息。
-
auto:表示自动应答,除非 MessageListener 抛出异常,这是默认配置方式:
- 如果消息成功处理,则自动确认。
- 当发生异常抛出 AmqpRejectAndDontRequeueException 时,则消息会被拒绝且不重新进入队列。
- 当发生异常抛出 ImmediateAcknowledgeAmqpException 时,则消费者会被确认。
- 当抛出其他的异常时,则消息会被拒绝,且 requeue=true 时会发生死循环,可以通过 setDefaultRequeueRejected(默认是 true)设置抛弃消息。
下面通过示例来演示消费端 ACK 和 NACK 机制:
1) 修改配置文件
修改 application.properties 配置文件,改为 ACK 手动确认模式,示例如下:spring.rabbitmq.template.mandatory=true # ACK手动确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual在上面的示例中,设置 ACK 模式为 manual,即手动确认模式。
2) 修改消费者
修改消费者 Consumer 的消息处理机制,配置手动 ACK 机制,示例代码如下:@Component public class Consumer { @RabbitListener(queues = "rabbit_confirm_queue") public void process(Message message, Channel channel) throws IOException, InterruptedException { try { System.out.println("正常收到消息:" + new String(message.getBody())); int i=1/0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { System.out.println("消息已重复处理失败,拒绝再次接收"); // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列,则进入死信队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("消息即将再次返回队列处理"); // requeue为true时重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }上面的示例与之前的消费者示例代码基本相同。只是在 Consumer 中增加了在消息处理成功时,会调用 channel.basicAck() 发送消息 ACK 确认,以及在消息处理失败时,会调用 channel.basicNack() 通知 Exchange 消息处理失败,重新发送消息。当再次处理失败时,则调用 channel.basicReject() 拒绝这条消息。
3) 验证测试
@Test public void testACK() throws InterruptedException { producer.sendConfirm("rabbit_confirm_queue"); Thread.sleep(1000); }单击 Run Test 或在方法上右击,选择 Run 'testACK',查看后台输出情况,运行结果如下图所示:

图 6 消费端消息确认机制的运行结果
通过上面的输出结果可以看到,消息处理失败,重回队列。再次发送失败时,则拒绝该消息。