webman中Redis队列实现分析


webman中Redis队列实现分析


正文

webman中Redis队列实现用的包是 webman/redis-queue ,版本是 v1.1.0

Composer 安装 webman/redis-queue

Redis配置文件自动生成在 config/plugin/webman/redis-queue/redis.php,内容类似如下:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // 密码,可选参数
            'db' => 0,            // 数据库
            'max_attempts'  => 5, // 消费失败后,重试次数
            'retry_seconds' => 5, // 重试间隔,单位秒
        ]
    ],
];

消费进程配置文件自动生成在 config/plugin/webman/redis-queue/process.php,内容类似如下:

<?php
return [
    'consumer'  => [
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8, // 可以设置多进程同时消费
        'constructor' => [
            // 消费者类目录
            'consumer_dir' => app_path() . '/queue/redis'
        ]
    ]
];

以及自动生成的该插件是否可用配置文件 config/plugin/webman/redis-queue/app.php

<?php
return [
    'enable' => true,
];

配置并启动

app_path() . '/queue/redis' 目录下写消费者文件,如 ProductEdit.php 文件:

<?php
namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

/**
 * Class ProductEdit
 * @package app\queue\redis\yq
 */
class ProductEdit implements Consumer
{
    public $queue = 'product_edit';

    public $connection = 'default';

    public function consume($data)
    {
        if (empty($data["product_id"])) {
            return;
        }

        /* 处理逻辑 */
    }
}

启动进程:

php start.php start -d

进程配置文件加载分析

在 项目根目录下的 start.php 文件中有:

#!/usr/bin/env php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;
use Workerman\Protocols\Http;
use Workerman\Connection\TcpConnection;
use Webman\App;
use Webman\Config;
use Webman\Route;
use Webman\Middleware;
use Dotenv\Dotenv;
use support\Request;
use support\Log;
use support\Container;

ini_set('display_errors', 'on');
error_reporting(E_ALL);

// 加载环境配置文件
if (class_exists('Dotenv\Dotenv') && file_exists(base_path() . '/.env')) {
    if (method_exists('Dotenv\Dotenv', 'createUnsafeImmutable')) {
        Dotenv::createUnsafeImmutable(base_path())->load();
    } else {
        Dotenv::createMutable(base_path())->load();
    }
}

// 加载应用配置文件,包括 自动加载、容器、数据库、插件进程 等
Config::load(config_path(), ['route', 'container']);

if ($timezone = config('app.default_timezone')) {
    date_default_timezone_set($timezone);
}

$runtime_logs_path = runtime_path() . DIRECTORY_SEPARATOR . 'logs';
if ( !file_exists($runtime_logs_path) || !is_dir($runtime_logs_path) ) {
    if (!mkdir($runtime_logs_path,0777,true)) {
        throw new \RuntimeException("Failed to create runtime logs directory. Please check the permission.");
    }
}

$runtime_views_path = runtime_path() . DIRECTORY_SEPARATOR . 'views';
if ( !file_exists($runtime_views_path) || !is_dir($runtime_views_path) ) {
    if (!mkdir($runtime_views_path,0777,true)) {
        throw new \RuntimeException("Failed to create runtime views directory. Please check the permission.");
    }
}

Worker::$onMasterReload = function () {
    if (function_exists('opcache_get_status')) {
        if ($status = opcache_get_status()) {
            if (isset($status['scripts']) && $scripts = $status['scripts']) {
                foreach (array_keys($scripts) as $file) {
                    opcache_invalidate($file, true);
                }
            }
        }
    }
};

$config = config('server');
Worker::$pidFile = $config['pid_file'];
Worker::$stdoutFile = $config['stdout_file'];
Worker::$logFile = $config['log_file'];
Worker::$eventLoopClass = $config['event_loop'] ?? '';
TcpConnection::$defaultMaxPackageSize = $config['max_package_size'] ?? 10 * 1024 * 1024;
if (property_exists(Worker::class, 'statusFile')) {
    Worker::$statusFile = $config['status_file'] ?? '';
}

if ($config['listen']) {
    $worker = new Worker($config['listen'], $config['context']);
    $property_map = [
        'name',
        'count',
        'user',
        'group',
        'reusePort',
        'transport',
    ];
    foreach ($property_map as $property) {
        if (isset($config[$property])) {
            $worker->$property = $config[$property];
        }
    }

    $worker->onWorkerStart = function ($worker) {
        require_once base_path() . '/support/bootstrap.php';
        $app = new App($worker, Container::instance(), Log::channel('default'), app_path(), public_path());
        Http::requestClass(config('app.request_class', config('server.request_class', Request::class)));
        $worker->onMessage = [$app, 'onMessage'];
    };
}

// 自定义进程
// Windows does not support custom processes.
if (\DIRECTORY_SEPARATOR === '/') {
    // 一级进程
    foreach (config('process', []) as $process_name => $config) {
        worker_start($process_name, $config);
    }
    // 插件进程
    foreach (config('plugin', []) as $firm => $projects) {
        foreach ($projects as $name => $project) {
            foreach ($project['process'] ?? [] as $process_name => $config) {
                worker_start("plugin.$firm.$name.$process_name", $config);
            }
        }
    }
}

Worker::runAll();

加载应用配置文件:Config::load(config_path(), ['route', 'container']);

config_path() 获取配置文件所在目录地址,在 support/helpers.php 文件中。

Config::load()Webman\Config 类文件中。

start.php 中加载自定义进程:

if (\DIRECTORY_SEPARATOR === '/') {
    foreach (config('process', []) as $process_name => $config) {
        worker_start($process_name, $config);
    }
    foreach (config('plugin', []) as $firm => $projects) {
        foreach ($projects as $name => $project) {
            foreach ($project['process'] ?? [] as $process_name => $config) {
                worker_start("plugin.$firm.$name.$process_name", $config);
            }
        }
    }
}

config('plugin', []) 获取插件配置,返回的结果:

array(1) {
  ["webman"]=>
  array(3) {
    ["console"]=>
    array(1) {
      ["app"]=>
      array(7) {
        ["enable"]=>
        bool(true)
        ["phar_file_output_dir"]=>
        string(34) "/var/www/html/webman/build"
        ["phar_filename"]=>
        string(11) "webman.phar"
        ["signature_algorithm"]=>
        int(3)
        ["private_key_file"]=>
        string(0) ""
        ["exclude_pattern"]=>
        string(228) "#^(?!.*(config/plugin/webman/console/app.php|webman/console/src/Commands/(PharPackCommand.php|ReloadCommand.php)|LICENSE|composer.json|.github|.idea|doc|docs|.git|.setting|runtime|test|test_old|tests|Tests|vendor-bin|.md))(.*)$#"
        ["exclude_files"]=>
        array(5) {
          [0]=>
          string(4) ".env"
          [1]=>
          string(7) "LICENSE"
          [2]=>
          string(13) "composer.json"
          [3]=>
          string(13) "composer.lock"
          [4]=>
          string(9) "start.php"
        }
      }
    }
    ["push"]=>
    array(2) {
      ["app"]=>
      array(7) {
        ["enable"]=>
        bool(true)
        ["websocket"]=>
        string(24) "websocket://0.0.0.0:2121"
        ["api"]=>
        string(19) "http://0.0.0.0:2222"
        ["app_key"]=>
        string(32) "dfa7def75955c62f88e02526691ee155"
        ["app_secret"]=>
        string(32) "cd88c5033d8e591dd248408734be233f"
        ["channel_hook"]=>
        string(45) "http://127.0.0.1:8787/plugin/webman/push/hook"
        ["auth"]=>
        string(24) "/plugin/webman/push/auth"
      }
      ["process"]=>
      array(1) {
        ["server"]=>
        array(5) {
          ["handler"]=>
          string(18) "Webman\Push\Server"
          ["listen"]=>
          string(24) "websocket://0.0.0.0:2121"
          ["count"]=>
          int(1)
          ["reloadable"]=>
          bool(false)
          ["constructor"]=>
          array(2) {
            ["api_listen"]=>
            string(19) "http://0.0.0.0:2222"
            ["app_info"]=>
            array(1) {
              ["dfa7def75955c62f88e02526691ee155"]=>
              array(2) {
                ["channel_hook"]=>
                string(45) "http://127.0.0.1:8787/plugin/webman/push/hook"
                ["app_secret"]=>
                string(32) "cd88c5033d8e591dd248408734be233f"
              }
            }
          }
        }
      }
    }
    ["redis-queue"]=>
    array(3) {
      ["app"]=>
      array(1) {
        ["enable"]=>
        bool(true)
      }
      ["process"]=>
      array(1) {
        ["consumer"]=>
        array(3) {
          ["handler"]=>
          string(34) "Webman\RedisQueue\Process\Consumer"
          ["count"]=>
          int(6)
          ["constructor"]=>
          array(1) {
            ["consumer_dir"]=>
            string(44) "/var/www/html/webman/app/queue/redis"
          }
        }
      }
      ["redis"]=>
      array(1) {
        ["default"]=>
        array(2) {
          ["host"]=>
          string(25) "redis://192.168.56.108:6379"
          ["options"]=>
          array(4) {
            ["auth"]=>
            string(12) "eszEDCrdx341"
            ["db"]=>
            string(1) "3"
            ["max_attempts"]=>
            int(5)
            ["retry_seconds"]=>
            int(5)
          }
        }
      }
    }
  }
}

