PHP延迟消息队列如何实现,具体方法是什么
Admin 2022-08-02 群英技术资讯 1848 次浏览
很多朋友都对“PHP延迟消息队列如何实现,具体方法是什么”的内容比较感兴趣,对此小编整理了相关的知识分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获,那么感兴趣的朋友就继续往下看吧!需求:电商秒杀场景中,如果用户下单10分钟未支付,需要进行库存归还
本篇是用PHP+Laravel+RabbitMQ来实现异步延迟消息队列
在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列
公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户
安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人
用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时
…
很多场景下我们都需要延迟队列。
本文以 RabbitMQ 为例来和大家聊一聊延迟队列的玩法。
使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。
官网插件下载地址

我这里直接下载了最新版本,你们根据自己的rabbitmq版本号进行下载

把下载好的文件移动到rabbitmq的插件plugins下,以我自己的Mac为例子,放到了如下路径

然后执行安装插件指令,如下
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

最后重启rabbitmq服务,并刷新查看exchanges交换机有没有该插件

如上图则延迟消息队列插件安装完成
新建rabbitmq服务类,包含延迟消息队列生产消息,和消费消息,如下

代码如下:
<?php
namespace App\Http\Controllers\Service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class RabbitmqServer
{
private $host = "127.0.0.1";
private $port = 5672;
private $user = "guest";
private $password = "guest";
private $msg;
private $channel;
private $connection;
// 过期时间
const TIMEOUT_5_S = 5; // 5s
const TIMEOUT_10_S = 10; // 10s
private $exchange_logs = "logs";
private $exchange_direct = "direct";
private $exchange_delayed = "delayed";
private $queue_delayed = "delayedQueue";
const EXCHANGETYPE_FANOUT = "fanout";
const EXCHANGETYPE_DIRECT = "direct";
const EXCHANGETYPE_DELAYED = "x-delayed-message";
public function __construct($type = false)
{
$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
$this->channel = $this->connection->channel();
// 声明Exchange
$this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false, false, false, new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT]));
$this->channel->queue_declare($this->queue_delayed, false, true, false, false);
$this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed, $this->queue_delayed);
}
/**
* delay creat message
*/
public function createMessageDelay($msg, $time)
{
$delayConfig = [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable(['x-delay' => $time * 1000])
];
$msg = new AMQPMessage($msg, $delayConfig);
return $msg;
}
/**
* delay send message
*/
public function sendDelay($msg, $time = self::TIMEOUT_10_S)
{
$msg = $this->createMessageDelay($msg, $time);;
$this->channel->basic_publish($msg, $this->exchange_delayed, $this->queue_delayed);
$this->channel->close();
$this->connection->close();
}
/**
* delay consum
*/
public function consumDelay()
{
$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
$this->channel->basic_ack($msg->delivery_info['delivery_tag'], false);
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback);
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
$this->channel->close();
$this->connection->close();
}
}
比如新建QueueController控制器,进行测试生产消息放到延迟消息队列中

代码如下:
<?php
namespace App\Http\Controllers\Api\v1;
use App\Http\Controllers\Controller;
use App\Http\Controllers\Service\RabbitmqServer;
use App\Jobs\Queue;
use Illuminate\Http\Request;
class QueueController extends Controller
{
//
public function index(Request $request)
{
//比如说现在是下订单操作
//需求:如果用户10分钟之内不支付订单就要取消订单,并且库存归还
$msg = $request->post();
$Rabbit = new RabbitmqServer("x-delayed-message");
//第一个参数发送的消息,第二个参数延迟多少秒
$Rabbit->sendDelay(json_encode($msg),5);
}
}
至此通过接口调试工具进行模拟生产消息即可
消息生产完毕要进行消费,这里使用的是Laravel的任务调度,代码如下

<?php
namespace App\Console\Commands;
use App\Http\Controllers\Service\RabbitmqServer;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqConsumerCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq_consumer';//给消费者起个command名称
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
* @return int
*/
public function handle()
{
$Rabbit = new RabbitmqServer("x-delayed-message");
$Rabbit->consumDelay();
}
}
用postman模拟生产消息,效果如下:

然后消费消息,用一下命令,如果延迟5秒执行消费则成功

至此,就完成了rabbitmq异步延迟消息队列
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
本篇文章带大家了解一下PHP中接收外部参数的几种方式,希望对大家有所帮助!
在php中有两个函数,能够实现数字补零,str_pad()sprintf() 函数1: str_pad顾名思义这个函数是针对字符串来说的这个可以对指定的字符串填补任何其它的字符串例如:str_pad(带填补的字符串,填补后的长度,填补字符串,填补位置)其中填补后的长度必须是个正整数,填补位置有三个选项,左边:STR_PAD_LEFT,右边:STR_PAD_RIGHT,两
这篇文章主要给大家分享CentOS7系统搭建LAMP的操作和如何更新PHP版本,对于新手来说了解很清楚LAMP环境的搭建是需要掌握的,希望大家阅读完这篇文章能有所收获,下面我们一起来学习一下吧。
我所采用的服务器是Apache,实际上对所有服务器的操作是相同的。服务器安装在本地电脑上,我们项目的源文件位于服务器的跟文件目录下,也即 /htdocs 目录下。
命令模式:命令模式(CommandPattern):将一个请求封装为一个对象,从而使我们可用不同的请求对客户进行参数化;对请求排队或者记录请求日志,以及支持可撤销的操作。命令模式是一种对象行为型模式,其别名为动作(Action)模式或事务(Transaction)模式。模式动机:在软件设计中,我们经常需要向某些对象发送请求,但是并不知道请求的接收者是谁,也不知道被请求的操作是哪个,
成为群英会员,开启智能安全云计算之旅
立即注册关注或联系群英网络
7x24小时售前:400-678-4567
7x24小时售后:0668-2555666
24小时QQ客服
群英微信公众号
CNNIC域名投诉举报处理平台
服务电话:010-58813000
服务邮箱:service@cnnic.cn
投诉与建议:0668-2555555
Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 ICP核准(ICP备案)粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008