PHP+Laravel框架RabbitMQ简单使用(工作队列模式(竞争消费者模式))
一、简介
工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式
假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询。
假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。
举一个例子
一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,那怎么办呢?
公平分发
案例中发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量
因此我们可以使用 basicQos 方法,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。
二、使用Laravel的command来实现消息的生产和消费
生产者(老板) RabbitmqProducerCommand下的task方法
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
//引入amqp扩展
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqProducerCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq_producer';//给生产者起个command名称
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
* 生产者消息代码
* @return int
*/
public function handle()
{
$this->task();
}
/**
* Point-to-Point,点对点通信模型。PTP是基于队列(Queue)的,一个队列可以有多个生产者,和多个消费者。消息服务器按照收到消息的先后顺序,将消息放到队列中。队列中的每一条消息,只能由一个消费者进行消费,消费之后就会从队列中移除。
* Author: 李硕
* Date: 2022/5/7
* Time: 09:26
* @throws \Exception
*/
public function ptp()
{
//创建服务器连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
//连接信道
//信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的
//信道是建立在TCP上面的虚拟链接,也就是rabbitMQ在一个TCP上面建立成百上千的信道来达到多个线程处理。
//注意是一个TCP 被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。
$channel = $connection->channel();
//channel->queue_declare通过信道创建一个是否是持久化的消息队列
//queue第一个参数代表消息队列名称
$channel->queue_declare('test', false, false, false, false);
//往队列里要发送内容,待发送的内容
$msg = new AMQPMessage('我是一个生产者消息');
//通过信道来进行生产消息
//而exchange是怎么知道消息应该推到哪个queue呢,这就要通过绑定queue与exchange时的routingkey了,通过代码进行绑定并且指定routingkey,下面有一张关系图,p(发布者) —> x(exchange) bindding(绑定关系也就是我们的routingkey) 红色代表着queue
$channel->basic_publish($msg, '', 'test');
echo " [x] Sent '我是一个生产者消息!'\n";
//关闭信道
$channel->close();
//关闭连接
$connection->close();
}
/**
* 工作队列模式(竞争消费者模式)
* Author: 李硕
* Date: 2022/5/7
* Time: 09:39
* @throws \Exception
*/
public function task()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//确保队列能够在 RabbitMQ 节点重启后继续存在,第三个参数设置为true
$channel->queue_declare('task_queue', false, true, false, false);
$argv = [];
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
//消息标记为持久
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
}
}
消费者(工人) RabbitmqProducerCommand下的task方法
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqConsumerCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq_consumer';//给消费者起个command名称
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
* @return int
*/
public function handle()
{
$this->task();
}
/**
* Point-to-Point,点对点通信模型。PTP是基于队列(Queue)的,一个队列可以有多个生产者,和多个消费者。消息服务器按照收到消息的先后顺序,将消息放到队列中。队列中的每一条消息,只能由一个消费者进行消费,消费之后就会从队列中移除。
* Author: 李硕
* Date: 2022/5/7
* Time: 09:26
* @throws \Exception
*/
public function ptp()
{
//创建服务器连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
//连接信道
//信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的
//信道是建立在TCP上面的虚拟链接,也就是rabbitMQ在一个TCP上面建立成百上千的信道来达到多个线程处理。
//注意是一个TCP 被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。
$channel = $connection->channel();
//channel->queue_declare通过信道创建一个是否是持久化的消息队列
//queue第一个参数代表消息队列名称
$channel->queue_declare('test', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
//进行监听消费者是否有消息,如果有进行输出消息内容
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};
//通过信道进行消费消息
$channel->basic_consume('test', '', false, true, false, false, $callback);
//如果信道是打开状态
while ($channel->is_open()) {
//然后让信道一直处于监听等待状态
$channel->wait();
}
//关闭信道
$channel->close();
//关闭连接
$connection->close();
}
/**
* 工作队列模式(竞争消费者模式)
* Author: 李硕
* Date: 2022/5/7
* Time: 09:39
* @throws \Exception
*/
public function task()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->ack();
};
//公平调度
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
三、运行截图
执行生产消息 php artisan rabbitmq_producer
执行消费消息 hp artisan rabbitmq_consumer
转载自:https://juejin.cn/post/7094824302368948254