SpringBoot整合rabbitMq

Java
317
0
0
2023-03-27

SpringBoot整合rabbitMq

一、介绍

消息队列(Message Queue)简称mq,本文将介绍SpringBoot整合rabbitmq的功能使用

队列是一种数据结构,就像排队一样,遵循先进先出的原则。

而消息队列是一种消息中间件,在项目中我们可以将消息打包放入队列,再由消费者监听进行处理数据,再进行业务的处理

那么使用队列可以带来哪些好处呢

  1. 解耦
  2. 异步
  3. 流量削峰/限流

原本的程序再装上了消息中间件后,有哪些事需要特别注意的呢

  1. 高可用:简单点就是说,要保证消息中间件不要那么容易崩溃。作为桥梁的消息中间件崩溃了,那整个系统就相当于崩溃的情况的
  2. 数据丢失
  3. 重复消费
  4. 顺序性

话不多说,优缺点的例子以后补上,先来开始使用rabbitmq吧

二、rabbitmq的安装,项目依赖和相关配置

本文rabbitmq使用的版本是3.8.5,在不同版本下可能会出现与本文不同的结果,甚至是报错

创建springBoot项目,引入项目依赖,本文使用的springBoot版本为2.4.2,算是比较新的了

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.73</version>
    </dependency>
</dependencies>

配置yaml文件

server:
    port: 8011

spring:
    application:
        name: rabbitmq_learn
    rabbitmq:
        host: 主机名
        port: 端口
        username: 帐号
        password: 密码

三、队列类型

3.1、简单模式

首先创建队列常量类RabbitmqConstant,声明常量为队列名称,当然也可以写在配置文件

public class RabbitmqConstant {    
    /** ================  简单模式 begin  ================ */
    public static final String LEARN_SIMPLE_QUEUE = "LEARN.SIMPLE.QUEUE";
    /** ================  简单模式  end  ================ */
}

创建队列配置类SimpleRabbitmqConfig,声明创建队列

package com.banmoon.learn.config;

import com.banmoon.learn.constants.RabbitmqConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SimpleRabbitmqConfig {
    
    @Bean(name = RabbitmqConstant.LEARN_SIMPLE_QUEUE)
    public Queue queue(){
        return new Queue(RabbitmqConstant.LEARN_SIMPLE_QUEUE, false, false, true);
    }
    
}
简单说明一下,这几个参数的意思 参数名 说明 name 定义队列的名称 durable 是否持久化,重启rabbitmq队列是否还存在,默认为true exclusive 是否排他,是否仅保持一个连接,且该连接断线后,此队列会被删除,默认为false autoDelete 是否自动删除,当队列没有消息一段时间后自动删除,默认为false arguments 参数,可以设置队列的最大消息数等

创建此队列的生产者类SimpleMsgProducer,提供send方法进行生产消息

package com.banmoon.learn.rabbitmq.producer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SimpleMsgProducer {
    
    @Autowired
    private AmqpTemplate amqpTemplate;
    
    public void send(SendBaseMsgReq req){
        amqpTemplate.convertAndSend(RabbitmqConstant.LEARN_SIMPLE_QUEUE, JSON.toJSONString(req));
    }
    
}

创建此队列的消费者类SimpleMsgConsumer,监听队列进行消费

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SimpleMsgConsumer {
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_SIMPLE_QUEUE)
    public void simpleMsgConsumerListener(String message) {
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("简单模式消费:{}", req.getContent());
    }

}

创建一个控制类TestController,用来测试生产消息

package com.banmoon.learn.controller;

import com.banmoon.learn.rabbitmq.producer.*;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private SimpleMsgProducer simpleMsgProducer;
    
    @GetMapping("/sendSimpleMsg")
    public String sendSimpleMsg(@RequestParam String content){
        SendBaseMsgReq req = new SendBaseMsgReq();
        req.setContent(content);
        simpleMsgProducer.send(req);
        return "发送成功";
    }
    
}

进行测试,请求3次

3.2、work模式

队列常量类RabbitmqConstant,声明常量为队列名称

public class RabbitmqConstant {    
    /** ================  工作模式 begin  ================ */
    public static final String LEARN_WORK_QUEUE = "LEARN.WORK.QUEUE";
    /** ================  工作模式  end  ================ */
}

创建队列配置类WorkRabbitmqConfig,声明创建队列

package com.banmoon.learn.config;

import com.banmoon.learn.constants.RabbitmqConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class WorkRabbitmqConfig {
    
    @Bean(name = RabbitmqConstant.LEARN_WORK_QUEUE)
    public Queue queue(){
        return new Queue(RabbitmqConstant.LEARN_WORK_QUEUE, false, false, false);
    }
    
}

创建此队列的生产者类WorkMsgProducer,提供send方法进行生产消息

package com.banmoon.learn.rabbitmq.producer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class WorkMsgProducer {
    
    @Autowired
    private AmqpTemplate amqpTemplate;
    
    public void send(SendBaseMsgReq req){
        amqpTemplate.convertAndSend(RabbitmqConstant.LEARN_WORK_QUEUE, JSON.toJSONString(req));
    }
    
}

创建此队列的消费者类WorkMsgConsumer,监听队列进行消费

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class WorkMsgConsumer {
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_WORK_QUEUE)
    public void workMsgConsumerListenerA(String message) {
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("工作模式消费A:{}", req.getContent());
    }
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_WORK_QUEUE)
    public void workMsgConsumerListenerB(String message) {
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("工作模式消费B:{}", req.getContent());
    }
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_WORK_QUEUE)
    public void workMsgConsumerListenerC(String message) {
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("工作模式消费C:{}", req.getContent());
    }

}

创建一个控制类TestController,用来测试生产消息

package com.banmoon.learn.controller;

import com.banmoon.learn.rabbitmq.producer.*;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private WorkMsgProducer workMsgProducer;
    
    @GetMapping("/sendWorkMsg")
    public String sendWorkMsg(@RequestParam String content){
        SendBaseMsgReq req = new SendBaseMsgReq();
        req.setContent(content);
        workMsgProducer.send(req);
        return "发送成功";
    }
    
}

进行测试,请求6次

可以看到,如果正常消费的情况下,多个消费者会轮循进行消费

3.3、Direct直连模式

在使用Direct之前,我们先得了解rabbitmq的Exchange交换机,这个交换机扮演了什么样的角色,在消息队列中处到了什么样的作用 生产者生产的消息会先到交换机Exchange,再通过router_key和路由策略分发到相对应的队列,再通过消费者监听队列消费消息 以下是官方的图

