Yii2 队列扩展


Yii2 队列扩展


yii2-queue 使用

Yii2 队列扩展 yii2-queue 引导文档,参阅 https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.0/zh-cn

碰到一个项目中用 Yii2 队列扩展 实现的MySQL队列,入队的地方看明白了,但消费的地方理解了好长时间,这里记录一下。

使用的是Yii原生拓展 yiisoft/yii2-queue:

composer update yiisoft/yii2-queue

配置

如果要使用这个扩展必须向下面这样配置它:

return [
    'bootstrap' => [
        'queue', // 把这个组件注册到控制台
    ],
    'components' => [
        'queue' => [
            'class' => \yii\queue\<driver>\Queue::class,
            'as log' => \yii\queue\LogBehavior::class,  // 事件日志
            // 驱动的其他选项
        ],
    ],
];

<driver>是驱动名称,如 db、redis 等。

每个被发送到队列的任务应该被定义为一个单独的类。 例如,如果您需要下载并保存一个文件,该类可能看起来如下:

class DownloadJob extends BaseObject implements \yii\queue\JobInterface
{
    public $url;
    public $file;
    
    public function execute($queue)
    {
        file_put_contents($this->file, file_get_contents($this->url));
    }
}

入队

下面是将任务添加到队列:

Yii::$app->queue->push(new DownloadJob([
    'url' => 'http://example.com/image.jpg',
    'file' => '/tmp/image.jpg',
]));

将作业推送到队列中延时5分钟运行:

Yii::$app->queue->delay(5 * 60)->push(new DownloadJob([
    'url' => 'http://example.com/image.jpg',
    'file' => '/tmp/image.jpg',
]));

yii\queue\Queue 中有:

public function delay($value)
{
    $this->pushDelay = $value;
    return $this;
}

public function push($job)
{
    $event = new PushEvent([
        'job' => $job,
        'ttr' => $this->pushTtr ?: (
            $job instanceof RetryableJobInterface
                ? $job->getTtr()
                : $this->ttr
        ),
        'delay' => $this->pushDelay ?: 0,
        'priority' => $this->pushPriority,
    ]);
    $this->pushTtr = null;
    $this->pushDelay = null;
    $this->pushPriority = null;

    $this->trigger(self::EVENT_BEFORE_PUSH, $event);
    if ($event->handled) {
        return null;
    }

    if ($this->strictJobType && !($event->job instanceof JobInterface)) {
        throw new InvalidArgumentException('Job must be instance of JobInterface.');
    }

    if (!is_numeric($event->ttr)) {
        throw new InvalidArgumentException('Job TTR must be integer.');
    }
    $event->ttr = (int) $event->ttr;
    if ($event->ttr <= 0) {
        throw new InvalidArgumentException('Job TTR must be greater that zero.');
    }

    if (!is_numeric($event->delay)) {
        throw new InvalidArgumentException('Job delay must be integer.');
    }
    $event->delay = (int) $event->delay;
    if ($event->delay < 0) {
        throw new InvalidArgumentException('Job delay must be positive.');
    }

    $message = $this->serializer->serialize($event->job);
    $event->id = $this->pushMessage($message, $event->ttr, $event->delay, $event->priority);
    $this->trigger(self::EVENT_AFTER_PUSH, $event);

    return $event->id;
}

$this->pushMessage() 调用具体继承类方法实现队列数据持久化。如\yii\queue\db\Queue中:

protected function pushMessage($message, $ttr, $delay, $priority)
{
    $this->db->createCommand()->insert($this->tableName, [
        'channel' => $this->channel,
        'job' => $message,
        'pushed_at' => time(),
        'ttr' => $ttr,
        'delay' => $delay,
        'priority' => $priority ?: 1024,
    ])->execute();
    $tableSchema = $this->db->getTableSchema($this->tableName);
    return $this->db->getLastInsertID($tableSchema->sequenceName);
}

DB驱动消费

DB 驱动使用库存储队列数据。

配置示例:

