亚洲精品成人_精品成人一区_999视频在线播放_免费黄色在线_亚洲成人久久久_久久www免费视频

Queue 介紹

原理

EasySwoole 封裝實現了一個輕量級的隊列,默認以 Redis 作為隊列驅動器。

可以自己實現一個隊列驅動來實現隊列,用 kafka 作為隊列驅動器或者 其他驅動器方式 作為隊列驅動器,來進行存儲。

從上可知,Queue 并不是一個單獨使用的組件,它更像一個對不同驅動的隊列進行統一封裝的門面組件。

組件要求

  • ext-swoole: >=4.4.0
  • easyswoole/component: ^2.0
  • easyswoole/redis-pool: ~2.2.0

安裝方法

composer require easyswoole/queue=2.1.x

倉庫地址

easyswoole/queue

基本使用

  • 注冊隊列驅動器
  • 設置消費進程
  • 生產者投遞任務

定義一個隊列

<?php

namespace App\Utility;

use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;

class MyQueue extends Queue
{
    use Singleton;
}

定義消費進程

<?php

namespace App\Utility;

use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;

class QueueProcess extends AbstractProcess
{
    protected function run($arg)
    {
        go(function () {
            MyQueue::getInstance()->consumer()->listen(function (Job $job) {
                // 打印消費數據
                var_dump($job->getJobData());
            });
        });
    }
}

可以多進程,多協程消費

驅動注冊

<?php

namespace EasySwoole\EasySwoole;

use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use EasySwoole\Queue\Job;

class EasySwooleEvent implements Event
{

    public static function initialize()
    {
        // TODO: Implement initialize() method.
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        // redis-pool 的使用請看 redis 章節文檔(http://www.b3f21.cn/Components/Redis/pool.html)
        // 注冊一個名為 queue 的 Redis 連接池
        \EasySwoole\RedisPool\RedisPool::getInstance()->register(new \EasySwoole\Redis\Config\RedisConfig(
            [
                'host' => '127.0.0.1',
                'port' => '6379',
                // [可選參數] 密碼
                // 'auth' => ''
            ]
        ), 'queue');

        // 獲取 Redis 連接池中的一個 Redis 連接對象
        $redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->getPool('queue');

        // 配置隊列驅動器,底層使用 Redis 驅動,并設置隊列名為 'queue'
        $driver = new \EasySwoole\Queue\Driver\Redis($redisPool, 'queue');

        // 注冊自定義隊列
        \App\Utility\MyQueue::getInstance($driver);

        // 注冊一個消費進程
        \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new \App\Utility\QueueProcess());

        // 模擬生產者,投遞任務到隊列中,可以在任意位置投遞
        $register->add($register::onWorkerStart, function ($server, $id) {
            if ($id == 0) {
                Timer::getInstance()->loop(3000, function () {
                    $job = new Job();
                    // 設置投遞的隊列任務數據
                    $job->setJobData(['time' => \time()]);
                    \App\Utility\MyQueue::getInstance()->producer()->push($job);
                });
            }
        });
    }
}

關于進程安全退出問題請看 進程章節

進階使用

我們可以自定義驅動,實現 RabbitMQKafka 等消費隊列軟件的封裝。

用戶需要定義類,并實現 \EasySwoole\Queue\QueueDriverInterface 接口的幾個方法即可。該接口的詳細實現請看下文。

QueueDriverInterface 接口類實現

<?php

namespace EasySwoole\Queue;

interface QueueDriverInterface
{
    public function push(Job $job):bool ;

    public function pop(float $timeout = 3.0):?Job;

    public function size():?int ;
}

組件自帶的 Redis 隊列驅動器實現

<?php

namespace EasySwoole\Queue\Driver;

use EasySwoole\Queue\Job;
use EasySwoole\Queue\QueueDriverInterface;
use EasySwoole\Redis\Redis as Connection;
use EasySwoole\RedisPool\Pool;

class Redis implements QueueDriverInterface
{

    protected $pool;
    protected $queueName;
    public function __construct(Pool $pool,string $queueName = 'easy_queue')
    {
        $this->pool = $pool;
        $this->queueName = $queueName;
    }

    public function push(Job $job): bool
    {
        $data = serialize($job);
        return $this->pool->invoke(function (Connection $connection)use($data){
            return $connection->lPush($this->queueName,$data);
        });
    }

    public function pop(float $timeout = 3.0): ?Job
    {
        return $this->pool->invoke(function (Connection $connection){
            $data =  $connection->rPop($this->queueName);
            if($data){
                return unserialize($data);
            }
            return null;
        });
    }

    public function size(): ?int
    {
        return $this->pool->invoke(function (Connection $connection){
            return $connection->lLen($this->queueName);
        });
    }
}

Queue 多節點使用

定義第一個隊列(自定義 nodeId)

<?php

namespace App\Utility;