队列常量类RabbitmqConstant,声明常量为队列名称

public class RabbitmqConstant {    
    /** ================  直连模式 begin  ================ */
    public static final String LEARN_DIRECT_QUEUE = "LEARN.DIRECT.QUEUE";
    public static final String LEARN_DIRECT_EXCHANGE = "LEARN.DIRECT.EXCHANGE";
    public static final String LEARN_DIRECT_ROUTER_KEY = "LEARN.DIRECT.ROUTER_KEY";
    /** ================  直连模式  end  ================ */
}

创建队列配置类DirectRabbitmqConfig,声明创建队列,声明创建交换机,将交换机和队列进行绑定

package com.banmoon.learn.config;

import com.banmoon.learn.constants.RabbitmqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 直连模式
 */
@Configuration
public class DirectRabbitmqConfig {

    @Bean(name = RabbitmqConstant.LEARN_DIRECT_QUEUE)
    public Queue directQueue(){
        return new Queue(RabbitmqConstant.LEARN_DIRECT_QUEUE, true);
    }

    @Bean(name = RabbitmqConstant.LEARN_DIRECT_EXCHANGE)
    public DirectExchange directExchange(){
        return new DirectExchange(RabbitmqConstant.LEARN_DIRECT_EXCHANGE, true, false);
    }
    
    @Bean
    public Binding bindingDirect(@Qualifier(RabbitmqConstant.LEARN_DIRECT_QUEUE) Queue directQueue,
                                 @Qualifier(RabbitmqConstant.LEARN_DIRECT_EXCHANGE) DirectExchange directExchange){
        return BindingBuilder.bind(directQueue).to(directExchange).with(RabbitmqConstant.LEARN_DIRECT_ROUTER_KEY);
    }
    
}

创建此队列的生产者类DirectMsgProducer,提供send方法进行生产消息

package com.banmoon.learn.rabbitmq.producer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class DirectMsgProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void send(SendBaseMsgReq req){
        rabbitTemplate.convertAndSend(RabbitmqConstant.LEARN_DIRECT_EXCHANGE,
                RabbitmqConstant.LEARN_DIRECT_ROUTER_KEY,
                JSON.toJSONString(req));
    }

}

创建此队列的消费者类DirectMsgConsumer,监听队列进行消费

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DirectMsgConsumer {
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_DIRECT_QUEUE)
    public void directMsgConsumerListener(String message) {
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("直连模式消费:{}", req.getContent());
    }
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_DIRECT_QUEUE)
    public void newDirectMsgConsumerListener(String message) {
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("新的消费:直连模式消费:{}", req.getContent());
    }

}

创建一个控制类TestController,用来测试生产消息

package com.banmoon.learn.controller;

import com.banmoon.learn.rabbitmq.producer.*;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private DirectMsgProducer directMsgProducer;
    
    @GetMapping("/sendDirectMsg")
    public String sendDirectMsg(@RequestParam String content){
        SendBaseMsgReq req = new SendBaseMsgReq();
        req.setContent(content);
        directMsgProducer.send(req);
        return "发送成功";
    }
    
}

进行测试,请求4次

在看到一个消费者监听了队列后,我在想,能不能可以使用多个消费者监听同个队列,会造成什么样的结果 修改下消费者类DirectMsgConsumer,使此队列拥有两个消费者 package com.banmoon.learn.rabbitmq.consumer; import com.alibaba.fastjson.JSON; import com.banmoon.learn.constants.RabbitmqConstant; import com.banmoon.learn.req.SendBaseMsgReq; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class DirectMsgConsumer { @RabbitHandler @RabbitListener(queues = RabbitmqConstant.LEARN_DIRECT_QUEUE) public void directMsgConsumerListener(String message) { SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class); log.info("直连模式消费:{}", req.getContent()); } @RabbitHandler @RabbitListener(queues = RabbitmqConstant.LEARN_DIRECT_QUEUE) public void newDirectMsgConsumerListener(String message) { SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class); log.info("新的消费:直连模式消费:{}", req.getContent()); } } 再测试,请求4次,查看日志

照样是消费者轮循处理消息的,且不会重复消费

在经过以上的测试,我对于前面写的简单模式和work模式有了一个猜想,会不会没有指定交换机的这两个模式,本质上就是直连模式,而交换机使用的默认的交换机名字。 带着这个猜想,我打开了rabbitmq的web管理后台,查看交换机,在简单模式和work模式上再发送几条请求 果然,这个AMQP default交换机有消息进入,证明了我的猜想,为了更加一步的求实,我翻阅了官方文档

– 简单翻译一下默认交换机 默认交换机是一个由"broker"预先声明好的没有名字(空字符串)的直连交换机。他有一个特殊的属性,这会使得它对于简单的应用程序变得十分有用。创建的每个队列都会使用队列名作为router_key自动绑定到它。 例如,当你声明名称为"search-indexing-online"的队列时,"broker"将使用"search-indexing-online"作为router_key将它绑定到default exchange。因此,一条被发布到default exchange并且routing_key为"search-indexing-online"将被路由到名称为"search-indexing-online"的queue。换句话说,default exchange使直接传送消息到queue成为可能,即使从技术角度上而言,事实并不是这样。 所以,平常程序若是简单,我们可以直接使用默认交换机,这样可以省略少些点代码不是吗

3.4、Fanout扇形模式

此类模式,我更习惯称呼为广播模式。因为,交换机上可以绑定多个队列,router_key不生效,默认向绑定的所有队列进行发送消息,就像广播一样。

队列常量类RabbitmqConstant,声明常量为队列名称

public class RabbitmqConstant {    
    /** ================  扇形模式 begin  ================ */
    public static final String LEARN_FANOUT_QUEUE_A = "LEARN.FANOUT.QUEUE.A";
    public static final String LEARN_FANOUT_QUEUE_B = "LEARN.FANOUT.QUEUE.B";
    public static final String LEARN_FANOUT_QUEUE_C = "LEARN.FANOUT.QUEUE.C";
    public static final String LEARN_FANOUT_EXCHANGE = "LEARN.FANOUT.EXCHANGE";
    public static final String LEARN_FANOUT_ROUTER_KEY = "LEARN.FANOUT.ROUTER_KEY";// 扇形模式,路由键无用
    /** ================  扇形模式  end  ================ */
}

创建队列配置类FanoutRabbitConfig,声明创建队列,声明创建交换机,将交换机和队列进行绑定

package com.banmoon.learn.config;

