Queue 介紹
原理
EasySwoole
封裝實現(xiàn)了一個輕量級的隊列,默認以 Redis
作為隊列驅(qū)動器。
可以自己實現(xiàn)一個隊列驅(qū)動來實現(xiàn)隊列,用 kafka
作為隊列驅(qū)動器或者 其他驅(qū)動器方式
作為隊列驅(qū)動器,來進行存儲。
從上可知,Queue
并不是一個單獨使用的組件,它更像一個對不同驅(qū)動的隊列進行統(tǒng)一封裝的門面組件。
組件要求
- ext-swoole: >=4.4.0
- easyswoole/component: ^2.0
- easyswoole/redis-pool: ~2.2.0
安裝方法
composer require easyswoole/queue=2.1.x
倉庫地址
基本使用
- 注冊隊列驅(qū)動器
- 設(shè)置消費進程
- 生產(chǎn)者投遞任務(wù)
定義一個隊列
<?php
namespace App\Utility;
use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
class MyQueue extends Queue
{
use Singleton;
}
定義消費進程
<?php
namespace App\Utility;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;
class QueueProcess extends AbstractProcess
{
protected function run($arg)
{
go(function () {
MyQueue::getInstance()->consumer()->listen(function (Job $job) {
// 打印消費數(shù)據(jù)
var_dump($job->getJobData());
});
});
}
}
可以多進程,多協(xié)程消費
驅(qū)動注冊
<?php
namespace EasySwoole\EasySwoole;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use EasySwoole\Queue\Job;
class EasySwooleEvent implements Event
{
public static function initialize()
{
// TODO: Implement initialize() method.
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
// redis-pool 的使用請看 redis 章節(jié)文檔(http://www.b3f21.cn/Components/Redis/pool.html)
// 注冊一個名為 queue 的 Redis 連接池
\EasySwoole\RedisPool\RedisPool::getInstance()->register(new \EasySwoole\Redis\Config\RedisConfig(
[
'host' => '127.0.0.1',
'port' => '6379',
// [可選參數(shù)] 密碼
// 'auth' => ''
]
), 'queue');
// 獲取 Redis 連接池中的一個 Redis 連接對象
$redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->getPool('queue');
// 配置隊列驅(qū)動器,底層使用 Redis 驅(qū)動,并設(shè)置隊列名為 'queue'
$driver = new \EasySwoole\Queue\Driver\Redis($redisPool, 'queue');
// 注冊自定義隊列
\App\Utility\MyQueue::getInstance($driver);
// 注冊一個消費進程
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new \App\Utility\QueueProcess());
// 模擬生產(chǎn)者,投遞任務(wù)到隊列中,可以在任意位置投遞
$register->add($register::onWorkerStart, function ($server, $id) {
if ($id == 0) {
Timer::getInstance()->loop(3000, function () {
$job = new Job();
// 設(shè)置投遞的隊列任務(wù)數(shù)據(jù)
$job->setJobData(['time' => \time()]);
\App\Utility\MyQueue::getInstance()->producer()->push($job);
});
}
});
}
}
關(guān)于進程安全退出問題請看 進程章節(jié)。
進階使用
我們可以自定義驅(qū)動,實現(xiàn) RabbitMQ
、Kafka
等消費隊列軟件的封裝。
用戶需要定義類,并實現(xiàn) \EasySwoole\Queue\QueueDriverInterface
接口的幾個方法即可。該接口的詳細實現(xiàn)請看下文。
QueueDriverInterface 接口類實現(xiàn)
<?php
namespace EasySwoole\Queue;
interface QueueDriverInterface
{
public function push(Job $job):bool ;
public function pop(float $timeout = 3.0):?Job;
public function size():?int ;
}
組件自帶的 Redis
隊列驅(qū)動器實現(xiàn)
<?php
namespace EasySwoole\Queue\Driver;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\QueueDriverInterface;
use EasySwoole\Redis\Redis as Connection;
use EasySwoole\RedisPool\Pool;
class Redis implements QueueDriverInterface
{
protected $pool;
protected $queueName;
public function __construct(Pool $pool,string $queueName = 'easy_queue')
{
$this->pool = $pool;
$this->queueName = $queueName;
}
public function push(Job $job): bool
{
$data = serialize($job);
return $this->pool->invoke(function (Connection $connection)use($data){
return $connection->lPush($this->queueName,$data);
});
}
public function pop(float $timeout = 3.0): ?Job
{
return $this->pool->invoke(function (Connection $connection){
$data = $connection->rPop($this->queueName);
if($data){
return unserialize($data);
}
return null;
});
}
public function size(): ?int
{
return $this->pool->invoke(function (Connection $connection){
return $connection->lLen($this->queueName);
});
}
}
Queue 多節(jié)點使用
定義第一個隊列(自定義 nodeId)
<?php
namespace App\Utility;
use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
use EasySwoole\Queue\QueueDriverInterface;
class MyQueue1 extends Queue
{
use Singleton;
public function __construct(QueueDriverInterface $driver)
{
parent::__construct($driver);
// 自定義 nodeId
$this->setNodeId('xxxxx1');
}
}
定義第二個隊列(自動生成 nodeId)
<?php
namespace App\Utility;
use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
class MyQueue2 extends Queue
{
use Singleton;
}
獲取節(jié)點id
<?php
namespace App\Utility;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;
class QueueProcess extends AbstractProcess
{
protected function run($arg)
{
// 消費隊列
go(function () {
MyQueue1::getInstance()->consumer()->listen(function (Job $job) {
// 打印 節(jié)點Id
var_dump($job->getNodeId());
// 打印 任務(wù)Id
var_dump($job->getJobId());
});
MyQueue2::getInstance()->consumer()->listen(function (Job $job) {
// 打印 節(jié)點Id
var_dump($job->getNodeId());
// 打印 任務(wù)Id
var_dump($job->getJobId());
});
});
}
}
可以多進程,多協(xié)程消費
驅(qū)動注冊
<?php
namespace EasySwoole\EasySwoole;
use App\Utility\QueueProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use EasySwoole\Queue\Job;
class EasySwooleEvent implements Event
{
public static function initialize()
{
// TODO: Implement initialize() method.
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
// redis-pool 的使用請看 redis 章節(jié)文檔(http://www.b3f21.cn/Components/Redis/pool.html)
// 注冊一個名為 queue 的 Redis 連接池
\EasySwoole\RedisPool\RedisPool::getInstance()->register(new \EasySwoole\Redis\Config\RedisConfig(
[
'host' => '127.0.0.1',
'port' => '6379',
// [可選參數(shù)] 密碼
// 'auth' => ''
]
), 'queue');
// 獲取 Redis 連接池中的一個 Redis 連接對象
$redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->getPool('queue');
// 配置隊列驅(qū)動器,底層使用 Redis 驅(qū)動,并設(shè)置隊列名為 'queue'
$driver = new \EasySwoole\Queue\Driver\Redis($redisPool, 'queue');
// 【這里是重點】
// 注冊自定義隊列
\App\Utility\MyQueue1::getInstance($driver);
\App\Utility\MyQueue2::getInstance($driver);
// 注冊一個消費進程
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new \App\Utility\QueueProcess());
// 模擬生產(chǎn)者,投遞任務(wù)到隊列中,可以在任意位置投遞
$register->add($register::onWorkerStart, function ($server, $id) {
if ($id == 0) {
Timer::getInstance()->loop(3000, function () {
$job = new Job();
// 設(shè)置投遞的隊列任務(wù)數(shù)據(jù)
$job->setJobData(['time' => \time()]);
// 這里是重點
\App\Utility\MyQueue1::getInstance()->producer()->push($job);
\App\Utility\MyQueue2::getInstance()->producer()->push($job);
});
}
});
}
}