


webman中Redis队列实现用的包是 webman/redis-queue ,版本是 v1.1.0

Composer 安装 webman/redis-queue

Redis配置文件自动生成在 config/plugin/webman/redis-queue/redis.php,内容类似如下:

return [
    'default' => [
        'host' => 'redis://',
        'options' => [
            'auth' => '',         // 密码,可选参数
            'db' => 0,            // 数据库
            'max_attempts'  => 5, // 消费失败后,重试次数
            'retry_seconds' => 5, // 重试间隔,单位秒

消费进程配置文件自动生成在 config/plugin/webman/redis-queue/process.php,内容类似如下:

return [
    'consumer'  => [
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8, // 可以设置多进程同时消费
        'constructor' => [
            // 消费者类目录
            'consumer_dir' => app_path() . '/queue/redis'

以及自动生成的该插件是否可用配置文件 config/plugin/webman/redis-queue/app.php

return [
    'enable' => true,


app_path() . '/queue/redis' 目录下写消费者文件,如 ProductEdit.php 文件:

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

 * Class ProductEdit
 * @package app\queue\redis\yq
class ProductEdit implements Consumer
    public $queue = 'product_edit';

    public $connection = 'default';

    public function consume($data)
        if (empty($data["product_id"])) {

        /* 处理逻辑 */


php start.php start -d


在 项目根目录下的 start.php 文件中有:

#!/usr/bin/env php
require_once __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;
use Workerman\Protocols\Http;
use Workerman\Connection\TcpConnection;
use Webman\App;
use Webman\Config;
use Webman\Route;
use Webman\Middleware;
use Dotenv\Dotenv;
use support\Request;
use support\Log;
use support\Container;

ini_set('display_errors', 'on');

// 加载环境配置文件
if (class_exists('Dotenv\Dotenv') && file_exists(base_path() . '/.env')) {
    if (method_exists('Dotenv\Dotenv', 'createUnsafeImmutable')) {
    } else {

// 加载应用配置文件,包括 自动加载、容器、数据库、插件进程 等
Config::load(config_path(), ['route', 'container']);

if ($timezone = config('app.default_timezone')) {

$runtime_logs_path = runtime_path() . DIRECTORY_SEPARATOR . 'logs';
if ( !file_exists($runtime_logs_path) || !is_dir($runtime_logs_path) ) {
    if (!mkdir($runtime_logs_path,0777,true)) {
        throw new \RuntimeException("Failed to create runtime logs directory. Please check the permission.");

$runtime_views_path = runtime_path() . DIRECTORY_SEPARATOR . 'views';
if ( !file_exists($runtime_views_path) || !is_dir($runtime_views_path) ) {
    if (!mkdir($runtime_views_path,0777,true)) {
        throw new \RuntimeException("Failed to create runtime views directory. Please check the permission.");

Worker::$onMasterReload = function () {
    if (function_exists('opcache_get_status')) {
        if ($status = opcache_get_status()) {
            if (isset($status['scripts']) && $scripts = $status['scripts']) {
                foreach (array_keys($scripts) as $file) {
                    opcache_invalidate($file, true);

$config = config('server');
Worker::$pidFile = $config['pid_file'];
Worker::$stdoutFile = $config['stdout_file'];
Worker::$logFile = $config['log_file'];
Worker::$eventLoopClass = $config['event_loop'] ?? '';
TcpConnection::$defaultMaxPackageSize = $config['max_package_size'] ?? 10 * 1024 * 1024;
if (property_exists(Worker::class, 'statusFile')) {
    Worker::$statusFile = $config['status_file'] ?? '';

if ($config['listen']) {
    $worker = new Worker($config['listen'], $config['context']);
    $property_map = [
    foreach ($property_map as $property) {
        if (isset($config[$property])) {
            $worker->$property = $config[$property];

    $worker->onWorkerStart = function ($worker) {
        require_once base_path() . '/support/bootstrap.php';
        $app = new App($worker, Container::instance(), Log::channel('default'), app_path(), public_path());
        Http::requestClass(config('app.request_class', config('server.request_class', Request::class)));
        $worker->onMessage = [$app, 'onMessage'];

// 自定义进程
// Windows does not support custom processes.
    // 一级进程
    foreach (config('process', []) as $process_name => $config) {
        worker_start($process_name, $config);
    // 插件进程
    foreach (config('plugin', []) as $firm => $projects) {
        foreach ($projects as $name => $project) {
            foreach ($project['process'] ?? [] as $process_name => $config) {
                worker_start("plugin.$firm.$name.$process_name", $config);


加载应用配置文件:Config::load(config_path(), ['route', 'container']);

config_path() 获取配置文件所在目录地址,在 support/helpers.php 文件中。

Config::load()Webman\Config 类文件中。

start.php 中加载自定义进程:

    foreach (config('process', []) as $process_name => $config) {
        worker_start($process_name, $config);
    foreach (config('plugin', []) as $firm => $projects) {
        foreach ($projects as $name => $project) {
            foreach ($project['process'] ?? [] as $process_name => $config) {
                worker_start("plugin.$firm.$name.$process_name", $config);

config('plugin', []) 获取插件配置,返回的结果:

array(1) {
  array(3) {
    array(1) {
      array(7) {
        string(34) "/var/www/html/webman/build"
        string(11) "webman.phar"
        string(0) ""
        string(228) "#^(?!.*(config/plugin/webman/console/app.php|webman/console/src/Commands/(PharPackCommand.php|ReloadCommand.php)|LICENSE|composer.json|.github|.idea|doc|docs|.git|.setting|runtime|test|test_old|tests|Tests|vendor-bin|.md))(.*)$#"
        array(5) {
          string(4) ".env"
          string(7) "LICENSE"
          string(13) "composer.json"
          string(13) "composer.lock"
          string(9) "start.php"
    array(2) {
      array(7) {
        string(24) "websocket://"
        string(19) ""
        string(32) "dfa7def75955c62f88e02526691ee155"
        string(32) "cd88c5033d8e591dd248408734be233f"
        string(45) ""
        string(24) "/plugin/webman/push/auth"
      array(1) {
        array(5) {
          string(18) "Webman\Push\Server"
          string(24) "websocket://"
          array(2) {
            string(19) ""
            array(1) {
              array(2) {
                string(45) ""
                string(32) "cd88c5033d8e591dd248408734be233f"
    array(3) {
      array(1) {
      array(1) {
        array(3) {
          string(34) "Webman\RedisQueue\Process\Consumer"
          array(1) {
            string(44) "/var/www/html/webman/app/queue/redis"
      array(1) {
        array(2) {
          string(25) "redis://"
          array(4) {
            string(12) "eszEDCrdx341"
            string(1) "3"

worker_start() 启动进程,调用的 Workerman\Worker 类实现。

php start.php start启动中可以看到加载的进程名:

root@514444b84e17:/var/www/html/webman# php start.php start
Workerman[start.php] start in DEBUG mode
------------------------------------------------------ WORKERMAN -------------------------------------------------------
Workerman version:4.0.30          PHP version:7.4.28
------------------------------------------------------- WORKERS --------------------------------------------------------
proto   user            worker                                listen                      processes    status
tcp     root            webman                               8             [OK]
tcp     root            monitor                               none                        1             [OK]
tcp     root            plugin.webman.push.server             websocket://    1             [OK]
tcp     root            plugin.webman.redis-queue.consumer    none                        6             [OK]
Press Ctrl+C to stop. Start success.


Webman\RedisQueue\Process\Consumer 类文件内容:

namespace Webman\RedisQueue\Process;

use support\Container;
use Webman\RedisQueue\Client;

 Class Consumer
 * @package process
class Consumer
     * @var string
    protected $_consumerDir = '';

     * StompConsumer constructor.
     * @param string $consumer_dir
    public function __construct($consumer_dir = '')
        $this->_consumerDir = $consumer_dir;

     * onWorkerStart.
    public function onWorkerStart()
        if (!is_dir($this->_consumerDir)) {
            echo "Consumer directory {$this->_consumerDir} not exists\r\n";
        $dir_iterator = new \RecursiveDirectoryIterator($this->_consumerDir);
        $iterator = new \RecursiveIteratorIterator($dir_iterator);
        foreach ($iterator as $file) {
            if (is_dir($file)) {
            $fileinfo = new \SplFileInfo($file);
            $ext = $fileinfo->getExtension();
            if ($ext === 'php') {
                $class = str_replace('/', "\\", substr(substr($file, strlen(base_path())), 0, -4));
                if (is_a($class, 'Webman\RedisQueue\Consumer', true)) {
                    $consumer = Container::get($class);
                    $connection_name = $consumer->connection ?? 'default';
                    $queue = $consumer->queue;
                    $connection = Client::connection($connection_name);
                    $connection->subscribe($queue, [$consumer, 'consume']);


Webman\RedisQueue\Client 类文件内容:

namespace Webman\RedisQueue;

use Workerman\RedisQueue\Client as RedisClient;

 Class RedisQueue
 * @package support
 * Strings methods
 * @method static void send($queue, $data, $delay=0)
class Client
     * @var Client[]
    protected static $_connections = null;

     * 用redis队列插件中的配置文件中的配置信息连接redis服务
     * @param string $name
     * @return RedisClient
    public static function connection($name = 'default') {
        if (!isset(static::$_connections[$name])) {
            $config = config('redis_queue', config('plugin.webman.redis-queue.redis', []));
            if (!isset($config[$name])) {
                throw new \RuntimeException("RedisQueue connection $name not found");
            $host = $config[$name]['host'];
            $options = $config[$name]['options'];
            $client = new RedisClient($host, $options);
            static::$_connections[$name] = $client;
        return static::$_connections[$name];

     * @param $name
     * @param $arguments
     * @return mixed
    public static function __callStatic($name, $arguments)
        return static::connection('default')->{$name}(... $arguments);

Workerman\RedisQueue\Client 类文件内容:

namespace Workerman\RedisQueue;

use RuntimeException;
use Workerman\Lib\Timer;
use Workerman\Redis\Client as Redis;

 Class Client
 * @package Workerman\RedisQueue
class Client
     * Queue waiting for consumption
    const QUEUE_WAITING = '{redis-queue}-waiting';

     * Queue with delayed consumption
    const QUEUE_DELAYED = '{redis-queue}-delayed';

     * Queue with consumption failure
    const QUEUE_FAILD = '{redis-queue}-failed';

     * @var Redis
    protected $_redisSubscribe;

     * @var Redis
    protected $_redisSend;

     * @var array
    protected $_subscribeQueues = [];

     * @var array
    protected $_options = [
        'retry_seconds' => 5,
        'max_attempts'  => 5,
        'auth'          => '',
        'db'            => 0,

     * Client constructor.
     * @param $address
     * @param array $options
    public function __construct($address, $options = [])
        $this->_redisSubscribe = new Redis($address, $options);
        $this->_redisSubscribe->brPoping = 0;
        $this->_redisSend = new Redis($address, $options);
        if (isset($options['auth'])) {
        if (isset($options['db'])) {
        $this->_options = array_merge($this->_options, $options);

     * Send.
     * @param $queue
     * @param $data
     * @param int $delay
     * @param callable $cb
    public function send($queue, $data, $delay = 0, $cb = null)
        static $_id = 0;
        $id = \microtime(true) . '.' . (++$_id);
        $now = time();
        $package_str = \json_encode([
            'id'       => $id,
            'time'     => $now,
            'delay'    => $delay,
            'attempts' => 0,
            'queue'    => $queue,
            'data'     => $data
        if (\is_callable($delay)) {
            $cb = $delay;
            $delay = 0;
        if ($cb) {
            $cb = function ($ret) use ($cb) {
            if ($delay == 0) {
                $this->_redisSend->lPush(static::QUEUE_WAITING . $queue, $package_str, $cb);
            } else {
                $this->_redisSend->zAdd(static::QUEUE_DELAYED, $now + $delay, $package_str, $cb);
        if ($delay == 0) {
            $this->_redisSend->lPush(static::QUEUE_WAITING . $queue, $package_str);
        } else {
            $this->_redisSend->zAdd(static::QUEUE_DELAYED, $now + $delay, $package_str);

     * Subscribe.
     * @param string|array $queue
     * @param callable $callback
    public function subscribe($queue, callable $callback)
        $queue = (array)$queue;
        foreach ($queue as $q) {
            $redis_key = static::QUEUE_WAITING . $q;
            $this->_subscribeQueues[$redis_key] = $callback;

     * Unsubscribe.
     * @param string|array $queue
     * @return void
    public function unsubscribe($queue)
        $queue = (array)$queue;
        foreach($queue as $q) {
            $redis_key = static::QUEUE_WAITING . $q;

     * tryToPullDelayQueue.
    protected function tryToPullDelayQueue()
        static $retry_timer = 0;
        if ($retry_timer) {
        $retry_timer = Timer::add(1, function () {
            $now = time();
            $options = ['LIMIT', 0, 128];
            $this->_redisSend->zrevrangebyscore(static::QUEUE_DELAYED, $now, '-inf', $options, function($items){
                if ($items === false) {
                    throw new RuntimeException($this->_redisSend->error());
                foreach ($items as $package_str) {
                    $this->_redisSend->zRem(static::QUEUE_DELAYED, $package_str, function ($result) use ($package_str) {
                        if ($result !== 1) {
                        $package = \json_decode($package_str, true);
                        if (!$package) {
                            $this->_redisSend->lPush(static::QUEUE_FAILD , $package_str);
                        $this->_redisSend->lPush(static::QUEUE_WAITING . $package['queue'], $package_str);

     * pull.
    public function pull()
        if (!$this->_subscribeQueues || $this->_redisSubscribe->brPoping) {
        $cb = function($data) use (&$cb) {
            if ($data) {
                $this->_redisSubscribe->brPoping = 0;
                $redis_key = $data[0];
                $package_str = $data[1];
                $package = json_decode($package_str, true);
                if (!$package) {
                    $this->_redisSend->lPush(static::QUEUE_FAILD, $package_str);
                } else {
                    if (!isset($this->_subscribeQueues[$redis_key])) {
                        // 取消订阅,放回队列
                        $this->_redisSend->rPush($redis_key, $package_str);
                    } else {
                        $callback = $this->_subscribeQueues[$redis_key];
                        try {
                            \call_user_func($callback, $package['data']);
                        } catch (\Exception $e) {
                            if (++$package['attempts'] > $this->_options['max_attempts']) {
                                $package['error'] = (string) $e;
                            } else {
                        } catch (\Error $e) {
                            if (++$package['attempts'] > $this->_options['max_attempts']) {
                                $package['error'] = (string) $e;
                            } else {
            if ($this->_subscribeQueues) {
                $this->_redisSubscribe->brPoping = 1;
                Timer::add(0.000001, [$this->_redisSubscribe, 'brPop'], [\array_keys($this->_subscribeQueues), 1, $cb] ,false);
        $this->_redisSubscribe->brPoping = 1;
        $this->_redisSubscribe->brPop(\array_keys($this->_subscribeQueues), 1, $cb);

     * @param $package
    protected function retry($package)
        $delay = time() + $this->_options['retry_seconds'] * ($package['attempts']);
        $this->_redisSend->zAdd(static::QUEUE_DELAYED, $delay, \json_encode($package));

     * @param $package
    protected function fail($package)
        $this->_redisSend->lPush(static::QUEUE_FAILD , \json_encode($package));


Workerman\Redis\Client 类文件内容:

namespace Workerman\Redis;

use Workerman\Connection\AsyncTcpConnection;
use Workerman\Lib\Timer;

 Class Client
 * @package Workerman\Redis
class Client
     * @var AsyncTcpConnection
    protected $_connection = null;

     * @var array
    protected $_options = [];

     * @var string
    protected $_address = '';

     * @var array
    protected $_queue = [];

     * @var int
    protected $_db = 0;

     * @var string|array
    protected $_auth = null;

     * @var bool
    protected $_waiting = true;

     * @var Timer
    protected $_connectTimeoutTimer = null;

     * @var Timer
    protected $_reconnectTimer = null;

     * @var callable
    protected $_connectionCallback = null;

     * @var Timer
    protected $_waitTimeoutTimer = null;

     * @var string
    protected $_error = '';

     * @var bool
    protected $_subscribe = false;

     * @var bool
    protected $_firstConnect = true;

     * Client constructor.
     * @param $address
     * @param array $options
     * @param null $callback
    public function __construct($address, $options = [], $callback = null)
        if (!\class_exists('Protocols\Redis')) {
            \class_alias('Workerman\Redis\Protocols\Redis', 'Protocols\Redis');
        $this->_address = $address;
        $this->_options = $options;
        $this->_connectionCallback = $callback;
        $timer = Timer::add(1, function () use (&$timer) {
            if (empty($this->_queue)) {
            if ($this->_subscribe) {
            $current_queue = current($this->_queue);
            $current_command = $current_queue[0][0];
            $ignore_first_queue = in_array($current_command, ['BLPOP', 'BRPOP']);
            $time = time();
            $timeout = isset($this->_options['wait_timeout']) ? $this->_options['wait_timeout'] : 600;
            $has_timeout = false;
            $first_queue = true;
            foreach ($this->_queue as $key => $queue) {
                if ($first_queue && $ignore_first_queue) {
                    $first_queue = false;
                if ($time - $queue[1] > $timeout) {
                    $has_timeout = true;
                    $msg = "Workerman Redis Wait Timeout ($timeout seconds)";
                    if ($queue[2]) {
                        $this->_error = $msg;
                        \call_user_func($queue[2], false, $this);
                    } else {
                        echo new Exception($msg);
            if ($has_timeout && !$ignore_first_queue) {

     * connect
    public function connect()
        if ($this->_connection) {

        $timeout = isset($this->_options['connect_timeout']) ? $this->_options['connect_timeout'] : 5;
        $context = isset($this->_options['context']) ? $this->_options['context'] : [];
        $this->_connection = new AsyncTcpConnection($this->_address, $context);

        $this->_connection->onConnect = function () {
            $this->_waiting = false;
            if ($this->_reconnectTimer) {
                $this->_reconnectTimer = null;

            if ($this->_db) {
                $this->_queue = \array_merge([[['SELECT', $this->_db], time(), null]], $this->_queue);

            if ($this->_auth) {
                $this->_queue = \array_merge([[['AUTH', $this->_auth], time(), null]],  $this->_queue);

            $this->_connection->onError = function ($connection, $code, $msg) {
                echo new \Exception("Workerman Redis Connection Error $code $msg");
            $this->_firstConnect && $this->_connectionCallback && \call_user_func($this->_connectionCallback, true, $this);
            $this->_firstConnect = false;

        $time_start = microtime(true);
        $this->_connection->onError = function ($connection) use ($time_start) {
            $time = microtime(true) - $time_start;
            $msg = "Workerman Redis Connection Failed ($time seconds)";
            $this->_error = $msg;
            $exception = new \Exception($msg);
            if (!$this->_connectionCallback) {
                echo $exception;
            $this->_firstConnect && \call_user_func($this->_connectionCallback, false, $this);

        $this->_connection->onClose = function () use ($time_start) {
            $this->_subscribe = false;
            if ($this->_connectTimeoutTimer) {
            if ($this->_reconnectTimer) {
                $this->_reconnectTimer = null;
            if (microtime(true) - $time_start > 5) {
            } else {
                $this->_reconnectTimer = Timer::add(5, function () {
                }, null, false);

        $this->_connection->onMessage = function ($connection, $data) {
            $this->_error = '';
            $this->_waiting = false;
            $queue = current($this->_queue);
            $cb = $queue[2];
            $type = $data[0];
            if (!$this->_subscribe) {
            $success = $type === '-' || $type === '!' ? false : true;
            $exception = false;
            $result = false;
            if ($success) {
                $result = $data[1];
                if ($type === '+' && $result === 'OK') {
                    $result = true;
            } else {
                $this->_error = $data[1];
            if (!$cb) {
            // format.
            if (!empty($queue[3])) {
                $result = \call_user_func($queue[3], $result);
            try {
                \call_user_func($cb, $result, $this);
            } catch (\Exception $exception) {

            if ($type === '!') {
            } else {
            if ($exception) {
                throw $exception;

        $this->_connectTimeoutTimer = Timer::add($timeout, function () use ($timeout) {
            $this->_connectTimeoutTimer = null;
            if ($this->_connection && $this->_connection->getStatus(false) === 'ESTABLISHED') {
            $this->_error = "Workerman Redis Connection to {$this->_address} timeout ({$timeout} seconds)";
            if ($this->_firstConnect && $this->_connectionCallback) {
                \call_user_func($this->_connectionCallback, false, $this);
            } else {
                echo $this->_error . "\n";


     * process
    public function process()
        if (!$this->_connection || $this->_waiting || empty($this->_queue) || $this->_subscribe) {
        $queue = \current($this->_queue);
        if ($queue[0][0] === 'SUBSCRIBE' || $queue[0][0] === 'PSUBSCRIBE') {
            $this->_subscribe = true;
        $this->_waiting = true;
        $this->_error = '';

     * subscribe
     * @param $channels
     * @param $cb
    public function subscribe($channels, $cb)
        $new_cb = function ($result) use ($cb) {
            if (!$result) {
                echo $this->error();
            $response_type = $result[0];
            switch ($response_type) {
                case 'subscribe':
                case 'message':
                    \call_user_func($cb, $result[1], $result[2], $this);
                    echo 'unknow response type for subscribe. buffer:' . serialize($result) . "\n";
        $this->_queue[] = [['SUBSCRIBE', $channels], time(), $new_cb];

     * psubscribe
     * @param $patterns
     * @param $cb
    public function pSubscribe($patterns, $cb)
        $new_cb = function ($result) use ($cb) {
            if (!$result) {
                echo $this->error();
            $response_type = $result[0];
            switch ($response_type) {
                case 'psubscribe':
                case 'pmessage':
                    \call_user_func($cb, $result[1], $result[2], $result[3], $this);
                    echo 'unknow response type for psubscribe. buffer:' . serialize($result) . "\n";
        $this->_queue[] = [['PSUBSCRIBE', $patterns], time(), $new_cb];

     * select
     * @param $db
     * @param null $cb
    public function select($db, $cb = null)
        $format = function ($result) use ($db) {
            $this->_db = $db;
            return $result;
        $cb = $cb ? $cb : function(){};
        $this->_queue[] = [['SELECT', $db], time(), $cb, $format];

     * auth
     * @param string|array $auth
     * @param null $cb
    public function auth($auth, $cb = null)
        $format = function ($result) use ($auth) {
            $this->_auth = $auth;
            return $result;
        $cb = $cb ? $cb : function(){};
        $this->_queue[] = [['AUTH', $auth], time(), $cb, $format];

     * set
     * @param $key
     * @param $value
     * @param null $cb
     * @return null
    public function set($key, $value, $cb = null)
        $args = func_get_args();
        if ($cb !== null && !\is_callable($cb)) {
            $timeout = $cb;
            $cb = null;
            if (\count($args) > 3) {
                $cb = $args[3];
            $this->_queue[] = [['SETEX', $key, $timeout, $value], time(), $cb];
            return null;
        $this->_queue[] = [['SET', $key, $value], time(), $cb];

     * incr
     * @param $key
     * @param null $cb
     * @return null
    public function incr($key, $cb = null)
        $args = func_get_args();
        if ($cb !== null && !\is_callable($cb)) {
            $num = $cb;
            $cb = null;
            if (\count($args) > 2) {
                $cb = $args[2];
            $this->_queue[] = [['INCRBY', $key, $num], time(), $cb];
            return null;
        $this->_queue[] = [['INCR', $key], time(), $cb];

     * decr
     * @param $key
     * @param null $cb
     * @return null
    public function decr($key, $cb = null)
        $args = func_get_args();
        if ($cb !== null && !\is_callable($cb)) {
            $num = $cb;
            $cb = null;
            if (\count($args) > 2) {
                $cb = $args[2];
            $this->_queue[] = [['DECRBY', $key, $num], time(), $cb];
            return null;
        $this->_queue[] = [['DECR', $key], time(), $cb];

     * sort
     * @param $key
     * @param $options
     * @param null $cb
    function sort($key, $options, $cb = null)
        $args = [];
        if (isset($options['sort'])) {
            $args[] = $options['sort'];

        foreach ($options as $op => $value) {
            $args[] = $op;
            if (!is_array($value)) {
                $args[] = $value;
            foreach ($value as $sub_value) {
                $args[] = $sub_value;
        \array_unshift($args, 'SORT', $key);
        $this->_queue[] = [$args, time(), $cb];

     * mSet
     * @param array $array
     * @param null $cb
    public function mSet(array $array, $cb = null)
        $this->mapCb('MSET', $array, $cb);

     * mSetNx
     * @param array $array
     * @param null $cb
    public function mSetNx(array $array, $cb = null)
        $this->mapCb('MSETNX', $array, $cb);

     * mapCb
     * @param $command
     * @param array $array
     * @param $cb
    protected function mapCb($command, array $array, $cb)
        $args = [$command];
        foreach ($array as $key => $value) {
            $args[] = $key;
            $args[] = $value;
        $this->_queue[] = [$args, time(), $cb];

     * hMSet
     * @param $key
     * @param array $array
     * @param null $cb
    public function hMSet($key, array $array, $cb = null)
        $this->keyMapCb('HMSET', $key, $array, $cb);

     * hMGet
     * @param $key
     * @param array $array
     * @param null $cb
    public function hMGet($key, array $array, $cb = null)
        $format = function ($result) use ($array) {
            if (!is_array($result)) {
                return $result;
            return \array_combine($array, $result);
        $this->_queue[] = [['HMGET', $key, $array], time(), $cb, $format];

     * hGetAll
     * @param $key
     * @param null $cb
    public function hGetAll($key, $cb = null)
        $format = function ($result) {
            if (!\is_array($result)) {
                return $result;
            $return = [];
            $key = '';
            foreach ($result as $index => $item) {
                if ($index % 2 == 0) {
                    $key = $item;
                $return[$key] = $item;
            return $return;
        $this->_queue[] = [['HGETALL', $key], time(), $cb, $format];

     * keyMapCb
     * @param $command
     * @param $key
     * @param array $array
     * @param $cb
    protected function keyMapCb($command, $key, array $array, $cb)
        $args = [$command, $key];
        foreach ($array as $key => $value) {
            $args[] = $key;
            $args[] = $value;
        $this->_queue[] = [$args, time(), $cb];

     * __call
     * @param $method
     * @param $args
    public function __call($method, $args)
        $cb = null;
        if (\is_callable(end($args))) {
            $cb = array_pop($args);

        \array_unshift($args, \strtoupper($method));
        $this->_queue[] = [$args, time(), $cb];

     * closeConnection
    public function closeConnection()
        if (!$this->_connection) {
        $this->_subscribe = false;
        $this->_connection->onConnect = $this->_connection->onError = $this->_connection->onClose =
        $this->_connection->onMessge = null;
        $this->_connection = null;

     * error
     * @return string
    function error()
        return $this->_error;

     * close
    public function close()
        $this->_queue = [];

     * scan
     * @throws Exception
    public function scan()
        throw new Exception('Not implemented');

     * hScan
     * @throws Exception
    public function hScan()
        throw new Exception('Not implemented');

     * hScan
     * @throws Exception
    public function sScan()
        throw new Exception('Not implemented');

     * hScan
     * @throws Exception
    public function zScan()
        throw new Exception('Not implemented');

failed 队列数据写入数据库

发现上面把处理不了的队列数据都扔到了 {redis-queue}-failed 队列中,我们这里想办法把这个队列中的数据写入到数据库中。


新建lib/webman/redisQueue/failed/Consumer.php 文件,内容:


namespace lib\webman\redisQueue\failed;

use support\Container;
use Webman\RedisQueue\Process\Consumer as webmanConsumer;

 * Class Consumer
 * @package process
class Consumer extends webmanConsumer
     * onWorkerStart.
    public function onWorkerStart()
        if (!is_dir($this->_consumerDir)) {
            echo "Consumer directory {$this->_consumerDir} not exists\r\n";
        $dir_iterator = new \RecursiveDirectoryIterator($this->_consumerDir);
        $iterator = new \RecursiveIteratorIterator($dir_iterator);
        foreach ($iterator as $file) {
            if (is_dir($file)) {
            $fileinfo = new \SplFileInfo($file);
            $ext = $fileinfo->getExtension();
            if ($ext === 'php') {
                $class = str_replace('/', "\\", substr(substr($file, strlen(base_path())), 0, -4));
                if (is_a($class, 'Webman\RedisQueue\Consumer', true)) {
                    $consumer = Container::get($class);
                    $connection_name = $consumer->connection ?? 'default';
                    $queue = $consumer->queue;
                    $connection = Client::connection($connection_name);
                    $connection->subscribe($queue, [$consumer, 'consume']);

新建lib/webman/redisQueue/failed/Client.php 文件,内容:


namespace lib\webman\redisQueue\failed;

use Webman\RedisQueue\Client as webmanClient;

 * Class Client
 * @package lib\webman\redisQueue\failed
class Client extends webmanClient
     * @param string $name
     * @return RedisClient
    public static function connection($name = 'default')
        if (!isset(static::$_connections[$name])) {
            $config = config('redis_queue', config('plugin.webman.redis-queue.redis', []));
            if (!isset($config[$name])) {
                throw new \RuntimeException("RedisQueue connection $name not found");
            $host = $config[$name]['host'];
            $options = $config[$name]['options'];
            $client = new RedisClient($host, $options);
            static::$_connections[$name] = $client;

        return static::$_connections[$name];

新建lib/webman/redisQueue/failed/RedisClient.php 文件,内容:


namespace lib\webman\redisQueue\failed;

use Workerman\Lib\Timer;
use Workerman\RedisQueue\Client;
use support\Log;

 * Class RedisClient
 * @package lib\webman\redisQueue\failed
class RedisClient extends Client
     * @param array|string $queue
     * @param callable $callback
    public function subscribe($queue, callable $callback)
        $this->_subscribeQueues[$queue] = $callback;


     * pull
    public function pull()
        if (!$this->_subscribeQueues || $this->_redisSubscribe->brPoping) {
        $cb = function($data) use (&$cb) {
            if ($data) {
                $this->_redisSubscribe->brPoping = 0;
                $redis_key = $data[0];
                $package_str = $data[1];
                $package = json_decode($package_str, true);
                if (!$package) {
                    $log = Log::channel('default');
                    $log->error('lib\webman\redisQueue\failed\RedisClient::pull', ["package_str" => $package_str]);
                } else {
                    if (!isset($this->_subscribeQueues[$redis_key])) {
                        // 取消订阅,放回队列
                        $this->_redisSend->rPush($redis_key, $package_str);
                    } else {
                        $callback = $this->_subscribeQueues[$redis_key];
                        try {
                            \call_user_func($callback, $package);
                        } catch (\Throwable $e) {
                            // 记录异常日志
                            $package['Throwable'] = (string) $e;

                            $log = Log::channel('default');
                            $log->error('lib\webman\redisQueue\failed\RedisClient::pull', $package);
            if ($this->_subscribeQueues) {
                $this->_redisSubscribe->brPoping = 1;
                Timer::add(0.000001, [$this->_redisSubscribe, 'brPop'], [\array_keys($this->_subscribeQueues), 1, $cb] ,false);
        $this->_redisSubscribe->brPoping = 1;
        $this->_redisSubscribe->brPop(\array_keys($this->_subscribeQueues), 1, $cb);



CREATE TABLE `redis_queue_failed_record` (
  `data` text COLLATE utf8mb4_unicode_ci COMMENT '数据',
  `status` tinyint(4) NOT NULL DEFAULT '10' COMMENT '状态,10:待处理,20:处理成功,30:处理失败',
  `created_at` timestamp NULL DEFAULT NULL,
  `updated_at` timestamp NULL DEFAULT NULL,
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=DYNAMIC COMMENT='redis队列错误记录';

model模型类也建好,app/model/RedisQueueFailedRecord.php 文件内容:


namespace app\model;

use support\Model;

 * redis队列错误记录
class RedisQueueFailedRecord extends Model
     * 模型的连接名称
     * @var string
    protected $connection = 'mysql';

     * The table associated with the model.
     * @var string
    protected $table = 'redis_queue_failed_record';

     * The primary key associated with the table.
     * @var string
    protected $primaryKey = 'id';

     * Indicates if the model should be timestamped.
     * 建议true;自动管理created_at(开始时间),updated_at(更新时间)
     * @var bool
    public $timestamps = true;
     * 可以适用create方法直接创建的属性 
    protected $fillable = [

     * 类型转换
    protected $casts = [
        "data" => "json",

    const STATUS_UNDEAL = 10;  // 待处理
    const STATUS_DEALED_SUCC = 20;  // 处理成功
    const STATUS_DEALED_FAIL = 30;  // 处理失败

新建 app/queue/redisFailed/RedisQueueFailed.php 文件,内容:

namespace app\queue\redisFailed;

use Webman\RedisQueue\Consumer;
use app\model\RedisQueueFailedRecord;

 * redis队列错误
 * Class fddContractFiling
 * @package app\queue\redis
class RedisQueueFailed implements Consumer
    public $queue = \Workerman\RedisQueue\Client::QUEUE_FAILD;

    public $connection = 'default';

    public function consume($data)
        $model = new RedisQueueFailedRecord();
        $model->data = $data;
        $model->status = RedisQueueFailedRecord::STATUS_UNDEAL;

        if (!$model->save()) {
            throw new \RuntimeException("redis_queue_failed_record data save failed");


config/plugin/webman/redis-queue/process.php 配置文件中,新增 consumerFailed 配置:

return [
    'consumer'  => [
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8, // 可以设置多进程同时消费
        'constructor' => [
            // 消费者类目录
            'consumer_dir' => app_path() . '/queue/redis'
    'consumerFailed'  => [
        'handler'     => lib\webman\redisQueue\failed\Consumer::class,
        'count'       => 1, // 可以设置多进程同时消费
        'constructor' => [
            // 消费者类目录
            'consumer_dir' => app_path() . '/queue/redisFailed'



support/helpers.php 文件

support/helpers.php 是一个帮助函数库:


use support\Container;
use Workerman\Worker;
use Webman\Config;
// 省略若干...

 * @return bool
function is_phar()
    return class_exists(\Phar::class, false) && Phar::running();

// Phar support.
if (is_phar()) {
    define('BASE_PATH', dirname(__DIR__));
} else {
    define('BASE_PATH', realpath(__DIR__ . '/../'));
define('WEBMAN_VERSION', '1.3.0');

 * @param $return_phar
 * @return false|string
function base_path($return_phar = true)
    static $real_path = '';
    if (!$real_path) {
        $real_path = is_phar() ? dirname(Phar::running(false)) : BASE_PATH;
    return $return_phar ? BASE_PATH : $real_path;

 * @return string
function app_path()

 * @return string
function public_path()
    static $path = '';
    if (!$path) {
        $path = config('app.public_path', BASE_PATH . DIRECTORY_SEPARATOR . 'public');
    return $path;

 * 获取配置目录路径
 * @return string
function config_path()
    return BASE_PATH . DIRECTORY_SEPARATOR . 'config';

 * Phar support.
 * Compatible with the 'realpath' function in the phar file.
 * @return string
function runtime_path()
    static $path = '';
    if (!$path) {
        $path = config('app.runtime_path', BASE_PATH . DIRECTORY_SEPARATOR . 'runtime');
    return $path;

// 省略若干...

 * 获取配置参数值
 * @param $key
 * @param null $default
 * @return mixed
function config($key = null, $default = null)
    return Config::get($key, $default);

// 省略若干...

 * @param $worker
 * @param $class
function worker_bind($worker, $class)
    $callback_map = [
    foreach ($callback_map as $name) {
        if (method_exists($class, $name)) {
            $worker->$name = [$class, $name];
    if (method_exists($class, 'onWorkerStart')) {
        call_user_func([$class, 'onWorkerStart'], $worker);

 * @param $process_name
 * @param $config
 * @return void
function worker_start($process_name, $config)
    $worker = new Worker($config['listen'] ?? null, $config['context'] ?? []);
    $property_map = [
    $worker->name = $process_name;
    foreach ($property_map as $property) {
        if (isset($config[$property])) {
            $worker->$property = $config[$property];

    $worker->onWorkerStart = function ($worker) use ($config) {
        require_once base_path() . '/support/bootstrap.php';

        foreach ($config['services'] ?? [] as $server) {
            if (!class_exists($server['handler'])) {
                echo "process error: class {$server['handler']} not exists\r\n";
            $listen = new Worker($server['listen'] ?? null, $server['context'] ?? []);
            if (isset($server['listen'])) {
                echo "listen: {$server['listen']}\n";
            $instance = Container::make($server['handler'], $server['constructor'] ?? []);
            worker_bind($listen, $instance);

        if (isset($config['handler'])) {
            if (!class_exists($config['handler'])) {
                echo "process error: class {$config['handler']} not exists\r\n";

            $instance = Container::make($config['handler'], $config['constructor'] ?? []);
            worker_bind($worker, $instance);


// 省略若干...

Webman\Config 类

namespace Webman;

class Config

     * @var array
    protected static $_config = [];

     * @var string
    protected static $_configPath = '';

     * @var bool
    protected static $_loaded = false;

     * @param $config_path 配置文件路径
     * @param array $exclude_file 排除的文件
    public static function load($config_path, $exclude_file = [])
        static::$_configPath = $config_path;
        if (!$config_path) {
        // RecursiveDirectoryIterator 提供了一个用于递归地遍历文件系统目录的接口
        $dir_iterator = new \RecursiveDirectoryIterator($config_path, \FilesystemIterator::FOLLOW_SYMLINKS);
        // RecursiveIteratorIterator 可用于遍历递归迭代器
        $iterator = new \RecursiveIteratorIterator($dir_iterator);
        foreach ($iterator as $file) {
            /** var SplFileInfo $file */
            if (is_dir($file) || $file->getExtension() != 'php' || \in_array($file->getBaseName('.php'), $exclude_file)) {
                // 是目录、或非.php后缀文件、或在排除的文件中,跳过本次
                // getBaseName('.php') 返回没有路径信息及.php后缀的文件名称
            /* 判断当前配置是否可用 */ 
            $app_config_file = $file->getPath() . '/app.php';
            if (!is_file($app_config_file)) {
            // 如果配置文件的相对深度大于等于2层,则说明是个插件,查看该插件是否可用
            $relative_path = str_replace($config_path . DIRECTORY_SEPARATOR, '', substr($file, 0, -4));
            $explode = array_reverse(explode(DIRECTORY_SEPARATOR, $relative_path));
            if (count($explode) >= 2) {
                $app_config = include $app_config_file;
                if (empty($app_config['enable'])) {
                    // 如果enable属性未配置、或为空、或false,则说明该插件不可用,跳过本次
            $config = include $file;
            foreach ($explode as $section) {
                $tmp = [];
                $tmp[$section] = $config;
                $config = $tmp;
            static::$_config = array_replace_recursive(static::$_config, $config);

        // Merge database config
        foreach (static::$_config['plugin'] ?? [] as $firm => $projects) {
            foreach ($projects as $name => $project) {
                foreach ($project['database']['connections'] ?? [] as $key => $connection) {
                    static::$_config['database']['connections']["plugin.$firm.$name.$key"] = $connection;
        if (!empty(static::$_config['database']['connections'])) {
            static::$_config['database']['default'] = static::$_config['database']['default'] ?? key(static::$_config['database']['connections']);
        // Merge thinkorm config
        foreach (static::$_config['plugin'] ?? [] as $firm => $projects) {
            foreach ($projects as $name => $project) {
                foreach ($project['thinkorm']['connections'] ?? [] as $key => $connection) {
                    static::$_config['thinkorm']['connections']["plugin.$firm.$name.$key"] = $connection;
        if (!empty(static::$_config['thinkorm']['connections'])) {
            static::$_config['thinkorm']['default'] = static::$_config['thinkorm']['default'] ?? key(static::$_config['thinkorm']['connections']);
        // Merge redis config
        foreach (static::$_config['plugin'] ?? [] as $firm => $projects) {
            foreach ($projects as $name => $project) {
                foreach ($project['redis'] ?? [] as $key => $connection) {
                    static::$_config['redis']["plugin.$firm.$name.$key"] = $connection;

        static::$_loaded = true;

     * @param null $key
     * @param null $default
     * @return array|mixed|null
    public static function get($key = null, $default = null)
        if ($key === null) {
            return static::$_config;
        $key_array = \explode('.', $key);
        $value = static::$_config;
        $finded = true;
        foreach ($key_array as $index) {
            if (!isset($value[$index])) {
                if (static::$_loaded) {
                    return $default;
                $finded = false;
            $value = $value[$index];
        if ($finded) {
            return $value;
        return static::read($key, $default);

     * @param $key
     * @param $default
     * @return array|mixed|void|null
    protected static function read($key, $default = null)
        $path = static::$_configPath;
        if ($path === '') {
            return $default;
        $keys = $key_array = \explode('.', $key);
        foreach ($key_array as $index => $section) {
            if (is_file($file = "$path/$section.php")) {
                $config = include $file;
                return static::find($keys, $config, $default);
            if (!is_dir($path = "$path/$section")) {
                return $default;
        return $default;

     * @param $key_array
     * @param $stack
     * @param $default
     * @return array|mixed
    protected static function find($key_array, $stack, $default)
        if (!is_array($stack)) {
            return $default;
        $value = $stack;
        foreach ($key_array as $index) {
            if (!isset($value[$index])) {
                return $default;
            $value = $value[$index];
        return $value;

     * @param $config_path
     * @param array $exclude_file
    public static function reload($config_path, $exclude_file = [])
        static::$_config = [];
        static::load($config_path, $exclude_file);




    array|string $search,
    array|string $replace,
    string|array $subject,
    int &$count = null
): string|array
// Provides: <body text='black'>
$bodytag = str_replace("%body%", "black", "<body text='%body%'>");

// Provides: Hll Wrld f PHP
$vowels = array("a", "e", "i", "o", "u", "A", "E", "I", "O", "U");
$onlyconsonants = str_replace($vowels, "", "Hello World of PHP");

// Provides: You should eat pizza, beer, and ice cream every day
$phrase  = "You should eat fruits, vegetables, and fiber every day.";
$healthy = array("fruits", "vegetables", "fiber");
$yummy   = array("pizza", "beer", "ice cream");

$newphrase = str_replace($healthy, $yummy, $phrase);

// Provides: 2
$str = str_replace("ll", "", "good golly miss molly!", $count);
echo $count;


// 三个参数
 strtr(string $string, string $from, string $to): string
// 两个参数
 strtr(string $string, array $replace_pairs): string
echo strtr("Hilla Warld","ia","eo");  // Hello World

$arr = array("Hello" => "Hi", "world" => "earth");
echo strtr("Hello world",$arr);  // Hi earth


 array_replace_recursive(array $array, array ...$replacements): array
$base = array('citrus' => array( "orange") , 'berries' => array("blackberry", "raspberry"), );
$replacements = array('citrus' => array('pineapple'), 'berries' => array('blueberry'));

$basket = array_replace_recursive($base, $replacements);

$basket = array_replace($base, $replacements);

    [citrus] => Array
            [0] => pineapple

    [berries] => Array
            [0] => blueberry
            [1] => raspberry

    [citrus] => Array
            [0] => pineapple

    [berries] => Array
            [0] => blueberry

$base = array('citrus' => array("orange") , 'berries' => array("blackberry", "raspberry"), 'others' => 'banana' );
$replacements = array('citrus' => 'pineapple', 'berries' => array('blueberry'), 'others' => array('litchis'));
$replacements2 = array('citrus' => array('pineapple'), 'berries' => array('blueberry'), 'others' => 'litchis');

$basket = array_replace_recursive($base, $replacements, $replacements2);

    [citrus] => Array
            [0] => pineapple

    [berries] => Array
            [0] => blueberry
            [1] => raspberry

    [others] => litchis


$config = include $file;
foreach ($explode as $section) {
    $tmp = [];
    $tmp[$section] = $config;
    $config = $tmp;


$config = ["a" => 1];
$relative_path = "plugin/webman/redis-queue/process";
$explode = array_reverse(explode(DIRECTORY_SEPARATOR, $relative_path));

foreach ($explode as $section) {
    $tmp = [];
    $tmp[$section] = $config;
    $config = $tmp;

    [0] => process
    [1] => redis-queue
    [2] => webman
    [3] => plugin
    [plugin] => Array
            [webman] => Array
                    [redis-queue] => Array
                            [process] => Array
                                    [a] => 1




在webman项目的 app/candy/controller 目录下新建文件 Inqueue.php


namespace app\candy\controller;

use support\Request;
use Webman\RedisQueue\Client;

 * 入队
 * Class Inqueue
 * @package app\candy\controller
class Inqueue
     * @param Request $request
     * @return \support\Response
    public function send(Request $request)
        if (empty($request->data['queue_name'])) {
            return json(failResult("queue_name为空"));
        $queue_name = $request->data['queue_name'];

        $queue_data = !empty($request->data['queue_data']) ? $request->data['queue_data'] : [];
        $queue_delaytime = !empty($request->data['queue_delaytime']) ? (int)$request->data['queue_delaytime'] : 0;

        try {
            Client::send($queue_name, $queue_data, $queue_delaytime);
        } catch (\Throwable $e) {
            return json(failResult($e->getMessage()));

        return json(successResult());


    'data' => ["queue_name" => "order_end", "queue_data" => ["order_id" => 101], "queue_delaytime" => 60], 
    'project_id' => 'web_test', 
    'time' => time(), 
    'sign' => $sign

$sign 是 上面数据的签名。

权限使用洋葱模型中间件进行过滤,在项目的 config/middleware.php 文件中写入:

return [
    'candy' => [

这里使用到了多应用,可以参考 https://www.workerman.net/doc/webman/multiapp.html

app/middleware/CandyAuthCheck.php 文件中写入:

namespace app\middleware;

use Webman\MiddlewareInterface;
use Webman\Http\Response;
use Webman\Http\Request;

class CandyAuthCheck implements MiddlewareInterface
    public function process(Request $request, callable $next) : Response
        try {
            // 数据准备
            $params = getParams($request, ['project_id', 'data', 'time', 'sign']);

            $project_id = $params['project_id'];
            if (empty($project_id)) {
                return json(failResult('project_id is empty'));

            $data = $params['data'];
            if (empty($data)) {
                return json(failResult('data is empty'));

            $time = (int)$params['time'];
            if (empty($time)) {
                return json(failResult('time is empty'));

            $sign = $params['sign'];
            if (empty($sign)) {
                return json(failResult('sign is empty'));

            // 验签
            $hash_config = config('candy.' .$project_id); // 获取candy应用下其他项目的hash配置,如 $config['candy.web_test']['key'] = 'n5b2abe17cd9f24cf62ah66bad623d5a'
            if (empty($hash_config)) {
                return json(failResult('project config is not found'));

            $arr = [
                'data' => $data,
                'project_id' => $project_id,
                'time' => $time,
            $sign_hash = hash_hmac('sha256', json_encode($arr), $hash_config['key']);
            if ($sign_hash != $sign) {
                return json(failResult('BaseService验签失败'));

            // 数据处理
            $request->data = json_decode($data, true);
        } catch (\Throwable $e) {
            return json(failResult($e->getMessage()));

        return $next($request);

接下来Client::send() 入队处理,就是队列相关的内容了。