import com.banmoon.learn.constants.RabbitmqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 扇形模式
 */
@Configuration
public class FanoutRabbitConfig {

    @Bean(name = RabbitmqConstant.LEARN_FANOUT_QUEUE_A)
    public Queue queueA(){
        return new Queue(RabbitmqConstant.LEARN_FANOUT_QUEUE_A, true);
    }
    
    @Bean(name = RabbitmqConstant.LEARN_FANOUT_QUEUE_B)
    public Queue queueB(){
        return new Queue(RabbitmqConstant.LEARN_FANOUT_QUEUE_B, true);
    }
    
    @Bean(name = RabbitmqConstant.LEARN_FANOUT_QUEUE_C)
    public Queue queueC(){
        return new Queue(RabbitmqConstant.LEARN_FANOUT_QUEUE_C, true);
    }
    
    @Bean(name = RabbitmqConstant.LEARN_FANOUT_EXCHANGE)
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitmqConstant.LEARN_FANOUT_EXCHANGE, true, false);
    }
    
    @Bean
    public Binding bindingFanoutQueueA(){
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }
    
    @Bean
    public Binding bindingFanoutQueueB(){
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }
    
    @Bean
    public Binding bindingFanoutQueueC(){
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
    
}

创建此队列的生产者类FanoutMsgProducer,提供send方法进行生产消息

package com.banmoon.learn.rabbitmq.producer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutMsgProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void send(SendBaseMsgReq req){
        rabbitTemplate.convertAndSend(RabbitmqConstant.LEARN_FANOUT_EXCHANGE, null, JSON.toJSONString(req));
    }
    
}

创建此队列的消费者类FanoutMsgConsumer,监听队列进行消费

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class FanoutMsgConsumer {

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_FANOUT_QUEUE_A)
    public void fanoutMsgConsumerListenerA(Message message, Channel channel){
        SendBaseMsgReq req = JSON.parseObject(message.getBody(), SendBaseMsgReq.class);
        log.info("扇形模式消费A: {}", req.getContent());
    }
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_FANOUT_QUEUE_B)
    public void fanoutMsgConsumerListenerB(Message message, Channel channel){
        SendBaseMsgReq req = JSON.parseObject(message.getBody(), SendBaseMsgReq.class);
        log.info("扇形模式消费B: {}", req.getContent());
    }
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_FANOUT_QUEUE_C)
    public void fanoutMsgConsumerListenerC(Message message, Channel channel){
        SendBaseMsgReq req = JSON.parseObject(message.getBody(), SendBaseMsgReq.class);
        log.info("扇形模式消费C: {}", req.getContent());
    }

}

创建一个控制类TestController,用来测试生产消息

package com.banmoon.learn.controller;

import com.banmoon.learn.rabbitmq.producer.*;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private FanoutMsgProducer fanoutMsgProducer;
    
    @GetMapping("/sendFanoutMsg")
    public String sendFanoutMsg(@RequestParam String content){
        SendBaseMsgReq req = new SendBaseMsgReq();
        req.setContent(content);
        fanoutMsgProducer.send(req);
        return "发送成功";
    }
    
}

进行测试,请求2次

3.5、Topic主题模式

在主题模式中,路由键的匹配规则将成为重点,符合匹配规则的消息会发送到指定的队列

  • *:匹配一个单词
  • #:匹配0-n个单词

交换机与队列绑定的router_key

推送消息的router_key

是否推送到此队列

topic.#

topic.A

topic.#

topic.A.B

topic.#

A.topic

*.topic

A.B.topic

*.topic

A.topic

队列常量类RabbitmqConstant,声明常量为队列名称

public class RabbitmqConstant {    
    /** ================  主题模式 begin  ================ */
    public static final String LEARN_TOPIC_QUEUE_LOG = "LEARN.TOPIC.QUEUE.LOG";
    public static final String LEARN_TOPIC_QUEUE_TEXT = "LEARN.TOPIC.QUEUE.TEXT";
    public static final String LEARN_TOPIC_QUEUE_IMAGE = "LEARN.TOPIC.QUEUE.IMAGE";
    public static final String LEARN_TOPIC_QUEUE_VOICE = "LEARN.TOPIC.QUEUE.VOICE";
    public static final String LEARN_TOPIC_EXCHANGE = "LEARN.TOPIC.EXCHANGE";
    public static final String LEARN_TOPIC_ROUTER_KEY_LOG = "LEARN.TOPIC.ROUTER_KEY.#";
    public static final String LEARN_TOPIC_ROUTER_KEY_TEXT = "LEARN.TOPIC.ROUTER_KEY.TEXT";
    public static final String LEARN_TOPIC_ROUTER_KEY_IMAGE = "LEARN.TOPIC.ROUTER_KEY.IMAGE";
    public static final String LEARN_TOPIC_ROUTER_KEY_VOICE = "LEARN.TOPIC.ROUTER_KEY.VOICE";
    /** ================  主题模式  end  ================ */
}

创建队列配置类TopicRabbitmqConfig,声明创建队列,声明创建交换机,将交换机和队列进行绑定

package com.banmoon.learn.config;

import com.banmoon.learn.constants.RabbitmqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 主题模式
 */
@Configuration
public class TopicRabbitmqConfig {
    
    @Bean(name = RabbitmqConstant.LEARN_TOPIC_QUEUE_LOG)
    public Queue queueLog(){
        return new Queue(RabbitmqConstant.LEARN_TOPIC_QUEUE_LOG, true);
    }
    
    @Bean(name = RabbitmqConstant.LEARN_TOPIC_QUEUE_TEXT)
    public Queue queueText(){
        return new Queue(RabbitmqConstant.LEARN_TOPIC_QUEUE_TEXT, true);
    }
    
    @Bean(name = RabbitmqConstant.LEARN_TOPIC_QUEUE_IMAGE)
    public Queue queueImage(){
        return new Queue(RabbitmqConstant.LEARN_TOPIC_QUEUE_IMAGE, true);
    }
    
    @Bean(name = RabbitmqConstant.LEARN_TOPIC_QUEUE_VOICE)
    public Queue queueVoice(){
        return new Queue(RabbitmqConstant.LEARN_TOPIC_QUEUE_VOICE, true);
    }

    @Bean(name = RabbitmqConstant.LEARN_TOPIC_EXCHANGE)
    public TopicExchange topicExchange(){
        return new TopicExchange(RabbitmqConstant.LEARN_TOPIC_EXCHANGE, true, false);
    }
    
