详解Laravel中Kafka的使用实例是什么样的
Admin 2022-09-22 群英技术资讯 817 次浏览
这篇主要是介绍“详解Laravel中Kafka的使用实例是什么样的”的内容了,下文有实例供大家参考,对大家了解操作过程或相关知识有一定的帮助,而且实用性强,希望这篇文章能帮助大家解决详解Laravel中Kafka的使用实例是什么样的的问题,下面我们一起来了解看看吧。
本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.
以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php
<?php
namespace App\Tools;
use Illuminate\Config\Repository;
use Illuminate\Support\Facades\DB;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
use Illuminate\Http\Request;
class Kafka
{
public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka
public $topic = 'test';//管道名称
public $partition = 0;
protected $producer = null;
protected $consumer = null;
public function __construct()
{
if (empty($this->broker_list)) {
throw new InvalidConfigException("broker not config");
}
$rk = new \RdKafka\Producer();
if (empty($rk)) {
throw new InvalidConfigException("producer error");
}
$rk->setLogLevel(LOG_DEBUG);
if (!$rk->addBrokers($this->broker_list)) {
throw new InvalidConfigException("producer error");
}
$this->producer = $rk;
}
/**
* 生产者
* @param array $messages
* @return mixed
*/
public function send($messages = [],$topic)
{
$topic = $this->producer->newTopic($topic);
return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
}
/**
* 消费者
*/
public function consumer($object, $callback){
$conf = new \RdKafka\Conf();
$conf->set('group.id', 0);
$conf->set('metadata.broker.list', $this->broker_list);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf->setDefaultTopicConf($topicConf);
$consumer = new \RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$this->topic]);
echo "waiting for messages.....\n";
while(true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "message payload....";
$object->$callback($message->payload);
break;
}
sleep(1);
}
}
}
?>
在控制器中如何使用:
首先再头部导入这个类:use App\Tools\Kafka;
下面是使用生产者实例:
public function test(){
$topic = 'tool';//输入使用管道名称
$data['shop_id'] = 58;
$data['bar_code']=586;
$data['goods_num'] = 1;
$data['goods_unit'] = '个';
$Kafka = new Kafka();
$Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json
var_dump($Error_Msg);
}
下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:
<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("localhost:9092");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("tool", $topicConf);//读取的管道
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
//没有错误打印信息
$message = json_decode(json_encode($message),true);
$data = json_decode($message['payload'],true);
var_dump($data);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "等待接收信息\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "超时\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
sleep(1);
}
?>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
在laravel中,distinct()方法用于强制让查找返回不重复的结果,语法为“$users=DB::table('users')->distinct()->get();”;若使用想要查找多个字段,可指定select字段并添加字段名称。
这篇文章主要介绍了php 多个变量指向同一个引用($b = &$a)用法,结合实例形式分析了PHP变量引用原理、优缺点及相关操作技巧,需要的朋友可以参考下
PHP中php.ini参数优化是怎样,如何理解?下文有实例供大家参考,对大家了解操作过程或相关知识有一定的帮助,而且实用性强,希望这篇文章能帮助大家,下面我们一起来了解看看吧。
这篇文章主要介绍了Laravel框架视图和模型操作方法,结合实例形式详细分析了laravel框架视图和模型的概念、原理、创建及使用方法,需要的朋友可以参考下
我们在浏览一些网站的时候,经常能看到签到功能,如果连续签到还会有奖励之类的。那么连续签到功能是怎么实现的?如何判断连续签到呢?下面分享基于PHP实现的连续签到方法,感兴趣的朋友可以参考。
成为群英会员,开启智能安全云计算之旅
立即注册Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008