Java中间件实战系列(1)- RabbitMQ死信与延迟队列的区别与实现

Java
329
0
0
2023-06-18
标签   RabbitMQ

对于消息中间件RabbitMQ,想必各位小伙伴并不陌生,其广泛应用程度不言而喻,此前我们也在许多课程以及诸多专栏文章中介绍了它的应用,其应用场景也是相当广泛的,像什么消息异步通信、服务模块解耦、高并发流量削峰、订单超时未支付自动失效等等都是实际项目中最为常见的场景。本文我们将重点介绍并实现Rabbit mq 的死信与延时队列,并将两者做一个简单的对比!

内容

对于RabbitMQ的死信队列,此前我们在”Java秒杀系统”这一技术专栏中已经有重点介绍过了,在那里我们是将其应用于 “订单超时未支付自动失效”这一业务场景中,简而言之,”死信队列”是一种特殊的”队列”,跟普通的队列相比,具有”延迟处理任务”的特性。

而在消息中间件RabbitMQ的架构组件中,也存在着跟”死信队列”在功能特性方面几乎相同的组件,那就是”延迟队列/延时队列”,同样也具有”延迟、延时处理任务”的功效!

当然啦,这两者还是有一丢丢区别的,最直观的当然是名字上啦,从名字上你就可以看出来两者的”处事风格”是不一样的,具体体现在:

一、创建上的差异:

(1)RabbitMQ的死信队列DeadQueue是由”死信交换机DLX”+”死信路由DLK”组成的,当然,可能还会有” TTL “,而DLX和DLK又可以绑定指向真正的队列RealQueue,这个队列RealQueue便是”消费者”真正监听的对象.

(2)而RabbitMQ的延迟/延时队列DelayedQueue 则是由普通的队列来创建即可,唯一不同的地方在于其绑定的交换机为自定义的交换机,即”CustomExchange”,在创建该交换机时只需要指定其消息的类型为 “x-delayed-message”即可.”消费者”真正监听的队列也是它本人,即DelayedQueue

画外音:从这一点上看,延迟/延时队列的创建相对而言简单一些!


二、功能特性上的差异:

(1)死信队列在实际应用时虽然可以实现”延时、延迟处理任务”的功效,但进入死信中的消息却依然保留了队列的特性,即”FIFO” ~ 先进先出,而不管先后进入队列中消息的TTL的值. 即假设先后进入死信的消息为A、B、C,各自的TTL分别为:10s、3s、5s,理论上TTL先后到达的顺序是:B、C、A,然后从死信出来,最终被路由到真正的队列中,即消息被消费的先后顺序应该为:B、C、A,然而现实却是残酷的,其最终消费的消息的顺序为:A、B、C,即”消息是怎么进去的,就怎么出来”,保留了所谓的FIFO特性.

(2)或许是因为死信有这种缺陷,所以RabbitMQ提供了另一种组件,即”延迟队列”,它可以很完美的解决上面死信出现的问题,即最终消费的消息的顺序为:B、C、A,我们将在下面用实际的代码进行实战实现与演练.


三、插件安装上的差异:

(1)死信不需要额外的插件

(2)但是延迟队列在实际项目使用时却需要在Mq Server中安装一个插件,它的名字叫做:”rabbitmq_delayed_message_ exchange “,其安装过程可以参考链接: 里面就提供了Windows环境和Linux环境下的插件的安装过程(很简单,只需要不到3步的步骤.)


四、代码的实战实现~RabbitMQ的死信队列

说了这么多,想必有些小伙伴有点不耐烦了,下面我将采用实际的代码对上面所介绍的几点区别进行实现与演练(代码都是基于Spring Boot2.0搭建的项目环境实现与测试的)

(1)首先,我们需要创建死信队列以及真正的队列,并实现相关的绑定:

 //构建订单超时未支付的死信队列消息模型
    @ Bean 
    public Queue  success KillDeadQueue(){
        Map<String, Object> argsMap= Maps.newHashMap();
        argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
        argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
        return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true, false ,false,argsMap);
    }

    //基本交换机
    @Bean
    public TopicExchange successKillDeadProdExchange(){
        return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
    }

    //创建基本交换机+基本路由 -> 死信队列 的绑定
    @Bean
    public Binding successKillDeadProdBinding(){
        return Binding build er.bind(successKillDeadQueue()).to(successKillDeadProdExchange()). with (env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
    }

    //真正的队列
    @Bean
    public Queue successKillRealQueue(){
        return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
    }

    //死信交换机
    @Bean
    public TopicExchange successKillDeadExchange(){
        return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
    }

    //死信交换机+死信路由->真正队列 的绑定
    @Bean
    public Binding successKillDeadBinding(){
        return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
    } 

(2)将项目运行起来,登录RabbitMQ的后端控制台,可以看到成功创建了相应的死信队列和真正的队列等组件,如下图所示:

(3)紧接着,我们在Controller中建立一个请求方法,用于接收前端请求过来的消息,并将该消息附以TTL值,塞入死信队列中,如下所示:

 //死信队列-生产者
    @RequestMapping(value = "dead/ msg /send",method = RequestMethod.GET)
    @ResponseBody
    public BaseResponse sendDQMsg(@RequestParam String msg,@RequestParam Long ttl){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            Message realMsg=MessageBuilder.withBody(msg.getBytes("UTF-")).build();

            rabbitTemplate.setMessageConverter(new JacksonJsonMessageConverter());
            rabbitTemplate.convertAndSend(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"), env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"), realMsg, message -> {
                MessageProperties mp=message.getMessageProperties();
                mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

                //TODO:动态设置TTL
                mp.setExpiration(String.valueOf(ttl));

                log.info("死信队列生产者-发出消息:{} TTL:{}",msg,ttl);
                return message;
            });
        }catch (Exception e){
            response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
        }
        return response;
    } 