    @Bean
    public Binding bindingQueueLog(@Qualifier(RabbitmqConstant.LEARN_TOPIC_EXCHANGE) TopicExchange topicExchange){
        return BindingBuilder.bind(queueLog()).to(topicExchange).with(RabbitmqConstant.LEARN_TOPIC_ROUTER_KEY_LOG);
    }
    
    @Bean
    public Binding bindingQueueText(@Qualifier(RabbitmqConstant.LEARN_TOPIC_EXCHANGE) TopicExchange topicExchange){
        return BindingBuilder.bind(queueText()).to(topicExchange).with(RabbitmqConstant.LEARN_TOPIC_ROUTER_KEY_TEXT);
    }
    
    @Bean
    public Binding bindingQueueImage(@Qualifier(RabbitmqConstant.LEARN_TOPIC_EXCHANGE) TopicExchange topicExchange){
        return BindingBuilder.bind(queueImage()).to(topicExchange).with(RabbitmqConstant.LEARN_TOPIC_ROUTER_KEY_IMAGE);
    }
    
    @Bean
    public Binding bindingQueueVoice(@Qualifier(RabbitmqConstant.LEARN_TOPIC_EXCHANGE) TopicExchange topicExchange){
        return BindingBuilder.bind(queueVoice()).to(topicExchange).with(RabbitmqConstant.LEARN_TOPIC_ROUTER_KEY_VOICE);
    }
    
}

创建此队列的生产者类TopicMsgProducer,提供send方法进行生产消息

package com.banmoon.learn.rabbitmq.producer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicMsgProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendText(SendBaseMsgReq req){
        rabbitTemplate.convertAndSend(RabbitmqConstant.LEARN_TOPIC_EXCHANGE,
                RabbitmqConstant.LEARN_TOPIC_ROUTER_KEY_TEXT,
                JSON.toJSONString(req));
    }
    
    public void sendImage(SendBaseMsgReq req){
        rabbitTemplate.convertAndSend(RabbitmqConstant.LEARN_TOPIC_EXCHANGE,
                RabbitmqConstant.LEARN_TOPIC_ROUTER_KEY_IMAGE,
                JSON.toJSONString(req));
    }
    
    public void sendVoice(SendBaseMsgReq req){
        rabbitTemplate.convertAndSend(RabbitmqConstant.LEARN_TOPIC_EXCHANGE,
                RabbitmqConstant.LEARN_TOPIC_ROUTER_KEY_VOICE,
                JSON.toJSONString(req));
    }
    
}

创建此队列的消费者类TopicConsumer,监听队列进行消费

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class TopicConsumer {

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_TOPIC_QUEUE_LOG)
    public void topicMsgConsumerListenerLog(String message, Channel channel, @Headers Map<String, Object> headerMap){
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("主题模式消费,日志队列: {}", req.getContent());
    }
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_TOPIC_QUEUE_TEXT)
    public void topicMsgConsumerListenerText(String message, Channel channel, @Headers Map<String, Object> headerMap){
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("主题模式消费,文本队列: {}", req.getContent());
    }
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_TOPIC_QUEUE_IMAGE)
    public void topicMsgConsumerListenerImage(String message, Channel channel, @Headers Map<String, Object> headerMap){
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("主题模式消费,图片队列: {}", req.getContent());
    }
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_TOPIC_QUEUE_VOICE)
    public void topicMsgConsumerListenerVoice(String message, Channel channel, @Headers Map<String, Object> headerMap){
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("主题模式消费,语音队列: {}", req.getContent());
    }

}

创建一个控制类TestController,用来测试生产消息

package com.banmoon.learn.controller;

import com.banmoon.learn.rabbitmq.producer.*;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private TopicMsgProducer topicMsgProducer;
    
    @GetMapping("/sendTopicMsg")
    public String sendTopicMsg(@RequestParam String content, @RequestParam String type){
        SendBaseMsgReq req = new SendBaseMsgReq();
        req.setContent(content);
        switch (type){
            case "TEXT":
                topicMsgProducer.sendText(req);
                break;
            case "IMAGE":
                topicMsgProducer.sendImage(req);
                break;
            case "VOICE":
                topicMsgProducer.sendVoice(req);
                break;
            default:
                return "不明确的消息类型,请确认!";
        }
        return "发送成功";
    }
    
}

进行测试,TEXT、IMAGE、VOICE各请求两次。可见,日志队列消费了其他所有的消息

3.6、Headers头部交换机

header交换机为在多个属性进行路由而设计的,这些属性更容易描述为消息头,而不是routing key。headers交换机忽略routing key属性,相反用于路由的属性是从headers属性中获取的。如果消息头的值等于指定的绑定值,则认为消息是匹配的。

队列常量类RabbitmqConstant,声明常量为队列名称

public class RabbitmqConstant {    
    /** ================  头部模式 begin  ================ */
    public static final String LEARN_HEADERS_QUEUE_ANY = "LEARN.HEADERS.QUEUE.ANY";
    public static final String LEARN_HEADERS_QUEUE_ALL = "LEARN.HEADERS.QUEUE.ALL";
    public static final String LEARN_HEADERS_EXCHANGE = "LEARN.HEADERS.EXCHANGE";
    public static final String LEARN_HEADERS_ROUTER_KEY = "LEARN.HEADERS.ROUTER_KEY";// 路由键无用,头部模式通过消息头来进行路由
    /** ================  头部模式  end  ================ */
}

创建队列配置类HeadersRabbitmqConfig,声明创建队列,声明创建交换机,将交换机和队列进行绑定

package com.banmoon.learn.config;

import com.banmoon.learn.constants.RabbitmqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * 头部模式
 */
@Configuration
public class HeadersRabbitmqConfig {

    @Bean(name = RabbitmqConstant.LEARN_HEADERS_QUEUE_ANY)
    public Queue queueAny(){
        return new Queue(RabbitmqConstant.LEARN_HEADERS_QUEUE_ANY, false, true, true);
    }
    
    @Bean(name = RabbitmqConstant.LEARN_HEADERS_QUEUE_ALL)
    public Queue queueAll(){
        return new Queue(RabbitmqConstant.LEARN_HEADERS_QUEUE_ALL, false, true, true);
    }
    
    @Bean(name = RabbitmqConstant.LEARN_HEADERS_EXCHANGE)
    public HeadersExchange headersExchange(){
        return new HeadersExchange(RabbitmqConstant.LEARN_HEADERS_EXCHANGE, false, true);
    }
    
