1.服务器上安装rabbitmq
2.进入thinkphp项目目录下,使用composer安装php-amqplib库
composer require php-amqplib/php-amqplib
3.然后引入命名空间就可以使用了
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Message\AMQPMessage;
接下来我以发送邮件为例子,来讲一下thinkphp实战rabbitmq消息队列
应用场景是:当用户注册后,就给网站管理员发送一封邮件
一、生产者生产消息
下面我写了一个Register类,在用户注册成功后,发送消息给消息队列
<?php
namespace app\index\controller;
use think\Db;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Register
{
public function reg()
{
$data = input('post.');
$username = trim(input('post.username'));
$password = trim(input('post.password'));
$tel = trim(input('post.tel'));
//判断用户名是否存在
$user = Db::table('user')->where('username',$username)->find();
if($user){
return ['status'=>0,'msg'=>'用户名已存在'];
}
//判断手机号是否存在
$usertel = Db::table('user')->where('tel',$tel)->find();
if($usertel){
return ['status'=>0,'msg'=>'手机号已存在'];
}
$res = Db::table('user')->insert($data);
if($res){
//用户注册成功,将用户的用户名和手机号发给消息队列
$this->sendmsg(json_encode(['username'=>$username,'tel'=>$tel]));
return ['status'=>1,'msg'=>'注册成功'];
}else{
return ['status'=>0,'msg'=>'注册失败'];
}
}
//发送消息给消息队列
public function sendmsg($data)
{
$vhost = 'zhiboblog';//虚拟主机名,要先创建
$queue_name = 'mailer';//队列名称,可以自定义
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->queue_declare($queue_name,false,true,false,false);
$msg = new AMQPMessage($data,['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg,'',$queue_name);
$channel->close();
$connection->close();
}
}
namespace app\index\controller;
use think\Db;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Register
{
public function reg()
{
$data = input('post.');
$username = trim(input('post.username'));
$password = trim(input('post.password'));
$tel = trim(input('post.tel'));
//判断用户名是否存在
$user = Db::table('user')->where('username',$username)->find();
if($user){
return ['status'=>0,'msg'=>'用户名已存在'];
}
//判断手机号是否存在
$usertel = Db::table('user')->where('tel',$tel)->find();
if($usertel){
return ['status'=>0,'msg'=>'手机号已存在'];
}
$res = Db::table('user')->insert($data);
if($res){
//用户注册成功,将用户的用户名和手机号发给消息队列
$this->sendmsg(json_encode(['username'=>$username,'tel'=>$tel]));
return ['status'=>1,'msg'=>'注册成功'];
}else{
return ['status'=>0,'msg'=>'注册失败'];
}
}
//发送消息给消息队列
public function sendmsg($data)
{
$vhost = 'zhiboblog';//虚拟主机名,要先创建
$queue_name = 'mailer';//队列名称,可以自定义
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->queue_declare($queue_name,false,true,false,false);
$msg = new AMQPMessage($data,['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg,'',$queue_name);
$channel->close();
$connection->close();
}
}
二、消费者消费消息
首先先创建一个命令行,假如对thinkphp命令行不了解的,可以看一下文档:https://www.kancloud.cn/manual/thinkphp5_1/354146
php think make:command Mailer mailer
执行上面这行命令后,就会在application\command下面生成一个Mailer.php文件,我们就将消费者的代码写在这里
<?php
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Mailer extends Command
{
protected function configure()
{
// 指令配置
$this->setName('mailer');
// 设置参数
}
protected function execute(Input $input, Output $output)
{
// 指令输出
//$output->writeln('mailer');
$vhost = 'zhiboblog';
$queue_name = 'mailer';
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->queue_declare($queue_name,false,true,false,false);
$callback = function($msg){
// echo $msg->body."\n";
$arr = json_decode($msg->body,true);
$username = $arr['username'];
$tel = $arr['tel'];
$title = '有新的注册用户';//邮件标题
$content = '用户名:'.$username.',手机号:'.$tel;//邮件内容
$this->sendmail($title,$content);
$msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
}
protected function sendmail($title,$content)
{
//发送邮件的方法,我这里就不写了,可以自己发挥
}
}
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Mailer extends Command
{
protected function configure()
{
// 指令配置
$this->setName('mailer');
// 设置参数
}
protected function execute(Input $input, Output $output)
{
// 指令输出
//$output->writeln('mailer');
$vhost = 'zhiboblog';
$queue_name = 'mailer';
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->queue_declare($queue_name,false,true,false,false);
$callback = function($msg){
// echo $msg->body."\n";
$arr = json_decode($msg->body,true);
$username = $arr['username'];
$tel = $arr['tel'];
$title = '有新的注册用户';//邮件标题
$content = '用户名:'.$username.',手机号:'.$tel;//邮件内容
$this->sendmail($title,$content);
$msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
}
protected function sendmail($title,$content)
{
//发送邮件的方法,我这里就不写了,可以自己发挥
}
}
运行创建的命令(终端关闭的时候,这个进程也会被关闭,如果想一直运行,可以使用守护进程运行)
php think mailer
守护进程运行
//nohup表示不挂断地运行命令,&表示在后台运行,两个结合起来使用就可以守护进程运行
nohup php think mailer &
nohup php think mailer &
如果想关掉,可以先找到这个进程的id,然后杀掉这个进程
//先找到进程id
ps -ef |grep mailer
//然后杀掉进程
kill -9 进程id
ps -ef |grep mailer
//然后杀掉进程
kill -9 进程id
平常用的时候,我们守护进程运行就可以了。
这样一有新用户注册,我们就会收到邮件。
上面我省略了发送邮件的方法,发送邮件可以参考一下我的这篇文章: