亚洲精品成人_精品成人一区_999视频在线播放_免费黄色在线_亚洲成人久久久_久久www免费视频

Queue 介紹

原理

EasySwoole 封裝實現了一個輕量級的隊列,默認使用 Redis 作為隊列驅動器。

用戶可以自己實現一個隊列驅動器來實現隊列,用 kafka 作為隊列驅動器或者 其他驅動器方式 作為隊列驅動器,來進行存儲。

從上可知,Queue 并不是一個單獨使用的組件,它更像一個對不同驅動的隊列進行統一封裝的門面組件。

Queue 組件當前最新穩定版本為 3.x。

舊版本 (2.1.x) 的 Queue 組件的使用,請看 Queue 2.1.x

組件要求

  • ext-swoole: >=4.4.0
  • easyswoole/component: ^2.0
  • easyswoole/redis-pool: ~2.2.0

安裝方法

composer require easyswoole/queue 3.x

倉庫地址

easyswoole/queue 3.x

基本使用

默認自帶的隊列驅動為 Redis 隊列。這里簡單列舉 2 種用戶可使用的方式:

  • 在框架的任意位置進行生產和消費隊列任務。
  • 在框架的任意位置進行生產隊列任務, 然后在自定義進程中進行消費任務。

在框架中進行生產和消費任務

創建隊列

use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config\RedisConfig;

// 配置 Redis 隊列驅動器
$redisConfig = new RedisConfig([
    'host' => '127.0.0.1', // 服務端地址 默認為 '127.0.0.1'
    'port' => 6379, // 端口 默認為 6379
    'auth' => '', // 密碼 默認為 不設置
    'db'   => 0, // 默認為 0 號庫
]);

// 創建隊列
$queue = new Queue(new RedisQueue($redisConfig));

普通生產任務

$queue 為上述創建隊列中得到的隊列對象。

// 創建任務
$job = new Job();

// 設置任務數據
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));

// 生產普通任務
$queue->producer()->push($job);

普通消費任務

$queue 為上述創建隊列中得到的隊列對象。

// 消費任務
$job = $queue->consumer()->pop();

// 或者是自定義進程中消費任務(具體使用請看下文自定義進程消費任務完整使用示例)
$queue->consumer()->listen(function (Job $job){
    var_dump($job);
});

生產延遲任務

$queue 為上述創建隊列中得到的隊列對象。

// 創建任務
$job = new Job();

// 設置任務數據
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));

// 設置任務延后執行時間
$job->setDelayTime(5);

// 生產延遲任務
$queue->producer()->push($job);

生產可信任務

// 創建任務
$job = new Job();

// 設置任務數據
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));

// 設置任務重試次數為 3 次。任務如果沒有確認,則會執行三次
$job->setRetryTimes(3);

// 如果5秒內沒確認任務,會重新回到隊列。默認為3秒
$job->setWaitConfirmTime(5);

// 投遞任務
$queue->producer()->push($job);

// 確認一個任務
$queue->consumer()->confirm($job);

在框架中生產任務和自定義進程中消費任務

  • 注冊隊列驅動器
  • 設置消費進程
  • 生產者投遞任務

定義一個隊列

<?php

namespace App\Utility;

use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;

class MyQueue extends Queue
{
    use Singleton;
}

定義消費進程

<?php

namespace App\Utility;

use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;

class QueueProcess extends AbstractProcess
{
    protected function run($arg)
    {
        go(function (){
            MyQueue::getInstance()->consumer()->listen(function (Job $job){
                var_dump($job->getJobData());
            });
        });
    }
}

支持多進程、多協程消費

注冊隊列驅動器、消費進程及設置生產者投遞任務

<?php

namespace EasySwoole\EasySwoole;

use App\Utility\MyQueue;
use App\Utility\QueueProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\Queue\Job;

class EasySwooleEvent implements Event
{
    public static function initialize()
    {
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        // redis pool 使用請看 redis 章節文檔
        $redisConfig = new \EasySwoole\Redis\Config\RedisConfig(
            [
                'host' => '127.0.0.1', // 服務端地址 默認為 '127.0.0.1'
                'port' => 6379, // 端口 默認為 6379
                'auth' => '', // 密碼 默認為 不設置
                'db'   => 0, // 默認為 0 號庫
            ]
        );
        // 配置 隊列驅動器
        $driver = new \EasySwoole\Queue\Driver\RedisQueue($redisConfig, 'easyswoole_queue');
        MyQueue::getInstance($driver);
        // 注冊一個消費進程
        $processConfig = new \EasySwoole\Component\Process\Config([
            'processName' => 'QueueProcess', // 設置 自定義進程名稱
            'processGroup' => 'Queue', // 設置 自定義進程組名稱
            'enableCoroutine' => true, // 設置 自定義進程自動開啟協程
        ]);
        \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new QueueProcess($processConfig));
        // 模擬生產者,可以在任意位置投遞
        $register->add($register::onWorkerStart, function ($server, $id) {
            if ($id == 0) {
                Timer::getInstance()->loop(3000, function () {
                    $job = new Job();
                    $job->setJobData(['time' => \time()]);
                    MyQueue::getInstance()->producer()->push($job);
                });
            }
        });
    }
}

