PHP+Laravel框架RabbitMQ简单使用(Pub/Sub模式)
一、简介
publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。
特点:
- 每个消息可以有多个订阅者;
- 客户端只有订阅后才能接收到消息;
- 持久订阅和非持久订阅。
注意:
- 发布者和订阅者有时间依赖:接受者和发布者只有建立订阅关系才能收到消息;
- 持久订阅:订阅关系建立后,消息就不会消失,不管订阅者是否都在线;
- 非持久订阅:订阅者为了接受消息,必须一直在线。 当只有一个订阅者时约等于点对点模式
P 表示为生产者、 X 表示交换机、C1与C2 表示为消费者,红色表示队列。
通常情况下,一个条消息只要被消费一次就行了,那么什么情况下需要所有的消费者都对这条消息进行消费呢?最典型的情况就是需要在内存中对数据进行缓存,并需要实时进行更新。
例如,笔者做过一个违禁词系统,对用户输入的评论内容进行违禁词汇检测。这个违禁词系统,部署了在N台服务器上,为了提升检测性能,每台机器都会将违禁词库全量加载到内存中,词库的更新,是通过发送MQ消息来完成的。由于采用Pub/Sub模型,每台机器的consumer,都可以接收到这条消息,直接在内存中更新敏感词库即可。
我们现在希望将消息发布到我们的日志交换而不是无名的交换(emit_log.php)
二、使用Laravel的command来实现消息的生产和消费
发布者我们现在希望将消息发布到我们的日志交换而不是无名的交换 RabbitmqProducerCommand下的publish方法
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
//引入amqp扩展
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqProducerCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq_producer';//给生产者起个command名称
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
* 生产者消息代码
* @return int
*/
public function handle()
{
$this->publish();
}
/**
* 发布订阅模式的发布操作
* publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。
* Author: 李硕
* Date: 2022/5/7
* Time: 11:03
*/
public function publish()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//发布模式(fanout)
$channel->exchange_declare('logs', 'fanout', false, false, false);
$argv = [];
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "info: 我是发布者生产的消息消息!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
}
}
订阅者RabbitmqProducerCommand下的subscribe方法
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqConsumerCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq_consumer';//给消费者起个command名称
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
* @return int
*/
public function handle()
{
$this->subscribe();
}
/**
* 发布订阅模式 -订阅
* publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。
* Author: 李硕
* Date: 2022/5/7
* Time: 11:03
*/
public function subscribe()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' 我是订阅者开始进行消费[x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
三、运行截图
执行生产消息 php artisan rabbitmq_producer
执行消费消息 hp artisan rabbitmq_consumer
转载自:https://juejin.cn/post/7094827248347250696