use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
use EasySwoole\Queue\QueueDriverInterface;

class MyQueue1 extends Queue
{
    use Singleton;

    public function __construct(QueueDriverInterface $driver)
    {
        parent::__construct($driver);
        // 自定義 nodeId
        $this->setNodeId('xxxxx1');
    }
}

定義第二個隊列(自動生成 nodeId)

<?php

namespace App\Utility;

use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;

class MyQueue2 extends Queue
{
    use Singleton;
}

獲取節點id

<?php

namespace App\Utility;

use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;

class QueueProcess extends AbstractProcess
{
    protected function run($arg)
    {   
        // 消費隊列
        go(function () {
            MyQueue1::getInstance()->consumer()->listen(function (Job $job) {
                // 打印 節點Id
                var_dump($job->getNodeId());
                // 打印 任務Id
                var_dump($job->getJobId());
            });
            MyQueue2::getInstance()->consumer()->listen(function (Job $job) {
                // 打印 節點Id
                var_dump($job->getNodeId());
                // 打印 任務Id
                var_dump($job->getJobId());
            });
        });
    }
}

可以多進程,多協程消費

驅動注冊

<?php

namespace EasySwoole\EasySwoole;

use App\Utility\QueueProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use EasySwoole\Queue\Job;

class EasySwooleEvent implements Event
{

    public static function initialize()
    {
        // TODO: Implement initialize() method.
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        // redis-pool 的使用請看 redis 章節文檔(http://www.b3f21.cn/Components/Redis/pool.html)
        // 注冊一個名為 queue 的 Redis 連接池
        \EasySwoole\RedisPool\RedisPool::getInstance()->register(new \EasySwoole\Redis\Config\RedisConfig(
            [
                'host' => '127.0.0.1',
                'port' => '6379',
                // [可選參數] 密碼
                // 'auth' => ''
            ]
        ), 'queue');

        // 獲取 Redis 連接池中的一個 Redis 連接對象
        $redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->getPool('queue');

        // 配置隊列驅動器,底層使用 Redis 驅動,并設置隊列名為 'queue'
        $driver = new \EasySwoole\Queue\Driver\Redis($redisPool, 'queue');

        // 【這里是重點】
        // 注冊自定義隊列
        \App\Utility\MyQueue1::getInstance($driver);
        \App\Utility\MyQueue2::getInstance($driver);

        // 注冊一個消費進程
        \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new \App\Utility\QueueProcess());

        // 模擬生產者,投遞任務到隊列中,可以在任意位置投遞
        $register->add($register::onWorkerStart, function ($server, $id) {
            if ($id == 0) {
                Timer::getInstance()->loop(3000, function () {
                    $job = new Job();
                    // 設置投遞的隊列任務數據
                    $job->setJobData(['time' => \time()]);
                    // 這里是重點
                    \App\Utility\MyQueue1::getInstance()->producer()->push($job);
                    \App\Utility\MyQueue2::getInstance()->producer()->push($job);
                });
            }
        });
    }
}

相關倉庫

EasySwoole 中利用 Redis 實現消息隊列

如何利用 EasySwoole 多進程多協程 Redis 隊列實現爬蟲

