亚洲精品成人_精品成人一区_999视频在线播放_免费黄色在线_亚洲成人久久久_久久www免费视频

Queue 介紹

原理

EasySwoole 封裝實現(xiàn)了一個輕量級的隊列,默認(rèn)使用 Redis 作為隊列驅(qū)動器。

用戶可以自己實現(xiàn)一個隊列驅(qū)動器來實現(xiàn)隊列,用 kafka 作為隊列驅(qū)動器或者 其他驅(qū)動器方式 作為隊列驅(qū)動器,來進(jìn)行存儲。

從上可知,Queue 并不是一個單獨使用的組件,它更像一個對不同驅(qū)動的隊列進(jìn)行統(tǒng)一封裝的門面組件。

Queue 組件當(dāng)前最新穩(wěn)定版本為 3.x。

舊版本 (2.1.x) 的 Queue 組件的使用,請看 Queue 2.1.x

組件要求

  • ext-swoole: >=4.4.0
  • easyswoole/component: ^2.0
  • easyswoole/redis-pool: ~2.2.0

安裝方法

composer require easyswoole/queue 3.x

倉庫地址

easyswoole/queue 3.x

基本使用

默認(rèn)自帶的隊列驅(qū)動為 Redis 隊列。這里簡單列舉 2 種用戶可使用的方式:

  • 在框架的任意位置進(jìn)行生產(chǎn)和消費隊列任務(wù)。
  • 在框架的任意位置進(jìn)行生產(chǎn)隊列任務(wù), 然后在自定義進(jìn)程中進(jìn)行消費任務(wù)。

在框架中進(jìn)行生產(chǎn)和消費任務(wù)

創(chuàng)建隊列

use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config\RedisConfig;

// 配置 Redis 隊列驅(qū)動器
$redisConfig = new RedisConfig([
    'host' => '127.0.0.1', // 服務(wù)端地址 默認(rèn)為 '127.0.0.1'
    'port' => 6379, // 端口 默認(rèn)為 6379
    'auth' => '', // 密碼 默認(rèn)為 不設(shè)置
    'db'   => 0, // 默認(rèn)為 0 號庫
]);

// 創(chuàng)建隊列
$queue = new Queue(new RedisQueue($redisConfig));

普通生產(chǎn)任務(wù)

$queue 為上述創(chuàng)建隊列中得到的隊列對象。

// 創(chuàng)建任務(wù)
$job = new Job();

// 設(shè)置任務(wù)數(shù)據(jù)
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));

// 生產(chǎn)普通任務(wù)
$queue->producer()->push($job);

普通消費任務(wù)

$queue 為上述創(chuàng)建隊列中得到的隊列對象。

// 消費任務(wù)
$job = $queue->consumer()->pop();

// 或者是自定義進(jìn)程中消費任務(wù)(具體使用請看下文自定義進(jìn)程消費任務(wù)完整使用示例)
$queue->consumer()->listen(function (Job $job){
    var_dump($job);
});

生產(chǎn)延遲任務(wù)

$queue 為上述創(chuàng)建隊列中得到的隊列對象。

// 創(chuàng)建任務(wù)
$job = new Job();

// 設(shè)置任務(wù)數(shù)據(jù)
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));

// 設(shè)置任務(wù)延后執(zhí)行時間
$job->setDelayTime(5);

// 生產(chǎn)延遲任務(wù)
$queue->producer()->push($job);

生產(chǎn)可信任務(wù)

// 創(chuàng)建任務(wù)
$job = new Job();

// 設(shè)置任務(wù)數(shù)據(jù)
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));

// 設(shè)置任務(wù)重試次數(shù)為 3 次。任務(wù)如果沒有確認(rèn),則會執(zhí)行三次
$job->setRetryTimes(3);

// 如果5秒內(nèi)沒確認(rèn)任務(wù),會重新回到隊列。默認(rèn)為3秒
$job->setWaitConfirmTime(5);

// 投遞任務(wù)
$queue->producer()->push($job);

// 確認(rèn)一個任務(wù)
$queue->consumer()->confirm($job);

