rabbitmq如果设置了死信队列,消息过期后就会进入到死信队列。
我们可以利用死信队列这一特性,来实现延迟队列。只要给消息设置一个过期时间,消息过期就会自动进入死信队列,消费者只要监听死信队列就可以实现延迟队列了。
应用场景:订单在一段时间内未支付则自动取消
下面以一个简单的例子来讲解,设置消息过期时间为20秒,生产者生产消息20秒之后,消息会进入到死信队列,消费者监听死信队列,就可以实现延迟队列。
生产者:
<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$vhost = 'order';
$exc_name = 'exc_pay';
$routing_key = 'route_pay';
$queue_name = 'queue_pay';
$ttl = 20000;//消息过期时间,单位是毫秒
$dead_exc_name = 'dead_exc_pay';//死信交换器名称
$dead_routing_key = 'dead_route_pay';//死信routing_key
$dead_queue_name = 'dead_queue_pay';//死信队列名称
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'direct',false,false,false);
$args = new AMQPTable(['x-message-ttl'=>$ttl,'x-dead-letter-exchange'=>$dead_exc_name,'x-dead-letter-routing-key'=>$dead_routing_key]);
$channel->queue_declare($queue_name,false,true,false,false,false,$args);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
//声明死信交换器和队列
$channel->exchange_declare($dead_exc_name,'direct',false,false,false);
$channel->queue_declare($dead_queue_name,false,true,false,false,false);
$channel->queue_bind($dead_queue_name,$dead_exc_name,$dead_routing_key);
$data = 'this is dead message';
$msg = new AMQPMessage($data,['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg,$exc_name,$routing_key);
$channel->close();
$connection->close();
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$vhost = 'order';
$exc_name = 'exc_pay';
$routing_key = 'route_pay';
$queue_name = 'queue_pay';
$ttl = 20000;//消息过期时间,单位是毫秒
$dead_exc_name = 'dead_exc_pay';//死信交换器名称
$dead_routing_key = 'dead_route_pay';//死信routing_key
$dead_queue_name = 'dead_queue_pay';//死信队列名称
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'direct',false,false,false);
$args = new AMQPTable(['x-message-ttl'=>$ttl,'x-dead-letter-exchange'=>$dead_exc_name,'x-dead-letter-routing-key'=>$dead_routing_key]);
$channel->queue_declare($queue_name,false,true,false,false,false,$args);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
//声明死信交换器和队列
$channel->exchange_declare($dead_exc_name,'direct',false,false,false);
$channel->queue_declare($dead_queue_name,false,true,false,false,false);
$channel->queue_bind($dead_queue_name,$dead_exc_name,$dead_routing_key);
$data = 'this is dead message';
$msg = new AMQPMessage($data,['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg,$exc_name,$routing_key);
$channel->close();
$connection->close();
消费者:
<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$vhost = 'order';
$dead_exc_name = 'dead_exc_pay';
$dead_routing_key = 'dead_route_pay';
$dead_queue_name = 'dead_queue_pay';
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->exchange_declare($dead_exc_name,'direct',false,false,false);
$channel->queue_bind($dead_queue_name,$dead_exc_name,$dead_routing_key);
$callback = function($msg){
echo $msg->body."\n";
$msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($dead_queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$vhost = 'order';
$dead_exc_name = 'dead_exc_pay';
$dead_routing_key = 'dead_route_pay';
$dead_queue_name = 'dead_queue_pay';
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->exchange_declare($dead_exc_name,'direct',false,false,false);
$channel->queue_bind($dead_queue_name,$dead_exc_name,$dead_routing_key);
$callback = function($msg){
echo $msg->body."\n";
$msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($dead_queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
rabbitmq除了用死信队列这种方式来实现延迟队列,还可以用延迟插件来实现,延迟插件的实现方式更好,推荐使用延迟插件