worker_start() 启动进程,调用的 Workerman\Worker 类实现。

php start.php start启动中可以看到加载的进程名:

root@514444b84e17:/var/www/html/webman# php start.php start
Workerman[start.php] start in DEBUG mode
------------------------------------------------------ WORKERMAN -------------------------------------------------------
Workerman version:4.0.30          PHP version:7.4.28
------------------------------------------------------- WORKERS --------------------------------------------------------
proto   user            worker                                listen                      processes    status
tcp     root            webman                                http://0.0.0.0:8787         8             [OK]
tcp     root            monitor                               none                        1             [OK]
tcp     root            plugin.webman.push.server             websocket://0.0.0.0:2121    1             [OK]
tcp     root            plugin.webman.redis-queue.consumer    none                        6             [OK]
------------------------------------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.

Redis队列实现分析

Webman\RedisQueue\Process\Consumer 类文件内容:

<?php
/**
 * This file is part of webman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link      http://www.workerman.net/
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */

namespace Webman\RedisQueue\Process;

use support\Container;
use Webman\RedisQueue\Client;

/**
 * Class Consumer
 * @package process
 */
class Consumer
{
    /**
     * @var string
     */
    protected $_consumerDir = '';

    /**
     * StompConsumer constructor.
     * @param string $consumer_dir
     */
    public function __construct($consumer_dir = '')
    {
        $this->_consumerDir = $consumer_dir;
    }

    /**
     * onWorkerStart.
     */
    public function onWorkerStart()
    {
        if (!is_dir($this->_consumerDir)) {
            echo "Consumer directory {$this->_consumerDir} not exists\r\n";
            return;
        }
        $dir_iterator = new \RecursiveDirectoryIterator($this->_consumerDir);
        $iterator = new \RecursiveIteratorIterator($dir_iterator);
        foreach ($iterator as $file) {
            if (is_dir($file)) {
                continue;
            }
            $fileinfo = new \SplFileInfo($file);
            $ext = $fileinfo->getExtension();
            if ($ext === 'php') {
                $class = str_replace('/', "\\", substr(substr($file, strlen(base_path())), 0, -4));
                if (is_a($class, 'Webman\RedisQueue\Consumer', true)) {
                    $consumer = Container::get($class);
                    $connection_name = $consumer->connection ?? 'default';
                    $queue = $consumer->queue;
                    $connection = Client::connection($connection_name);
                    $connection->subscribe($queue, [$consumer, 'consume']);
                }
            }
        }

    }
}

Webman\RedisQueue\Client 类文件内容:

<?php
/**
 * This file is part of webman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link      http://www.workerman.net/
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */
namespace Webman\RedisQueue;

use Workerman\RedisQueue\Client as RedisClient;

/**
 * Class RedisQueue
 * @package support
 *
 * Strings methods
 * @method static void send($queue, $data, $delay=0)
 */
class Client
{
    /**
     * @var Client[]
     */
    protected static $_connections = null;
    

    /**
     * 用redis队列插件中的配置文件中的配置信息连接redis服务
     * @param string $name
     * @return RedisClient
     */
    public static function connection($name = 'default') {
        if (!isset(static::$_connections[$name])) {
            $config = config('redis_queue', config('plugin.webman.redis-queue.redis', []));
            if (!isset($config[$name])) {
                throw new \RuntimeException("RedisQueue connection $name not found");
            }
            $host = $config[$name]['host'];
            $options = $config[$name]['options'];
            $client = new RedisClient($host, $options);
            static::$_connections[$name] = $client;
        }
        return static::$_connections[$name];
    }

    /**
     * @param $name
     * @param $arguments
     * @return mixed
     */
    public static function __callStatic($name, $arguments)
    {
        return static::connection('default')->{$name}(... $arguments);
    }
}

Workerman\RedisQueue\Client 类文件内容:

<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link      http://www.workerman.net/
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */
namespace Workerman\RedisQueue;

use RuntimeException;
use Workerman\Lib\Timer;
use Workerman\Redis\Client as Redis;

/**
 * Class Client
 * @package Workerman\RedisQueue
 */
class Client
{
    /**
     * Queue waiting for consumption
     */
    const QUEUE_WAITING = '{redis-queue}-waiting';

    /**
     * Queue with delayed consumption
     */
    const QUEUE_DELAYED = '{redis-queue}-delayed';

    /**
     * Queue with consumption failure
     */
    const QUEUE_FAILD = '{redis-queue}-failed';

    /**
     * @var Redis
     */
    protected $_redisSubscribe;

    /**
     * @var Redis
     */
    protected $_redisSend;

    /**
     * @var array
     */
    protected $_subscribeQueues = [];

    /**
     * @var array
     */
    protected $_options = [
        'retry_seconds' => 5,
        'max_attempts'  => 5,
        'auth'          => '',
        'db'            => 0,
    ];

    /**
     * Client constructor.
     * @param $address
     * @param array $options
     */
    public function __construct($address, $options = [])
    {
        $this->_redisSubscribe = new Redis($address, $options);
        $this->_redisSubscribe->brPoping = 0;
        $this->_redisSend = new Redis($address, $options);
        if (isset($options['auth'])) {
            $this->_redisSubscribe->auth($options['auth']);
            $this->_redisSend->auth($options['auth']);
        }
        if (isset($options['db'])) {
            $this->_redisSubscribe->select($options['db']);
            $this->_redisSend->select($options['db']);
        }
        $this->_options = array_merge($this->_options, $options);
    }

    /**
     * Send.
     *
     * @param $queue
     * @param $data
     * @param int $delay
     * @param callable $cb
     */
    public function send($queue, $data, $delay = 0, $cb = null)
    {
        static $_id = 0;
        $id = \microtime(true) . '.' . (++$_id);
        $now = time();
        $package_str = \json_encode([
            'id'       => $id,
            'time'     => $now,
            'delay'    => $delay,
            'attempts' => 0,
            'queue'    => $queue,
            'data'     => $data
        ]);
        if (\is_callable($delay)) {
            $cb = $delay;
            $delay = 0;
        }
        if ($cb) {
            $cb = function ($ret) use ($cb) {
               $cb((bool)$ret);
            };
            if ($delay == 0) {
                $this->_redisSend->lPush(static::QUEUE_WAITING . $queue, $package_str, $cb);
            } else {
                $this->_redisSend->zAdd(static::QUEUE_DELAYED, $now + $delay, $package_str, $cb);
            }
            return;
        }
        if ($delay == 0) {
            $this->_redisSend->lPush(static::QUEUE_WAITING . $queue, $package_str);
        } else {
            $this->_redisSend->zAdd(static::QUEUE_DELAYED, $now + $delay, $package_str);
        }
    }

    /**
     * Subscribe.
     *
     * @param string|array $queue
     * @param callable $callback
     */
    public function subscribe($queue, callable $callback)
    {
        $queue = (array)$queue;
        foreach ($queue as $q) {
            $redis_key = static::QUEUE_WAITING . $q;
            $this->_subscribeQueues[$redis_key] = $callback;
        }
        $this->pull();
    }

    /**
     * Unsubscribe.
     *
     * @param string|array $queue
     * @return void
     */
    public function unsubscribe($queue)
    {
        $queue = (array)$queue;
        foreach($queue as $q) {
            $redis_key = static::QUEUE_WAITING . $q;
            unset($this->_subscribeQueues[$redis_key]);
        }
    }

    /**
     * tryToPullDelayQueue.
     */
    protected function tryToPullDelayQueue()
    {
        static $retry_timer = 0;
        if ($retry_timer) {
            return;
        }
        $retry_timer = Timer::add(1, function () {
            $now = time();
            $options = ['LIMIT', 0, 128];
            $this->_redisSend->zrevrangebyscore(static::QUEUE_DELAYED, $now, '-inf', $options, function($items){
                if ($items === false) {
                    throw new RuntimeException($this->_redisSend->error());
                }
                foreach ($items as $package_str) {
                    $this->_redisSend->zRem(static::QUEUE_DELAYED, $package_str, function ($result) use ($package_str) {
                        if ($result !== 1) {
                            return;
                        }
                        $package = \json_decode($package_str, true);
                        if (!$package) {
                            $this->_redisSend->lPush(static::QUEUE_FAILD , $package_str);
                            return;
                        }
                        $this->_redisSend->lPush(static::QUEUE_WAITING . $package['queue'], $package_str);
                    });
                }
            });
        });
    }