(4)最后是写一个Spring Bean类充当消费者,在其中监听”实际队列”的消息:

 @RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
    public void consumeExpireOrder(@Payload byte[] msg){
        try {
            log.info("死信队列-监听者-接收消息:{}",new String(msg,"UTF-"));

        }catch (Exception e){
            log.error("死信队列-监听者-发生异常:",e.fillInStackTrace());
        }
    } 

最后,我们进入测试环节,打开Postman,前后输入3次不同的请求信息,其中各自的TTL也不尽相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最终在Console控制台等待,你会发现消费者监听的消息的顺序为:A、B、C,而不是C、B、A,如下图所示:

Java中间件实战系列(1)- RabbitMQ死信与延迟队列的区别与实现

五、代码的实战实现~RabbitMQ的延迟/延时队列

很明显,由于死信存在的这个缺陷,故而其在上面的应用场景中是不太适用的! 即死信队列在 消息的TTL不一致,且后入死信的消息TTL小于前入的消息TTL的应用场景中是不适用的,而像”订单超时未支付”的应用场景,因为大家都一样,都是固定的30min或者 1h,故而这种场景,死信是相当适合的。

因此,为了解决实际项目中”TTL不一致且不固定”的应用场景,我们需要搬上”延迟/延时队列”(当然啦,Redisson的延迟/延迟队列也是可以实现的!),下面我们用代码加以实现!

(1)首先是创建”延迟/延时队列”等相关的组件,如下所示;

 	//TODO:RabbitMQ延迟队列
    @Bean
    public Queue delayQueue(){
        return QueueBuilder.durable(env.getProperty("mq.kill.delay.queue")).build();
    }

    @Bean
    public CustomExchange delayExchange(){
        Map<String,Object> map=Maps.newHashMap();
        map.put("x-delayed-type","direct");
        return new CustomExchange(env.getProperty("mq.kill.delay.exchange"),"x-delayed-message",true,false,map);
    }

    @Bean
    public Binding delayBinding(){
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(env.getProperty("mq.kill.delay.routingKey")).noargs();
    }

(2)其生产者发送消息的代码我们仍然是放在一个Controller的请求方法中,如下所示:

 //延迟队列-生产者
    @RequestMapping(value = "delay/msg/send",method = RequestMethod.GET)
    @ResponseBody
    public BaseResponse sendDelayMsg(@RequestParam String msg,@RequestParam Long ttl){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            String info=msg;

            Message realMsg=MessageBuilder.withBody(info.getBytes("UTF-")).build();
            rabbitTemplate.convertAndSend(env.getProperty("mq.kill.delay.exchange"),env.getProperty("mq.kill.delay.routingKey"),
                    realMsg, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    MessageProperties mp=message.getMessageProperties();
                    mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    mp.setHeader("x-delay",ttl);

                    log.info("延迟队列生产者-发出消息:{} TTL:{}",msg,ttl);
                    return message;
                }
            });

        }catch (Exception e){
            response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
        }
        return response;
    } 

(3)最后是用于监听延迟队列中消息的消费者的代码,如下所示:

 /**
 * 延时队列-消息监听器-消费者
 * @Author:debug (SteadyJack)
 * @Link: weixin-> debug qq-> 1948831260
**/
@Component
public class DelayQueueMqListener {
    private static final Logger log= LoggerFactory.getLogger(DelayQueueMqListener.class);

    //消息监听
    @RabbitListener(queues = {"${mq.kill.delay.queue}"})
    public void consumeMsg(@Payload byte[] msg){
        try {
            String info=new String(msg,"UTF-");

            log.info("延时队列监听到消息:{}  ",info);
        }catch (Exception e){
            log.error("延时队列-消息监听器-消费者-消息监听-发生异常:",e.fillInStackTrace());
        }
    }
} 

(4)将项目跑起来,可以看到RabbitMQ的后端控制台已经建立了该队列,如下图所示:

Java中间件实战系列(1)- RabbitMQ死信与延迟队列的区别与实现

(5)最后,我们打开postman,前后输入3次不同的请求信息,其中各自的TTL也不尽相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最终在Console控制台等待,你会发现消费者监听的消息的顺序为:A、B、C,而不是C、B、A,如下图所示:

Java中间件实战系列(1)- RabbitMQ死信与延迟队列的区别与实现

从该运行结果上看,会发现这才是我们真正想要的结果,即按照时间TTL的大小来决定消息被消费的先后顺序,而且,你可以看出消费时的时间跟发出的时间刚好差 TTL !

在文章的最后的,我们简单总结一下本文所讲的内容,即主要介绍、对比并实战了RabbitMQ中两款具有”延时、延迟处理任务”功效的组件,即”死信队列”和”延迟队列”,其差异性主要体现在:创建上的不同、功能特性的不同、插件安装上的不同等方面。

总体来说,如果是想追求消息传输的稳定性、可靠性且TTL是固定的话,那么建议选择”死信队列”,因为消息从一开始就在队列中待着,等到TTL一到才被路由到真正的队列!而”延迟队列”则不同,即发送出去的消息需要等待 TTL 的时间才进入”延迟队列”,如果在等待的期间,Mq Server 宕机了,那很可能消息就丢失了…..