RabbitMQ实现延迟队列

PHP技术
397
0
0
2022-10-07
标签   RabbitMQ
  • 实现原理:给队列的消息设置过期时间(TTL),消息到期后就会投递到一个死信队列,我们就可以在这里处理延迟的任务。

一、介绍

1. 死信队列
  • 当消息在一个队列中变成死信之后,它会被重新投递到设置的Exchange中,这个Exchange就是DLX,通过routing_key的绑定投递到对应的队列,这个队列就是死信队列。
2. 死信消息
  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false。
  • 消息TTL过期。TTL:Time To Live的简称,即过期时间。
  • 队列达到最大长度。
3. 过期消息
  • 通过队列进行设置,设置后该队列所有的消息都存在相同的过期时间。
  • 通过对消息本身设置,队列中的每条消息的过期时间都可以不一样。如果要用来实现延迟队列不建议使用这种方式,因为队列只会判断第一个消息是否过期,过期则把消息投递到死信队列。如果第一个消息过期时间为30s,二个消息的过期时间为10s,那么队列等30s后把第一消息投递到死信队列,然后继续判断下一个消息,但是这样子第二个消息的延迟时间就变成30s了。

二、原理图

  • RabbitMQ实现延迟队列

三、上代码

  • composer require php-amqplib/php-amqplib
  • 把代码贴到根目录的public.php文件运行
  <?php

  use PhpAmqpLib\Message\AMQPMessage;

  require_once __DIR__ . '/vendor/autoload.php';
  //配置信息 
  $conn_args       = array(
      'host'     => '47.107.237.18',
      'port'     => '5672',
      'login'    => 'guest',
      'password' => 'guest',
      'vhost'    => '/'
  );
  $ttl             = 10;//过期时间 
  $exchange        = 'exchange';//正常交换机 
  $delayExchange   = 'delayed_' . $exchange;//死信交换机 
  $type            = 'topic';
  $msg             = 'hello world';
  $route           = 'delay';
  $deadQueue       = 'dead_queue';//死信队列 
  $delayQueue      = 'delayed_queue_' . $exchange . '_' . $ttl;//延迟队列 
  $delayRoutingKey = $route . '_' . $ttl;

  $conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(
      $conn_args['host'],
      $conn_args['port'],
      $conn_args['login'],
      $conn_args['password'],
      $conn_args['vhost']
  );
  //创建连接和channel 
  $channel = $conn->channel();
  //定义延迟交换器 
  $channel->exchange_declare($delayExchange, 'topic', false, true, false);
  //定义延迟队列, 
  $channel->queue_declare(
      $delayQueue,
      false,
      true,
      false,
      false,
      false,
      new \PhpAmqpLib\Wire\AMQPTable(
          array(
              "x-dead-letter-exchange"    => $delayExchange,/*队列信息超时后投递到这个交换机*/ 
              "x-dead-letter-routing-key" => $route,/*routingKey*/ 
              "x-message-ttl"             => $ttl * 1000,/*超时时间*/
          )
      )
  );
  /*定义死信队列,延迟队列超时的消息就会投递到这里处理*/ 
  $channel->queue_declare(
      $deadQueue,
      false,
      true,
      false,
      false
  );
  /*绑定死信队列到延迟交换机*/ 
  $channel->queue_bind($deadQueue, $delayExchange, $route);
  //绑定延迟队列到交换器上 
  $channel->queue_bind($delayQueue, $delayExchange, $delayRoutingKey);
  //生产者发送消息 
  $msg = new AMQPMessage($msg);
  $channel->basic_publish($msg, $delayExchange, $delayRoutingKey);
  $channel->close();

谢谢观看,日常记录!

streetlamp 敬上!