    /**
     * pull.
     */
    public function pull()
    {
        $this->tryToPullDelayQueue();
        if (!$this->_subscribeQueues || $this->_redisSubscribe->brPoping) {
            return;
        }
        $cb = function($data) use (&$cb) {
            if ($data) {
                $this->_redisSubscribe->brPoping = 0;
                $redis_key = $data[0];
                $package_str = $data[1];
                $package = json_decode($package_str, true);
                if (!$package) {
                    $this->_redisSend->lPush(static::QUEUE_FAILD, $package_str);
                } else {
                    if (!isset($this->_subscribeQueues[$redis_key])) {
                        // 取消订阅,放回队列
                        $this->_redisSend->rPush($redis_key, $package_str);
                    } else {
                        $callback = $this->_subscribeQueues[$redis_key];
                        try {
                            \call_user_func($callback, $package['data']);
                        } catch (\Exception $e) {
                            if (++$package['attempts'] > $this->_options['max_attempts']) {
                                $package['error'] = (string) $e;
                                $this->fail($package);
                            } else {
                                $this->retry($package);
                            }
                        } catch (\Error $e) {
                            if (++$package['attempts'] > $this->_options['max_attempts']) {
                                $package['error'] = (string) $e;
                                $this->fail($package);
                            } else {
                                $this->retry($package);
                            }
                        }
                    }
                }
            }
            if ($this->_subscribeQueues) {
                $this->_redisSubscribe->brPoping = 1;
                Timer::add(0.000001, [$this->_redisSubscribe, 'brPop'], [\array_keys($this->_subscribeQueues), 1, $cb] ,false);
            }
        };
        $this->_redisSubscribe->brPoping = 1;
        $this->_redisSubscribe->brPop(\array_keys($this->_subscribeQueues), 1, $cb);
    }

    /**
     * @param $package
     */
    protected function retry($package)
    {
        $delay = time() + $this->_options['retry_seconds'] * ($package['attempts']);
        $this->_redisSend->zAdd(static::QUEUE_DELAYED, $delay, \json_encode($package));
    }

    /**
     * @param $package
     */
    protected function fail($package)
    {
        $this->_redisSend->lPush(static::QUEUE_FAILD , \json_encode($package));
    }

}

Workerman\Redis\Client 类文件内容:

<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the LICENSE
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link      http://www.workerman.net/
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */
namespace Workerman\Redis;

use Workerman\Connection\AsyncTcpConnection;
use Workerman\Lib\Timer;

/**
 * Class Client
 * @package Workerman\Redis
 *
 * Strings methods
 * @method static int append($key, $value, $cb = null)
 * @method static int bitCount($key, $cb = null)
 * @method static int decrBy($key, $value, $cb = null)
 * @method static string|bool get($key, $cb = null)
 * @method static int getBit($key, $offset, $cb = null)
 * @method static string getRange($key, $start, $end, $cb = null)
 * @method static string getSet($key, $value, $cb = null)
 * @method static int incrBy($key, $value, $cb = null)
 * @method static float incrByFloat($key, $value, $cb = null)
 * @method static array mGet(array $keys, $cb = null)
 * @method static array getMultiple(array $keys, $cb = null)
 * @method static bool setBit($key, $offset, $value, $cb = null)
 * @method static bool setEx($key, $ttl, $value, $cb = null)
 * @method static bool pSetEx($key, $ttl, $value, $cb = null)
 * @method static bool setNx($key, $value, $cb = null)
 * @method static string setRange($key, $offset, $value, $cb = null)
 * @method static int strLen($key, $cb = null)
 * Keys methods
 * @method static int del(...$keys, $cb = null)
 * @method static int unlink(...$keys, $cb = null)
 * @method static false|string dump($key, $cb = null)
 * @method static int exists(...$keys, $cb = null)
 * @method static bool expire($key, $ttl, $cb = null)
 * @method static bool pexpire($key, $ttl, $cb = null)
 * @method static bool expireAt($key, $timestamp, $cb = null)
 * @method static bool pexpireAt($key, $timestamp, $cb = null)
 * @method static array keys($pattern, $cb = null)
 * @method static bool|array scan($it, $cb = null)
 * @method static void migrate($host, $port, $keys, $dbIndex, $timeout, $copy = false, $replace = false, $cb = null)
 * @method static bool move($key, $dbIndex, $cb = null)
 * @method static string|int|bool object($information, $key, $cb = null)
 * @method static bool persist($key, $cb = null)
 * @method static string randomKey(, $cb = null)
 * @method static bool rename($srcKey, $dstKey, $cb = null)
 * @method static bool renameNx($srcKey, $dstKey, $cb = null)
 * @method static string type($key, $cb = null)
 * @method static int ttl($key, $cb = null)
 * @method static int pttl($key, $cb = null)
 * @method static void restore($key, $ttl, $value, $cb = null)
 * Hashes methods
 * @method static false|int hSet($key, $hashKey, $value, $cb = null)
 * @method static bool hSetNx($key, $hashKey, $value, $cb = null)
 * @method static false|string hGet($key, $hashKey, $cb = null)
 * @method static false|int hLen($key, $cb = null)
 * @method static false|int hDel($key, ...$hashKeys, $cb = null)
 * @method static array hKeys($key, $cb = null)
 * @method static array hVals($key, $cb = null)
 * @method static bool hExists($key, $hashKey, $cb = null)
 * @method static int hIncrBy($key, $hashKey, $value, $cb = null)
 * @method static float hIncrByFloat($key, $hashKey, $value, $cb = null)
 * @method static array hScan($key, $iterator, $pattern = '', $count = 0, $cb = null)
 * @method static int hStrLen($key, $hashKey, $cb = null)
 * Lists methods
 * @method static array blPop($keys, $timeout, $cb = null)
 * @method static array brPop($keys, $timeout, $cb = null)
 * @method static false|string bRPopLPush($srcKey, $dstKey, $timeout, $cb = null)
 * @method static false|string lIndex($key, $index, $cb = null)
 * @method static int lInsert($key, $position, $pivot, $value, $cb = null)
 * @method static false|string lPop($key, $cb = null)
 * @method static false|int lPush($key, ...$entries, $cb = null)
 * @method static false|int lPushx($key, $value, $cb = null)
 * @method static array lRange($key, $start, $end, $cb = null)
 * @method static false|int lRem($key, $value, $count, $cb = null)
 * @method static bool lSet($key, $index, $value, $cb = null)
 * @method static false|array lTrim($key, $start, $end, $cb = null)
 * @method static false|string rPop($key, $cb = null)
 * @method static false|string rPopLPush($srcKey, $dstKey, $cb = null)
 * @method static false|int rPush($key, ...$entries, $cb = null)
 * @method static false|int rPushX($key, $value, $cb = null)
 * @method static false|int lLen($key, $cb = null)
 * Sets methods
 * @method static int sAdd($key, $value, $cb = null)
 * @method static int sCard($key, $cb = null)
 * @method static array sDiff($keys, $cb = null)
 * @method static false|int sDiffStore($dst, $keys, $cb = null)
 * @method static false|array sInter($keys, $cb = null)
 * @method static false|int sInterStore($dst, $keys, $cb = null)
 * @method static bool sIsMember($key, $member, $cb = null)
 * @method static array sMembers($key, $cb = null)
 * @method static bool sMove($src, $dst, $member, $cb = null)
 * @method static false|string|array sPop($key, $count = 0, $cb = null)
 * @method static false|string|array sRandMember($key, $count = 0, $cb = null)
 * @method static int sRem($key, ...$members, $cb = null)
 * @method static array sUnion(...$keys, $cb = null)
 * @method static false|int sUnionStore($dst, ...$keys, $cb = null)
 * @method static false|array sScan($key, $iterator, $pattern = '', $count = 0, $cb = null)
 * Sorted sets methods
 * @method static array bzPopMin($keys, $timeout, $cb = null)
 * @method static array bzPopMax($keys, $timeout, $cb = null)
 * @method static int zAdd($key, $score, $value, $cb = null)
 * @method static int zCard($key, $cb = null)
 * @method static int zCount($key, $start, $end, $cb = null)
 * @method static double zIncrBy($key, $value, $member, $cb = null)
 * @method static int zinterstore($keyOutput, $arrayZSetKeys, $arrayWeights = [], $aggregateFunction = '', $cb = null)
 * @method static array zPopMin($key, $count, $cb = null)
 * @method static array zPopMax($key, $count, $cb = null)
 * @method static array zRange($key, $start, $end, $withScores = false, $cb = null)
 * @method static array zRangeByScore($key, $start, $end, $options = [], $cb = null)
 * @method static array zRevRangeByScore($key, $start, $end, $options = [], $cb = null)
 * @method static array zRangeByLex($key, $min, $max, $offset = 0, $limit = 0, $cb = null)
 * @method static int zRank($key, $member, $cb = null)
 * @method static int zRevRank($key, $member, $cb = null)
 * @method static int zRem($key, ...$members, $cb = null)
 * @method static int zRemRangeByRank($key, $start, $end, $cb = null)
 * @method static int zRemRangeByScore($key, $start, $end, $cb = null)
 * @method static array zRevRange($key, $start, $end, $withScores = false, $cb = null)
 * @method static double zScore($key, $member, $cb = null)
 * @method static int zunionstore($keyOutput, $arrayZSetKeys, $arrayWeights = [], $aggregateFunction = '', $cb = null)
 * @method static false|array zScan($key, $iterator, $pattern = '', $count = 0, $cb = null)
 * HyperLogLogs methods
 * @method static int pfAdd($key, $values, $cb = null)
 * @method static int pfCount($keys, $cb = null)
 * @method static bool pfMerge($dstKey, $srcKeys, $cb = null)
 * Geocoding methods
 * @method static int geoAdd($key, $longitude, $latitude, $member, ...$items, $cb = null)
 * @method static array geoHash($key, ...$members, $cb = null)
 * @method static array geoPos($key, ...$members, $cb = null)
 * @method static double geoDist($key, $members, $unit = '', $cb = null)
 * @method static int|array geoRadius($key, $longitude, $latitude, $radius, $unit, $options = [], $cb = null)
 * @method static array geoRadiusByMember($key, $member, $radius, $units, $options = [], $cb = null)
 * Streams methods
 * @method static int xAck($stream, $group, $arrMessages, $cb = null)
 * @method static string xAdd($strKey, $strId, $arrMessage, $iMaxLen = 0, $booApproximate = false, $cb = null)
 * @method static array xClaim($strKey, $strGroup, $strConsumer, $minIdleTime, $arrIds, $arrOptions = [], $cb = null)
 * @method static int xDel($strKey, $arrIds, $cb = null)
 * @method static mixed xGroup($command, $strKey, $strGroup, $strMsgId, $booMKStream = null, $cb = null)
 * @method static mixed xInfo($command, $strStream, $strGroup = null, $cb = null)
 * @method static int xLen($stream, $cb = null)
 * @method static array xPending($strStream, $strGroup, $strStart = 0, $strEnd = 0, $iCount = 0, $strConsumer = null, $cb = null)
 * @method static array xRange($strStream, $strStart, $strEnd, $iCount = 0, $cb = null)
 * @method static array xRead($arrStreams, $iCount = 0, $iBlock = null, $cb = null)
 * @method static array xReadGroup($strGroup, $strConsumer, $arrStreams, $iCount = 0, $iBlock = null, $cb = null)
 * @method static array xRevRange($strStream, $strEnd, $strStart, $iCount = 0, $cb = null)
 * @method static int xTrim($strStream, $iMaxLen, $booApproximate = null, $cb = null)
 * Pub/sub methods
 * @method static mixed publish($channel, $message, $cb = null)
 * @method static mixed pubSub($keyword, $argument = null, $cb = null)
 * Generic methods
 * @method static mixed rawCommand(...$commandAndArgs, $cb = null)
 * Transactions methods
 * @method static \Redis multi($cb = null)
 * @method static mixed exec($cb = null)
 * @method static mixed discard($cb = null)
 * @method static mixed watch($keys, $cb = null)
 * @method static mixed unwatch($keys, $cb = null)
 * Scripting methods
 * @method static mixed eval($script, $args = [], $numKeys = 0, $cb = null)
 * @method static mixed evalSha($sha, $args = [], $numKeys = 0, $cb = null)
 * @method static mixed script($command, ...$scripts, $cb = null)
 * @method static mixed client(...$args, $cb = null)
 * @method static null|string getLastError($cb = null)
 * @method static bool clearLastError($cb = null)
 * @method static mixed _prefix($value, $cb = null)
 * @method static mixed _serialize($value, $cb = null)
 * @method static mixed _unserialize($value, $cb = null)
 * Introspection methods
 * @method static bool isConnected($cb = null)
 * @method static mixed getHost($cb = null)
 * @method static mixed getPort($cb = null)
 * @method static false|int getDbNum($cb = null)
 * @method static false|double getTimeout($cb = null)
 * @method static mixed getReadTimeout($cb = null)
 * @method static mixed getPersistentID($cb = null)
 * @method static mixed getAuth($cb = null)
 */