亚洲精品成人_精品成人一区_999视频在线播放_免费黄色在线_亚洲成人久久久_久久www免费视频
  • <kbd id="eqi2k"><code id="eqi2k"></code></kbd><cite id="eqi2k"><tbody id="eqi2k"></tbody></cite>
    视频一区视频二区视频三区视频四区国产| 日韩三级电影网站| 黑人一区二区| 久久99精品国产一区二区三区| 亚洲国产精品第一区二区| 日韩av不卡播放| 精品亚洲欧美日韩| 成人区精品一区二区| 中文有码久久| 亚洲国产精品综合| 国产精品av一区二区| 制服诱惑一区| 亚洲自拍的二区三区| 欧美最大成人综合网| 国产伦理一区二区三区| 久久中文精品| 91超碰在线免费观看| 噜噜噜91成人网| 亚洲永久视频| 久久久99爱| 久久国产主播| 97操在线视频| 国产伦精品一区二区| 国产精品久久波多野结衣| 99三级在线| 国产乱码精品一区二区三区不卡| 91超碰在线免费观看| 99re6热在线精品视频播放速度| 久久精品麻豆| 国产欧美日韩视频一区二区三区| 99视频免费观看蜜桃视频| 国产高清自拍99| 国产日韩欧美精品| 欧洲精品码一区二区三区免费看| 欧美日本亚洲| 宅男一区二区三区| 亚洲高清视频一区二区| 国产亚洲欧美一区二区| 在线综合亚洲| 成人精品一二区| 欧美极品视频一区二区三区| 欧洲国产精品| 国产综合欧美| 久久亚洲免费| 好吊色欧美一区二区三区| 日本精品一区| 午夜精品一区二区三区四区| 亚洲欧洲午夜| 99在线国产| 亚洲成色www久久网站| 好看的亚洲午夜视频在线| 国产精品一区二区三区观看| 国产精品xxx在线观看www| 日韩成人在线资源| 亚洲视频碰碰| 成人在线观看91| 亚洲自拍偷拍二区| 国产精品久久一区二区三区| 国产精品一区二区三区四区五区| 无码免费一区二区三区免费播放| 亚洲无玛一区| 精品国产一区二区三区麻豆小说 | 亚洲永久网站| 欧美系列一区| 一区二区三区三区在线| 国产91精品一区二区绿帽| 视频一区视频二区视频三区视频四区国产 | 国产亚洲欧美一区二区三区| 一区在线电影| 欧美亚洲专区| 亚洲午夜精品一区二区三区| 亚洲欧美日韩视频二区| 日韩国产在线一区| 一道本一区二区| 久久香蕉综合色| 999亚洲国产精| 欧美日本韩国国产| 免费毛片一区二区三区久久久| 日韩妆和欧美的一区二区| 国产精品亚洲综合色区韩国| 日本高清不卡一区二区三| 在线一区欧美| 亚洲一区二区不卡视频| 99re在线| 中文有码久久| 午夜久久福利| 欧美日韩亚洲一区二区三区在线观看| 最新成人av网站| 午夜视频久久久| 国产精品视频免费一区| 亚洲国产欧美国产综合一区| 欧美一区二区视频17c| 久久免费黄色| 91久久综合| 欧美精品一区在线发布| 欧美日韩在线精品| 国产69精品久久久久9999apgf | 中文字幕色一区二区| 精品一卡二卡三卡四卡日本乱码| 一区二区三区四区五区精品视频| 一区二区三区国| 日本在线成人一区二区| 国产欧美欧洲| 99re在线视频上| 久久一区二区精品| 国产深夜精品| 99国产精品99久久久久久粉嫩| 欧美一区二区三区在线免费观看| 欧美精品一区二区三区四区五区| 97av影视网在线观看| 鲁大师影院一区二区三区| 一本久久综合| 国产日韩欧美一区在线| 亚洲作爱视频| 亚洲麻豆一区| 国产精品一区二区三区免费观看| 在线观看视频日韩| 亚洲精选久久| 亚洲欧美精品| 米奇777在线欧美播放| 国产免费成人| 国产精品区二区三区日本| 欧美成ee人免费视频| 国严精品久久久久久亚洲影视| 国产激情美女久久久久久吹潮| 丁香五月网久久综合| 国产欧美日韩在线播放| 九九九久久久| 日本不卡一二三区| 亚洲国产激情一区二区三区| 亚洲一区二区四区| 欧美日韩国产欧| 伊人天天综合| 99热在线精品观看| 美女国产一区| 国产精品久久久久久久久久直播 | 亚洲一区不卡在线| 欧美黄色一区二区| 亚洲人成高清| 久久久蜜桃一区二区人| 成人欧美一区二区三区视频xxx| 国产伦精品一区二区三区在线| 欧美一级爽aaaaa大片| 亚洲福利av在线| 精品91视频| 成人羞羞视频免费| 日韩资源av在线| 国内精品福利| 老牛国产精品一区的观看方式| 精品亚洲欧美日韩| 欧美1区2区视频| 一级日韩一区在线观看| 产国精品偷在线| 视频在线精品一区| 夜夜嗨网站十八久久| 国产在线播放一区二区| 亚洲一区三区电影在线观看| 亚洲国产日韩美| 国产欧美亚洲日本| 最新欧美日韩亚洲| 午夜亚洲伦理| 无码免费一区二区三区免费播放| 亚洲日本无吗高清不卡| 国产激情一区二区三区在线观看| 亚洲精品二区| 久久精品一本| 欧美另类一区| 国产成人精品免费视频大全最热 | 久久久久久久久久码影片| 日本一区二区在线视频观看| 国内一区二区三区在线视频| 久久精品一区| 欧美日韩在线播放一区二区| 91超碰在线电影| 黄色精品免费| 欧美一区二区综合| 亚洲一区在线免费| 在线免费一区| 国外成人在线视频网站| 亚洲久久在线| 亚洲一卡二卡三卡四卡无卡网站在线看| 国产婷婷精品| 欧美精选一区| 久久精品国产精品青草色艺| 中文亚洲字幕| 中文字幕色一区二区 | 午夜影院日韩| 欧美福利精品| 青娱乐一区二区| 久久婷婷丁香| 日韩午夜电影| 欧美区一区二| 日本午夜精品一区二区三区| 久久综合久久久| 国产日韩一区| 在线观看亚洲| 欧美成人dvd在线视频| 欧美一级二级三级| 一区二区三区欧美在线| 日韩久久久久久久久久久久久|