進程安全退出問題請看 自定義進程 章節

控制器使用

以在 http 服務中為例,使用示例代碼如下:

<?php

namespace App\HttpController;

use App\Utility\MyQueue;
use EasySwoole\Http\AbstractInterface\Controller;
use EasySwoole\Http\Message\Status;
use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config\RedisConfig;

class Index extends Controller
{
    // 生產普通任務
    public function producer1()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        // 創建任務
        $job = new Job();

        // 設置任務數據
        $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));

        var_dump('producer1 => ');
        var_dump($job->getJobData());

        // 生產普通任務
        $produceRes = $queue->producer()->push($job);
        if (!$produceRes) {
            $this->writeJson(Status::CODE_OK, [], '隊列生產普通任務失敗!');
        } else {
            $this->writeJson(Status::CODE_OK, [], '隊列生產普通任務成功!');
        }
    }

    // 生產延遲任務
    public function producer2()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        // 創建任務
        $job = new Job();

        // 設置任務數據
        $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));

        // 設置任務延后執行時間
        $job->setDelayTime(5);

        var_dump('producer2 => ');
        var_dump($job->getJobData());

        // 生產延遲任務
        $produceRes = $queue->producer()->push($job);
        if (!$produceRes) {
            $this->writeJson(Status::CODE_OK, [], '隊列生產延遲任務失敗!');
        } else {
            $this->writeJson(Status::CODE_OK, [], '隊列生產延遲任務成功!');
        }
    }

    // 生產可信任務
    public function producer3()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        // 創建任務
        $job = new Job();

        // 設置任務數據
        $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));

        var_dump('producer3 => ');
        var_dump($job->getJobData());

        // 設置任務重試次數為 3 次。任務如果沒有確認,則會執行三次
        $job->setRetryTimes(3);

        // 如果5秒內沒確認任務,會重新回到隊列。默認為3秒
        $job->setWaitConfirmTime(5);

        // 投遞任務
        $queue->producer()->push($job);

        // 確認一個任務
        $queue->consumer()->confirm($job);
    }

    // 消費任務
    public function consumer()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        ### 消費任務
        // 獲取到需要消費的任務
        $job = $queue->consumer()->pop();

        if (!$job) {
            $this->writeJson(Status::CODE_OK, [], '沒有隊列任務需要消費了!');
            return false;
        }

        // 獲取需要消費的任務的數據
        $jobData = $job->getJobData();
        var_dump($jobData);
    }
}

進階使用

我們可以自定義驅動,實現 RabbitMQKafka 等消費隊列軟件的封裝。

用戶需要定義類,并實現 \EasySwoole\Queue\QueueDriverInterface 接口的幾個方法即可。該接口的詳細實現請看下文。

QueueDriverInterface 接口類實現

<?php

namespace EasySwoole\Queue;

interface QueueDriverInterface
{
    public function push(Job $job,float $timeout = 3.0): bool;

    public function pop(float $timeout = 3.0, array $params = []): ?Job;

    public function info(): ?array;

    public function confirm(Job $job,float $timeout = 3.0): bool;
}

相關倉庫

EasySwoole 中利用 Redis 實現消息隊列

如何利用 EasySwoole 多進程多協程 Redis 隊列實現爬蟲