class Client
{
    /**
     * @var AsyncTcpConnection
     */
    protected $_connection = null;

    /**
     * @var array
     */
    protected $_options = [];

    /**
     * @var string
     */
    protected $_address = '';

    /**
     * @var array
     */
    protected $_queue = [];

    /**
     * @var int
     */
    protected $_db = 0;

    /**
     * @var string|array
     */
    protected $_auth = null;

    /**
     * @var bool
     */
    protected $_waiting = true;

    /**
     * @var Timer
     */
    protected $_connectTimeoutTimer = null;

    /**
     * @var Timer
     */
    protected $_reconnectTimer = null;

    /**
     * @var callable
     */
    protected $_connectionCallback = null;

    /**
     * @var Timer
     */
    protected $_waitTimeoutTimer = null;

    /**
     * @var string
     */
    protected $_error = '';

    /**
     * @var bool
     */
    protected $_subscribe = false;

    /**
     * @var bool
     */
    protected $_firstConnect = true;

    /**
     * Client constructor.
     * @param $address
     * @param array $options
     * @param null $callback
     */
    public function __construct($address, $options = [], $callback = null)
    {
        if (!\class_exists('Protocols\Redis')) {
            \class_alias('Workerman\Redis\Protocols\Redis', 'Protocols\Redis');
        }
        $this->_address = $address;
        $this->_options = $options;
        $this->_connectionCallback = $callback;
        $this->connect();
        $timer = Timer::add(1, function () use (&$timer) {
            if (empty($this->_queue)) {
                return;
            }
            if ($this->_subscribe) {
                Timer::del($timer);
                return;
            }
            reset($this->_queue);
            $current_queue = current($this->_queue);
            $current_command = $current_queue[0][0];
            $ignore_first_queue = in_array($current_command, ['BLPOP', 'BRPOP']);
            $time = time();
            $timeout = isset($this->_options['wait_timeout']) ? $this->_options['wait_timeout'] : 600;
            $has_timeout = false;
            $first_queue = true;
            foreach ($this->_queue as $key => $queue) {
                if ($first_queue && $ignore_first_queue) {
                    $first_queue = false;
                    continue;
                }
                if ($time - $queue[1] > $timeout) {
                    $has_timeout = true;
                    unset($this->_queue[$key]);
                    $msg = "Workerman Redis Wait Timeout ($timeout seconds)";
                    if ($queue[2]) {
                        $this->_error = $msg;
                        \call_user_func($queue[2], false, $this);
                    } else {
                        echo new Exception($msg);
                    }
                }
            }
            if ($has_timeout && !$ignore_first_queue) {
                $this->closeConnection();
                $this->connect();
            }
        });
    }

