Redis实现延时队列


Redis实现延时队列


正文

Redis实现延时队列,是使用有序集合。

介绍下Redis的5种数据类型:

Redis 的 字符串(String) 是 key => value 的单个值。

Redis 的 哈希(Hash) 是一个 string 类型的 field(字段) 和 value(值) 的映射表,hash 特别适合用于存储对象。 Redis 中每个 hash 可以存储 2^32 - 1 键值对(40多亿)。

Redis 的 列表(List) 是简单的字符串(String)列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边), 一个列表最多可以包含 2^32 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。

Redis 的 集合(Set) 是 String 类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。 Redis 中集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是 O(1)。 集合中最大的成员数为 2^32 - 1 (4294967295, 每个集合可存储40多亿个成员)。

Redis 的 有序集合(sorted set) 和 集合(Set)一样也是string类型元素的集合,且不允许重复的成员。 不同的是每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为集合中的成员进行从小到大的排序。 有序集合的成员是唯一的,但分数(score)却可以重复。 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是 O(1)。 集合中最大的成员数为 2^32 - 1 (4294967295, 每个集合可存储40多亿个成员)。

上面是redis的5种数据类型,我们可以用 有序集合(sorted set) 类型 实现延迟队列,不过成员不能重复。

Redis实现延时队列,相关方法有:

// 添加数据
public function zAdd( $key, $options, $score1, $value1, $score2 = null, $value2 = null, $scoreN = null, $valueN = null ) {}
// 获取数据
public function zRange( $key, $start, $end, $withscores = null ) {}
// 删除数据
public function zRem( $key, $member1, $member2 = null, $memberN = null ) {}

看一下生产者的例子:

$redis = = new Redis();
$redis->connect($config['host'], $config['port'], $config['timeout']);
$redis->auth($config['auth']);

$redis->zAdd($key, $time, json_encode(["task_name" => $task_name, "task_time" => $time, "task_params" = $task_params]));

$time是有序集合(sorted set)的score,传一个截止时间戳。

消费者,获取当前时间以前的数据:

$redis = = new Redis();
$redis->connect($config['host'], $config['port'], $config['timeout']);
$redis->auth($config['auth']);

// 获取前面11条数据
$data = $redis->zrange($key, 0, 11);

// 获取指定时间段内的数据(指定分数值)
$data = $redis->zrangebyscore($key, 1684142654, time());

/** 消费逻辑 **/ 

// 删除
$redis->zRem($key, $value);

redis有序集合函数原型

php中Redis 有序集合相关方法完整有:

/**
 * Adds the specified member with a given score to the sorted set stored at key.
 *
 * @param   string  $key    Required key
 * @param   array   $options Options if needed
 * @param   float   $score1 Required score
 * @param   string  $value1 Required value
 * @param   float   $score2 Optional score
 * @param   string  $value2 Optional value
 * @param   float   $scoreN Optional score
 * @param   string  $valueN Optional value
 * @return  int     Number of values added
 * @link    https://redis.io/commands/zadd
 * @example
 * <pre>
 * <pre>
 * $redis->zAdd('z', 1, 'v1', 2, 'v2', 3, 'v3', 4, 'v4' );  // int(2)
 * $redis->zRem('z', 'v2', 'v3');                           // int(2)
 * $redis->zAdd('z', ['NX'], 5, 'v5');                      // int(1)
 * $redis->zAdd('z', ['NX'], 6, 'v5');                      // int(0)
 * $redis->zAdd('z', 7, 'v6');                              // int(1)
 * $redis->zAdd('z', 8, 'v6');                              // int(0)
 * var_dump( $redis->zRange('z', 0, -1) );
 * //// Output:
 * // array(4) {
 * //   [0]=> string(2) "v1"
 * //   [1]=> string(2) "v4"
 * //   [2]=> string(2) "v5"
 * //   [3]=> string(2) "v8"
 * // }
 * var_dump( $redis->zRange('z', 0, -1, true) );
 * //// Output:
 * // array(4) {
 * //   ["v1"]=> float(1)
 * //   ["v4"]=> float(4)
 * //   ["v5"]=> float(5)
 * //   ["v6"]=> float(8)
 * // }
 * </pre>
 * </pre>
 */