在框架中生產(chǎn)任務(wù)和自定義進(jìn)程中消費任務(wù)

  • 注冊隊列驅(qū)動器
  • 設(shè)置消費進(jìn)程
  • 生產(chǎn)者投遞任務(wù)

定義一個隊列

<?php

namespace App\Utility;

use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;

class MyQueue extends Queue
{
    use Singleton;
}

定義消費進(jìn)程

<?php

namespace App\Utility;

use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;

class QueueProcess extends AbstractProcess
{
    protected function run($arg)
    {
        go(function (){
            MyQueue::getInstance()->consumer()->listen(function (Job $job){
                var_dump($job->getJobData());
            });
        });
    }
}

支持多進(jìn)程、多協(xié)程消費

注冊隊列驅(qū)動器、消費進(jìn)程及設(shè)置生產(chǎn)者投遞任務(wù)

<?php

namespace EasySwoole\EasySwoole;

use App\Utility\MyQueue;
use App\Utility\QueueProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\Queue\Job;

class EasySwooleEvent implements Event
{
    public static function initialize()
    {
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        // redis pool 使用請看 redis 章節(jié)文檔
        $redisConfig = new \EasySwoole\Redis\Config\RedisConfig(
            [
                'host' => '127.0.0.1', // 服務(wù)端地址 默認(rèn)為 '127.0.0.1'
                'port' => 6379, // 端口 默認(rèn)為 6379
                'auth' => '', // 密碼 默認(rèn)為 不設(shè)置
                'db'   => 0, // 默認(rèn)為 0 號庫
            ]
        );
        // 配置 隊列驅(qū)動器
        $driver = new \EasySwoole\Queue\Driver\RedisQueue($redisConfig, 'easyswoole_queue');
        MyQueue::getInstance($driver);
        // 注冊一個消費進(jìn)程
        $processConfig = new \EasySwoole\Component\Process\Config([
            'processName' => 'QueueProcess', // 設(shè)置 自定義進(jìn)程名稱
            'processGroup' => 'Queue', // 設(shè)置 自定義進(jìn)程組名稱
            'enableCoroutine' => true, // 設(shè)置 自定義進(jìn)程自動開啟協(xié)程
        ]);
        \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new QueueProcess($processConfig));
        // 模擬生產(chǎn)者,可以在任意位置投遞
        $register->add($register::onWorkerStart, function ($server, $id) {
            if ($id == 0) {
                Timer::getInstance()->loop(3000, function () {
                    $job = new Job();
                    $job->setJobData(['time' => \time()]);
                    MyQueue::getInstance()->producer()->push($job);
                });
            }
        });
    }
}

進(jìn)程安全退出問題請看 自定義進(jìn)程 章節(jié)

控制器使用

以在 http 服務(wù)中為例,使用示例代碼如下:

<?php

namespace App\HttpController;

use App\Utility\MyQueue;
use EasySwoole\Http\AbstractInterface\Controller;
use EasySwoole\Http\Message\Status;
use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config\RedisConfig;

class Index extends Controller
{
    // 生產(chǎn)普通任務(wù)
    public function producer1()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        // 創(chuàng)建任務(wù)
        $job = new Job();