    /**
     * connect
     */
    public function connect()
    {
        if ($this->_connection) {
            return;
        }

        $timeout = isset($this->_options['connect_timeout']) ? $this->_options['connect_timeout'] : 5;
        $context = isset($this->_options['context']) ? $this->_options['context'] : [];
        $this->_connection = new AsyncTcpConnection($this->_address, $context);

        $this->_connection->onConnect = function () {
            $this->_waiting = false;
            Timer::del($this->_connectTimeoutTimer);
            if ($this->_reconnectTimer) {
                Timer::del($this->_reconnectTimer);
                $this->_reconnectTimer = null;
            }

            if ($this->_db) {
                $this->_queue = \array_merge([[['SELECT', $this->_db], time(), null]], $this->_queue);
            }

            if ($this->_auth) {
                $this->_queue = \array_merge([[['AUTH', $this->_auth], time(), null]],  $this->_queue);
            }

            $this->_connection->onError = function ($connection, $code, $msg) {
                echo new \Exception("Workerman Redis Connection Error $code $msg");
            };
            $this->process();
            $this->_firstConnect && $this->_connectionCallback && \call_user_func($this->_connectionCallback, true, $this);
            $this->_firstConnect = false;
        };

        $time_start = microtime(true);
        $this->_connection->onError = function ($connection) use ($time_start) {
            $time = microtime(true) - $time_start;
            $msg = "Workerman Redis Connection Failed ($time seconds)";
            $this->_error = $msg;
            $exception = new \Exception($msg);
            if (!$this->_connectionCallback) {
                echo $exception;
                return;
            }
            $this->_firstConnect && \call_user_func($this->_connectionCallback, false, $this);
        };

        $this->_connection->onClose = function () use ($time_start) {
            $this->_subscribe = false;
            if ($this->_connectTimeoutTimer) {
                Timer::del($this->_connectTimeoutTimer);
            }
            if ($this->_reconnectTimer) {
                Timer::del($this->_reconnectTimer);
                $this->_reconnectTimer = null;
            }
            $this->closeConnection();
            if (microtime(true) - $time_start > 5) {
                $this->connect();
            } else {
                $this->_reconnectTimer = Timer::add(5, function () {
                    $this->connect();
                }, null, false);
            }
        };

        $this->_connection->onMessage = function ($connection, $data) {
            $this->_error = '';
            $this->_waiting = false;
            reset($this->_queue);
            $queue = current($this->_queue);
            $cb = $queue[2];
            $type = $data[0];
            if (!$this->_subscribe) {
                unset($this->_queue[key($this->_queue)]);
            }
            $success = $type === '-' || $type === '!' ? false : true;
            $exception = false;
            $result = false;
            if ($success) {
                $result = $data[1];
                if ($type === '+' && $result === 'OK') {
                    $result = true;
                }
            } else {
                $this->_error = $data[1];
            }
            if (!$cb) {
                $this->process();
                return;
            }
            // format.
            if (!empty($queue[3])) {
                $result = \call_user_func($queue[3], $result);
            }
            try {
                \call_user_func($cb, $result, $this);
            } catch (\Exception $exception) {
            }

            if ($type === '!') {
                $this->closeConnection();
                $this->connect();
            } else {
                $this->process();
            }
            if ($exception) {
                throw $exception;
            }
        };

        $this->_connectTimeoutTimer = Timer::add($timeout, function () use ($timeout) {
            $this->_connectTimeoutTimer = null;
            if ($this->_connection && $this->_connection->getStatus(false) === 'ESTABLISHED') {
                return;
            }
            $this->closeConnection();
            $this->_error = "Workerman Redis Connection to {$this->_address} timeout ({$timeout} seconds)";
            if ($this->_firstConnect && $this->_connectionCallback) {
                \call_user_func($this->_connectionCallback, false, $this);
            } else {
                echo $this->_error . "\n";
            }

        });
        $this->_connection->connect();
    }

    /**
     * process
     */
    public function process()
    {
        if (!$this->_connection || $this->_waiting || empty($this->_queue) || $this->_subscribe) {
            return;
        }
        \reset($this->_queue);
        $queue = \current($this->_queue);
        if ($queue[0][0] === 'SUBSCRIBE' || $queue[0][0] === 'PSUBSCRIBE') {
            $this->_subscribe = true;
        }
        $this->_waiting = true;
        $this->_connection->send($queue[0]);
        $this->_error = '';
    }

    /**
     * subscribe
     *
     * @param $channels
     * @param $cb
     */
    public function subscribe($channels, $cb)
    {
        $new_cb = function ($result) use ($cb) {
            if (!$result) {
                echo $this->error();
                return;
            }
            $response_type = $result[0];
            switch ($response_type) {
                case 'subscribe':
                    return;
                case 'message':
                    \call_user_func($cb, $result[1], $result[2], $this);
                    return;
                default:
                    echo 'unknow response type for subscribe. buffer:' . serialize($result) . "\n";
            }
        };
        $this->_queue[] = [['SUBSCRIBE', $channels], time(), $new_cb];
        $this->process();
    }

    /**
     * psubscribe
     *
     * @param $patterns
     * @param $cb
     */
    public function pSubscribe($patterns, $cb)
    {
        $new_cb = function ($result) use ($cb) {
            if (!$result) {
                echo $this->error();
                return;
            }
            $response_type = $result[0];
            switch ($response_type) {
                case 'psubscribe':
                    return;
                case 'pmessage':
                    \call_user_func($cb, $result[1], $result[2], $result[3], $this);
                    return;
                default:
                    echo 'unknow response type for psubscribe. buffer:' . serialize($result) . "\n";
            }
        };
        $this->_queue[] = [['PSUBSCRIBE', $patterns], time(), $new_cb];
        $this->process();
    }

    /**
     * select
     *
     * @param $db
     * @param null $cb
     */
    public function select($db, $cb = null)
    {
        $format = function ($result) use ($db) {
            $this->_db = $db;
            return $result;
        };
        $cb = $cb ? $cb : function(){};
        $this->_queue[] = [['SELECT', $db], time(), $cb, $format];
        $this->process();
    }

    /**
     * auth
     *
     * @param string|array $auth
     * @param null $cb
     */
    public function auth($auth, $cb = null)
    {
        $format = function ($result) use ($auth) {
            $this->_auth = $auth;
            return $result;
        };
        $cb = $cb ? $cb : function(){};
        $this->_queue[] = [['AUTH', $auth], time(), $cb, $format];
        $this->process();
    }

    /**
     * set
     *
     * @param $key
     * @param $value
     * @param null $cb
     * @return null
     */
    public function set($key, $value, $cb = null)
    {
        $args = func_get_args();
        if ($cb !== null && !\is_callable($cb)) {
            $timeout = $cb;
            $cb = null;
            if (\count($args) > 3) {
                $cb = $args[3];
            }
            $this->_queue[] = [['SETEX', $key, $timeout, $value], time(), $cb];
            $this->process();
            return null;
        }
        $this->_queue[] = [['SET', $key, $value], time(), $cb];
        $this->process();
    }

    /**
     * incr
     *
     * @param $key
     * @param null $cb
     * @return null
     */
    public function incr($key, $cb = null)
    {
        $args = func_get_args();
        if ($cb !== null && !\is_callable($cb)) {
            $num = $cb;
            $cb = null;
            if (\count($args) > 2) {
                $cb = $args[2];
            }
            $this->_queue[] = [['INCRBY', $key, $num], time(), $cb];
            $this->process();
            return null;
        }
        $this->_queue[] = [['INCR', $key], time(), $cb];
        $this->process();
    }


    /**
     * decr
     *
     * @param $key
     * @param null $cb
     * @return null
     */
    public function decr($key, $cb = null)
    {
        $args = func_get_args();
        if ($cb !== null && !\is_callable($cb)) {
            $num = $cb;
            $cb = null;
            if (\count($args) > 2) {
                $cb = $args[2];
            }
            $this->_queue[] = [['DECRBY', $key, $num], time(), $cb];
            $this->process();
            return null;
        }
        $this->_queue[] = [['DECR', $key], time(), $cb];
        $this->process();
    }

    /**
     * sort
     *
     * @param $key
     * @param $options
     * @param null $cb
     */
    function sort($key, $options, $cb = null)
    {
        $args = [];
        if (isset($options['sort'])) {
            $args[] = $options['sort'];
            unset($options['sort']);
        }

        foreach ($options as $op => $value) {
            $args[] = $op;
            if (!is_array($value)) {
                $args[] = $value;
                continue;
            }
            foreach ($value as $sub_value) {
                $args[] = $sub_value;
            }
        }
        \array_unshift($args, 'SORT', $key);
        $this->_queue[] = [$args, time(), $cb];
        $this->process();
    }

    /**
     * mSet
     *
     * @param array $array
     * @param null $cb
     */
    public function mSet(array $array, $cb = null)
    {
        $this->mapCb('MSET', $array, $cb);
    }

    /**
     * mSetNx
     *
     * @param array $array
     * @param null $cb
     */
    public function mSetNx(array $array, $cb = null)
    {
        $this->mapCb('MSETNX', $array, $cb);
    }

    /**
     * mapCb
     *
     * @param $command
     * @param array $array
     * @param $cb
     */
    protected function mapCb($command, array $array, $cb)
    {
        $args = [$command];
        foreach ($array as $key => $value) {
            $args[] = $key;
            $args[] = $value;
        }
        $this->_queue[] = [$args, time(), $cb];
        $this->process();
    }

    /**
     * hMSet
     *
     * @param $key
     * @param array $array
     * @param null $cb
     */
    public function hMSet($key, array $array, $cb = null)
    {
        $this->keyMapCb('HMSET', $key, $array, $cb);
    }

    /**
     * hMGet
     *
     * @param $key
     * @param array $array
     * @param null $cb
     */
    public function hMGet($key, array $array, $cb = null)
    {
        $format = function ($result) use ($array) {
            if (!is_array($result)) {
                return $result;
            }
            return \array_combine($array, $result);
        };
        $this->_queue[] = [['HMGET', $key, $array], time(), $cb, $format];
        $this->process();
    }

