PHP延迟消息队列如何实现,具体方法是什么
Admin 2022-08-02 群英技术资讯 903 次浏览
需求:电商秒杀场景中,如果用户下单10分钟未支付,需要进行库存归还
本篇是用PHP+Laravel+RabbitMQ来实现异步延迟消息队列
在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列
公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户
安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人
用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时
…
很多场景下我们都需要延迟队列。
本文以 RabbitMQ 为例来和大家聊一聊延迟队列的玩法。
使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。
官网插件下载地址
我这里直接下载了最新版本,你们根据自己的rabbitmq版本号进行下载
把下载好的文件移动到rabbitmq的插件plugins下,以我自己的Mac为例子,放到了如下路径
然后执行安装插件指令,如下
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
最后重启rabbitmq服务,并刷新查看exchanges交换机有没有该插件
如上图则延迟消息队列插件安装完成
新建rabbitmq服务类,包含延迟消息队列生产消息,和消费消息,如下
代码如下:
<?php namespace App\Http\Controllers\Service; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class RabbitmqServer { private $host = "127.0.0.1"; private $port = 5672; private $user = "guest"; private $password = "guest"; private $msg; private $channel; private $connection; // 过期时间 const TIMEOUT_5_S = 5; // 5s const TIMEOUT_10_S = 10; // 10s private $exchange_logs = "logs"; private $exchange_direct = "direct"; private $exchange_delayed = "delayed"; private $queue_delayed = "delayedQueue"; const EXCHANGETYPE_FANOUT = "fanout"; const EXCHANGETYPE_DIRECT = "direct"; const EXCHANGETYPE_DELAYED = "x-delayed-message"; public function __construct($type = false) { $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); $this->channel = $this->connection->channel(); // 声明Exchange $this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false, false, false, new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT])); $this->channel->queue_declare($this->queue_delayed, false, true, false, false); $this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed, $this->queue_delayed); } /** * delay creat message */ public function createMessageDelay($msg, $time) { $delayConfig = [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['x-delay' => $time * 1000]) ]; $msg = new AMQPMessage($msg, $delayConfig); return $msg; } /** * delay send message */ public function sendDelay($msg, $time = self::TIMEOUT_10_S) { $msg = $this->createMessageDelay($msg, $time);; $this->channel->basic_publish($msg, $this->exchange_delayed, $this->queue_delayed); $this->channel->close(); $this->connection->close(); } /** * delay consum */ public function consumDelay() { $callback = function ($msg) { echo ' [x] ', $msg->body, "\n"; $this->channel->basic_ack($msg->delivery_info['delivery_tag'], false); }; $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback); echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; while (count($this->channel->callbacks)) { $this->channel->wait(); } $this->channel->close(); $this->connection->close(); } }
比如新建QueueController控制器,进行测试生产消息放到延迟消息队列中
代码如下:
<?php namespace App\Http\Controllers\Api\v1; use App\Http\Controllers\Controller; use App\Http\Controllers\Service\RabbitmqServer; use App\Jobs\Queue; use Illuminate\Http\Request; class QueueController extends Controller { // public function index(Request $request) { //比如说现在是下订单操作 //需求:如果用户10分钟之内不支付订单就要取消订单,并且库存归还 $msg = $request->post(); $Rabbit = new RabbitmqServer("x-delayed-message"); //第一个参数发送的消息,第二个参数延迟多少秒 $Rabbit->sendDelay(json_encode($msg),5); } }
至此通过接口调试工具进行模拟生产消息即可
消息生产完毕要进行消费,这里使用的是Laravel的任务调度,代码如下
<?php namespace App\Console\Commands; use App\Http\Controllers\Service\RabbitmqServer; use Illuminate\Console\Command; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitmqConsumerCommand extends Command { /** * The name and signature of the console command. * * @var string */ protected $signature = 'rabbitmq_consumer';//给消费者起个command名称 /** * The console command description. * * @var string */ protected $description = 'Command description'; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * Execute the console command. * @return int */ public function handle() { $Rabbit = new RabbitmqServer("x-delayed-message"); $Rabbit->consumDelay(); } }
用postman模拟生产消息,效果如下:
然后消费消息,用一下命令,如果延迟5秒执行消费则成功
至此,就完成了rabbitmq异步延迟消息队列
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
php设置端口号的方法:首先找到PHP配置文件所在路径;然后通过命令“vim /usr/local/php7/etc/php-fpm.d/www.conf listen = 0.0.0.0:9000”修改端口号即可。
使用phpstorm打开File ==> Settings ==> Code Style ==> PHP;找到 Wraping and Braces 下的 Array initializer;设置,以上,点击 Apply 应用即可。
php超详细讲解命名管道 目录 进程间为什么要通信 进程如何实现通信 常见进程通信方式 管道概念 命名管道实现 posix_mkfifo函数 无血缘进程间通信 进程间为什么要通信 进程间通信的目的: 数据传输:一个 进程需要将它的数据 发送给另一个进程. 通知事件:一个进程需要向另一个或一组进程 发送消息,通知它(它们)发生了 某种事件(如进程终止时要通知父进程). 资源共享:多个进程之间 共享同样的资源 .为了做到这一点,需要内核提供互斥和同步机制. 进程控制:有些进程 希望完全控制另一个进程的执行 (如 Debug
今天小编就为大家分享一篇laravel框架 laravel-admin上传图片到oss的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
图像的旋转要根据要求去设定图像,然而旋转是要根据方向进行改变的,今天这篇文章是小编带来的PHP开发中实现图像处理旋转和翻转的方法,需要的朋友参考下面的介绍吧!
成为群英会员,开启智能安全云计算之旅
立即注册Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008