return [
    'bootstrap' => [
        'queue', // 把这个组件注册到控制台
    ],
    'components' => [
        'db' => [
            'class' => \yii\db\Connection::class, 
            // ...
        ],
        'queue' => [
            'class' => \yii\queue\db\Queue::class,
            'db' => 'db', // DB 连接组件或它的配置
            'tableName' => '', // 表名
            'channel' => 'default', // Queue channel key
            'mutex' => \yii\mutex\MysqlMutex::class, // Mutex that used to sync queries
            'as log' => \yii\queue\LogBehavior::class,
        ],
    ],
];

需要先向数据库添加一个表。MySQL示例语句:

CREATE TABLE `queue` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `channel` varchar(255) NOT NULL,
  `job` blob NOT NULL,
  `pushed_at` int(11) NOT NULL,
  `ttr` int(11) NOT NULL,
  `delay` int(11) NOT NULL DEFAULT 0,
  `priority` int(11) unsigned NOT NULL DEFAULT 1024,
  `reserved_at` int(11) DEFAULT NULL,
  `attempt` int(11) DEFAULT NULL,
  `done_at` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `channel` (`channel`),
  KEY `reserved_at` (`reserved_at`),
  KEY `priority` (`priority`)
) ENGINE=InnoDB;

消费者脚本命令,监听和处理队列任务:

yii queue/listen

队列中有数据,执行Job中的execute()方法。

listen 命令启动一个守护进程,它可以无限查询队列。如果有新的任务,他们立即得到并执行。 timeout 是下一次查询队列的时间 当命令正确地通过 supervisor 来实现时,这种方法是最有效的。

获取并执行循环中的任务,直到队列为空

yii queue/run

run 命令获取并执行循环中的任务,直到队列为空。适用与 cron

打印关于队列状态的信息

yii queue/info

原理

消费者中的几个方法在 \vendor\yiisoft\yii2-queue\src\drivers\db\Command.php 文件中:

<?php
/**
 * @link http://www.yiiframework.com/
 * @copyright Copyright (c) 2008 Yii Software LLC
 * @license http://www.yiiframework.com/license/
 */

namespace yii\queue\db;

use yii\console\Exception;
use yii\queue\cli\Command as CliCommand;

/**
 * Manages application db-queue.
 *
 * @author Roman Zhuravlev <zhuravljov@gmail.com>
 */
class Command extends CliCommand
{
    /**
     * @var Queue
     */
    public $queue;
    /**
     * @var string
     */
    public $defaultAction = 'info';


    /**
     * @inheritdoc
     */
    public function actions()
    {
        return [
            'info' => InfoAction::class,
        ];
    }

    /**
     * @inheritdoc
     */
    protected function isWorkerAction($actionID)
    {
        return in_array($actionID, ['run', 'listen'], true);
    }

    /**
     * Runs all jobs from db-queue.
     * It can be used as cron job.
     *
     * @return null|int exit code.
     */
    public function actionRun()
    {
        return $this->queue->run(false);
    }

    /**
     * Listens db-queue and runs new jobs.
     * It can be used as daemon process.
     *
     * @param int $timeout number of seconds to sleep before next reading of the queue.
     * @throws Exception when params are invalid.
     * @return null|int exit code.
     */
    public function actionListen($timeout = 3)
    {
        if (!is_numeric($timeout)) {
            throw new Exception('Timeout must be numeric.');
        }
        if ($timeout < 1) {
            throw new Exception('Timeout must be greater than zero.');
        }

        return $this->queue->run(true, $timeout);
    }

    /**
     * Clears the queue.
     *
     * @since 2.0.1
     */
    public function actionClear()
    {
        if ($this->confirm('Are you sure?')) {
            $this->queue->clear();
        }
    }

    /**
     * Removes a job by id.
     *
     * @param int $id
     * @throws Exception when the job is not found.
     * @since 2.0.1
     */
    public function actionRemove($id)
    {
        if (!$this->queue->remove($id)) {
            throw new Exception('The job is not found.');
        }
    }
}