    @Bean
    public Binding bindingHeadersQueueAny(){
        HashMap<String, Object> map = new HashMap<>();
        map.put("name", "banmoon");
        return BindingBuilder.bind(queueAny()).to(headersExchange()).whereAny(map).match();
    }
    
    @Bean
    public Binding bindingHeadersQueueAll(){
        HashMap<String, Object> map = new HashMap<>();
        map.put("name", "banmoon");
        map.put("sex", "男");
        return BindingBuilder.bind(queueAll()).to(headersExchange()).whereAll(map).match();
    }
    
}

创建此队列的生产者类HeadersMsgProducer,提供send方法进行生产消息

package com.banmoon.learn.rabbitmq.producer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class HeadersMsgProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void send(SendBaseMsgReq req, Map<String, Object> headerMap){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().putAll(headerMap);
        Message message = new Message(JSON.toJSONBytes(req), messageProperties);
        rabbitTemplate.send(RabbitmqConstant.LEARN_HEADERS_EXCHANGE, null, message);
    }

}

创建此队列的消费者类HeadersMsgConsumer,监听队列进行消费

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class HeadersMsgConsumer {

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_HEADERS_QUEUE_ANY)
    public void headersMsgConsumerListenerAny(String message, Channel channel, @Headers Map<String, Object> headerMap){
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("headers模式消费,任一匹配队列: {}", req.getContent());
    }
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_HEADERS_QUEUE_ALL)
    public void headersMsgConsumerListenerAll(String message, Channel channel, @Headers Map<String, Object> headerMap){
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("headers模式消费,全匹配队列: {}", req.getContent());
    }

}

创建一个控制类TestController,用来测试生产消息

package com.banmoon.learn.controller;

import com.banmoon.learn.rabbitmq.producer.*;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;

@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private DirectMsgProducer directMsgProducer;
    
    @PostMapping("/sendHeadersMsg")
    public String sendHeadersMsg(@RequestParam String content, @RequestBody Map<String, Object> headersMap){
        SendBaseMsgReq req = new SendBaseMsgReq();
        req.setContent(content);
        headersMsgProducer.send(req, headersMap);
        return "发送成功";
    }
    
}

测试三组,各请求1次

  • 第一组:仅进入任一匹配的队列
  • 第二组:两个队列皆进入
  • 第三组:没有进队列

3.7、延迟队列TTL

在许多业务场景中,消息需要延迟进行处理,如以下

  • 订单在创建完成后,一小时未支付时通知用户及时支付
  • 订单在创建完成后,24小时未支付后,自动关闭订单
  • 到点秒杀,前15分钟通知用户进行准备

在以上业务中,可以使用定时任务去查询数据库来进行实现,但这样的延迟不精确,且代码极度不优雅。我见过几个工程,里面定时任务一大堆,每天打印的日志起码都在150MB左右。而且,这样的程序会给数据库和服务器带来很大的压力,不是明智的选择。

而现在,rabbitmq可以实现延迟队列,可以解决以上的业务场景

延迟队列TTL(Time To Live)是rabbitmq中的一个高级特性,是消息或者队列的一个属性,此属性的作用是本消息或本队列中的消息最大存活的时间。如果时间一到,这条消息没有被消费,此消息将会进入死信交换机,再通过对应的路由策略进入队列,进行消费

死信交换机和普通的交换机没有任何区别,可以是任何一种类型,也可以用普通常用的方式进行声明,与队列进行绑定

队列常量类RabbitmqConstant,声明常量为队列名称

public class RabbitmqConstant {    
    /** ================  延迟队列 begin  ================ */
    public static final String LEARN_TTL_QUEUE_GLOBAL = "LEARN.TTL.QUEUE.GLOBAL";// 将设置队列中的消息最大存活时间
    public static final String LEARN_TTL_QUEUE_LOCAL = "LEARN.TTL.QUEUE.LOCAL";// 将单独设置消息的过期时间
    public static final String LEARN_TTL_EXCHANGE = "LEARN.TTL.EXCHANGE";
    public static final String LEARN_TTL_ROUTER_KEY_GLOBAL = "LEARN.DIRECT.ROUTER_KEY.GLOBAL";
    public static final String LEARN_TTL_ROUTER_KEY_LOCAL = "LEARN.DIRECT.ROUTER_KEY.LOCAL";
    public static final String LEARN_TTL_DEATH_QUEUE = "LEARN.TTL.DEATH.QUEUE.GLOBAL";// 死信交换机绑定的队列
    public static final String LEARN_TTL_DEATH_EXCHANGE = "LEARN.TTL.DEATH.EXCHANGE";// 死信交换机
    public static final String LEARN_TTL_DEATH_ROUTER_KEY = "LEARN.TTL.DEATH.ROUTER_KEY";
    /** ================  延迟队列  end  ================ */
}

创建队列配置类TtlRabbitmqConfig,声明创建队列,声明创建交换机,将交换机和队列进行绑定

package com.banmoon.learn.config;

import com.banmoon.learn.constants.RabbitmqConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * TTL 延迟队列
 */
@Configuration
public class TtlRabbitmqConfig {
    
    @Bean(name = RabbitmqConstant.LEARN_TTL_QUEUE_GLOBAL)
    public Queue queueGlobal(){
        Map<String, Object> arguments = new HashMap<>();
        // 有效时间过后转发的死信交换机
        arguments.put("x-dead-letter-exchange", RabbitmqConstant.LEARN_TTL_DEATH_EXCHANGE);
        // 死信交换机绑定队列的routingKey
        arguments.put("x-dead-letter-routing-key", RabbitmqConstant.LEARN_TTL_DEATH_ROUTER_KEY);
        // 设置最大存活时间,此处全局设置
        arguments.put("x-message-ttl", 7000);
        Queue queue = QueueBuilder.durable(RabbitmqConstant.LEARN_TTL_QUEUE_GLOBAL)
                .withArguments(arguments)
                .build();
        return queue;
    }
    
    @Bean(name = RabbitmqConstant.LEARN_TTL_QUEUE_LOCAL)
    public Queue queueLocal(){
        Map<String, Object> arguments = new HashMap<>();
        // 有效时间过后转发的死信交换机
        arguments.put("x-dead-letter-exchange", RabbitmqConstant.LEARN_TTL_DEATH_EXCHANGE);
        // 死信交换机绑定队列的routingKey
        arguments.put("x-dead-letter-routing-key", RabbitmqConstant.LEARN_TTL_DEATH_ROUTER_KEY);
        Queue queue = QueueBuilder.durable(RabbitmqConstant.LEARN_TTL_QUEUE_LOCAL)
                .withArguments(arguments)
                .build();
        return queue;
    }
    