    /**
     * hGetAll
     *
     * @param $key
     * @param null $cb
     */
    public function hGetAll($key, $cb = null)
    {
        $format = function ($result) {
            if (!\is_array($result)) {
                return $result;
            }
            $return = [];
            $key = '';
            foreach ($result as $index => $item) {
                if ($index % 2 == 0) {
                    $key = $item;
                    continue;
                }
                $return[$key] = $item;
            }
            return $return;
        };
        $this->_queue[] = [['HGETALL', $key], time(), $cb, $format];
        $this->process();
    }

    /**
     * keyMapCb
     *
     * @param $command
     * @param $key
     * @param array $array
     * @param $cb
     */
    protected function keyMapCb($command, $key, array $array, $cb)
    {
        $args = [$command, $key];
        foreach ($array as $key => $value) {
            $args[] = $key;
            $args[] = $value;
        }
        $this->_queue[] = [$args, time(), $cb];
        $this->process();
    }

    /**
     * __call
     *
     * @param $method
     * @param $args
     */
    public function __call($method, $args)
    {
        $cb = null;
        if (\is_callable(end($args))) {
            $cb = array_pop($args);
        }

        \array_unshift($args, \strtoupper($method));
        $this->_queue[] = [$args, time(), $cb];
        $this->process();
    }

    /**
     * closeConnection
     */
    public function closeConnection()
    {
        if (!$this->_connection) {
            return;
        }
        $this->_subscribe = false;
        $this->_connection->onConnect = $this->_connection->onError = $this->_connection->onClose =
        $this->_connection->onMessge = null;
        $this->_connection->close();
        $this->_connection = null;
    }

    /**
     * error
     *
     * @return string
     */
    function error()
    {
        return $this->_error;
    }

    /**
     * close
     */
    public function close()
    {
        $this->closeConnection();
        $this->_queue = [];
    }

    /**
     * scan
     *
     * @throws Exception
     */
    public function scan()
    {
        throw new Exception('Not implemented');
    }

    /**
     * hScan
     *
     * @throws Exception
     */
    public function hScan()
    {
        throw new Exception('Not implemented');
    }

    /**
     * hScan
     *
     * @throws Exception
     */
    public function sScan()
    {
        throw new Exception('Not implemented');
    }

    /**
     * hScan
     *
     * @throws Exception
     */
    public function zScan()
    {
        throw new Exception('Not implemented');
    }
}

failed 队列数据写入数据库

发现上面把处理不了的队列数据都扔到了 {redis-queue}-failed 队列中,我们这里想办法把这个队列中的数据写入到数据库中。

消费者相关类

新建lib/webman/redisQueue/failed/Consumer.php 文件,内容:

<?php

namespace lib\webman\redisQueue\failed;

use support\Container;
use Webman\RedisQueue\Process\Consumer as webmanConsumer;

/**
 * Class Consumer
 * @package process
 */
class Consumer extends webmanConsumer
{
    /**
     * onWorkerStart.
     */
    public function onWorkerStart()
    {
        if (!is_dir($this->_consumerDir)) {
            echo "Consumer directory {$this->_consumerDir} not exists\r\n";
            return;
        }
        $dir_iterator = new \RecursiveDirectoryIterator($this->_consumerDir);
        $iterator = new \RecursiveIteratorIterator($dir_iterator);
        foreach ($iterator as $file) {
            if (is_dir($file)) {
                continue;
            }
            $fileinfo = new \SplFileInfo($file);
            $ext = $fileinfo->getExtension();
            if ($ext === 'php') {
                $class = str_replace('/', "\\", substr(substr($file, strlen(base_path())), 0, -4));
                if (is_a($class, 'Webman\RedisQueue\Consumer', true)) {
                    $consumer = Container::get($class);
                    $connection_name = $consumer->connection ?? 'default';
                    $queue = $consumer->queue;
                    $connection = Client::connection($connection_name);
                    $connection->subscribe($queue, [$consumer, 'consume']);
                }
            }
        }
    }
}

新建lib/webman/redisQueue/failed/Client.php 文件,内容:

<?php

namespace lib\webman\redisQueue\failed;

use Webman\RedisQueue\Client as webmanClient;

/**
 * Class Client
 * @package lib\webman\redisQueue\failed
 */
class Client extends webmanClient
{
    /**
     * @param string $name
     * @return RedisClient
     */
    public static function connection($name = 'default')
    {
        if (!isset(static::$_connections[$name])) {
            $config = config('redis_queue', config('plugin.webman.redis-queue.redis', []));
            if (!isset($config[$name])) {
                throw new \RuntimeException("RedisQueue connection $name not found");
            }
            $host = $config[$name]['host'];
            $options = $config[$name]['options'];
            $client = new RedisClient($host, $options);
            static::$_connections[$name] = $client;
        }

        return static::$_connections[$name];
    }
}

新建lib/webman/redisQueue/failed/RedisClient.php 文件,内容:

<?php

namespace lib\webman\redisQueue\failed;

use Workerman\Lib\Timer;
use Workerman\RedisQueue\Client;
use support\Log;

/**
 * Class RedisClient
 * @package lib\webman\redisQueue\failed
 */
class RedisClient extends Client
{
    /**
     * @param array|string $queue
     * @param callable $callback
     */
    public function subscribe($queue, callable $callback)
    {
        $this->_subscribeQueues[$queue] = $callback;

        $this->pull();
    }

    /**
     * pull
     */
    public function pull()
    {
        if (!$this->_subscribeQueues || $this->_redisSubscribe->brPoping) {
            return;
        }
        $cb = function($data) use (&$cb) {
            if ($data) {
                $this->_redisSubscribe->brPoping = 0;
                $redis_key = $data[0];
                $package_str = $data[1];
                $package = json_decode($package_str, true);
                if (!$package) {
                    $log = Log::channel('default');
                    $log->error('lib\webman\redisQueue\failed\RedisClient::pull', ["package_str" => $package_str]);
                } else {
                    if (!isset($this->_subscribeQueues[$redis_key])) {
                        // 取消订阅,放回队列
                        $this->_redisSend->rPush($redis_key, $package_str);
                    } else {
                        $callback = $this->_subscribeQueues[$redis_key];
                        try {
                            \call_user_func($callback, $package);
                        } catch (\Throwable $e) {
                            // 记录异常日志
                            $package['Throwable'] = (string) $e;

                            $log = Log::channel('default');
                            $log->error('lib\webman\redisQueue\failed\RedisClient::pull', $package);
                        }
                    }
                }
            }
            if ($this->_subscribeQueues) {
                $this->_redisSubscribe->brPoping = 1;
                Timer::add(0.000001, [$this->_redisSubscribe, 'brPop'], [\array_keys($this->_subscribeQueues), 1, $cb] ,false);
            }
        };
        $this->_redisSubscribe->brPoping = 1;
        $this->_redisSubscribe->brPop(\array_keys($this->_subscribeQueues), 1, $cb);
    }
}

指定消费者

把数据表建好:

CREATE TABLE `redis_queue_failed_record` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `data` text COLLATE utf8mb4_unicode_ci COMMENT '数据',
  `status` tinyint(4) NOT NULL DEFAULT '10' COMMENT '状态,10:待处理,20:处理成功,30:处理失败',
  `created_at` timestamp NULL DEFAULT NULL,
  `updated_at` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=DYNAMIC COMMENT='redis队列错误记录';

model模型类也建好,app/model/RedisQueueFailedRecord.php 文件内容:

<?php

namespace app\model;

use support\Model;

/**
 * redis队列错误记录
 */
class RedisQueueFailedRecord extends Model
{
    /**
     * 模型的连接名称
     *
     * @var string
     */
    protected $connection = 'mysql';

    /**
     * The table associated with the model.
     *
     * @var string
     */
    protected $table = 'redis_queue_failed_record';

    /**
     * The primary key associated with the table.
     *
     * @var string
     */
    protected $primaryKey = 'id';

    /**
     * Indicates if the model should be timestamped.
     * 建议true;自动管理created_at(开始时间),updated_at(更新时间)
     *
     * @var bool
     */
    public $timestamps = true;
    
    /**
     * 可以适用create方法直接创建的属性 
     */
    protected $fillable = [
        "data",
        "status",
    ];

    /**
     * 类型转换
     */
    protected $casts = [
        "data" => "json",
    ];

    const STATUS_UNDEAL = 10;  // 待处理
    const STATUS_DEALED_SUCC = 20;  // 处理成功
    const STATUS_DEALED_FAIL = 30;  // 处理失败
}

新建 app/queue/redisFailed/RedisQueueFailed.php 文件,内容:

<?php
namespace app\queue\redisFailed;

use Webman\RedisQueue\Consumer;
use app\model\RedisQueueFailedRecord;

/**
 * redis队列错误
 * Class fddContractFiling
 * @package app\queue\redis
 */
class RedisQueueFailed implements Consumer
{
    public $queue = \Workerman\RedisQueue\Client::QUEUE_FAILD;