public function zAdd( $key, $options, $score1, $value1, $score2 = null, $value2 = null, $scoreN = null, $valueN = null ) {}

/**
 * Returns a range of elements from the ordered set stored at the specified key,
 * with values in the range [start, end]. start and stop are interpreted as zero-based indices:
 * 0 the first element,
 * 1 the second ...
 * -1 the last element,
 * -2 the penultimate ...
 *
 * @param   string  $key
 * @param   int     $start
 * @param   int     $end
 * @param   bool    $withscores
 * @return  array   Array containing the values in specified range.
 * @link    https://redis.io/commands/zrange
 * @example
 * <pre>
 * $redis->zAdd('key1', 0, 'val0');
 * $redis->zAdd('key1', 2, 'val2');
 * $redis->zAdd('key1', 10, 'val10');
 * $redis->zRange('key1', 0, -1); // array('val0', 'val2', 'val10')
 * // with scores
 * $redis->zRange('key1', 0, -1, true); // array('val0' => 0, 'val2' => 2, 'val10' => 10)
 * </pre>
 */
public function zRange( $key, $start, $end, $withscores = null ) {}

/**
 * Deletes a specified member from the ordered set.
 *
 * @param   string  $key
 * @param   string  $member1
 * @param   string  $member2
 * @param   string  $memberN
 * @return  int     Number of deleted values
 * @link    https://redis.io/commands/zrem
 * @example
 * <pre>
 * $redis->zAdd('z', 1, 'v2', 2, 'v2', 3, 'v3', 4, 'v4' );  // int(2)
 * $redis->zRem('z', 'v2', 'v3');                           // int(2)
 * var_dump( $redis->zRange('z', 0, -1) );
 * //// Output:
 * // array(2) {
 * //   [0]=> string(2) "v1"
 * //   [1]=> string(2) "v4"
 * // }
 * </pre>
 */
public function zRem( $key, $member1, $member2 = null, $memberN = null ) {}

/**
 * @see zRem()
 * @param   string  $key
 * @param   string  $member1
 * @param   string  $member2
 * @param   string  $memberN
 * @return  int     Number of deleted values
 * @link    https://redis.io/commands/zrem
 */
public function zDelete( $key, $member1, $member2 = null, $memberN = null ) {}

/**
 * Returns the elements of the sorted set stored at the specified key in the range [start, end]
 * in reverse order. start and stop are interpretated as zero-based indices:
 * 0 the first element,
 * 1 the second ...
 * -1 the last element,
 * -2 the penultimate ...
 *
 * @param   string  $key
 * @param   int     $start
 * @param   int     $end
 * @param   bool    $withscore
 * @return  array   Array containing the values in specified range.
 * @link    https://redis.io/commands/zrevrange
 * @example
 * <pre>
 * $redis->zAdd('key', 0, 'val0');
 * $redis->zAdd('key', 2, 'val2');
 * $redis->zAdd('key', 10, 'val10');
 * $redis->zRevRange('key', 0, -1); // array('val10', 'val2', 'val0')
 *
 * // with scores
 * $redis->zRevRange('key', 0, -1, true); // array('val10' => 10, 'val2' => 2, 'val0' => 0)
 * </pre>
 */
public function zRevRange( $key, $start, $end, $withscore = null ) {}

/**
 * Returns the elements of the sorted set stored at the specified key which have scores in the
 * range [start,end]. Adding a parenthesis before start or end excludes it from the range.
 * +inf and -inf are also valid limits.
 *
 * zRevRangeByScore returns the same items in reverse order, when the start and end parameters are swapped.
 *
 * @param   string  $key
 * @param   int     $start
 * @param   int     $end
 * @param   array   $options Two options are available:
 *                      - withscores => TRUE,
 *                      - and limit => array($offset, $count)
 * @return  array   Array containing the values in specified range.
 * @link    https://redis.io/commands/zrangebyscore
 * @example
 * <pre>
 * $redis->zAdd('key', 0, 'val0');
 * $redis->zAdd('key', 2, 'val2');
 * $redis->zAdd('key', 10, 'val10');
 * $redis->zRangeByScore('key', 0, 3);                                          // array('val0', 'val2')
 * $redis->zRangeByScore('key', 0, 3, array('withscores' => TRUE);              // array('val0' => 0, 'val2' => 2)
 * $redis->zRangeByScore('key', 0, 3, array('limit' => array(1, 1));                        // array('val2')
 * $redis->zRangeByScore('key', 0, 3, array('withscores' => TRUE, 'limit' => array(1, 1));  // array('val2' => 2)
 * </pre>
 */