    @Bean(name = RabbitmqConstant.LEARN_TTL_EXCHANGE)
    public DirectExchange directExchange(){
        return new DirectExchange(RabbitmqConstant.LEARN_TTL_EXCHANGE, true, false);
    }
    
    @Bean
    public Binding bindingQueueGlobal(){
        return BindingBuilder.bind(queueGlobal()).to(directExchange()).with(RabbitmqConstant.LEARN_TTL_ROUTER_KEY_GLOBAL);
    }
    
    @Bean
    public Binding bindingQueueLocal(){
        return BindingBuilder.bind(queueLocal()).to(directExchange()).with(RabbitmqConstant.LEARN_TTL_ROUTER_KEY_LOCAL);
    }
    
    /** ================  死信队列和交换机 begin  ================ */
    @Bean(name = RabbitmqConstant.LEARN_TTL_DEATH_QUEUE)
    public Queue queueDeath(){
        return new Queue(RabbitmqConstant.LEARN_TTL_DEATH_QUEUE, true);
    }
    
    @Bean(name = RabbitmqConstant.LEARN_TTL_DEATH_EXCHANGE)
    public DirectExchange deathExchange(){
        return new DirectExchange(RabbitmqConstant.LEARN_TTL_DEATH_EXCHANGE, true, false);
    }
    
    @Bean
    public Binding bindingQueueDeath(){
        return BindingBuilder.bind(queueDeath()).to(deathExchange()).with(RabbitmqConstant.LEARN_TTL_DEATH_ROUTER_KEY);
    }
    /** ================  死信队列和交换机 end  ================ */
    
}

创建此队列的生产者类TtlMsgProducer

package com.banmoon.learn.rabbitmq.producer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TtlMsgProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendGlobal(SendBaseMsgReq req){
        log.info("发送消息, 设置队列处的最大存活时间,7s:{}", JSON.toJSONString(req));
        rabbitTemplate.convertAndSend(RabbitmqConstant.LEARN_TTL_EXCHANGE, RabbitmqConstant.LEARN_TTL_ROUTER_KEY_GLOBAL, JSON.toJSONString(req));
    }

    public void sendLocal(SendBaseMsgReq req, String expiration){
        log.info("发送消息, 设置消息处的最大存活时间:{}", JSON.toJSONString(req));
        Message message = MessageBuilder
                .withBody(JSON.toJSONBytes(req))
                .setExpiration(expiration)
                .build();
        rabbitTemplate.convertAndSend(RabbitmqConstant.LEARN_TTL_EXCHANGE, RabbitmqConstant.LEARN_TTL_ROUTER_KEY_LOCAL, message);
    }

}

创建此队列的消费者类TtlMsgConsumer,监听队列进行消费,注意这里消费的队列是死信队列,普通队列根本没有消费者进行消费,队列里的消息存活超时后将会进入死信队列

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TtlMsgConsumer {

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_TTL_DEATH_QUEUE)
    public void ttlMsgConsumerListener(String message){
        SendBaseMsgReq req = JSON.parseObject(message, SendBaseMsgReq.class);
        log.info("TTL延迟队列,消费消息: {}", req.getContent());
    }

}

创建一个控制类TestController,用来测试生产消息

package com.banmoon.learn.controller;

import com.banmoon.learn.rabbitmq.producer.*;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private TtlMsgProducer ttlMsgProducer;
    
    @GetMapping("/sendTtlMsg")
    public String sendTtlMsg(@RequestParam String content, @RequestParam String type,
                             @RequestParam(required = false) String expiration){
        SendBaseMsgReq req = new SendBaseMsgReq();
        req.setContent(content);
        switch (type){
            case "GLOBAL":
                ttlMsgProducer.sendGlobal(req);
                break;
            case "LOCAL":
                ttlMsgProducer.sendLocal(req, expiration);
                break;
            default:
                return "不明确的消息类型,请确认!";
        }
        return "发送成功";
    }
    
}

进行测试,队列设置和消息设置的各请求一次

查看日志消费,发送消息打印的日志和消费的日志时间间隔正好是设置的存活时间

以上的队列可以解决固定的延迟业务场景,可当出现延迟时间不一致的时候,就会出现以下的情况 第一次请求20秒延迟的消息,马上请求第二次3秒的

可以看到,先进队列的20s的延迟消息会卡着3秒的延迟消息,所以TTL的延迟队列只适合相同延迟时间的业务场景那如果是不同时间的消息呢,自然不可能设置多条队列,这时就得用到了TXL延迟队列

3.8、延迟队列TXL

TXL延迟队列,主要用到了rabbitmq的一个插件,rabbitmq_delayed_message_exchange

插件的下载地址:https://www.rabbitmq.com/community-plugins.html

下载后将插件放入rabbitmq/plugins目录下 # 查看插件列表 rabbitmq-plugins list # 启用延迟插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 重启rabbitmq 如果顺利的话,插件就会启用成功;如果报错,那就需要检查下erlang运行环境和插件的版本号了。

队列常量类RabbitmqConstant,声明常量为队列名称

public class RabbitmqConstant {    
    /** ================  延迟队列 begin  ================ */
    public static final String LEARN_TXL_QUEUE = "LEARN.TXL.QUEUE";
    public static final String LEARN_TXL_EXCHANGE = "LEARN.TXL.EXCHANGE";
    public static final String LEARN_TXL_ROUTER_KEY = "LEARN.TXL.ROUTER_KEY";
    /** ================  延迟队列  end  ================ */
}

创建队列配置类TxlRabbitmqConfig,声明创建队列,声明创建交换机,将交换机和队列进行绑定

package com.banmoon.learn.config;

import com.banmoon.learn.constants.RabbitmqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TxlRabbitmqConfig {

    @Bean(name = RabbitmqConstant.LEARN_TXL_QUEUE)
    public Queue queue(){
        return new Queue(RabbitmqConstant.LEARN_TXL_QUEUE, true);
    }

    @Bean(name = RabbitmqConstant.LEARN_TXL_EXCHANGE)
    public CustomExchange customExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");// 设置成直连模式
        return new CustomExchange(RabbitmqConstant.LEARN_TXL_EXCHANGE, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingTxlQueue() {
        return BindingBuilder.bind(queue()).to(customExchange()).with(RabbitmqConstant.LEARN_TXL_ROUTER_KEY).noargs();
    }

}

创建此队列的生产者类TxlMsgProducer,提供send方法进行生产消息

package com.banmoon.learn.rabbitmq.producer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TxlMsgProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(SendBaseMsgReq req, Integer delayTime){
        log.info("发送消息, 延迟时间:{},消息内容:{}", delayTime, JSON.toJSONString(req));
        rabbitTemplate.convertAndSend(RabbitmqConstant.LEARN_TXL_EXCHANGE, RabbitmqConstant.LEARN_TXL_ROUTER_KEY,
            JSON.toJSONString(req), a -> {
                a.getMessageProperties().setDelay(delayTime);
                return a;
        });
    }

}