    public $connection = 'default';

    public function consume($data)
    {
        $model = new RedisQueueFailedRecord();
        $model->data = $data;
        $model->status = RedisQueueFailedRecord::STATUS_UNDEAL;

        if (!$model->save()) {
            throw new \RuntimeException("redis_queue_failed_record data save failed");
        }
    }
}

消费者配置

config/plugin/webman/redis-queue/process.php 配置文件中,新增 consumerFailed 配置:

<?php
return [
    'consumer'  => [
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8, // 可以设置多进程同时消费
        'constructor' => [
            // 消费者类目录
            'consumer_dir' => app_path() . '/queue/redis'
        ]
    ],
    'consumerFailed'  => [
        'handler'     => lib\webman\redisQueue\failed\Consumer::class,
        'count'       => 1, // 可以设置多进程同时消费
        'constructor' => [
            // 消费者类目录
            'consumer_dir' => app_path() . '/queue/redisFailed'
        ]
    ]
];

服务停止,然后重启,查看效果。

源码

support/helpers.php 文件

support/helpers.php 是一个帮助函数库:

<?php

use support\Container;
use Workerman\Worker;
use Webman\Config;
// 省略若干...

/**
 * @return bool
 */
function is_phar()
{
    return class_exists(\Phar::class, false) && Phar::running();
}

// Phar support.
if (is_phar()) {
    define('BASE_PATH', dirname(__DIR__));
} else {
    define('BASE_PATH', realpath(__DIR__ . '/../'));
}
define('WEBMAN_VERSION', '1.3.0');

/**
 * @param $return_phar
 * @return false|string
 */
function base_path($return_phar = true)
{
    static $real_path = '';
    if (!$real_path) {
        $real_path = is_phar() ? dirname(Phar::running(false)) : BASE_PATH;
    }
    return $return_phar ? BASE_PATH : $real_path;
}

/**
 * @return string
 */
function app_path()
{
    return BASE_PATH . DIRECTORY_SEPARATOR . 'app';
}

/**
 * @return string
 */
function public_path()
{
    static $path = '';
    if (!$path) {
        $path = config('app.public_path', BASE_PATH . DIRECTORY_SEPARATOR . 'public');
    }
    return $path;
}

/**
 * 获取配置目录路径
 * @return string
 */
function config_path()
{
    return BASE_PATH . DIRECTORY_SEPARATOR . 'config';
}

/**
 * Phar support.
 * Compatible with the 'realpath' function in the phar file.
 *
 * @return string
 */
function runtime_path()
{
    static $path = '';
    if (!$path) {
        $path = config('app.runtime_path', BASE_PATH . DIRECTORY_SEPARATOR . 'runtime');
    }
    return $path;
}

// 省略若干...

/**
 * 获取配置参数值
 * @param $key
 * @param null $default
 * @return mixed
 */
function config($key = null, $default = null)
{
    return Config::get($key, $default);
}

// 省略若干...

/**
 * @param $worker
 * @param $class
 */
function worker_bind($worker, $class)
{
    $callback_map = [
        'onConnect',
        'onMessage',
        'onClose',
        'onError',
        'onBufferFull',
        'onBufferDrain',
        'onWorkerStop',
        'onWebSocketConnect'
    ];
    foreach ($callback_map as $name) {
        if (method_exists($class, $name)) {
            $worker->$name = [$class, $name];
        }
    }
    if (method_exists($class, 'onWorkerStart')) {
        call_user_func([$class, 'onWorkerStart'], $worker);
    }
}

/**
 * @param $process_name
 * @param $config
 * @return void
 */
function worker_start($process_name, $config)
{
    $worker = new Worker($config['listen'] ?? null, $config['context'] ?? []);
    $property_map = [
        'count',
        'user',
        'group',
        'reloadable',
        'reusePort',
        'transport',
        'protocol',
    ];
    $worker->name = $process_name;
    foreach ($property_map as $property) {
        if (isset($config[$property])) {
            $worker->$property = $config[$property];
        }
    }

    $worker->onWorkerStart = function ($worker) use ($config) {
        require_once base_path() . '/support/bootstrap.php';

        foreach ($config['services'] ?? [] as $server) {
            if (!class_exists($server['handler'])) {
                echo "process error: class {$server['handler']} not exists\r\n";
                continue;
            }
            $listen = new Worker($server['listen'] ?? null, $server['context'] ?? []);
            if (isset($server['listen'])) {
                echo "listen: {$server['listen']}\n";
            }
            $instance = Container::make($server['handler'], $server['constructor'] ?? []);
            worker_bind($listen, $instance);
            $listen->listen();
        }

        if (isset($config['handler'])) {
            if (!class_exists($config['handler'])) {
                echo "process error: class {$config['handler']} not exists\r\n";
                return;
            }

            $instance = Container::make($config['handler'], $config['constructor'] ?? []);
            worker_bind($worker, $instance);
        }

    };
}

// 省略若干...

Webman\Config 类

<?php
/**
 * This file is part of webman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link      http://www.workerman.net/
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */

namespace Webman;

class Config
{

    /**
     * @var array
     */
    protected static $_config = [];

    /**
     * @var string
     */
    protected static $_configPath = '';

    /**
     * @var bool
     */
    protected static $_loaded = false;

    /**
     * @param $config_path 配置文件路径
     * @param array $exclude_file 排除的文件
     */
    public static function load($config_path, $exclude_file = [])
    {
        static::$_configPath = $config_path;
        if (!$config_path) {
            return;
        }
        // RecursiveDirectoryIterator 提供了一个用于递归地遍历文件系统目录的接口
        $dir_iterator = new \RecursiveDirectoryIterator($config_path, \FilesystemIterator::FOLLOW_SYMLINKS);
        // RecursiveIteratorIterator 可用于遍历递归迭代器
        $iterator = new \RecursiveIteratorIterator($dir_iterator);
        foreach ($iterator as $file) {
            /** var SplFileInfo $file */
            if (is_dir($file) || $file->getExtension() != 'php' || \in_array($file->getBaseName('.php'), $exclude_file)) {
                // 是目录、或非.php后缀文件、或在排除的文件中,跳过本次
                // getBaseName('.php') 返回没有路径信息及.php后缀的文件名称
                continue;
            }
            
            /* 判断当前配置是否可用 */ 
            $app_config_file = $file->getPath() . '/app.php';
            if (!is_file($app_config_file)) {
                continue;
            }
            // 如果配置文件的相对深度大于等于2层,则说明是个插件,查看该插件是否可用
            $relative_path = str_replace($config_path . DIRECTORY_SEPARATOR, '', substr($file, 0, -4));
            $explode = array_reverse(explode(DIRECTORY_SEPARATOR, $relative_path));
            if (count($explode) >= 2) {
                $app_config = include $app_config_file;
                if (empty($app_config['enable'])) {
                    // 如果enable属性未配置、或为空、或false,则说明该插件不可用,跳过本次
                    continue;
                }
            }
            
            $config = include $file;
            foreach ($explode as $section) {
                $tmp = [];
                $tmp[$section] = $config;
                $config = $tmp;
            }
            static::$_config = array_replace_recursive(static::$_config, $config);
        }

        // Merge database config
        foreach (static::$_config['plugin'] ?? [] as $firm => $projects) {
            foreach ($projects as $name => $project) {
                foreach ($project['database']['connections'] ?? [] as $key => $connection) {
                    static::$_config['database']['connections']["plugin.$firm.$name.$key"] = $connection;
                }
            }
        }
        if (!empty(static::$_config['database']['connections'])) {
            static::$_config['database']['default'] = static::$_config['database']['default'] ?? key(static::$_config['database']['connections']);
        }
        
        // Merge thinkorm config
        foreach (static::$_config['plugin'] ?? [] as $firm => $projects) {
            foreach ($projects as $name => $project) {
                foreach ($project['thinkorm']['connections'] ?? [] as $key => $connection) {
                    static::$_config['thinkorm']['connections']["plugin.$firm.$name.$key"] = $connection;
                }
            }
        }
        if (!empty(static::$_config['thinkorm']['connections'])) {
            static::$_config['thinkorm']['default'] = static::$_config['thinkorm']['default'] ?? key(static::$_config['thinkorm']['connections']);
        }
        
        // Merge redis config
        foreach (static::$_config['plugin'] ?? [] as $firm => $projects) {
            foreach ($projects as $name => $project) {
                foreach ($project['redis'] ?? [] as $key => $connection) {
                    static::$_config['redis']["plugin.$firm.$name.$key"] = $connection;
                }
            }
        }

        static::$_loaded = true;
    }