public function zRangeByScore( $key, $start, $end, array $options = array() ) {}

/**
 * @see zRangeByScore()
 * @param   string  $key
 * @param   int     $start
 * @param   int     $end
 * @param   array   $options
 *
 * @return     array
 */
public function zRevRangeByScore( $key, $start, $end, array $options = array() ) {}

/**
 * Returns a lexigraphical range of members in a sorted set, assuming the members have the same score. The
 * min and max values are required to start with '(' (exclusive), '[' (inclusive), or be exactly the values
 * '-' (negative inf) or '+' (positive inf).  The command must be called with either three *or* five
 * arguments or will return FALSE.
 * @param   string  $key    The ZSET you wish to run against.
 * @param   int     $min    The minimum alphanumeric value you wish to get.
 * @param   int     $max    The maximum alphanumeric value you wish to get.
 * @param   int     $offset Optional argument if you wish to start somewhere other than the first element.
 * @param   int     $limit  Optional argument if you wish to limit the number of elements returned.
 * @return  array   Array containing the values in the specified range.
 * @link    https://redis.io/commands/zrangebylex
 * @example
 * <pre>
 * foreach (array('a', 'b', 'c', 'd', 'e', 'f', 'g') as $char) {
 *     $redis->zAdd('key', $char);
 * }
 *
 * $redis->zRangeByLex('key', '-', '[c'); // array('a', 'b', 'c')
 * $redis->zRangeByLex('key', '-', '(c'); // array('a', 'b')
 * $redis->zRangeByLex('key', '-', '[c'); // array('b', 'c')
 * </pre>
 */
public function zRangeByLex( $key, $min, $max, $offset = null, $limit = null ) {}

/**
 * @see zRangeByLex()
 * @param   string  $key
 * @param   int     $min
 * @param   int     $max
 * @param   int     $offset
 * @param   int     $limit
 * @return  array
 * @link    https://redis.io/commands/zrevrangebylex
 */
public function zRevRangeByLex( $key, $min, $max, $offset = null, $limit = null ) {}

/**
 * Returns the number of elements of the sorted set stored at the specified key which have
 * scores in the range [start,end]. Adding a parenthesis before start or end excludes it
 * from the range. +inf and -inf are also valid limits.
 *
 * @param   string  $key
 * @param   string  $start
 * @param   string  $end
 * @return  int     the size of a corresponding zRangeByScore.
 * @link    https://redis.io/commands/zcount
 * @example
 * <pre>
 * $redis->zAdd('key', 0, 'val0');
 * $redis->zAdd('key', 2, 'val2');
 * $redis->zAdd('key', 10, 'val10');
 * $redis->zCount('key', 0, 3); // 2, corresponding to array('val0', 'val2')
 * </pre>
 */
public function zCount( $key, $start, $end ) {}

/**
 * Deletes the elements of the sorted set stored at the specified key which have scores in the range [start,end].
 *
 * @param   string          $key
 * @param   float|string    $start double or "+inf" or "-inf" string
 * @param   float|string    $end double or "+inf" or "-inf" string
 * @return  int             The number of values deleted from the sorted set
 * @link    https://redis.io/commands/zremrangebyscore
 * @example
 * <pre>
 * $redis->zAdd('key', 0, 'val0');
 * $redis->zAdd('key', 2, 'val2');
 * $redis->zAdd('key', 10, 'val10');
 * $redis->zRemRangeByScore('key', 0, 3); // 2
 * </pre>
 */
public function zRemRangeByScore( $key, $start, $end ) {}

/**
 * @see zRemRangeByScore()
 * @param string    $key
 * @param float     $start
 * @param float     $end
 */
public function zDeleteRangeByScore( $key, $start, $end ) {}

