PHP 框架 Hyperf 实现处理超时未支付订单和延时队列
延时队列
Delayproducer.Php
Amqpbuilder.Php
AmqpBuilder.php
- <?php
- declare(strict_types = 1);
- namespace App\Components\Amqp;
- use Hyperf\Amqp\Builder\Builder;
- use Hyperf\Amqp\Builder\QueueBuilder;
- class AmqpBuilder extends QueueBuilder
- {
- /**
- * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments
- *
- * @return \Hyperf\Amqp\Builder\Builder
- */
- public function setArguments($arguments) : Builder
- {
- $this->arguments = array_merge($this->arguments, $arguments);
- return $this;
- }
- /**
- * 设置延时队列相关参数
- *
- * @param string $queueName
- * @param int $xMessageTtl
- * @param string $xDeadLetterExchange
- * @param string $xDeadLetterRoutingKey
- *
- * @return $this
- */
- public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
- {
- $this->setArguments([
- 'x-message-ttl' => ['I', $xMessageTtl * 1000], // 毫秒
- 'x-dead-letter-exchange' => ['S', $xDeadLetterExchange],
- 'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
- ]);
- $this->setQueue($queueName);
- return $this;
- }
- }
DelayProducer.php
- <?php
- declare(strict_types = 1);
- namespace App\Components\Amqp;
- use Hyperf\Amqp\Annotation\Producer;
- use Hyperf\Amqp\Builder;
- use Hyperf\Amqp\Message\ProducerMessageInterface;
- use Hyperf\Di\Annotation\AnnotationCollector;
- use PhpAmqpLib\Message\AMQPMessage;
- use Throwable;
- class DelayProducer extends Builder
- {
- /**
- * @param ProducerMessageInterface $producerMessage
- * @param AmqpBuilder $queueBuilder
- * @param bool $confirm
- * @param int $timeout
- *
- * @return bool
- * @throws \Throwable
- */
- public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
- {
- return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
- {
- return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
- });
- }
- /**
- * @param ProducerMessageInterface $producerMessage
- * @param AmqpBuilder $queueBuilder
- * @param bool $confirm
- * @param int $timeout
- *
- * @return bool
- * @throws \Throwable
- */
- private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
- {
- $result = false;
- $this->injectMessageProperty($producerMessage);
- $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
- $pool = $this->getConnectionPool($producerMessage->getPoolName());
- /** @var \Hyperf\Amqp\Connection $connection */
- $connection = $pool->get();
- if ($confirm) {
- $channel = $connection->getConfirmChannel();
- } else {
- $channel = $connection->getChannel();
- }
- $channel->set_ack_handler(function () use (&$result)
- {
- $result = true;
- });
- try {
- // 处理延时队列
- $exchangeBuilder = $producerMessage->getExchangeBuilder();
- // 队列定义
- $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
- // 路由定义
- $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
- // 队列绑定
- $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
- // 消息发送
- $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
- $channel->wait_for_pending_acks_returns($timeout);
- } catch (Throwable $exception) {
- // Reconnect the connection before release.
- $connection->reconnect();
- throw $exception;
- }
- finally {
- $connection->release();
- }
- return $confirm ? $result : true;
- }
- /**
- * @param ProducerMessageInterface $producerMessage
- */
- private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
- {
- if (class_exists(AnnotationCollector::class)) {
- /** @var \Hyperf\Amqp\Annotation\Producer $annotation */
- $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
- if ($annotation) {
- $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
- $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
- }
- }
- }
- }
处理超时订单
Orderqueueconsumer.Php
Orderqueueproducer.Php
Orderqueueproducer.php
- <?php
- declare(strict_types = 1);
- namespace App\Amqp\Producer;
- use Hyperf\Amqp\Annotation\Producer;
- use Hyperf\Amqp\Builder\ExchangeBuilder;
- use Hyperf\Amqp\Message\ProducerMessage;
- /**
- * @Producer(exchange="order_exchange", routingKey="order_exchange")
- */
- class OrderQueueProducer extends ProducerMessage
- {
- public function __construct($data)
- {
- $this->payload = $data;
- }
- public function getExchangeBuilder() : ExchangeBuilder
- {
- return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
- }
- }
Orderqueueconsumer.php
- <?php
- declare(strict_types = 1);
- namespace App\Amqp\Consumer;
- use App\Service\CityTransport\OrderService;
- use Hyperf\Amqp\Result;
- use Hyperf\Amqp\Annotation\Consumer;
- use Hyperf\Amqp\Message\ConsumerMessage;
- /**
- * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)
- */
- class OrderQueueConsumer extends ConsumerMessage
- {
- public function consume($data) : string
- {
- ##业务处理
- }
- public function isEnable() : bool
- {
- return true;
- }
- }
Demo
- $builder = new AmqpBuilder();
- $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
- $que = ApplicationContext::getContainer()->get(DelayProducer::class);
- var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))