首页 > 编程笔记 > Java笔记 阅读:2

RabbitMQ消息确认机制详解(新手必看)

虽然使用 RabbitMQ 可以降低系统的耦合度,提高整个系统的高并发能力,但是也使得业务变得复杂,可能造成消息丢失,导致业务中断的情况。

使用 RabbitMQ 很可能造成消息丢失,导致业务中断的情况,例如:
针对上面的情况,RabbitMQ 提供了多种消息确认机制,确保消息的正常处理,主要有生产者消息确认机制、Return 消息机制、消费端 ACK 和 Nack 机制这 3 种消息确认模式。

生产者消息确认机制

生产者消息的确认是指生产者发送消息后,如果 Broker 收到消息,则会给生产者一个应答。生产者接收应答,用来确定这条消息是否正常地发送到 Broker,这种方式也是消息可靠性发送的核心保障。

如下图所示,当生产者发送消息到 MQ Broker 时,Broker 会发送一个确认(Confirm),通知发送端已经收到此消息。


图 1 生产者消息确认机制流程图

下面通过示例来演示生产者消息确认机制。

1) 修改配置文件

修改 application.properties 配置文件,增加消息确认机制的相关配置,示例如下:
# 开启确认机制
spring.rabbitmq.publisher-confirms=true
# 设置 acknowledge-mode 为manual 手动模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
在上面的示例中,修改 application.properties 配置文件开启确认机制,设置 acknowledge-mode 为 manual(手动模式)。

2) 配置队列绑定规则

先创建 Topic 配置规则类 ConfirmRabbitConfig,再创建对应的 Exchange、Queue,并将队列绑定到交换机上。示例代码如下:
@Configuration
public class ConfirmRabbitConfig {
    @Bean
    public Queue confirmQueue() {
        return new Queue("rabbit_confirm_queue");
    }
    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange("confirm_direct_exchange");
    }
    @Bean
    public Binding confirmFanoutExchangeBing() {
        return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with("rabbit_confirm_queue");
    }
}
在上面的代码中,定义了 FanoutExchange 交换机,并定义了 rabbit_confirm_queue 队列,然后通过 .bind(confirmQueue()).to(confirmExchange()) 方法将队列绑定到交换机。

3) 生产者

创建生产者,配置确认(Confirm)机制,示例代码如下:
@@Component
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 配置消息确认机制
     */
    private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        /**
         *
         * @param correlationData 消息相关的数据,一般用于获取唯一标识ID
         * @param b 是否发送成功
         * @param error 失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String error) {
            if (b) {
                System.out.println("confirm 消息发送确认成功...消息ID为:" + correlationData.getId());
            } else {
                System.out.println("confirm 消息发送确认失败...消息ID为:" + correlationData.getId() + " 失败原因: " + error);
            }
        }
    };

    /**
     * 发送消息,参数有交换机、空路由键、消息,并设置一个唯一的消息ID
     */
    public void sendConfirm(String routingKey) {
        rabbitTemplate.convertAndSend("confirm_direct_exchange",
                routingKey,
                "这是一个带confirm的消息",
                new CorrelationData("" + System.currentTimeMillis()));
        //使用上方配置的发送回调方法
        rabbitTemplate.setConfirmCallback(confirmCallback);
    }
}
在上面的示例中,首先定义了 ConfirmCallback 消息确认回调方法,然后使用 convertAndSend() 发送消息。我们看到 convertAndSend() 方法多了一个 correlationData 参数。此参数为消息相关的数据,一般用于获取唯一标识 ID。最后使用 setConfirmCallback 配置回调方法。

4) 消费者

首先创建消费者类,然后使用 @RabbitListener 监听 3 个队列,消息处理成功后发送 ACK 应答。示例代码如下:
@Component
public class Consumer {
    @RabbitListener(queues = "rabbit_confirm_queue")
    public void aa(Message message, Channel channel) throws IOException, InterruptedException {
        try {
            System.out.println("正常收到消息:" + new String(message.getBody()));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 两个布尔值,若第二个设为false,则丢弃该消息;若设为true,则返回给队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.out.println("消费失败 我此次将返回给队列");
        }
    }
}
在上面的示例中,与之前的消费者基本一致,只是新增加了 channel.basicAck 方法发送 ACK 确认。

5) 验证测试

创建单元测试类,发送带 Confirm 确认的消息。示例代码如下:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Autowired
    Producer producer;

    @Test
    public void testConfirm() throws InterruptedException {
        producer.sendConfirm("rabbit_confirm_queue");
        Thread.sleep(1000);
    }
}
首先,单击 Run Test 或在方法上右击,选择 Run 'testConfirm',查看后台输出情况,运行结果如下图所示:


图 2 生产者消息确认机制的运行结果

通过上面的输出结果可以看到,消息发送成功,并收到 Broker 返回的 ACK 确认。这说明 RabbitMQ 的生产者消息确认机制配置成功。

如果消息端处理失败,应该如何处理呢?下面再模拟一个消费处理失败的场景。将消费者的代码修改如下:
@Component
public class Consumer {
    @RabbitListener(queues = "rabbit_confirm_queue")
    public void aa(Message message, Channel channel) throws IOException, InterruptedException {
        try {
            System.out.println("正常收到消息:" + new String(message.getBody()));
                   int a = 1/0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 两个布尔值,若第二个设为 false,则丢弃该消息;若设为true,则返回给队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.out.println("消费失败 我此次将返回给队列");
        }
    }
}
在上面的示例中,我们增加了 int a = 1/0,即除 0 异常,模拟消息处理失败的情况。

