对于消息中间件RabbitMQ,想必各位小伙伴并不陌生,其广泛应用程度不言而喻,此前我们也在许多课程以及诸多专栏文章中介绍了它的应用,其应用场景也是相当广泛的,像什么消息异步通信、服务模块解耦、高并发流量削峰、订单超时未支付自动失效等等都是实际项目中最为常见的场景。本文我们将重点介绍并实现Rabbit mq 的死信与延时队列,并将两者做一个简单的对比!
内容
对于RabbitMQ的死信队列,此前我们在”Java秒杀系统”这一技术专栏中已经有重点介绍过了,在那里我们是将其应用于 “订单超时未支付自动失效”这一业务场景中,简而言之,”死信队列”是一种特殊的”队列”,跟普通的队列相比,具有”延迟处理任务”的特性。
而在消息中间件RabbitMQ的架构组件中,也存在着跟”死信队列”在功能特性方面几乎相同的组件,那就是”延迟队列/延时队列”,同样也具有”延迟、延时处理任务”的功效!
当然啦,这两者还是有一丢丢区别的,最直观的当然是名字上啦,从名字上你就可以看出来两者的”处事风格”是不一样的,具体体现在:
一、创建上的差异:
(1)RabbitMQ的死信队列DeadQueue是由”死信交换机DLX”+”死信路由DLK”组成的,当然,可能还会有” TTL “,而DLX和DLK又可以绑定指向真正的队列RealQueue,这个队列RealQueue便是”消费者”真正监听的对象.
(2)而RabbitMQ的延迟/延时队列DelayedQueue 则是由普通的队列来创建即可,唯一不同的地方在于其绑定的交换机为自定义的交换机,即”CustomExchange”,在创建该交换机时只需要指定其消息的类型为 “x-delayed-message”即可.”消费者”真正监听的队列也是它本人,即DelayedQueue
画外音:从这一点上看,延迟/延时队列的创建相对而言简单一些!
二、功能特性上的差异:
(1)死信队列在实际应用时虽然可以实现”延时、延迟处理任务”的功效,但进入死信中的消息却依然保留了队列的特性,即”FIFO” ~ 先进先出,而不管先后进入队列中消息的TTL的值. 即假设先后进入死信的消息为A、B、C,各自的TTL分别为:10s、3s、5s,理论上TTL先后到达的顺序是:B、C、A,然后从死信出来,最终被路由到真正的队列中,即消息被消费的先后顺序应该为:B、C、A,然而现实却是残酷的,其最终消费的消息的顺序为:A、B、C,即”消息是怎么进去的,就怎么出来”,保留了所谓的FIFO特性.
(2)或许是因为死信有这种缺陷,所以RabbitMQ提供了另一种组件,即”延迟队列”,它可以很完美的解决上面死信出现的问题,即最终消费的消息的顺序为:B、C、A,我们将在下面用实际的代码进行实战实现与演练.
三、插件安装上的差异:
(1)死信不需要额外的插件
(2)但是延迟队列在实际项目使用时却需要在Mq Server中安装一个插件,它的名字叫做:”rabbitmq_delayed_message_ exchange “,其安装过程可以参考链接: 里面就提供了Windows环境和Linux环境下的插件的安装过程(很简单,只需要不到3步的步骤.)
四、代码的实战实现~RabbitMQ的死信队列
说了这么多,想必有些小伙伴有点不耐烦了,下面我将采用实际的代码对上面所介绍的几点区别进行实现与演练(代码都是基于Spring Boot2.0搭建的项目环境实现与测试的)
(1)首先,我们需要创建死信队列以及真正的队列,并实现相关的绑定:
//构建订单超时未支付的死信队列消息模型
@ Bean
public Queue success KillDeadQueue(){
Map<String, Object> argsMap= Maps.newHashMap();
argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true, false ,false,argsMap);
}
//基本交换机
@Bean
public TopicExchange successKillDeadProdExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
}
//创建基本交换机+基本路由 -> 死信队列 的绑定
@Bean
public Binding successKillDeadProdBinding(){
return Binding build er.bind(successKillDeadQueue()).to(successKillDeadProdExchange()). with (env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}
//真正的队列
@Bean
public Queue successKillRealQueue(){
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}
//死信交换机
@Bean
public TopicExchange successKillDeadExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}
//死信交换机+死信路由->真正队列 的绑定
@Bean
public Binding successKillDeadBinding(){
return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}
(2)将项目运行起来,登录RabbitMQ的后端控制台,可以看到成功创建了相应的死信队列和真正的队列等组件,如下图所示:
(3)紧接着,我们在Controller中建立一个请求方法,用于接收前端请求过来的消息,并将该消息附以TTL值,塞入死信队列中,如下所示:
//死信队列-生产者
@RequestMapping(value = "dead/ msg /send",method = RequestMethod.GET)
@ResponseBody
public BaseResponse sendDQMsg(@RequestParam String msg,@RequestParam Long ttl){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
Message realMsg=MessageBuilder.withBody(msg.getBytes("UTF-")).build();
rabbitTemplate.setMessageConverter(new JacksonJsonMessageConverter());
rabbitTemplate.convertAndSend(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"), env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"), realMsg, message -> {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//TODO:动态设置TTL
mp.setExpiration(String.valueOf(ttl));
log.info("死信队列生产者-发出消息:{} TTL:{}",msg,ttl);
return message;
});
}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}
(4)最后是写一个Spring Bean类充当消费者,在其中监听”实际队列”的消息:
@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(@Payload byte[] msg){
try {
log.info("死信队列-监听者-接收消息:{}",new String(msg,"UTF-"));
}catch (Exception e){
log.error("死信队列-监听者-发生异常:",e.fillInStackTrace());
}
}
最后,我们进入测试环节,打开Postman,前后输入3次不同的请求信息,其中各自的TTL也不尽相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最终在Console控制台等待,你会发现消费者监听的消息的顺序为:A、B、C,而不是C、B、A,如下图所示:
五、代码的实战实现~RabbitMQ的延迟/延时队列
很明显,由于死信存在的这个缺陷,故而其在上面的应用场景中是不太适用的! 即死信队列在 消息的TTL不一致,且后入死信的消息TTL小于前入的消息TTL的应用场景中是不适用的,而像”订单超时未支付”的应用场景,因为大家都一样,都是固定的30min或者 1h,故而这种场景,死信是相当适合的。
因此,为了解决实际项目中”TTL不一致且不固定”的应用场景,我们需要搬上”延迟/延时队列”(当然啦,Redisson的延迟/延迟队列也是可以实现的!),下面我们用代码加以实现!
(1)首先是创建”延迟/延时队列”等相关的组件,如下所示;
//TODO:RabbitMQ延迟队列
@Bean
public Queue delayQueue(){
return QueueBuilder.durable(env.getProperty("mq.kill.delay.queue")).build();
}
@Bean
public CustomExchange delayExchange(){
Map<String,Object> map=Maps.newHashMap();
map.put("x-delayed-type","direct");
return new CustomExchange(env.getProperty("mq.kill.delay.exchange"),"x-delayed-message",true,false,map);
}
@Bean
public Binding delayBinding(){
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(env.getProperty("mq.kill.delay.routingKey")).noargs();
}
(2)其生产者发送消息的代码我们仍然是放在一个Controller的请求方法中,如下所示:
//延迟队列-生产者
@RequestMapping(value = "delay/msg/send",method = RequestMethod.GET)
@ResponseBody
public BaseResponse sendDelayMsg(@RequestParam String msg,@RequestParam Long ttl){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
String info=msg;
Message realMsg=MessageBuilder.withBody(info.getBytes("UTF-")).build();
rabbitTemplate.convertAndSend(env.getProperty("mq.kill.delay.exchange"),env.getProperty("mq.kill.delay.routingKey"),
realMsg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader("x-delay",ttl);
log.info("延迟队列生产者-发出消息:{} TTL:{}",msg,ttl);
return message;
}
});
}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}
(3)最后是用于监听延迟队列中消息的消费者的代码,如下所示:
/**
* 延时队列-消息监听器-消费者
* @Author:debug (SteadyJack)
* @Link: weixin-> debug qq-> 1948831260
**/
@Component
public class DelayQueueMqListener {
private static final Logger log= LoggerFactory.getLogger(DelayQueueMqListener.class);
//消息监听
@RabbitListener(queues = {"${mq.kill.delay.queue}"})
public void consumeMsg(@Payload byte[] msg){
try {
String info=new String(msg,"UTF-");
log.info("延时队列监听到消息:{} ",info);
}catch (Exception e){
log.error("延时队列-消息监听器-消费者-消息监听-发生异常:",e.fillInStackTrace());
}
}
}
(4)将项目跑起来,可以看到RabbitMQ的后端控制台已经建立了该队列,如下图所示:
(5)最后,我们打开postman,前后输入3次不同的请求信息,其中各自的TTL也不尽相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最终在Console控制台等待,你会发现消费者监听的消息的顺序为:A、B、C,而不是C、B、A,如下图所示:
从该运行结果上看,会发现这才是我们真正想要的结果,即按照时间TTL的大小来决定消息被消费的先后顺序,而且,你可以看出消费时的时间跟发出的时间刚好差 TTL !
在文章的最后的,我们简单总结一下本文所讲的内容,即主要介绍、对比并实战了RabbitMQ中两款具有”延时、延迟处理任务”功效的组件,即”死信队列”和”延迟队列”,其差异性主要体现在:创建上的不同、功能特性的不同、插件安装上的不同等方面。
总体来说,如果是想追求消息传输的稳定性、可靠性且TTL是固定的话,那么建议选择”死信队列”,因为消息从一开始就在队列中待着,等到TTL一到才被路由到真正的队列!而”延迟队列”则不同,即发送出去的消息需要等待 TTL 的时间才进入”延迟队列”,如果在等待的期间,Mq Server 宕机了,那很可能消息就丢失了…..