/**
 * Deletes the elements of the sorted set stored at the specified key which have rank in the range [start,end].
 *
 * @param   string  $key
 * @param   int     $start
 * @param   int     $end
 * @return  int     The number of values deleted from the sorted set
 * @link    https://redis.io/commands/zremrangebyrank
 * @example
 * <pre>
 * $redis->zAdd('key', 1, 'one');
 * $redis->zAdd('key', 2, 'two');
 * $redis->zAdd('key', 3, 'three');
 * $redis->zRemRangeByRank('key', 0, 1); // 2
 * $redis->zRange('key', 0, -1, array('withscores' => TRUE)); // array('three' => 3)
 * </pre>
 */
public function zRemRangeByRank( $key, $start, $end ) {}

/**
 * @see zRemRangeByRank()
 * @param   string  $key
 * @param   int     $start
 * @param   int     $end
 * @link    https://redis.io/commands/zremrangebyscore
 */
public function zDeleteRangeByRank( $key, $start, $end ) {}

/**
 * Returns the cardinality of an ordered set.
 *
 * @param   string  $key
 * @return  int     the set's cardinality
 * @link    https://redis.io/commands/zsize
 * @example
 * <pre>
 * $redis->zAdd('key', 0, 'val0');
 * $redis->zAdd('key', 2, 'val2');
 * $redis->zAdd('key', 10, 'val10');
 * $redis->zCard('key');            // 3
 * </pre>
 */
public function zCard( $key ) {}

/**
 * @see zCard()
 * @param string $key
 */
public function zSize( $key ) {}

/**
 * Returns the score of a given member in the specified sorted set.
 *
 * @param   string  $key
 * @param   string  $member
 * @return  float
 * @link    https://redis.io/commands/zscore
 * @example
 * <pre>
 * $redis->zAdd('key', 2.5, 'val2');
 * $redis->zScore('key', 'val2'); // 2.5
 * </pre>
 */
public function zScore( $key, $member ) {}

/**
 * Returns the rank of a given member in the specified sorted set, starting at 0 for the item
 * with the smallest score. zRevRank starts at 0 for the item with the largest score.
 *
 * @param   string  $key
 * @param   string  $member
 * @return  int     the item's score.
 * @link    https://redis.io/commands/zrank
 * @example
 * <pre>
 * $redis->delete('z');
 * $redis->zAdd('key', 1, 'one');
 * $redis->zAdd('key', 2, 'two');
 * $redis->zRank('key', 'one');     // 0
 * $redis->zRank('key', 'two');     // 1
 * $redis->zRevRank('key', 'one');  // 1
 * $redis->zRevRank('key', 'two');  // 0
 * </pre>
 */
public function zRank( $key, $member ) {}

/**
 * @see zRank()
 * @param  string $key
 * @param  string $member
 * @return int    the item's score
 * @link   https://redis.io/commands/zrevrank
 */
public function zRevRank( $key, $member ) {}

/**
 * Increments the score of a member from a sorted set by a given amount.
 *
 * @param   string  $key
 * @param   float   $value (double) value that will be added to the member's score
 * @param   string  $member
 * @return  float   the new value
 * @link    https://redis.io/commands/zincrby
 * @example
 * <pre>
 * $redis->delete('key');
 * $redis->zIncrBy('key', 2.5, 'member1');  // key or member1 didn't exist, so member1's score is to 0
 *                                          // before the increment and now has the value 2.5
 * $redis->zIncrBy('key', 1, 'member1');    // 3.5
 * </pre>
 */
public function zIncrBy( $key, $value, $member ) {}