config配置queue的class \yii\queue\db\Queue 类中有一个属性:

public $commandClass = Command::class;

在父类 yii\queue\cli\Queue 中有:

public function bootstrap($app)
{
    if ($app instanceof ConsoleApp) {
        $app->controllerMap[$this->getCommandId()] = [
            'class' => $this->commandClass,
            'queue' => $this,  // Command类配置属性queue
        ] + $this->commandOptions;
    }
}

在这里看到了上面的Command类,在Command类中配置注入Queue,然后调用Queue的方法。

\yii\queue\db\Queue 源码:

<?php
/**
 * @link http://www.yiiframework.com/
 * @copyright Copyright (c) 2008 Yii Software LLC
 * @license http://www.yiiframework.com/license/
 */

namespace yii\queue\db;

use yii\base\Exception;
use yii\base\InvalidArgumentException;
use yii\db\Connection;
use yii\db\Query;
use yii\di\Instance;
use yii\mutex\Mutex;

use yii\queue\cli\Queue as CliQueue;

/**
 * Db Queue.
 *
 * @author Roman Zhuravlev <zhuravljov@gmail.com>
 */
class Queue extends CliQueue
{
    /**
     * @var Connection|array|string
     */
    public $db = 'db';
    /**
     * @var Mutex|array|string
     */
    public $mutex = 'mutex';
    /**
     * @var int timeout
     */
    public $mutexTimeout = 3;
    /**
     * @var string table name
     */
    public $tableName = '';
    /**
     * @var string
     */
    public $channel = 'queue';
    /**
     * @var bool ability to delete released messages from table
     */
    public $deleteReleased = true;
    /**
     * @var string command class name
     */
    public $commandClass = Command::class;


    /**
     * @inheritdoc
     */
    public function init()
    {
        parent::init();
        $this->db = Instance::ensure($this->db, Connection::class);
        $this->mutex = Instance::ensure($this->mutex, Mutex::class);
    }

