亚洲精品成人_精品成人一区_999视频在线播放_免费黄色在线_亚洲成人久久久_久久www免费视频

kafka

Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),有如下特性: 通過(guò)O(1)的磁盤(pán)數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對(duì)于即使數(shù)以TB的消息存儲(chǔ)也能夠保持長(zhǎng)時(shí)間的穩(wěn)定性能。 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數(shù)百萬(wàn)的消息。 支持通過(guò)Kafka服務(wù)器和消費(fèi)機(jī)集群來(lái)分區(qū)消息。 支持Hadoop并行數(shù)據(jù)加載。

本項(xiàng)目代碼參考自 https://github.com/weiboad/kafka-php

組件要求

  • php: >=7.1.0
  • ext-swoole: ^4.4.5
  • easyswoole/component: ^2.0
  • easyswoole/spl: ^1.1

安裝方法

composer require easyswoole/kafka

倉(cāng)庫(kù)地址

easyswoole/kafka

基本使用

注冊(cè)kafka服務(wù)

namespace EasySwoole\EasySwoole;

use App\Producer\Process as ProducerProcess;
use App\Consumer\Process as ConsumerProcess;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;

class EasySwooleEvent implements Event
{

    public static function initialize()
    {
        // TODO: Implement initialize() method.
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        // TODO: Implement mainServerCreate() method.
        // 生產(chǎn)者
        \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ProducerProcess());
        // 消費(fèi)者
        \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ConsumerProcess());
    }
}

生產(chǎn)者

namespace App\Producer;

use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Kafka\Config\ProducerConfig;
use EasySwoole\Kafka\Kafka;

class Process extends AbstractProcess
{
    protected function run($arg)
    {
        go(function () {
            $config = new ProducerConfig();
            $config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');
            $config->setBrokerVersion('0.9.0');
            $config->setRequiredAck(1);

            $kafka = new Kafka($config);
            $result = $kafka->producer()->send([
                [
                    'topic' => 'test',
                    'value' => 'message--',
                    'key'   => 'key--',
                ],
            ]);

            var_dump($result);
            var_dump('ok');
        });
    }
}

消費(fèi)者

namespace App\Consumer;

use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Kafka\Config\ConsumerConfig;
use EasySwoole\Kafka\Kafka;

class Process extends AbstractProcess
{
    protected function run($arg)
    {
        go(function () {
            $config = new ConsumerConfig();
            $config->setRefreshIntervalMs(1000);
            $config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');
            $config->setBrokerVersion('0.9.0');
            $config->setGroupId('test');

            $config->setTopics(['test']);
            $config->setOffsetReset('earliest');

            $kafka = new Kafka($config);
            // 設(shè)置消費(fèi)回調(diào)
            $func = function ($topic, $partition, $message) {
                var_dump($topic);
                var_dump($partition);
                var_dump($message);
            };
            $kafka->consumer()->subscribe($func);
        });
    }
}

附贈(zèng)

  1. Kafka 集群部署 docker-compose.yml 一份,使用方式如下
    1. 保證2181,9092,9093,9000端口未被占用(占用后可以修改compose文件中的端口號(hào))
    2. 根目錄下,docker-compose up -d
    3. 訪問(wèn)localhost:9000,可以查看kafka集群狀態(tài)。

https://github.com/easy-swoole/kafka/blob/master/docker-compose.yml

主站蜘蛛池模板: 亚洲成人生活片 | 国产最好的av国产大片 | 国产又粗又猛又爽又黄91 | 人人看人人艹 | 性视频免费的视频大全2015年 | 亚洲日本va中文字幕 | 蜜桃视频在线观看视频 | 中文字幕亚洲精品日韩 | 成人在线小视频 | 登山的目的3电影完型 | 成人福利视频 | 日韩av女优在线播放 | 久久久av片 | xxxx在线视频 | 三级日韩| 欧美三级欧美一级 | 亚洲www在线 | 明星换脸av一区二区三区网站 | 交换一区二区三区va在线 | 少妇做爰免费视频网站色黄 | 欧美一级免费片 | 日韩av高清在线 | 成人午夜视频免费在线观看 | 在线免费观看91 | 天堂影院在线观看 | 国产精品久久久久久久久久免 | 亚洲高清免费视频 | 一区二区三区四区视频 | 天堂av一区二区三区 | 免费1000部激情免费视频 | 亚洲欧美成人影院 | 久久美女视频 | 在线中文字幕亚洲 | 日韩成人在线免费视频 | 91成人精品爽啪在线观看 | 香蕉久久av| 欧美日本高清 | 成人亚洲视频 | 国产综合久久久 | 天堂网资源 | 在线视频日本 |