创建此队列的消费者类TxlMsgConsumer,监听队列进行消费

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TxlMsgConsumer {

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_TXL_QUEUE)
    public void txlMsgConsumerListener(Message message, Channel channel){
        SendBaseMsgReq req = JSON.parseObject(message.getBody(), SendBaseMsgReq.class);
        log.info("TXL延迟队列,消费消息: {}", req.getContent());
    }

}

创建一个控制类TestController,用来测试生产消息

package com.banmoon.learn.controller;

import com.banmoon.learn.rabbitmq.producer.*;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private TxlMsgProducer txlMsgProducer;
    
    @GetMapping("/sendTxlMsg")
    public String sendTxlMsg(@RequestParam String content, @RequestParam Integer delayTime){
        SendBaseMsgReq req = new SendBaseMsgReq();
        req.setContent(content);
        txlMsgProducer.send(req, delayTime);
        return "发送成功";
    }
    
}

进行测试,第一次请求20秒,第二次请求3秒

查看日志消费的消息,已达到预期的效果

四、Exchange交换机类型

在上文中,虽然列出了这么多项类型,但实际上交换机类型仅有4种,分别是

交换机类型

默认预先设定的交换机名字

说明

Direct exchange

(Empty string) and amq.direct

直连交换机,如果省略指定交换机,rabbitmq会使用默认的,这可以使开发省下一些时间

Fanout exchange

amq.fanout

扇形交换机,在此交换机中,router_key是失效的,消息会转到绑定在该交换机下的所有队列

Topic exchange

amq.topic

主题交换机,此交换机可以完成直连交换机和扇形交换机的功能,十分强大当router_key为#时,它便成为了扇形交换机当router_key没有出现*和#时,它便成为了直连交换机

Headers exchange

amq.match (and amq.headers in RabbitMQ)

头部交换机,与主题交换机类似,主要区别是此交换机的router_key失效,是通过消息头来进行路由的。此交换机也十分强大,但由于配置比较繁琐,一般项目中都使用主题交换机

五、消息确认机制

在使用上,以上的队列使用,已完全满足日常的需求。

在扩展上,消息加入队列,或被消费的时候,使用消息的回调,可以使业务更加丰富。这就是消息的确认机制,分为生产消息确认和消费消息确认。

5.1、生产消息确认机制

有时候,消息发送后没有被消费可能是没有发送成功,这时候就出现了生产消息确认。成功失败的消息都会进行回调,我们就可以对其做出进一步的处理。

首先,修改配置文件

server:
    port: 8011

spring:
    application:
        name: rabbitmq_learn
    rabbitmq:
        host: 主机名
        port: 端口
        username: 帐号
        password: 密码
        # 确认消息是否发送至交换机
        publisher-confirm-type: correlated
        # 确认消息是否发送至队列
        publisher-returns: true
本文使用的springBoot版本是2.4.2,旧版本的配置应该将(具体多旧这得翻文档了o(╯□╰)o) # 将 publisher-confirm-type: correlated 改为 publisher-confirms: true

创建RabbitmqConfig,作为生产消息确认的配置

package com.banmoon.learn.config;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

@Slf4j
@Configuration
public class RabbitmqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) throws IOException {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setMandatory(true);// 无论成功失败,都会确认信息
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        return rabbitTemplate;
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("【confirm】消息发送成功");
        } else {
            String id = correlationData==null? null: correlationData.getId();
            log.error("【confirm】消息发送失败,相关数据:{},原因:{}", id, cause);
        }
    }
    
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("【ReturnCallback】消息:" + returnedMessage.getMessage());
        log.info("【ReturnCallback】回应码:" + returnedMessage.getReplyCode());
        log.info("【ReturnCallback】回应信息:" + returnedMessage.getReplyText());
        log.info("【ReturnCallback】交换机:" + returnedMessage.getExchange());
        log.info("【ReturnCallback】路由键:" + returnedMessage.getRoutingKey());
    }
}

以上代码已编写完毕,我们再来看看出现的情况种类

以下测试使用直连模式的交换机,创建交换机、队列、绑定的代码我就不贴出来了,大家可以参考3.4进行测试使用,也可以进入我的gitee

生产者代码有些许不同,需要给定相关数据信息,生产者ConfirmMsgProducer

package com.banmoon.learn.rabbitmq.producer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ConfirmMsgProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void send(SendBaseMsgReq req){
        CorrelationData data = new CorrelationData(JSON.toJSONString(req));
        rabbitTemplate.convertAndSend(RabbitmqConstant.LEARN_CONFIRM_EXCHANGE, RabbitmqConstant.LEARN_CONFIRM_ROUTER_KEY,
                JSON.toJSONString(req), data);
    }
    
}

本单元仅将生产消息确认,消费者暂时可以不监听指定

1)消息推送,找不到交换机

交换机,队列,绑定不用声明,仅留下一个生产者就行

走的confirm方法的回调,在自己输出的同时,上面有串源码中输出的,但进去后没发现confirm方法,没收获什么。

2)消息推送,找到了交换机,但找不到队列

交换机,队列可以声明,但不用绑定。这样就算找到了交换机,也找不到队

可以看到,confirmreturnedMessage的方法都进行了调用

3)消息推送,交换机和队列都没有找到

和第一种情况一致,交换机都找不到了,还会去找队列吗?

4)消息推送成功

仅推送了confirm方法

小结
  1. confirm方法,消息是否到达交换机,无论成功还是失败都会调用
  2. returnedMessage方法,仅当没有找到队列时,才会调用

在上面的示例中,仅打印了日志,在实际的开发中,可以根据自己的业务需求将失败的消息持久化到数据库中,以便排查问题

5.2、消费消息确认机制

消息被消费后,我们将进行确认,消息是否被成功消费,这就是消费消息的确认机制。