    /**
     * Listens queue and runs each job.
     *
     * @param bool $repeat whether to continue listening when queue is empty.
     * @param int $timeout number of seconds to sleep before next iteration.
     * @return null|int exit code.
     * @internal for worker command only
     * @since 2.0.2
     */
    public function run($repeat, $timeout = 0)
    {
        return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
            while ($canContinue()) {
                if ($payload = $this->reserve()) {
                    if ($this->handleMessage(
                        $payload['id'],
                        $payload['job'],
                        $payload['ttr'],
                        $payload['attempt']
                    )) {
                        $this->release($payload);
                    }
                } elseif (!$repeat) {
                    break;
                } elseif ($timeout) {
                    sleep($timeout);
                }
            }
        });
    }

    /**
     * @inheritdoc
     */
    public function status($id)
    {
        $payload = (new Query())
            ->from($this->tableName)
            ->where(['id' => $id])
            ->one($this->db);

        if (!$payload) {
            if ($this->deleteReleased) {
                return self::STATUS_DONE;
            }

            throw new InvalidArgumentException("Unknown message ID: $id.");
        }

        if (!$payload['reserved_at']) {
            return self::STATUS_WAITING;
        }

        if (!$payload['done_at']) {
            return self::STATUS_RESERVED;
        }

        return self::STATUS_DONE;
    }

    /**
     * Clears the queue.
     *
     * @since 2.0.1
     */
    public function clear()
    {
        $this->db->createCommand()
            ->delete($this->tableName, ['channel' => $this->channel])
            ->execute();
    }

    /**
     * Removes a job by ID.
     *
     * @param int $id of a job
     * @return bool
     * @since 2.0.1
     */
    public function remove($id)
    {
        return (bool) $this->db->createCommand()
            ->delete($this->tableName, ['channel' => $this->channel, 'id' => $id])
            ->execute();
    }

    /**
     * @inheritdoc
     */
    protected function pushMessage($message, $ttr, $delay, $priority)
    {
        $this->db->createCommand()->insert($this->tableName, [
            'channel' => $this->channel,
            'job' => $message,
            'pushed_at' => time(),
            'ttr' => $ttr,
            'delay' => $delay,
            'priority' => $priority ?: 1024,
        ])->execute();
        $tableSchema = $this->db->getTableSchema($this->tableName);
        return $this->db->getLastInsertID($tableSchema->sequenceName);
    }

    /**
     * Takes one message from waiting list and reserves it for handling.
     *
     * @return array|false payload
     * @throws Exception in case it hasn't waited the lock
     */
    protected function reserve()
    {
        return $this->db->useMaster(function () {
            if (!$this->mutex->acquire(__CLASS__ . $this->channel, $this->mutexTimeout)) {
                throw new Exception('Has not waited the lock.');
            }

            try {
                $this->moveExpired();

                // Reserve one message
                $payload = (new Query())
                    ->from($this->tableName)
                    ->andWhere(['channel' => $this->channel, 'reserved_at' => null])
                    ->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()])
                    ->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC])
                    ->limit(1)
                    ->one($this->db);
                if (is_array($payload)) {
                    $payload['reserved_at'] = time();
                    $payload['attempt'] = (int) $payload['attempt'] + 1;
                    $this->db->createCommand()->update($this->tableName, [
                        'reserved_at' => $payload['reserved_at'],
                        'attempt' => $payload['attempt'],
                    ], [
                        'id' => $payload['id'],
                    ])->execute();

                    // pgsql
                    if (is_resource($payload['job'])) {
                        $payload['job'] = stream_get_contents($payload['job']);
                    }
                }
            } finally {
                $this->mutex->release(__CLASS__ . $this->channel);
            }

            return $payload;
        });
    }



    /**
     * @param array $payload
     */
    protected function release($payload)
    {
        if ($this->deleteReleased) {
            $this->db->createCommand()->delete(
                $this->tableName,
                ['id' => $payload['id']]
            )->execute();
        } else {
            $this->db->createCommand()->update(
                $this->tableName,
                ['done_at' => time()],
                ['id' => $payload['id']]
            )->execute();
        }
    }

    /**
     * Moves expired messages into waiting list.
     */
    private function moveExpired()
    {
        if ($this->reserveTime !== time()) {
            $this->reserveTime = time();
            $this->db->createCommand()->update(
                $this->tableName,
                ['reserved_at' => null],
                '[[reserved_at]] < :time - [[ttr]] and [[done_at]] is null',
                [':time' => $this->reserveTime]
            )->execute();
        }
    }

    private $reserveTime;
}

yii\queue\Queue::runWorker方法:

protected function runWorker(callable $handler)
{
    $this->_workerPid = getmypid();
    /** @var LoopInterface $loop */
    $loop = Yii::createObject($this->loopConfig, [$this]);

    $event = new WorkerEvent(['loop' => $loop]);
    $this->trigger(self::EVENT_WORKER_START, $event);
    if ($event->exitCode !== null) {
        return $event->exitCode;
    }

    $exitCode = null;
    try {
        call_user_func($handler, function () use ($loop, $event) {
            $this->trigger(self::EVENT_WORKER_LOOP, $event);
            return $event->exitCode === null && $loop->canContinue();
        });
    } finally {
        $this->trigger(self::EVENT_WORKER_STOP, $event);
        $this->_workerPid = null;
    }

    return $event->exitCode;
}






参考资料

Yii2 队列扩展 https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.0/zh-cn

composer yiisoft/yii2-queue https://packagist.org/packages/yiisoft/yii2-queue

github yiisoft/yii2-queue https://github.com/yiisoft/yii2-queue

Yii2 队列扩展 基本使用 https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.0/zh-cn/usage

Yii2 队列扩展 DB 驱动 https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.0/zh-cn/driver-db

Yii2 利用controllerMap自定义控制器类 https://blog.csdn.net/gaoxuaiguoyi/article/details/50909446

yii2简单实现redis消息队列 https://blog.csdn.net/u011063477/article/details/105929688/


返回