背景
需要做项目迁移时,例如laravel迁移至hyperf时,因为基本上都是一步一步迁移的,仍有例如支付回调等依旧在laravel框架中进行消费的情况。需要接管处理消息的queue进行数据格式改造,利用构造同样命名空间的job去进行投递,他会序列化数据,可以debug一下内容哦,然后投递至rabbitMQ后,laravel进行消费就好啦。其中hyperf的版本背景为2.1
话不多说开干
- 在app下建立Job目录为例,大家可以根据情况来
- 在Job目录下建立Job.php,复制以下代码
declare(strict_types=1); | |
namespace AppJob; | |
/** | |
* Class Job | |
* @package AppJob | |
*/ | |
class Job | |
{ | |
protected $job; | |
public $connection; | |
public $queue; | |
public $delay; | |
/** | |
* Job constructor. | |
*/ | |
public function __invoke() | |
{ | |
$this->job = null; | |
$this->connection = null; | |
$this->queue = null; | |
$this->delay = null; | |
} | |
/** | |
* Set the desired delay for the job. | |
* | |
* @param DateTime|int|null $delay | |
* @return $this | |
*/ | |
public function delay($delay) | |
{ | |
$this->delay = $delay; | |
return $this; | |
} | |
/** | |
* Set the desired queue for the job. | |
* | |
* @param string|null $queue | |
* @return $this | |
*/ | |
public function onQueue($queue) | |
{ | |
$this->queue = $queue; | |
return $this; | |
} | |
/** | |
* Set the desired connection for the job. | |
* | |
* @param string|null $connection | |
* @return $this | |
*/ | |
public function onConnection($connection) | |
{ | |
$this->connection = $connection; | |
return $this; | |
} | |
} |
- 接管Producer.php,继续创建Producer.php,复制以下代码进去
declare(strict_types=1); | |
namespace AppJob; | |
use HyperfAmqpConnection; | |
use HyperfAmqpMessageProducerMessageInterface; | |
use PhpAmqpLibMessageAMQPMessage; | |
use PhpAmqpLibWireAMQPTable; | |
use HyperfAmqpBuilder; | |
/** | |
* 生产者 | |
* Class Producer | |
* @package AppJob | |
*/ | |
class Producer extends Builder | |
{ | |
public $exchange_type; | |
public $exchange_passive; | |
public $exchange_durable; | |
public $exchange_auto_delete; | |
public $queue_passive; | |
public $queue_durable; | |
public $queue_exclusive; | |
public $queue_auto_delete; | |
public $queue_nowait; | |
public function checkExchange($channel, $producerMessage) | |
{ | |
$exchange = $producerMessage->getExchange(); | |
$queue = $producerMessage->getQueue(); | |
$routingKey = $producerMessage->getRoutingKey(); | |
$ttl = $producerMessage->getTtl(); | |
$this->exchange_type = env('RABBITMQ_EXCHANGE_TYPE', 'direct'); | |
$this->exchange_passive = env('RABBITMQ_EXCHANGE_PASSIVE', false); | |
$this->exchange_durable = env('RABBITMQ_EXCHANGE_DURABLE', true); | |
$this->exchange_auto_delete = env('RABBITMQ_EXCHANGE_PASSIVE', false); | |
$this->queue_passive = env('RABBITMQ_QUEUE_PASSIVE', false); | |
$this->queue_durable = env('RABBITMQ_QUEUE_DURABLE', true); | |
$this->queue_exclusive = env('RABBITMQ_QUEUE_EXCLUSIVE', false); | |
$this->queue_auto_delete = env('RABBITMQ_QUEUE_AUTODELETE', false); | |
//定义交换器 | |
$channel->exchange_declare($exchange, $this->exchange_type, $this->exchange_passive, $this->exchange_durable, $this->exchange_auto_delete); | |
//定义队列 | |
$channel->queue_declare($queue, $this->queue_passive, $this->queue_durable, $this->queue_exclusive, $this->queue_auto_delete); | |
//绑定队列到交换器上 | |
$channel->queue_bind($queue, $exchange, $routingKey); | |
if ($ttl > 0) { | |
// $delayExchange = 'delayed_exchange_' . $exchange; | |
// $delayQueue = 'delayed_queue_' . $queue . '_' . $ttl; | |
// $delayRoutingKey = $routingKey . $ttl; | |
$delayExchange = $exchange; | |
$delayQueue = $queue . '_deferred_' . $ttl; | |
$delayRoutingKey = $delayQueue; | |
//定义延迟交换器 | |
$channel->exchange_declare($delayExchange, $this->exchange_type, $this->exchange_passive, $this->exchange_durable, $this->exchange_auto_delete); | |
//定义延迟队列 | |
$channel->queue_declare($delayQueue, $this->queue_passive, $this->queue_durable, $this->queue_exclusive, $this->queue_auto_delete, false, new AMQPTable(array( | |
"x-dead-letter-exchange" => $exchange, | |
"x-dead-letter-routing-key" => $routingKey, | |
"x-message-ttl" => $ttl * 1000, | |
))); | |
//绑定延迟队列到交换器上 | |
$channel->queue_bind($delayQueue, $delayExchange, $delayRoutingKey); | |
$producerMessage->setExchange($delayExchange); | |
$producerMessage->setRoutingKey($delayRoutingKey); | |
} | |
} | |
/** | |
* @param ProducerMessageInterface $producerMessage | |
* @param $routingKey | |
* @param $exchange | |
* @param bool $confirm | |
* @param int $timeout | |
* @return bool | |
* @throws Exception | |
* @throws Throwable | |
*/ | |
public function produce(ProducerMessageInterface $producerMessage, $routingKey, $exchange, bool $confirm = false, int $timeout = 5): bool | |
{ | |
return retry(1, function () use ($exchange, $routingKey, $producerMessage, $confirm, $timeout) { | |
return $this->produceMessage($producerMessage, $routingKey, $exchange, $confirm, $timeout); | |
}); | |
} | |
/** | |
* @param ProducerMessageInterface $producerMessage | |
* @param $routingKey | |
* @param $exchange | |
* @param bool $confirm | |
* @param int $timeout | |
* @return bool | |
* @throws Throwable | |
*/ | |
private function produceMessage(ProducerMessageInterface $producerMessage, $routingKey, $exchange, bool $confirm = false, int $timeout = 5) | |
{ | |
$result = false; | |
$this->injectMessageProperty($producerMessage, $routingKey, $exchange); | |
$delay = $producerMessage->getTtl(); | |
if ($delay > 0) { | |
$message = new AMQPMessage($producerMessage->payload(), array_merge($producerMessage->getProperties(), [ | |
'expiration' => $delay * 1000, | |
])); | |
} else { | |
$message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties()); | |
} | |
// $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties()); | |
$pool = $this->getConnectionPool($producerMessage->getPoolName()); | |
/** @var Connection $connection */ | |
$connection = $pool->get(); | |
if ($confirm) { | |
$channel = $connection->getConfirmChannel(); | |
} else { | |
$channel = $connection->getChannel(); | |
} | |
$channel->set_ack_handler(function () use (&$result) { | |
$result = true; | |
}); | |
try { | |
// 检测交换机和队列 | |
$this->checkExchange($channel, $producerMessage); | |
$channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey()); | |
$channel->wait_for_pending_acks_returns($timeout); | |
} catch (Throwable $exception) { | |
// Reconnect the connection before release. | |
$connection->reconnect(); | |
throw $exception; | |
} finally { | |
$connection->release(); | |
} | |
return $confirm ? $result : true; | |
} | |
private function injectMessageProperty(ProducerMessageInterface $producerMessage, $routingKey, $exchange) | |
{ | |
$producerMessage->setRoutingKey($routingKey); | |
$producerMessage->setExchange($exchange); | |
} | |
} |
- 接管ProducerMessage.php,继续创建ProducerMessage.php,复制以下代码进去
declare(strict_types=1); | |
namespace AppJob; | |
use HyperfAmqpConstants; | |
use HyperfAmqpMessageMessage; | |
use HyperfAmqpMessageProducerMessageInterface; | |
/** | |
* 生产消息 | |
* Class Job | |
* @package AppJob | |
*/ | |
abstract class ProducerMessage extends Message implements ProducerMessageInterface | |
{ | |
/** | |
* @var string | |
*/ | |
protected $payload = ''; | |
/** | |
* @var string | |
*/ | |
protected $routingKey = ''; | |
/** | |
* @var array | |
*/ | |
protected $properties | |
= [ | |
'content_type' => 'text/plain', | |
'delivery_mode' => Constants::DELIVERY_MODE_PERSISTENT, | |
]; | |
public function getProperties(): array | |
{ | |
return $this->properties; | |
} | |
public function setPayload($data): self | |
{ | |
$this->payload = $data; | |
return $this; | |
} | |
public function payload(): string | |
{ | |
return $this->serialize(); | |
} | |
public function serialize(): string | |
{ | |
return json_encode($this->payload); | |
} | |
/** | |
* @var integer 延迟时间(秒) | |
*/ | |
protected $ttl = 0; | |
public function setTtl($ttl) | |
{ | |
$this->ttl = $ttl; | |
return $this; | |
} | |
public function getTtl() | |
{ | |
return $this->ttl; | |
} | |
protected $queue = 'default'; | |
public function setQueue($name) | |
{ | |
$this->queue = $name; | |
return $this; | |
} | |
public function getQueue() | |
{ | |
return $this->queue; | |
} | |
} |
- 序列化数据,创建SerializeJobData.php,复制以下代码进去
declare(strict_types=1); | |
namespace AppJob; | |
use HyperfUtilsStr; | |
/** | |
* 序列化队列数据 | |
* Class SerializeJobData | |
* @package AppJob | |
*/ | |
class SerializeJobData extends ProducerMessage | |
{ | |
public function __construct($job) | |
{ | |
// 设置不同 pool | |
$this->poolName = 'default'; | |
/** | |
* 当驱动为redis时 | |
* use IlluminateSupportStr; | |
* 'id' => Str::random(32),'attempts' => 0, | |
*/ | |
if (env('QUEUE_DRIVER', 'rabbitmq') == 'rabbitmq') { | |
$this->payload = [ | |
'displayName' => get_class($job), | |
'job' => 'IlluminateQueueCallQueuedHandler@call', | |
'maxTries' => isset($job->tries) ? $job->tries : null, | |
'timeout' => isset($job->timeout) ? $job->timeout : null, | |
'data' => [ | |
'commandName' => get_class($job), | |
'command' => serialize(clone $job) | |
] | |
]; | |
} else { | |
$this->payload = [ | |
'displayName' => get_class($job), | |
'job' => 'IlluminateQueueCallQueuedHandler@call', | |
'maxTries' => isset($job->tries) ? $job->tries : null, | |
'timeout' => isset($job->timeout) ? $job->timeout : null, | |
'data' => [ | |
'commandName' => get_class($job), | |
'command' => serialize(clone $job) | |
], | |
'id' => Str::random(32), | |
'attempts' => 0 | |
]; | |
} | |
} | |
} |
- 创建助手函数 注意我的内容哦 按需修改
if (!function_exists('producerPushData')) { | |
/** | |
* 投递信息 | |
* @param ProducerMessageInterface $message 消息 | |
* @param string $routingKey 默认 default | |
* @param string $exchange 所投入的queue | |
* @param bool $confirm 是否需要确认 | |
* @param int $timeout 超时时间 | |
* @return bool | |
* @throws Throwable | |
*/ | |
function producerPushData($message, $routingKey = 'default', $exchange = '', bool $confirm = false, int $timeout = 5) | |
{ | |
$exchange = !empty($exchange) ? $exchange : env('RABBITMQ_EXCHANGE_NAME', 'sweetheart'); | |
return make(Producer::class)->produce($message, $routingKey, $exchange, $confirm, $timeout); | |
} | |
} |
- 使用方式 注意需要和laravel/lumen 保持同样的命名空间哦
创建的job需要继承```AppJobJob``
declare(strict_types=1); | |
use AppJobJob; | |
/** | |
* Class TestJob | |
*/ | |
class TestJob extends Job | |
{ | |
/** | |
* @var | |
*/ | |
protected $data; | |
/** | |
* TestJob constructor. | |
* @param $data | |
*/ | |
public function __construct($data) | |
{ | |
$this->data = $data; | |
} | |
/** | |
* 处理逻辑 | |
*/ | |
public function __handle() | |
{ | |
} | |
} | |
use AppJobSerializeJobData; | |
use TestJob; | |
$data = []; | |
$job = new TestJob($data); | |
producerPushData((new SerializeJobData($job))); |