首页 > 编程笔记 > Java笔记 阅读:2

SpringBoot实现RabbitMQ广播模式(附带实例)

Fanout 就是熟悉的广播模式或者订阅模式,每个发送到 Fanout 类型交换机的消息都会分到所有绑定的队列上。

Fanout 交换机不处理路由键,只是简单地将队列绑定到交换机上,每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。

如下图所示,Fanout 模式很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 类型转发消息是最快的。


图 1 Fanout 消息转发模式

配置Fanout规则

先创建 Fanout 规则配置类 FanoutRabbitConfig,再创建对应的 Exchange、Queue,并将队列绑定到交换机上。示例代码如下:
@Configuration
public class FanoutRabbitConfig {
    // 定义队列
    @Bean
    public Queue Q1Message() {
        return new Queue("fanout.Q1");
    }
    @Bean
    public Queue Q2Message() {
        return new Queue("fanout.Q2");
    }
    @Bean
    public Queue Q3Message() {
        return new Queue("fanout.Q3");
    }
    // 定义交换机
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    // 分别进行绑定
    @Bean
    Binding bindingExchangeQ1(Queue Q1Message, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(Q1Message).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeQ2(Queue Q2Message, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(Q2Message).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeQ3(Queue Q3Message, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(Q3Message).to(fanoutExchange);
    }
}
在上面的示例中,首先定义了交换机 fanoutExchange,然后分别定义了 Q1、Q2、Q3 三个队列,最后将三个队列绑定到 Fanout 交换机上。

发送者

创建发送者,示例代码如下:
@Component
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void produce() {
        String context = "fanout msg weiz";
        System.out.println("Fanout Sender : " + context);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    }
}
在上面的示例中,通过 convertAndSend() 方法发送消息。使用 Fanout 广播模式无须指定 routingKey,默认往交换机上的所有队列广播此消息。

接收者

创建 3个接收者分别监听 Q1、Q2、Q3 队列。示例代码如下:
@Component
public class Consumer {

    @RabbitHandler
    @RabbitListener(queues = "fanout.Q1")
    public void processA(String message) {
        System.out.println("fanout Receiver Q1: " + message);
    }

    @RabbitHandler
    @RabbitListener(queues = "fanout.Q2")
    public void processB(String message) {
        System.out.println("fanout Receiver Q2: " + message);
    }

    @RabbitHandler
    @RabbitListener(queues = "fanout.Q3")
    public void processC(String message) {
        System.out.println("fanout Receiver Q3: " + message);
    }
}
在上面的示例中,定义消息接收者,通过 @RabbitListener 注解接收 fanout.Q1 等 3 个队列的消息。

运行测试

创建单元测试类 ApplicationTests,编写测试用例发送消息进行测试,示例代码如下:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
    @Autowired
    Producer producer;

    @Test
    public void testFanout() throws InterruptedException {
        producer.produce();
        Thread.sleep(1000);
    }
}
单击 Run Test 或在方法上右击,选择 Run 'testFanout',运行单元测试程序,查看后台输出情况,运行结果如下图所示:


图 2 广播模式单元测试的运行结果

结果表明绑定到 Fanout 交换机上的队列 Q1、Q2、Q3 都收到了消息。

相关文章