/**
 * Creates an union of sorted sets given in second argument.
 * The result of the union will be stored in the sorted set defined by the first argument.
 * The third optionnel argument defines weights to apply to the sorted sets in input.
 * In this case, the weights will be multiplied by the score of each element in the sorted set
 * before applying the aggregation. The forth argument defines the AGGREGATE option which
 * specify how the results of the union are aggregated.
 *
 * @param string    $Output
 * @param array     $ZSetKeys
 * @param array     $Weights
 * @param string    $aggregateFunction  Either "SUM", "MIN", or "MAX": defines the behaviour to use on
 * duplicate entries during the zUnion.
 * @return int The number of values in the new sorted set.
 * @link    https://redis.io/commands/zunionstore
 * @example
 * <pre>
 * $redis->delete('k1');
 * $redis->delete('k2');
 * $redis->delete('k3');
 * $redis->delete('ko1');
 * $redis->delete('ko2');
 * $redis->delete('ko3');
 *
 * $redis->zAdd('k1', 0, 'val0');
 * $redis->zAdd('k1', 1, 'val1');
 *
 * $redis->zAdd('k2', 2, 'val2');
 * $redis->zAdd('k2', 3, 'val3');
 *
 * $redis->zUnion('ko1', array('k1', 'k2')); // 4, 'ko1' => array('val0', 'val1', 'val2', 'val3')
 *
 * // Weighted zUnion
 * $redis->zUnion('ko2', array('k1', 'k2'), array(1, 1)); // 4, 'ko2' => array('val0', 'val1', 'val2', 'val3')
 * $redis->zUnion('ko3', array('k1', 'k2'), array(5, 1)); // 4, 'ko3' => array('val0', 'val2', 'val3', 'val1')
 * </pre>
 */
public function zUnion($Output, $ZSetKeys, array $Weights = null, $aggregateFunction = 'SUM') {}

/**
 * Creates an intersection of sorted sets given in second argument.
 * The result of the union will be stored in the sorted set defined by the first argument.
 * The third optional argument defines weights to apply to the sorted sets in input.
 * In this case, the weights will be multiplied by the score of each element in the sorted set
 * before applying the aggregation. The forth argument defines the AGGREGATE option which
 * specify how the results of the union are aggregated.
 *
 * @param   string  $Output
 * @param   array   $ZSetKeys
 * @param   array   $Weights
 * @param   string  $aggregateFunction Either "SUM", "MIN", or "MAX":
 * defines the behaviour to use on duplicate entries during the zInter.
 * @return  int     The number of values in the new sorted set.
 * @link    https://redis.io/commands/zinterstore
 * @example
 * <pre>
 * $redis->delete('k1');
 * $redis->delete('k2');
 * $redis->delete('k3');
 *
 * $redis->delete('ko1');
 * $redis->delete('ko2');
 * $redis->delete('ko3');
 * $redis->delete('ko4');
 *
 * $redis->zAdd('k1', 0, 'val0');
 * $redis->zAdd('k1', 1, 'val1');
 * $redis->zAdd('k1', 3, 'val3');
 *
 * $redis->zAdd('k2', 2, 'val1');
 * $redis->zAdd('k2', 3, 'val3');
 *
 * $redis->zInter('ko1', array('k1', 'k2'));               // 2, 'ko1' => array('val1', 'val3')
 * $redis->zInter('ko2', array('k1', 'k2'), array(1, 1));  // 2, 'ko2' => array('val1', 'val3')
 *
 * // Weighted zInter
 * $redis->zInter('ko3', array('k1', 'k2'), array(1, 5), 'min'); // 2, 'ko3' => array('val1', 'val3')
 * $redis->zInter('ko4', array('k1', 'k2'), array(1, 5), 'max'); // 2, 'ko4' => array('val3', 'val1')
 * </pre>
 */
public function zInter($Output, $ZSetKeys, array $Weights = null, $aggregateFunction = 'SUM') {}






参考资料

Redis 有序集合(sorted set) https://www.runoob.com/redis/redis-sorted-sets.html

Redis 命令参考 » SortedSet(有序集合) » ZRANGE http://doc.redisfans.com/sorted_set/zrange.html

Yii redis zset有序集合的使用 https://www.kancloud.cn/kk1181958464/yii2/2422520

php+redis实现延迟队列(订单超时未支付。会员时间过期) https://blog.csdn.net/fangkang7/article/details/82849040

Yii2 队列扩展 https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.0/zh-cn

composer yiisoft/yii2-queue https://packagist.org/packages/yiisoft/yii2-queue

github yiisoft/yii2-queue https://github.com/yiisoft/yii2-queue

yii2简单实现redis消息队列 https://blog.csdn.net/u011063477/article/details/105929688/


返回