[TOC]
面试中关于 redis 中经常会被如何实现异步队列?以及存在什么问题,怎么改进,鉴于次今天进行异步队列实现和优化
说明:
异步消息队列是什么?
异步消息队列能解决什么问题?
什么时候用?
什么地方用?
以上问题请参考 消息队列
基于 list 实现的生产 / 消费模式队列,在应用中使用场景最为广泛,以下是具体的常见实现过程以及分析
How
生产 / 消费模式有三个基本的元素
生产者(producer):用于组装消息,并将组装的消息通过 lpush(左进)的方式压入队列,供消费者消费此消息
队列:用于承载消息的载体,在队列里面,被生产者压入的消息按照一定顺序进行存放,保证有序性
消费者(consumer):用于将队列里面的消息按照先进先出(此处采用 rpop)的原则弹出队列容器中
生产者实现
/**
* 准备处理消息key
*/
static $todoKey = 'TO_SEND_MSG';
/**
* 处理中消息key
*/
static $doingKey = 'SENDING_MSG';
/**
* 无限制阻塞
*/
static $timeout = 0;
/**
* 生产
*/
public function producer()
{
$userARr = [
'user1@mail1.com',
'user2@mail2.com',
'user3@mail3.com',
];
Redis::lpush(self::$todoKey, $userARr);
}
生产者 producer 用于生产队列,利用 redis 的 list 结构中 lpush 从左向队列中压入(可以批量)需要处理的消息,此消息在 list 中按照顺序进行存放,入队相对简单,至此完成了队列入队。
有了队列,怎么去消费这个队列数据呢?这就是下面的消费触发方式
如何消费
消费触发方式 特点
死循环方式读取 易实现,故障时无法及时恢复(比较适合做秒杀,比较集中,运维集中维护)
定时任务 压力均分,有处理上限;(需要合理设置时间间隔,不要等上一个任务没有完成下一个任务又开始了)
守护进程 类似于 php-fpm 和 php-cg, 需要 shell 基础,实现成本较高
鉴于此我们采用死循环的方式进行消费,具体消费过程设计如下对比
消费者
示例 1
/**
* 消费
*/
public function consumer1()
{
/**
* 循环执行消费
*/
while (true) {
$msg = Redis::rpop(self::$todoKey);
if ($msg) {
// TODO 发送消息业务
}
}
}
这种实现方式也是很多人会采用的实现方式,但是存在问题:
当待处理的消息 self::$todoKey 为空的情况下:消费服务将会密集的向 redis 服务重复执行’rpop’命令,将造成资源严重浪费,对系统性能有严重折损,因此并不可取
改进:可以考虑当消息为空的时候,进行 sleep 代码如下:
示例 2
/**
* 消费
*/
public function consumer2()
{
/**
* 循环执行消费
*/
while (true) {
$msg = Redis::rpop(self::$todoKey);
if ($msg) {
// TODO 发送消息业务
} else {
// 等待10s
sleep(10);
}
}
}
加入 sleep 后,解决了队列为空数据时候造成的服务端资源浪费问题,但是又引入新问题:
如果有消息入队的情况下,程序阻塞在 sleep 处,程序将会无法及时响应消息,消息及时性会打折扣(排除业务允许延迟的的情况),到此一般业务已经能够满足,但是对于追求性能的业务场景并不尽人意,需要继续探索,请看下面示例:
示例 3
/**
* 消费
*/
public function consumer3()
{
/**
* 循环执行消费
*/
while (true) {
// timeout=0 无限制阻塞式消费
$msg = Redis::brpop(self::$todoKey, self::$timeout);
if ($msg) {
// TODO 发送消息业务
}
}
}
幸好 redis 中只需要一个命令:brpop 搞定上面问题,brpop 在队列有数据的时候进行出队操作,在队列没有数据的时候进行阻塞等待,知道队列中有数据,看起来到此一切完美,大功告成的样子,但是作为程序设计的我们还得考虑极端情况:
如果消费服务端出现异常,由于服务端没有进行备份,那么将会出现消息丢失情况,这种情况的解决请看下面示例
示例 4
/**
* 消费
*/
public function consumer4()
{
while (true) {
do {
// 循环阻阻塞执行,消费端取到消息的同时,原子性的把该消息放入一个正在处理中的$doingKey列表(进行备份)
$msg = Redis::brpoplpush(self::$todoKey, self::$doingKey, self::$timeout);
try {
// TODO 发送消息业务
// 业务未出现异常,处理完业务后,则从正在处理的list中删除当前处理的消息,直到处理中list消息为空
Redis::lrem(self::$doingKey, 1, $msg);
} catch (\Exception $e) {
// TODO 异常
/**
* 业务出现异常,处理异常,
* 通常处理异常有两种方法
* 1.将异常消息重新放到待消费$todoKey的list中
* 2.单独处理异常消息(如发邮件通知,人工处理)
* 3.单独起一个脚本程序,处理长时间存在于处理中$doingKey->list消息
*/
}
} while (Redis::llen(self::$todoKey) || Redis::llen(self::$doingKey));
}
/**
* 到此时,队列已经满足了安全性和性能(高可用)要求
* 但是,对业务的处理上面此种方法并不完美
* 为了尽可能地完善,还需要写一个服务端定时脚本
* 此脚本用于监测和处理长时间存在于处理中$doingKey->list消息
*/
}
上面代码中利用 brpoplpush 在消费端取到消息的同时原子性的把该消息放入一个正在处理中的 $doingKey 列表(进行备份):
如果处理业务没有出现异常,那么业务处理完成后从正在处理中的 $doingKey 列表 lrem 删除当前已经处理 ok 的消息
但是如果处理消息业务出现异常,不用慌,我们已经在处理中的 $doingKey 列表中备份了异常消息,对于异常消息处理,得根据具体业务具体实现:常见有两种:见上面说明。
到此:我们可以说已经兼顾了队列的安全性,高效性,但是感觉在处理异常上面增加了复杂度,那么有没有更简单的实现方法呢?见下面示例:
示例 5
/**
* 消费
*/
public function consumer5()
{
/**
* 利用旋转列表功能
* 重置处理中消息list的key 和 待处理消息list为同一个key
*/
while (true) {
self::$doingKey = self::$todoKey;
do {
// 循环阻阻塞执行,消费端取到消息的同时,原子性的把该消息放入一个正在处理中的列表(进行备份)
// 此处由于处理中和待处理list为同一个,利用brpoplpush旋转列表
$msg = Redis::brpoplpush(self::$todoKey, self::$doingKey, self::$timeout);
try {
// TODO 发送消息业务
// 业务未出现异常,处理完业务后,则删除当前处理的消息,直到处理中list消息为空
Redis::lrem(self::$doingKey, 1, $msg);
} catch (\Exception $e) {
// 业务出现异常,继续循环,直到
}
} while (Redis::llen(self::$todoKey));
}
}
此段代码跟示例 4 唯一的不同就是:self::$doingKey = self::$todoKey;
1. 利用 list 实现旋转列表功能,在同一个队列中,从尾部出队的同时,从头部入队,如果没有异常则删除头部入队消息,如果出现异常,那么一直循环处理,当然这种处理有局限性:
A. 勿删的可能性
B. 如果一个异常消息一直得不到正确处理,就会一直占用资源
c. 此种方法慎用
综合考虑:示例 4 就是一种最优解
但是:萝卜白菜各有所爱,不同服务业务场景选择不同实现,我们就是得搞清楚他们是萝卜还是白菜