再次单击 Run Test 或在方法上右击,选择 Run 'testConfirm',查看后台输出情况,运行结果如下图所示:


图 3 生产者消息确认机制的消息处理异常

通过上面的输出结果可以看到,消息处理异常,消息被退回到队列,在 RabbitMQ 管理后台可以看到此消息。修复该异常之后,再次使用生产者发送消息,查看是否会消费两次(之前有一条消息未被消费,正常来说,若该消息没有被丢弃,则下次会继续发送)。

Return机制

我们知道,消息生产者通过指定一个 Exchange 和 routingKey 将消息送达某一个队列中,然后消费者监听队列进行消费处理操作。

在某些情况下,如果在发送消息的时候,当前的 Exchange 不存在或者指定的 routingKey 路由不到,这个时候如果需要监听这种不可达的消息,可以使用 RabbitMQ 提供的 Return 机制处理一些不可路由的消息,如下图所示。


图 4 Return机制

通过配置项 Mandatory 处理此类消息,如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理;如果为 false,则 Broker 服务器会自动删除该消息。

下面通过示例来演示生产者消息确认机制。

1) 修改配置文件

修改 application.properties 配置文件,增加消息确认机制的相关配置,示例如下:
# 开启 Return 确认机制
spring.rabbitmq.publisher-returns=true
# 设置为true后,消费者在消息没有被路由到合适队列的情况下会被Return监听,而不会自动删除
spring.rabbitmq.template.mandatory=true
在上面的示例中,修改 application.properties 配置文件,设置 mandatory 属性为 true,当设置为 true 的时候,路由不到队列的消息不会被自动删除,从而才可以被 Return 消息模式监听到。

2) 修改生产者

修改生产者,配置 Return 消息确认机制,示例代码如下:
@Component
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 配置消息确认机制
     */
    private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        /**
         *
         * @param correlationData 消息相关的数据,一般用于获取唯一标识ID
         * @param b 是否发送成功
         * @param error 失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String error) {
            if (b) {
                System.out.println("confirm 消息发送确认成功...消息ID为:" + correlationData.getId());
            } else {
                System.out.println("confirm 消息发送确认失败...消息ID为:" + correlationData.getId() + " 失败原因: " + error);
            }
        }
    };

    private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {

        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("return exchange: "+exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    /**
     * 发送消息的参数有交换机、空路由键、消息,并设置一个唯一消息ID
     */
    public void sendConfirm(String routingKey) {
        rabbitTemplate.convertAndSend("confirm_direct_exchange",
                routingKey,
                "这是一个带confirm的消息",
                new CorrelationData("" + System.currentTimeMillis()));
        // 使用上方配置的发送回调方法
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
    }
}
在上面的示例中,与之前的生产者示例代码基本相同,只是新增了 ReturnCallback 回调方法,并在发送消息时通过 setReturnCallback() 方法绑定 ReturnCallback。

3) 验证测试

创建单元测试类,发送带 Confirm 确认的消息。示例代码如下:
@Test
public void testConfirm2() throws InterruptedException {
    producer.sendConfirm("rabbit_return1111");
    Thread.sleep(1000);
}
首先,单击 Run Test 或在方法上右击,选择 Run 'testConfirm2',查看后台输出情况,运行结果如下图所示。


图 5 Return机制的运行结果

通过上面的输出结果可以看到,消息发送后,由于未找到 routingKey,导致消息不可达,Broker 服务器自动退回该消息。

消费端ACK和NACK机制

消费者在处理消息时,由于业务异常,我们可以进行日志的记录,然后进行补偿。但是,如果由于服务器宕机等严重问题无法记录日志,如何确保消息被正确处理呢?

这就需要消费端 ACK 和 NACK 机制,手工进行 ACK 确认,保障消费者成功处理消息,把未成功处理的消息再次发送,直到消息处理成功。

RabbitMQ 消费端的确认机制分为 3 种,分别是 none、manual、auto(默认):
下面通过示例来演示消费端 ACK 和 NACK 机制:

1) 修改配置文件

修改 application.properties 配置文件,改为 ACK 手动确认模式,示例如下:
spring.rabbitmq.template.mandatory=true

# ACK手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
在上面的示例中,设置 ACK 模式为 manual,即手动确认模式。

2) 修改消费者

修改消费者 Consumer 的消息处理机制,配置手动 ACK 机制,示例代码如下:
@Component
public class Consumer {
    @RabbitListener(queues = "rabbit_confirm_queue")
    public void process(Message message, Channel channel) throws IOException, InterruptedException {
        try {
            System.out.println("正常收到消息:" + new String(message.getBody()));
            int i=1/0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                System.out.println("消息已重复处理失败,拒绝再次接收");
                // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列,则进入死信队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("消息即将再次返回队列处理");
                // requeue为true时重新入队
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}
上面的示例与之前的消费者示例代码基本相同。只是在 Consumer 中增加了在消息处理成功时,会调用 channel.basicAck() 发送消息 ACK 确认,以及在消息处理失败时,会调用 channel.basicNack() 通知 Exchange 消息处理失败,重新发送消息。当再次处理失败时,则调用 channel.basicReject() 拒绝这条消息。

3) 验证测试

@Test
public void testACK() throws InterruptedException {
    producer.sendConfirm("rabbit_confirm_queue");
    Thread.sleep(1000);
}
单击 Run Test 或在方法上右击,选择 Run 'testACK',查看后台输出情况,运行结果如下图所示:


图 6 消费端消息确认机制的运行结果

通过上面的输出结果可以看到,消息处理失败,重回队列。再次发送失败时,则拒绝该消息。

相关文章