    /**
     * @param null $key
     * @param null $default
     * @return array|mixed|null
     */
    public static function get($key = null, $default = null)
    {
        if ($key === null) {
            return static::$_config;
        }
        $key_array = \explode('.', $key);
        $value = static::$_config;
        $finded = true;
        foreach ($key_array as $index) {
            if (!isset($value[$index])) {
                if (static::$_loaded) {
                    return $default;
                }
                $finded = false;
                break;
            }
            $value = $value[$index];
        }
        if ($finded) {
            return $value;
        }
        return static::read($key, $default);
    }

    /**
     * @param $key
     * @param $default
     * @return array|mixed|void|null
     */
    protected static function read($key, $default = null)
    {
        $path = static::$_configPath;
        if ($path === '') {
            return $default;
        }
        $keys = $key_array = \explode('.', $key);
        foreach ($key_array as $index => $section) {
            unset($keys[$index]);
            if (is_file($file = "$path/$section.php")) {
                $config = include $file;
                return static::find($keys, $config, $default);
            }
            if (!is_dir($path = "$path/$section")) {
                return $default;
            }
        }
        return $default;
    }

    /**
     * @param $key_array
     * @param $stack
     * @param $default
     * @return array|mixed
     */
    protected static function find($key_array, $stack, $default)
    {
        if (!is_array($stack)) {
            return $default;
        }
        $value = $stack;
        foreach ($key_array as $index) {
            if (!isset($value[$index])) {
                return $default;
            }
            $value = $value[$index];
        }
        return $value;
    }


    /**
     * @param $config_path
     * @param array $exclude_file
     */
    public static function reload($config_path, $exclude_file = [])
    {
        static::$_config = [];
        static::load($config_path, $exclude_file);
    }

}

零碎点

str_replace

 str_replace(
    array|string $search,
    array|string $replace,
    string|array $subject,
    int &$count = null
): string|array
<?php
// Provides: <body text='black'>
$bodytag = str_replace("%body%", "black", "<body text='%body%'>");

// Provides: Hll Wrld f PHP
$vowels = array("a", "e", "i", "o", "u", "A", "E", "I", "O", "U");
$onlyconsonants = str_replace($vowels, "", "Hello World of PHP");

// Provides: You should eat pizza, beer, and ice cream every day
$phrase  = "You should eat fruits, vegetables, and fiber every day.";
$healthy = array("fruits", "vegetables", "fiber");
$yummy   = array("pizza", "beer", "ice cream");

$newphrase = str_replace($healthy, $yummy, $phrase);

// Provides: 2
$str = str_replace("ll", "", "good golly miss molly!", $count);
echo $count;
?>

strtr

// 三个参数
 strtr(string $string, string $from, string $to): string
// 两个参数
 strtr(string $string, array $replace_pairs): string
<?php
echo strtr("Hilla Warld","ia","eo");  // Hello World

$arr = array("Hello" => "Hi", "world" => "earth");
echo strtr("Hello world",$arr);  // Hi earth
?>

array_replace_recursive

 array_replace_recursive(array $array, array ...$replacements): array
<?php
$base = array('citrus' => array( "orange") , 'berries' => array("blackberry", "raspberry"), );
$replacements = array('citrus' => array('pineapple'), 'berries' => array('blueberry'));

$basket = array_replace_recursive($base, $replacements);
print_r($basket);

$basket = array_replace($base, $replacements);
print_r($basket);
?>

Array
(
    [citrus] => Array
        (
            [0] => pineapple
        )

    [berries] => Array
        (
            [0] => blueberry
            [1] => raspberry
        )

)
Array
(
    [citrus] => Array
        (
            [0] => pineapple
        )

    [berries] => Array
        (
            [0] => blueberry
        )

)
<?php
$base = array('citrus' => array("orange") , 'berries' => array("blackberry", "raspberry"), 'others' => 'banana' );
$replacements = array('citrus' => 'pineapple', 'berries' => array('blueberry'), 'others' => array('litchis'));
$replacements2 = array('citrus' => array('pineapple'), 'berries' => array('blueberry'), 'others' => 'litchis');

$basket = array_replace_recursive($base, $replacements, $replacements2);
print_r($basket);
?>

Array
(
    [citrus] => Array
        (
            [0] => pineapple
        )

    [berries] => Array
        (
            [0] => blueberry
            [1] => raspberry
        )

    [others] => litchis
)

include

$config = include $file;
foreach ($explode as $section) {
    $tmp = [];
    $tmp[$section] = $config;
    $config = $tmp;
}

示例:

<?php
$config = ["a" => 1];
$relative_path = "plugin/webman/redis-queue/process";
$explode = array_reverse(explode(DIRECTORY_SEPARATOR, $relative_path));
print_r($explode);

foreach ($explode as $section) {
    $tmp = [];
    $tmp[$section] = $config;
    $config = $tmp;
}
print_r($config);
?>

Array
(
    [0] => process
    [1] => redis-queue
    [2] => webman
    [3] => plugin
)
Array
(
    [plugin] => Array
        (
            [webman] => Array
                (
                    [redis-queue] => Array
                        (
                            [process] => Array
                                (
                                    [a] => 1
                                )
                        )
                )
        )
)

实例

其他项目对接

有一种使用场景,在其他项目中调用webman项目推送队列数据。

在webman项目的 app/candy/controller 目录下新建文件 Inqueue.php

<?php

namespace app\candy\controller;

use support\Request;
use Webman\RedisQueue\Client;

/**
 * 入队
 * Class Inqueue
 * @package app\candy\controller
 */
class Inqueue
{
    /**
     * @param Request $request
     * @return \support\Response
     */
    public function send(Request $request)
    {
        if (empty($request->data['queue_name'])) {
            return json(failResult("queue_name为空"));
        }
        $queue_name = $request->data['queue_name'];

        $queue_data = !empty($request->data['queue_data']) ? $request->data['queue_data'] : [];
        $queue_delaytime = !empty($request->data['queue_delaytime']) ? (int)$request->data['queue_delaytime'] : 0;

        try {
            Client::send($queue_name, $queue_data, $queue_delaytime);
        } catch (\Throwable $e) {
            return json(failResult($e->getMessage()));
        }

        return json(successResult());
    }
}

其他项目请求domain_name/app/inqueue/send发送请求数据,如:

[
    'data' => ["queue_name" => "order_end", "queue_data" => ["order_id" => 101], "queue_delaytime" => 60], 
    'project_id' => 'web_test', 
    'time' => time(), 
    'sign' => $sign
]

$sign 是 上面数据的签名。

权限使用洋葱模型中间件进行过滤,在项目的 config/middleware.php 文件中写入:

<?php
return [
    //全局中间件
    'candy' => [
        app\middleware\CandyAuthCheck::class,
    ],
];

这里使用到了多应用,可以参考 https://www.workerman.net/doc/webman/multiapp.html

app/middleware/CandyAuthCheck.php 文件中写入:

<?php
namespace app\middleware;

use Webman\MiddlewareInterface;
use Webman\Http\Response;
use Webman\Http\Request;

class CandyAuthCheck implements MiddlewareInterface
{
    public function process(Request $request, callable $next) : Response
    {
        try {
            // 数据准备
            $params = getParams($request, ['project_id', 'data', 'time', 'sign']);

            $project_id = $params['project_id'];
            if (empty($project_id)) {
                return json(failResult('project_id is empty'));
            }

            $data = $params['data'];
            if (empty($data)) {
                return json(failResult('data is empty'));
            }

            $time = (int)$params['time'];
            if (empty($time)) {
                return json(failResult('time is empty'));
            }

            $sign = $params['sign'];
            if (empty($sign)) {
                return json(failResult('sign is empty'));
            }

            // 验签
            $hash_config = config('candy.' .$project_id); // 获取candy应用下其他项目的hash配置,如 $config['candy.web_test']['key'] = 'n5b2abe17cd9f24cf62ah66bad623d5a'
            if (empty($hash_config)) {
                return json(failResult('project config is not found'));
            }

            $arr = [
                'data' => $data,
                'project_id' => $project_id,
                'time' => $time,
            ];
            $sign_hash = hash_hmac('sha256', json_encode($arr), $hash_config['key']);
            if ($sign_hash != $sign) {
                return json(failResult('BaseService验签失败'));
            }

            // 数据处理
            $request->data = json_decode($data, true);
        } catch (\Throwable $e) {
            return json(failResult($e->getMessage()));
        }

        return $next($request);
    }
}

接下来Client::send() 入队处理,就是队列相关的内容了。






参考资料

redis队列插件 https://www.workerman.net/plugin/12

workerman/redis-queue https://www.workerman.net/doc/workerman/components/workerman-redis-queue.html

walkor/webman https://github.com/walkor/webman

str_replace https://www.php.net/manual/en/function.str-replace.php

array_replace_recursive https://www.php.net/manual/en/function.array-replace-recursive.php

strtr https://www.php.net/manual/en/function.strtr.php


返回