redis应用系列二:异步消息队列:生产/消费模式实现及优化

Laravel框架
372
0
0
2022-04-17

[TOC]

面试中关于 redis 中经常会被如何实现异步队列?以及存在什么问题,怎么改进,鉴于次今天进行异步队列实现和优化

说明:

异步消息队列是什么?

异步消息队列能解决什么问题?

什么时候用?

什么地方用?

以上问题请参考 消息队列

基于 list 实现的生产 / 消费模式队列,在应用中使用场景最为广泛,以下是具体的常见实现过程以及分析

How

生产 / 消费模式有三个基本的元素

生产者(producer):用于组装消息,并将组装的消息通过 lpush(左进)的方式压入队列,供消费者消费此消息

队列:用于承载消息的载体,在队列里面,被生产者压入的消息按照一定顺序进行存放,保证有序性

消费者(consumer):用于将队列里面的消息按照先进先出(此处采用 rpop)的原则弹出队列容器中

生产者实现

  /**
   * 准备处理消息key
   */
  static $todoKey = 'TO_SEND_MSG';
  /**
   * 处理中消息key
   */
  static $doingKey = 'SENDING_MSG';
  /**
   * 无限制阻塞
   */
  static $timeout = 0;

  /**
   * 生产
   */
  public function producer()
  {

    $userARr = [
      'user1@mail1.com',
      'user2@mail2.com',
      'user3@mail3.com',
    ];
    Redis::lpush(self::$todoKey, $userARr);
  }

生产者 producer 用于生产队列,利用 redis 的 list 结构中 lpush 从左向队列中压入(可以批量)需要处理的消息,此消息在 list 中按照顺序进行存放,入队相对简单,至此完成了队列入队。

有了队列,怎么去消费这个队列数据呢?这就是下面的消费触发方式

如何消费

消费触发方式 特点

死循环方式读取 易实现,故障时无法及时恢复(比较适合做秒杀,比较集中,运维集中维护)

定时任务 压力均分,有处理上限;(需要合理设置时间间隔,不要等上一个任务没有完成下一个任务又开始了)

守护进程 类似于 php-fpm 和 php-cg, 需要 shell 基础,实现成本较高

鉴于此我们采用死循环的方式进行消费,具体消费过程设计如下对比

消费者

示例 1

  /**
   * 消费
   */
  public function consumer1()
  {
    /**
     * 循环执行消费
     */
    while (true) {
      $msg = Redis::rpop(self::$todoKey);
      if ($msg) {
        // TODO 发送消息业务
      }
    }
  }

这种实现方式也是很多人会采用的实现方式,但是存在问题:

当待处理的消息 self::$todoKey 为空的情况下:消费服务将会密集的向 redis 服务重复执行’rpop’命令,将造成资源严重浪费,对系统性能有严重折损,因此并不可取

改进:可以考虑当消息为空的时候,进行 sleep 代码如下:

示例 2

/**
   * 消费
   */
  public function consumer2()
  {
    /**
     * 循环执行消费
     */
    while (true) {
      $msg = Redis::rpop(self::$todoKey);
      if ($msg) {
        // TODO 发送消息业务
      } else {
        // 等待10s
        sleep(10);
      }
    }
  }

加入 sleep 后,解决了队列为空数据时候造成的服务端资源浪费问题,但是又引入新问题:

如果有消息入队的情况下,程序阻塞在 sleep 处,程序将会无法及时响应消息,消息及时性会打折扣(排除业务允许延迟的的情况),到此一般业务已经能够满足,但是对于追求性能的业务场景并不尽人意,需要继续探索,请看下面示例:

示例 3

  /**
   * 消费
   */
  public function consumer3()
  {
    /**
     * 循环执行消费
     */
    while (true) {
      // timeout=0 无限制阻塞式消费
      $msg = Redis::brpop(self::$todoKey, self::$timeout);
      if ($msg) {
        // TODO 发送消息业务
      }
    }
  }

