yii2-queue 使用
Yii2 队列扩展 yii2-queue 引导文档,参阅 https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.0/zh-cn
碰到一个项目中用 Yii2 队列扩展 实现的MySQL队列,入队的地方看明白了,但消费的地方理解了好长时间,这里记录一下。
使用的是Yii原生拓展 yiisoft/yii2-queue:
composer update yiisoft/yii2-queue
配置
如果要使用这个扩展必须向下面这样配置它:
return [
'bootstrap' => [
'queue', // 把这个组件注册到控制台
],
'components' => [
'queue' => [
'class' => \yii\queue\<driver>\Queue::class,
'as log' => \yii\queue\LogBehavior::class, // 事件日志
// 驱动的其他选项
],
],
];
<driver>
是驱动名称,如 db、redis 等。
每个被发送到队列的任务应该被定义为一个单独的类。 例如,如果您需要下载并保存一个文件,该类可能看起来如下:
class DownloadJob extends BaseObject implements \yii\queue\JobInterface
{
public $url;
public $file;
public function execute($queue)
{
file_put_contents($this->file, file_get_contents($this->url));
}
}
入队
下面是将任务添加到队列:
Yii::$app->queue->push(new DownloadJob([
'url' => 'http://example.com/image.jpg',
'file' => '/tmp/image.jpg',
]));
将作业推送到队列中延时5分钟运行:
Yii::$app->queue->delay(5 * 60)->push(new DownloadJob([
'url' => 'http://example.com/image.jpg',
'file' => '/tmp/image.jpg',
]));
在 yii\queue\Queue
中有:
public function delay($value)
{
$this->pushDelay = $value;
return $this;
}
public function push($job)
{
$event = new PushEvent([
'job' => $job,
'ttr' => $this->pushTtr ?: (
$job instanceof RetryableJobInterface
? $job->getTtr()
: $this->ttr
),
'delay' => $this->pushDelay ?: 0,
'priority' => $this->pushPriority,
]);
$this->pushTtr = null;
$this->pushDelay = null;
$this->pushPriority = null;
$this->trigger(self::EVENT_BEFORE_PUSH, $event);
if ($event->handled) {
return null;
}
if ($this->strictJobType && !($event->job instanceof JobInterface)) {
throw new InvalidArgumentException('Job must be instance of JobInterface.');
}
if (!is_numeric($event->ttr)) {
throw new InvalidArgumentException('Job TTR must be integer.');
}
$event->ttr = (int) $event->ttr;
if ($event->ttr <= 0) {
throw new InvalidArgumentException('Job TTR must be greater that zero.');
}
if (!is_numeric($event->delay)) {
throw new InvalidArgumentException('Job delay must be integer.');
}
$event->delay = (int) $event->delay;
if ($event->delay < 0) {
throw new InvalidArgumentException('Job delay must be positive.');
}
$message = $this->serializer->serialize($event->job);
$event->id = $this->pushMessage($message, $event->ttr, $event->delay, $event->priority);
$this->trigger(self::EVENT_AFTER_PUSH, $event);
return $event->id;
}
$this->pushMessage()
调用具体继承类方法实现队列数据持久化。如\yii\queue\db\Queue
中:
protected function pushMessage($message, $ttr, $delay, $priority)
{
$this->db->createCommand()->insert($this->tableName, [
'channel' => $this->channel,
'job' => $message,
'pushed_at' => time(),
'ttr' => $ttr,
'delay' => $delay,
'priority' => $priority ?: 1024,
])->execute();
$tableSchema = $this->db->getTableSchema($this->tableName);
return $this->db->getLastInsertID($tableSchema->sequenceName);
}
DB驱动消费
DB 驱动使用库存储队列数据。
配置示例:
return [
'bootstrap' => [
'queue', // 把这个组件注册到控制台
],
'components' => [
'db' => [
'class' => \yii\db\Connection::class,
// ...
],
'queue' => [
'class' => \yii\queue\db\Queue::class,
'db' => 'db', // DB 连接组件或它的配置
'tableName' => '', // 表名
'channel' => 'default', // Queue channel key
'mutex' => \yii\mutex\MysqlMutex::class, // Mutex that used to sync queries
'as log' => \yii\queue\LogBehavior::class,
],
],
];
需要先向数据库添加一个表。MySQL示例语句:
CREATE TABLE `queue` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`channel` varchar(255) NOT NULL,
`job` blob NOT NULL,
`pushed_at` int(11) NOT NULL,
`ttr` int(11) NOT NULL,
`delay` int(11) NOT NULL DEFAULT 0,
`priority` int(11) unsigned NOT NULL DEFAULT 1024,
`reserved_at` int(11) DEFAULT NULL,
`attempt` int(11) DEFAULT NULL,
`done_at` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `channel` (`channel`),
KEY `reserved_at` (`reserved_at`),
KEY `priority` (`priority`)
) ENGINE=InnoDB;
消费者脚本命令,监听和处理队列任务:
yii queue/listen
队列中有数据,执行Job中的execute()
方法。
listen 命令启动一个守护进程,它可以无限查询队列。如果有新的任务,他们立即得到并执行。 timeout 是下一次查询队列的时间 当命令正确地通过 supervisor 来实现时,这种方法是最有效的。
获取并执行循环中的任务,直到队列为空
yii queue/run
run 命令获取并执行循环中的任务,直到队列为空。适用与 cron 。
打印关于队列状态的信息
yii queue/info
原理
消费者中的几个方法在 \vendor\yiisoft\yii2-queue\src\drivers\db\Command.php
文件中:
<?php
/**
* @link http://www.yiiframework.com/
* @copyright Copyright (c) 2008 Yii Software LLC
* @license http://www.yiiframework.com/license/
*/
namespace yii\queue\db;
use yii\console\Exception;
use yii\queue\cli\Command as CliCommand;
/**
* Manages application db-queue.
*
* @author Roman Zhuravlev <zhuravljov@gmail.com>
*/
class Command extends CliCommand
{
/**
* @var Queue
*/
public $queue;
/**
* @var string
*/
public $defaultAction = 'info';
/**
* @inheritdoc
*/
public function actions()
{
return [
'info' => InfoAction::class,
];
}
/**
* @inheritdoc
*/
protected function isWorkerAction($actionID)
{
return in_array($actionID, ['run', 'listen'], true);
}
/**
* Runs all jobs from db-queue.
* It can be used as cron job.
*
* @return null|int exit code.
*/
public function actionRun()
{
return $this->queue->run(false);
}
/**
* Listens db-queue and runs new jobs.
* It can be used as daemon process.
*
* @param int $timeout number of seconds to sleep before next reading of the queue.
* @throws Exception when params are invalid.
* @return null|int exit code.
*/
public function actionListen($timeout = 3)
{
if (!is_numeric($timeout)) {
throw new Exception('Timeout must be numeric.');
}
if ($timeout < 1) {
throw new Exception('Timeout must be greater than zero.');
}
return $this->queue->run(true, $timeout);
}
/**
* Clears the queue.
*
* @since 2.0.1
*/
public function actionClear()
{
if ($this->confirm('Are you sure?')) {
$this->queue->clear();
}
}
/**
* Removes a job by id.
*
* @param int $id
* @throws Exception when the job is not found.
* @since 2.0.1
*/
public function actionRemove($id)
{
if (!$this->queue->remove($id)) {
throw new Exception('The job is not found.');
}
}
}
config配置queue的class \yii\queue\db\Queue
类中有一个属性:
public $commandClass = Command::class;
在父类 yii\queue\cli\Queue
中有:
public function bootstrap($app)
{
if ($app instanceof ConsoleApp) {
$app->controllerMap[$this->getCommandId()] = [
'class' => $this->commandClass,
'queue' => $this, // Command类配置属性queue
] + $this->commandOptions;
}
}
在这里看到了上面的Command类,在Command类中配置注入Queue,然后调用Queue的方法。
\yii\queue\db\Queue
源码:
<?php
/**
* @link http://www.yiiframework.com/
* @copyright Copyright (c) 2008 Yii Software LLC
* @license http://www.yiiframework.com/license/
*/
namespace yii\queue\db;
use yii\base\Exception;
use yii\base\InvalidArgumentException;
use yii\db\Connection;
use yii\db\Query;
use yii\di\Instance;
use yii\mutex\Mutex;
use yii\queue\cli\Queue as CliQueue;
/**
* Db Queue.
*
* @author Roman Zhuravlev <zhuravljov@gmail.com>
*/
class Queue extends CliQueue
{
/**
* @var Connection|array|string
*/
public $db = 'db';
/**
* @var Mutex|array|string
*/
public $mutex = 'mutex';
/**
* @var int timeout
*/
public $mutexTimeout = 3;
/**
* @var string table name
*/
public $tableName = '';
/**
* @var string
*/
public $channel = 'queue';
/**
* @var bool ability to delete released messages from table
*/
public $deleteReleased = true;
/**
* @var string command class name
*/
public $commandClass = Command::class;
/**
* @inheritdoc
*/
public function init()
{
parent::init();
$this->db = Instance::ensure($this->db, Connection::class);
$this->mutex = Instance::ensure($this->mutex, Mutex::class);
}
/**
* Listens queue and runs each job.
*
* @param bool $repeat whether to continue listening when queue is empty.
* @param int $timeout number of seconds to sleep before next iteration.
* @return null|int exit code.
* @internal for worker command only
* @since 2.0.2
*/
public function run($repeat, $timeout = 0)
{
return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
while ($canContinue()) {
if ($payload = $this->reserve()) {
if ($this->handleMessage(
$payload['id'],
$payload['job'],
$payload['ttr'],
$payload['attempt']
)) {
$this->release($payload);
}
} elseif (!$repeat) {
break;
} elseif ($timeout) {
sleep($timeout);
}
}
});
}
/**
* @inheritdoc
*/
public function status($id)
{
$payload = (new Query())
->from($this->tableName)
->where(['id' => $id])
->one($this->db);
if (!$payload) {
if ($this->deleteReleased) {
return self::STATUS_DONE;
}
throw new InvalidArgumentException("Unknown message ID: $id.");
}
if (!$payload['reserved_at']) {
return self::STATUS_WAITING;
}
if (!$payload['done_at']) {
return self::STATUS_RESERVED;
}
return self::STATUS_DONE;
}
/**
* Clears the queue.
*
* @since 2.0.1
*/
public function clear()
{
$this->db->createCommand()
->delete($this->tableName, ['channel' => $this->channel])
->execute();
}
/**
* Removes a job by ID.
*
* @param int $id of a job
* @return bool
* @since 2.0.1
*/
public function remove($id)
{
return (bool) $this->db->createCommand()
->delete($this->tableName, ['channel' => $this->channel, 'id' => $id])
->execute();
}
/**
* @inheritdoc
*/
protected function pushMessage($message, $ttr, $delay, $priority)
{
$this->db->createCommand()->insert($this->tableName, [
'channel' => $this->channel,
'job' => $message,
'pushed_at' => time(),
'ttr' => $ttr,
'delay' => $delay,
'priority' => $priority ?: 1024,
])->execute();
$tableSchema = $this->db->getTableSchema($this->tableName);
return $this->db->getLastInsertID($tableSchema->sequenceName);
}
/**
* Takes one message from waiting list and reserves it for handling.
*
* @return array|false payload
* @throws Exception in case it hasn't waited the lock
*/
protected function reserve()
{
return $this->db->useMaster(function () {
if (!$this->mutex->acquire(__CLASS__ . $this->channel, $this->mutexTimeout)) {
throw new Exception('Has not waited the lock.');
}
try {
$this->moveExpired();
// Reserve one message
$payload = (new Query())
->from($this->tableName)
->andWhere(['channel' => $this->channel, 'reserved_at' => null])
->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()])
->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC])
->limit(1)
->one($this->db);
if (is_array($payload)) {
$payload['reserved_at'] = time();
$payload['attempt'] = (int) $payload['attempt'] + 1;
$this->db->createCommand()->update($this->tableName, [
'reserved_at' => $payload['reserved_at'],
'attempt' => $payload['attempt'],
], [
'id' => $payload['id'],
])->execute();
// pgsql
if (is_resource($payload['job'])) {
$payload['job'] = stream_get_contents($payload['job']);
}
}
} finally {
$this->mutex->release(__CLASS__ . $this->channel);
}
return $payload;
});
}
/**
* @param array $payload
*/
protected function release($payload)
{
if ($this->deleteReleased) {
$this->db->createCommand()->delete(
$this->tableName,
['id' => $payload['id']]
)->execute();
} else {
$this->db->createCommand()->update(
$this->tableName,
['done_at' => time()],
['id' => $payload['id']]
)->execute();
}
}
/**
* Moves expired messages into waiting list.
*/
private function moveExpired()
{
if ($this->reserveTime !== time()) {
$this->reserveTime = time();
$this->db->createCommand()->update(
$this->tableName,
['reserved_at' => null],
'[[reserved_at]] < :time - [[ttr]] and [[done_at]] is null',
[':time' => $this->reserveTime]
)->execute();
}
}
private $reserveTime;
}
yii\queue\Queue::runWorker方法:
protected function runWorker(callable $handler)
{
$this->_workerPid = getmypid();
/** @var LoopInterface $loop */
$loop = Yii::createObject($this->loopConfig, [$this]);
$event = new WorkerEvent(['loop' => $loop]);
$this->trigger(self::EVENT_WORKER_START, $event);
if ($event->exitCode !== null) {
return $event->exitCode;
}
$exitCode = null;
try {
call_user_func($handler, function () use ($loop, $event) {
$this->trigger(self::EVENT_WORKER_LOOP, $event);
return $event->exitCode === null && $loop->canContinue();
});
} finally {
$this->trigger(self::EVENT_WORKER_STOP, $event);
$this->_workerPid = null;
}
return $event->exitCode;
}
参考资料
Yii2 队列扩展 https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.0/zh-cn
composer yiisoft/yii2-queue https://packagist.org/packages/yiisoft/yii2-queue
github yiisoft/yii2-queue https://github.com/yiisoft/yii2-queue
Yii2 队列扩展 基本使用 https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.0/zh-cn/usage
Yii2 队列扩展 DB 驱动 https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.0/zh-cn/driver-db
Yii2 利用controllerMap自定义控制器类 https://blog.csdn.net/gaoxuaiguoyi/article/details/50909446
yii2简单实现redis消息队列 https://blog.csdn.net/u011063477/article/details/105929688/