主要有以下两种

  1. 无消息确认:在以上的代码示例中,都是无消费消息的确认,这也是没有指定默认的消费确认模式
  2. 手动确认:这种机制是最关键的,许多业务都会使用到,消息消费不成功,不符合预期,将会打回队列重新消费
  3. 自动确认:消费出现异常就会重发,正常则就消费成功。这种模式有丢失消息的风险,且重复消费可能会阻塞队列,在平常项目中不会经常使用

1)无消息确认

在没有消息确认的情况下,只要消息已经进入了消费者,那便立即被认定为消费成功。

所以,无论消费者有没有成功消费消息,还是消费者抛出异常后,消息都会默认处理完毕,也就是丢失了。

那么,在此模式,一定要保证消息是一次性的。异常一定得捕获,打印日志或持久化消息,以便排查问题。

消费者ConfirmMsgConsumer,捕获异常

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ConfirmMsgConsumer {

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_CONFIRM_QUEUE)
    public void ConfirmMsgConsumerListener(Message message, Channel channel){
        try {
            SendBaseMsgReq req = JSON.parseObject(message.getBody(), SendBaseMsgReq.class);
            int a = 1 / 0;
            log.info("消息确认消费:{}", req.getContent());
        } catch (Exception exception) {
            log.error("消息确认消费异常:消息内容:{}", new String(message.getBody()), exception);
        }
    }

}

请求测试,发送的消息被捕获后,就再也没有这条消息的影子了

2)手动确认

修改下配置文件,添加手动确认的配置

server:
    port: 8011

spring:
    application:
        name: rabbitmq_learn
    rabbitmq:
        host: 主机名
        port: 端口
        username: 帐号
        password: 密码
        # 确认消息是否发送至交换机
        publisher-confirm-type: correlated
        # 确认消息是否发送至队列
        publisher-returns: true
        listener:
            simple:
                # 消费消息确认,none(无),auto(自动确认),manual(手动确认)
                acknowledge-mode: manual
                # 消费者最小数量
                concurrency: 1
                # 消费者最大数量
                max-concurrency: 1
                retry:
                    # 是否支持重试
                    enabled: true
                    # 最大重试次数,包括第一次消费的次数
                    max-attempts: 4
                    # 重试的最大时间间隔
                    max-interval: 10000
                    # 重试的初始时间间隔
                    initial-interval: 2000
                    # 重试的时间间隔因子
                    multiplier: 1.5
其中在此模式下,retry的配置是不起作用的,手动进行确认重回队列,具体可查看消费者的代码使用

编写消费者ConfirmMsgConsumer,手动判断重试次数,再进行确认,确认次数可以存到redis缓存

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
public class ConfirmMsgConsumer {
    
    private static AtomicInteger reCount = new AtomicInteger(1);
    
    @Value("${spring.rabbitmq.listener.simple.retry.max-attempts}")
    private Integer maxReCount;
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_CONFIRM_QUEUE)
    public void ConfirmMsgConsumerListener(Message message, Channel channel) throws IOException {
        try {
            SendBaseMsgReq req = JSON.parseObject(message.getBody(), SendBaseMsgReq.class);
            Integer a = Integer.valueOf(req.getContent());
            log.info("消息确认消费:{}", a);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            reCount.set(1);
        } catch (Exception exception) {
            log.error("消息确认消费异常:消息内容:{}", new String(message.getBody()));
            if(reCount.get()<maxReCount){
                reCount.incrementAndGet();
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
                return;
            }
            reCount.set(1);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
//            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

}
确认消息共有三种
  1. basicAck:肯定确认
  2. deliveryTag:消息的标识ID
  3. multiple:是否批量,true为小于deliveryTag的消息也会被确认处理,false为只处理这一条消息
  4. basicNack:否定确认
  5. deliveryTag:同上
  6. multiple:同上
  7. requeue:是否重发队列
  8. basicReject:否定确认,但和basicNack不同的是,basicReject一次只能拒绝单条消息
  9. deliveryTag:同上
  10. requeue:是否重发队列

请求测试,会报异常的请求,可以看到一共消费了4次

请求测试,无异常的请求

在有次项目开发中,需要手动确认的消息,我并没有确认消息的代码,导致消息卡死。 查看后台,明明有消息在准备进行消费了,可就是没有消费,开发的时候好好的,可以正常消费,上测试生产后消费几条后就走不动了,重启工程又有消息进行了消费。 所以,既然选择了手动消费确认的模式,就必须要进行确认,不然就会出现以上诡异的问题。

3)自动确认

修改下配置文件,添加手动确认的配置

server:
    port: 8011

spring:
    application:
        name: rabbitmq_learn
    rabbitmq:
        host: 主机名
        port: 端口
        username: 帐号
        password: 密码
        # 确认消息是否发送至交换机
        publisher-confirm-type: correlated
        # 确认消息是否发送至队列
        publisher-returns: true
        listener:
            simple:
                # 消费消息确认,none(无),auto(自动确认),manual(手动确认)
                acknowledge-mode: auto
                # 消费者最小数量
                concurrency: 1
                # 消费者最大数量
                max-concurrency: 1
                retry:
                    # 是否支持重试
                    enabled: true
                    # 最大重试次数,包括第一次消费的次数
                    max-attempts: 4
                    # 重试的最大时间间隔
                    max-interval: 10000
                    # 重试的初始时间间隔
                    initial-interval: 2000
                    # 重试的时间间隔因子
                    multiplier: 1.5

编写消费者ConfirmMsgConsumer

package com.banmoon.learn.rabbitmq.consumer;

import com.alibaba.fastjson.JSON;
import com.banmoon.learn.constants.RabbitmqConstant;
import com.banmoon.learn.req.SendBaseMsgReq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
public class ConfirmMsgConsumer {
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.LEARN_CONFIRM_QUEUE)
    public void ConfirmMsgConsumerListener(Message message, Channel channel) throws IOException {
        SendBaseMsgReq req = JSON.parseObject(message.getBody(), SendBaseMsgReq.class);
        log.info("消息确认消费:{}", req.getContent());
        Integer a = Integer.valueOf(req.getContent());
    }
    
}

请求测试,会报异常的请求

请求测试,正常请求

六、最后

在以前,对rabbitmq的使用也是一知半解,通过编写此篇博客,有了许多清晰的了解。

此篇幅过长,请耐心观看

如有错误和不解的地方,请在评论区评论

在编写此博客过程中,我查阅了许多博客,也翻阅了官方文档,在此感谢其他博主的博客,给予了我很大的帮助,感谢

小目标青年

弗兰克的猫

rabbitmq官方文档

spring-amqp官方文档