幸好 redis 中只需要一个命令:brpop 搞定上面问题,brpop 在队列有数据的时候进行出队操作,在队列没有数据的时候进行阻塞等待,知道队列中有数据,看起来到此一切完美,大功告成的样子,但是作为程序设计的我们还得考虑极端情况:

如果消费服务端出现异常,由于服务端没有进行备份,那么将会出现消息丢失情况,这种情况的解决请看下面示例

示例 4

  /**
   * 消费
   */
  public function consumer4()
  {
    while (true) {
      do {
        // 循环阻阻塞执行,消费端取到消息的同时,原子性的把该消息放入一个正在处理中的$doingKey列表(进行备份)
        $msg = Redis::brpoplpush(self::$todoKey, self::$doingKey, self::$timeout);
        try {
          // TODO 发送消息业务
          // 业务未出现异常,处理完业务后,则从正在处理的list中删除当前处理的消息,直到处理中list消息为空
          Redis::lrem(self::$doingKey, 1, $msg);
        } catch (\Exception $e) {
          // TODO 异常
          /**
           * 业务出现异常,处理异常,
           * 通常处理异常有两种方法
           * 1.将异常消息重新放到待消费$todoKey的list中
           * 2.单独处理异常消息(如发邮件通知,人工处理)
           * 3.单独起一个脚本程序,处理长时间存在于处理中$doingKey->list消息
           */

        }
      } while (Redis::llen(self::$todoKey) || Redis::llen(self::$doingKey));
    }
    /**
     * 到此时,队列已经满足了安全性和性能(高可用)要求
     * 但是,对业务的处理上面此种方法并不完美
     * 为了尽可能地完善,还需要写一个服务端定时脚本
     * 此脚本用于监测和处理长时间存在于处理中$doingKey->list消息
     */
  }

上面代码中利用 brpoplpush 在消费端取到消息的同时原子性的把该消息放入一个正在处理中的 $doingKey 列表(进行备份):

如果处理业务没有出现异常,那么业务处理完成后从正在处理中的 $doingKey 列表 lrem 删除当前已经处理 ok 的消息

但是如果处理消息业务出现异常,不用慌,我们已经在处理中的 $doingKey 列表中备份了异常消息,对于异常消息处理,得根据具体业务具体实现:常见有两种:见上面说明。

到此:我们可以说已经兼顾了队列的安全性,高效性,但是感觉在处理异常上面增加了复杂度,那么有没有更简单的实现方法呢?见下面示例:

示例 5

  /**
   * 消费
   */
  public function consumer5()
  {
    /**
     * 利用旋转列表功能
     * 重置处理中消息list的key 和 待处理消息list为同一个key
     */
    while (true) {
      self::$doingKey = self::$todoKey;
      do {
        // 循环阻阻塞执行,消费端取到消息的同时,原子性的把该消息放入一个正在处理中的列表(进行备份)
        // 此处由于处理中和待处理list为同一个,利用brpoplpush旋转列表
        $msg = Redis::brpoplpush(self::$todoKey, self::$doingKey, self::$timeout);
        try {
          // TODO 发送消息业务
          // 业务未出现异常,处理完业务后,则删除当前处理的消息,直到处理中list消息为空
          Redis::lrem(self::$doingKey, 1, $msg);
        } catch (\Exception $e) {
          // 业务出现异常,继续循环,直到
        }
      } while (Redis::llen(self::$todoKey));
    }

  }

此段代码跟示例 4 唯一的不同就是:self::$doingKey = self::$todoKey;

1. 利用 list 实现旋转列表功能,在同一个队列中,从尾部出队的同时,从头部入队,如果没有异常则删除头部入队消息,如果出现异常,那么一直循环处理,当然这种处理有局限性:

A. 勿删的可能性

B. 如果一个异常消息一直得不到正确处理,就会一直占用资源

c. 此种方法慎用

综合考虑:示例 4 就是一种最优解

但是:萝卜白菜各有所爱,不同服务业务场景选择不同实现,我们就是得搞清楚他们是萝卜还是白菜