亚洲精品成人_精品成人一区_999视频在线播放_免费黄色在线_亚洲成人久久久_久久www免费视频
  • <kbd id="eqi2k"><code id="eqi2k"></code></kbd><cite id="eqi2k"><tbody id="eqi2k"></tbody></cite>
    亚洲激情一区二区三区| 99久热re在线精品996热视频 | 亚洲一区二区三区涩| 亚洲综合视频一区| 韩日午夜在线资源一区二区| 秋霞毛片久久久久久久久| 女人天堂亚洲aⅴ在线观看| 亚洲精品视频啊美女在线直播| 久久婷婷久久| 色姑娘综合网| 亚洲国产日韩欧美一区二区三区| 久久激情视频| 亚洲精品在线视频观看| 99精品视频免费| 美脚丝袜一区二区三区在线观看| 欧美激情综合| 国产激情一区二区三区在线观看| 日韩少妇中文字幕| 亚洲影视在线| 亚洲v国产v在线观看| 亚洲影视综合| 亚洲午夜精品久久久久久浪潮| 日韩一区二区免费看| 九九九九九九精品| 99精品99久久久久久宅男| 欧美日韩亚洲在线| 亚洲欧美日韩另类精品一区二区三区 | 欧美高清视频一区| 亚洲视频1区| 欧洲亚洲一区| 亚洲一区二区网站| 亚洲电影一二三区| av成人在线电影| 韩国在线一区| 欧美久久综合性欧美| 国产精品视频久久一区| 欧美精品在线一区| 久久青草久久| 亚洲无线一线二线三线区别av| 国产另类自拍| 久久都是精品| 伊人久久综合| 亚洲一区二区三区加勒比 | 男人的天堂成人在线| 中文字幕一区二区三区最新| 国产精品视频福利| 欧美一区成人| 久久精品综合一区| 久久久成人网| 一区二区三区四区五区精品| 亚洲欧美电影在线观看| 国产精品永久入口久久久| 亚洲欧美日韩在线综合| 欧美涩涩视频| 亚洲二区自拍| 久久99精品久久久久久久久久| 国产精品美女| 亚洲欧洲日夜超级视频| 最新国产精品久久| 亚洲日本理论电影| 欧美日韩在线播放一区二区| 国产乱码精品一区二区三区卡 | 亚洲一区二区三区免费看| 久久亚洲一区二区| 国产在线一区二区三区播放| 久久久久久色| 久久九九国产| 免费看亚洲片| 亚洲一级在线| 国产精品免费一区二区三区在线观看 | 亚洲国产精品一区制服丝袜| 欧美a级片网站| 亚洲一区影院| 午夜精品偷拍| 国产精品二区二区三区| 欧美日韩国产精品一卡| 欧美一区二区在线| 欧美777四色影| 欧美久久在线| 在线观看一区欧美| 在线视频国内自拍亚洲视频| 影音先锋中文字幕一区| 亚洲国产日韩在线| 中文精品视频| 久久久噜噜噜| 国产伦精品一区二区三区高清版| 国产精品一区二区免费看| 国产精品麻豆免费版| 久久riav| 日日夜夜精品网站| 欧美精品一卡| 国产亚洲二区| 成人久久18免费网站漫画| 国产伦精品一区二区三区四区视频 | 亚洲欧美综合国产精品一区| 欧美破处大片在线视频| 亚洲黄色三级| 久久国产精品久久精品国产 | 国产精品一区二区三区在线| 久久久久九九九| 亚洲欧洲一二三| 一区在线免费观看| 久久免费一区| 热re99久久精品国产99热| 欧美成人免费在线| 国产欧美综合一区二区三区| 国产成人精品免费视频大全最热 | 日韩少妇中文字幕| 韩日视频一区| 99热99热| 中文字幕一区二区三区四区五区| 一区免费视频| 成人免费视频观看视频| 亚洲v国产v| 亚洲一区欧美激情| 日韩av电影免费观看| 亚洲国产高清一区二区三区| 91青青草免费在线看| 色综合久久久久久久久五月| 在线一区视频| 日韩av电影免费播放| 一本色道精品久久一区二区三区| 国产精品二区二区三区| 一区精品视频| 久久这里只有| 午夜精品剧场| 国产精品久久久久免费| 在线观看亚洲视频啊啊啊啊| 香蕉av777xxx色综合一区| 日韩视频在线播放| 久久精品123| 在线视频精品一区| 成人免费视频视频在| 欧美黄色精品| 国产伦精品一区二区三区视频免费| 欧美成人中文| 狠狠色噜噜狠狠狠狠色吗综合| 国语自产精品视频在线看8查询8| 国内不卡一区二区三区| 日韩视频在线观看国产| 日产精品高清视频免费| 久久综合伊人77777麻豆| 欧美 日韩 国产在线| 国产精品一区视频网站| 夜夜爽www精品| 亚洲欧洲中文| 久久国产精品一区二区三区四区| 国产欧美日韩亚洲一区二区三区| 欧美极品一区二区| 久久久久久色| 日韩午夜在线| 国产精品分类| 亚洲欧洲国产日韩精品| 狠狠色噜噜狠狠狠狠色吗综合| 国产亚洲福利| 欧美日韩综合网| 亚洲精品欧美精品| 精品国产乱码久久久久久郑州公司| 国产欧美日本在线| 激情欧美国产欧美| 椎名由奈jux491在线播放| 久草精品电影| 波多野结衣一区二区三区在线观看| 亚洲激情专区| 亚洲天堂偷拍| 午夜精品偷拍| 亚洲人一区二区| 日韩在线电影一区| 久久综合一区| 爱情岛论坛亚洲入口| 欧美亚洲三区| 国产精品日韩一区二区三区| 亚洲视频一二| 国内一区二区在线视频观看| 亚洲精品自在在线观看| 日本一区网站| 午夜一区二区三区| 婷婷精品国产一区二区三区日韩 | 久久亚洲不卡| 91视频免费在线观看| 久久亚洲高清| 超碰97在线播放| 国产伦一区二区三区色一情| 国产精品9999久久久久仙踪林| 91视频免费在线观看| 国产精品v欧美精品v日韩精品| 1卡2卡3卡精品视频| 999国产视频| 国产二区一区| 久久精品第九区免费观看 | 亚洲永久一区二区三区在线| 亚洲综合欧美日韩| 欧美精品成人| 日韩视频一区二区三区在线播放免费观看| 国产精品videosex极品| 亚洲精品1区2区| 亚洲一区二区三区免费在线观看| 欧美综合二区| 国产精品免费一区二区三区观看| 黄色91av|