php使用redis实现滑动时间窗口限流算法

PHP技术
353
0
0
2022-11-12
标签   Redis

欢迎各位大佬指教:blush: :blush:

/**
 * 滑动窗口限流
 * Class SlidingWindow
 * @package App\Common
 */
class SlidingWindow
{
    protected $timeStamp;                       //当前的时间戳

    protected $_is_open_minimum_check = false;  //是否开启最小精度检查 默认不开启

    protected $_minimum_time_range_size = 1;    // 最小限制范围精度单位秒钟 
    protected $_minimum_time_range_qps  = 50;   // 最小范围精度对应的时间内允许的请求数量

    protected $_maximum_time_range_size = 60;   // 最大限制范围单位秒钟 
    protected $_maximum_time_range_qps = 3000;  // 最大限制范围对应的时间内允许的请求数量

    protected $key = 'sliding_window';

    /** @var $redis \Redis */ 
    public $redis;

    public function __construct($redis)
    {
        $this->timeStamp = time();
        $this->redis = $redis;
    }

    //设置是否开启最小精度的检查 false为关闭 true为开启 
    public function setOpenMininumCheck($open)
    {
        $this->_is_open_minimum_check = $open;
        return $this;
    }

    //设置最小精度时间 单位秒 
    public function setMininumTime($time)
    {
        $this->_minimum_time_range_size = $time;
        return $this;
    }

    //设置最小精度的限制数量 
    public function setMinnumQps($qps)
    {
        $this->_minimum_time_range_qps = $qps;
        return $this;
    }

    //设置最大精度时间 单位秒 
    public function setMaxnumTime($time)
    {
        $this->_maximum_time_range_size = $time;
        return $this;
    }

    //设置最大精度的限制数量 
    public function setMaxnumQps($qps)
    {
        $this->_maximum_time_range_qps = $qps;
        return $this;
    }
    //设置自定义key 
    public function setKey($key = '')
    {
        if(!empty($key)){
            $this->key = $key;
        }
        return $this;
    }

    public function luaGrant($value)
    {
        //极限并发情况下的,使用lua脚本会被redis当成一个整体的原子操作
        $luaScript = <<<EOT
                        local key = KEYS[1]
                        redis.call('expire',key,tonumber(ARGV[2]))
                        redis.call('zremrangebyscore',key,0,tonumber(ARGV[4]))
                        local max_count = redis.call('zcount',key,tonumber(ARGV[4]),tonumber(ARGV[5]))
                        if(max_count < tonumber(ARGV[8])) then
                            if (tonumber(ARGV[1]))then
                               local min_count = redis.call('zcount',key,tonumber(ARGV[3]),tonumber(ARGV[5]))
                               if(min_count >= tonumber(ARGV[7])) then
                                     return 0 
                               else
                                 redis.call('zadd',key,tonumber(ARGV[5]),tonumber(ARGV[6]))
                                 return 1
                               end
                            else
                                redis.call('zadd',key,tonumber(ARGV[5]),tonumber(ARGV[6]))
                                return 1
                            end
                        else 
                           return 0
                        end
EOT;
        $res =  $this->redis->eval($luaScript, [
            $this->key,
            $this->_is_open_minimum_check,
            $this->_maximum_time_range_size,
            $this->timeStamp - $this->_minimum_time_range_size + 1,
            $this->timeStamp - $this->_maximum_time_range_size,
            $this->timeStamp,
            $value,
            $this->_minimum_time_range_qps,
            $this->_maximum_time_range_qps
        ], 1);
//        return json_encode(["status" => (bool)$res,"time" => date('Y-m-d H:i:s',$this->timeStamp)]); 
        return (bool)$res;
    }
}

使用示例

//uid为 具体的每个用户
$res = (new SlidingWindow($redis))
    ->setKey()
    ->setOpenMininumCheck(true)
    ->setMininumTime(5)
    ->setMinnumQps(500)
    ->setMaxnumTime(60)
    ->setMaxnumQps(3000)
    ->luaGrant($uid);
if(!$res){
    //限流了 
    echo json_encode(["status" => false, "text" => "当前访问人数过多,请刷新重试", "is_limit" => true],JSON_UNESCAPED_UNICODE);
    exit;
}

使用swoole的压测脚本示例

/**
 * 压测
 * Class StressTest
 */
class StressTest
{
    //每秒并发数 
    public $second_num = 100;
    //时间秒 
    public $second = 60;

    public $timer = 0;
    //协程通道 
    public $channel;

    public function output()
    {
        go(function (){
            $success = 0;
            $error = 0;
            $cw = 0;
            $datas = [];
            while (true) {
                $data = $this->channel->pop();
                if(isset($data['status']) && $data['status'] === false){
                    $datas['error'][$data['time']] ++;
                    $error++;
                }else if(isset($data['status']) && $data['status'] === true){
                    $datas['success'][$data['time']] ++;
                    $success++;
                }else{
                    $cw++;
                }
                echo "成功的数量为:$success".PHP_EOL;
                echo "失败的数量为:$error".PHP_EOL;
                echo "错误的数量为:$cw".PHP_EOL;
                echo "详细数据:".json_encode($datas).PHP_EOL;
            }
        });
    }

    public function __construct($host,$path,$port = 80)
    {
        \Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
        $this->channel = new \Swoole\Coroutine\Channel(10);
        self::log(LOG_INFO,"压测开始");
        sleep(1);
        \Swoole\Timer::tick(1000,function ($timer_id) use ($host,$path,$port) {
            $this->timer ++;
            if($this->timer <= $this->second){
                for($i = 0; $i < $this->second_num; $i++) {
                    go(function () use ($host,$path,$port) {
                        $client = new \Swoole\Coroutine\Http\Client($host,$port);
                        $client->set(['timeout' => -1]);
                        $client->get($path);
                        $res = $client->getBody();
                        $client->close();
                        $ress = json_decode($res,true);
                        $this->channel->push($ress);
                        self::log(LOG_INFO,$res);
                    });
                }
            }else{
                \Swoole\Timer::clear($timer_id);
            }
        });
        $this->output();
        \Swoole\Event::wait();
    }

    /**
     * 日志输出
     * @param $level
     * @param $format
     */ 
    public static function log($level, $format) {
        $args = func_get_args();
        $message = [];

        $time = microtime(true);
        $message[] = date('Y-m-d H:i:s', $time);
        $message[] = ' ';

        if (isset($args[2])) {
            $message[] = vsprintf($format, array_slice($args, 2));
        } else {
            $message[] = $format;
        }

        if ($level <= 3) {
            $prefix = "\e[31m";
        }
        else if ($level <= 4) {
            $prefix = "\x1b[33m";
        }
        else if ($level <= 5) {
            $prefix = "\x1b[35m";
        }
        else if ($level <= 6) {
            $prefix = "\x1b[32m";
        }
        else {
            $prefix = "\x1b[37m";
        }
        $message[] = "\x1b[0m";
        echo $prefix.implode('', $message) , PHP_EOL;
    }
}
//替换你本地的具体地址
new StressTest("127.0.0.1","/live_recent.php",8082);