        // 設(shè)置任務(wù)數(shù)據(jù)
        $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));

        var_dump('producer1 => ');
        var_dump($job->getJobData());

        // 生產(chǎn)普通任務(wù)
        $produceRes = $queue->producer()->push($job);
        if (!$produceRes) {
            $this->writeJson(Status::CODE_OK, [], '隊列生產(chǎn)普通任務(wù)失敗!');
        } else {
            $this->writeJson(Status::CODE_OK, [], '隊列生產(chǎn)普通任務(wù)成功!');
        }
    }

    // 生產(chǎn)延遲任務(wù)
    public function producer2()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        // 創(chuàng)建任務(wù)
        $job = new Job();

        // 設(shè)置任務(wù)數(shù)據(jù)
        $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));

        // 設(shè)置任務(wù)延后執(zhí)行時間
        $job->setDelayTime(5);

        var_dump('producer2 => ');
        var_dump($job->getJobData());

        // 生產(chǎn)延遲任務(wù)
        $produceRes = $queue->producer()->push($job);
        if (!$produceRes) {
            $this->writeJson(Status::CODE_OK, [], '隊列生產(chǎn)延遲任務(wù)失敗!');
        } else {
            $this->writeJson(Status::CODE_OK, [], '隊列生產(chǎn)延遲任務(wù)成功!');
        }
    }

    // 生產(chǎn)可信任務(wù)
    public function producer3()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        // 創(chuàng)建任務(wù)
        $job = new Job();

        // 設(shè)置任務(wù)數(shù)據(jù)
        $job->setJobData("this is my job data time time " . date('Ymd h:i:s'));

        var_dump('producer3 => ');
        var_dump($job->getJobData());

        // 設(shè)置任務(wù)重試次數(shù)為 3 次。任務(wù)如果沒有確認(rèn),則會執(zhí)行三次
        $job->setRetryTimes(3);

        // 如果5秒內(nèi)沒確認(rèn)任務(wù),會重新回到隊列。默認(rèn)為3秒
        $job->setWaitConfirmTime(5);

        // 投遞任務(wù)
        $queue->producer()->push($job);

        // 確認(rèn)一個任務(wù)
        $queue->consumer()->confirm($job);
    }

    // 消費任務(wù)
    public function consumer()
    {
        // 獲取隊列
        $queue = MyQueue::getInstance();

        ### 消費任務(wù)
        // 獲取到需要消費的任務(wù)
        $job = $queue->consumer()->pop();

        if (!$job) {
            $this->writeJson(Status::CODE_OK, [], '沒有隊列任務(wù)需要消費了!');
            return false;
        }

        // 獲取需要消費的任務(wù)的數(shù)據(jù)
        $jobData = $job->getJobData();
        var_dump($jobData);
    }
}

進(jìn)階使用

我們可以自定義驅(qū)動,實現(xiàn) RabbitMQKafka 等消費隊列軟件的封裝。

用戶需要定義類,并實現(xiàn) \EasySwoole\Queue\QueueDriverInterface 接口的幾個方法即可。該接口的詳細(xì)實現(xiàn)請看下文。

QueueDriverInterface 接口類實現(xiàn)

<?php

namespace EasySwoole\Queue;

interface QueueDriverInterface
{
    public function push(Job $job,float $timeout = 3.0): bool;

    public function pop(float $timeout = 3.0, array $params = []): ?Job;

    public function info(): ?array;

    public function confirm(Job $job,float $timeout = 3.0): bool;
}

相關(guān)倉庫

EasySwoole 中利用 Redis 實現(xiàn)消息隊列

如何利用 EasySwoole 多進(jìn)程多協(xié)程 Redis 隊列實現(xiàn)爬蟲

主站蜘蛛池模板: 国产精品一卡二卡三卡 | 三级在线看 | 成人网av | 91精品国产综合久久香蕉922 | 国外黄色软件 | h片在线看 | 日韩精品一区二区三区电影在线看 | 黄色片在线观看免费 | 色噜噜狠狠一区二区三区狼国成人 | 久久精品福利 | a级片一区二区 | www.亚洲一区二区三区 | 激情五月综合 | 天天草视频 | 激情五月网站 | 精品一区二区三区亚洲 | 欧美成人在线免费观看 | 一级黄色片在线 | 成人免费色视频 | 91精品国产91久久久久久 | 成人一区二区三区在线观看 | 狠狠操狠狠操狠狠操 | 国产丝袜91久久久久久久久久久 | 亚洲成人免费看 | 麻豆精品免费 | h视频亚洲| 亚洲精品香蕉 | 日韩欧美无 | 成年片黄色日本大片网站视频 | 天天操天天干天天干 | 日韩欧美中文字幕在线观看 | 一本色道久久综合狠狠躁的推荐 | 日韩欧美一区二区在线观看视频 | 久久国产91 | 国产免费av电影 | 精品中文字幕在线 | 中文字幕在线观看不卡 | 在线成人www免费观看视频 | 毛片网站视频 | 91爱爱中文字幕 | 成人免费视频网 |