正文
经常见到的“请在30分钟内付款,否则取消订单”,这里就是多用延迟队列实现的,下面看一下RabbitMQ延迟队列的实现。
延迟队列声明:
// 交换机名称
$exchange = 'delay';
// 路由名称
$routing = 'delay_ticket';
$setting[
"queue" => "queue_delay_ticket",
"x_exchange" => "async",
"x_routing" => "queue_ticket",
];
// RabbitMQ配置信息
$setting => [
'host' => 'dev.rabbitmq.demo.com',
'port' => '5672',
'login' => 'admin',
'password' => '123+-*abc'
],
// RabbitMQ连接
$conn = new \AMQPConnection($setting);
$channel = new \AMQPChannel($conn);
$ex = new \AMQPExchange($channel);
$ex->setName($exchange);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
$q = new \AMQPQueue($channel);
$q->setName($setting['queue']);
$q->setFlags(AMQP_DURABLE);
$q->setArguments([
'x-dead-letter-exchange' => $setting['x_exchange'],
'x-dead-letter-routing-key' => $setting['x_routing']
]);
$q->declareQueue();
$q->bind($exchange, $routing);
信息推送到延迟队列:
$message = [
'ticket_id' => 10102
];
$message = json_encode($message);
// 消息生存时间(1000 = 1s)
$ttl = 30 * 60 * 1000;
// 交换机名称
$exchange = 'delay';
// 路由名称
$routing = 'delay_ticket';
// RabbitMQ配置信息
$setting => [
'host' => 'dev.rabbitmq.demo.com',
'port' => '5672',
'login' => 'admin',
'password' => '123+-*abc'
],
// RabbitMQ连接
$conn = new \AMQPConnection($setting);
$channel = new \AMQPChannel($conn);
$ex = new \AMQPExchange($channel);
$ex->setName($exchange);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
$argument = [
'delivery_mode' => 2,
'expiration' => $ttl
];
if ($ex->publish($message, $routing, 1, $argument)) {
echo 'succ';
}
echo 'end';
例如我们在消费者中的配置是:
'CTicket' => [
'queue' => 'queue_ticket',
'exchange' => 'async',
'routing' => 'ticket'
],
'DTicket' => [
'queue' => 'queue_delay_ticket',
'exchange' => 'delay',
'routing' => 'delay_ticket',
'x_exchange' => 'async',
'x_routing' => 'ticket'
],
延迟队列消费者运行后,过了信息生存期后,会把delay_ticket路由到x_routing指向的ticket路由队列中, 然后由ticket路由的消费者来进行后续处理。
延迟队列并不是消费,而是对队列服务器上延迟队列绑定关系的声明,所以只需要跑一次脚本就可以了,不需要常驻运行。
循环实现
不过上面的延迟队列中,每个只能有一种延迟时长,如果前面的一条队列数据没超时变为死信,则会阻塞后面的队列数据。
除了新建新的延迟时长的延迟队列,我们还有一种可通用的解决思路:新建1个实时队列 、 3个延迟队列(1小时、1分钟、1秒),
这3个延迟队列的死信都发送到上面的实时队列中。我们首先把要处理的数据推到 实时队列,实时队列消费者根据消息体中发送时间选择推送到哪一个延迟队列,
延迟队列数据过了时长再发送到实时队列,实时队列再根据距离时间选择推送到哪一个延迟队列,直到时间到了这条信息的处理时间,再把这条信息推送到目标队列。
如A项目中有一条信息需要在2小时15分23秒后处理,A项目先把信息推送到 实时队列loop_transfer_wait_data,
实时队列loop_transfer_wait_data
的消费者发现这条信息的处理截止时间大于1小时,
则把这条信息转发到 1小时延迟队列delay_loop_transfer_hour_x
中,等1小时后这条信息成为死信又进入 实时队列loop_transfer_wait_data
,
实时队列loop_transfer_wait_data
的消费者发现这条信息的处理截止时间大于1小时,再来一次循环,
实时队列loop_transfer_wait_data
的消费者发现这条信息的处理截止时间大于1分钟,则把这条信息转发到 1分钟延迟队列delay_loop_transfer_minute_x
中,
等1分钟后这条信息成为死信又进入 实时队列loop_transfer_wait_data
,实时队列loop_transfer_wait_data
的消费者发现这条信息的处理截止时间大于1分钟,再次循环,
经历15次循环后,实时队列loop_transfer_wait_data
的消费者发现这条信息的处理截止时间大于1秒,则把这条信息转发到 1秒延迟队列delay_loop_transfer_second_x
中,
等1秒后这条信息成为死信又进入 实时队列loop_transfer_wait_data
,实时队列loop_transfer_wait_data
的消费者发现这条信息的处理截止时间大于1秒,再次循环,
经历23次循环后,实时队列loop_transfer_wait_data
的消费者发现这条信息的处理截止时间就是现在,再把这条信息推送到我们的目标队列。
下面是代码实例,购物信息1.5小时优惠截止:
$msg = ["shipping_process_id" => $shipping_process_id, "seconds" => 1.5 * 60 * 60];
$data = [
"transfer_exchange" => "async",
"transfer_routing" => "shipping_confirm",
"transfer_message" => $msg,
"transfer_time" => time() + (1.5 * 60 * 60),
];
Queue::produce($data, 'async', 'loop_transfer_wait_data');
loop_transfer_wait_data
消费者:
$now = time();
if ($data['transfer_time'] <= $now) {
return Queue::produce($data['transfer_message'], $data['transfer_exchange'], $data['transfer_routing']);
} else {
if (!isset($data['transfer_id'])) {
$data['transfer_id'] = uniqid() . mt_rand(100000, 999999); //标识同一条轮转消息
$data['transfer_count'] = 1;
} else {
$data['transfer_count'] ++;
}
$wait = $data['transfer_time'] - $now;
$hour = floor($wait / (60 * 60));
if ($hour > 0) {
// 转至延迟队列等待,以1小时为维度
return Queue::produceTtl($data, 'delay', 'delay_loop_transfer_hour_x', 60 * 60 * 1000);
}
$minute = floor($wait / 60);
if ($minute > 0) {
// 转至延迟队列等待,以1分钟为维度
return Queue::produceTtl($data, 'delay', 'delay_loop_transfer_minute_x', 60 * 1000);
}
// 转至延迟队列等待,以1秒为维度
return Queue::produceTtl($data, 'delay', 'delay_loop_transfer_second_x', 1000);
}
参考资料
RabbitMQ的几种应用场景 https://blog.csdn.net/zyz511919766/article/details/41946521
rabbitmq 实现延迟队列的两种方式 https://blog.csdn.net/u014308482/article/details/53036770
rabbitmq 延迟队列的实现 https://blog.csdn.net/u012119576/article/details/74677835