PHP和golang在开发中使用RabbitMQ(路由模式)整理RabbitMQ的架构基于生产者-消费者模型,通过队列(
说明所使用的本地环境mac, php版本>=5.4,第三方包 php-amqplib版本2.9.2; go版本1.18.10,第三方包 github.com/streadway/amqp v1.0.0
一、关于 RabbitMQ
RabbitMQ 官网:www.rabbitmq.com/ RabbitMQ 是一个 由Erlang语言开发的 AMQP 的开源实现
AMQP:Advanced Message Queue,高级消息队列协议。 它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ的架构基于生产者-消费者模型,通过队列(Queue)来实现消息的存储和转发。生产者(Producer)将消息发送到队列中,而消费者(Consumer)则从队列中取出并处理这些消息。RabbitMQ还引入了交换机(Exchange)和路由键(Routing Key)等概念,以实现更加灵活和复杂的消息路由和分发机制。
消息队列在实际应用中常用的使用场景。场景分为异步处理
、应用解耦
、流量削峰
和消息通讯
四个场景。
ps:在我们的业务中主要是发短信等消息处理。
二、本地安装
1、使用 brew 安装
brew install rabbitmq
安装的过程会出现类似的一条信息:
🍺 /usr/local/Cellar/rabbitmq/3.13.7: 1,538 files, 36.2MB
从这里可以看到当前安装的版本和路径
2、启动RabbitMQ服务
brew services start rabbitmq
或
sudo rabbitmq-server -detached
你可以通过访问 http://localhost:15672 并使用默认的用户名和密码(都是 guest)来访问RabbitMQ管理界面。
-
connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况。
-
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
-
Exchanges:交换机,用来实现消息的路由。
-
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
3、查看状态
rabbitmqctl status
4、关闭后台
rabbitmqctl stop
三、项目应用
RabbitMQ共有六种工作模式: 简单模式(Simple) 工作队列模式(Work Queue) 发布订阅模式(Publish/Subscribe) 路由模式(Routing) 通配符模式(Topics) 远程调用模式(RPC,不常用,不对此模式进行讲解)
下面的项目代码以路由模式开发参考。
1、php开发项目应用
服务连接配置:
protected static $config => [
'host' => 'localhost',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/',
];
① 安装php-amqplib
去github下载或者使用composer
composer require php-amqplib/php-amqplib
② 代码封装
注:具体的使用可参照其文档,以下代码仅供参考。
// 生产者封装
public function rabbitMqPush($data,$queueName="",$exchangeName="",$routingKey="",$exchangeType = "direct",$durable = true){
try {
// 连接到 RabbitMQ
$connect = new AMQPStreamConnection(self::$config['host'],self::$config['port'],self::$config['login'],self::$config['password'],self::$config['vhost']);
// 打开一个通道
$channel = $connect->channel();
// 声明交换器
// 参数:
// exchange 交换器名字
// type 交换器类型
// passive false 是否检测同名队列
// durable false 交换机是否开启持久化
// auto_detlete false 通道关闭后是否删除队列
// 其中type交换器类型: direct(默认),fanout, topic, 和headers
// Direct:直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
// fanout:广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
// topic:主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
// headers:消息体的header匹配(ignore)
$channel->exchange_declare($exchangeName,$exchangeType,false,$durable,false);
// 声明队列
$channel->queue_declare($queueName,false,$durable,false,false);
// 绑定队列到交换器
$channel->queue_bind($queueName,$exchangeName,$routingKey);
// 发送消息
$data = trim(strval($data));
// AMQPMessage($body,$properties)的第二个参数 $properties 根据其定义的格式自己配置即可,
// 其中application_headers是AMQPTable类型,可自定义参数 key 和 value。
$msg = new AMQPMessage($data,['application_headers' => new AMQPTable(['retryTime' => 0]), // header 自定义 retryTime:重试次数
'timestamp' => time(),// 时间戳
'content_type' => 'application/json',//内容类型
'app_id'=>"test", // 自定义项目标识
]);
// publish
$channel->basic_publish($msg,$exchangeName,$routingKey);
// 关闭信道
$channel->close();
return true;
} catch (\Exception $e) {
// 自己的catch逻辑
// ……
return false;
}
}
// 消费者封装
public function rabbitMqConsume($callback,$consumerTag="",$queueName="",$exchangeName="",$routingKey="",$exchangeType = "direct",$durable = true)
{
try {
// 连接到 RabbitMQ
$connect = new AMQPStreamConnection(self::$config['host'],self::$config['port'],self::$config['login'],self::$config['password'],self::$config['vhost']);
// 打开一个通道
$channel = $connect->channel();
// 声明交换器
// 参数:同上
$channel->exchange_declare($exchangeName,$exchangeType,false,$durable,false);
// 声明队列
$channel->queue_declare($queueName,false,$durable,false,false);
// 绑定队列到交换器
$channel->queue_bind($queueName,$exchangeName,$routingKey);
// 消费消息
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, $consumerTag, false, false, false, false,$callback);
// 等待并接收消息直到程序退出
while ($channel->is_consuming()) {
$channel->wait();
}
return true;
} catch (\Exception $e) {
// 自己的catch逻辑
// ……
return false;
}
}
// 回调函数,当接收到消息时会被调用
$callback = function ($message) use ($queueName,$exchangeName,$routeKey) {
$this->consumerCallback($message,$queueName,$exchangeName,$routeKey);
};
// 消费者回调函数
private function consumerCallback($message = null,$queueName = "",$exchangeName = "",$routeKey = "")
{
//获取队列内容信息
$msg = $message->body;
$get_properties = $message->get_properties();
$headersObject = $message->get_properties()['application_headers'];
$headersArray = $headersObject ? $headersObject->getNativeData() : [];
LogService::save("consumerCallback headersArray:" . json_encode($headersArray,true) . PHP_EOL);
//校验信息数据格式是否正确
if (!isJson($msg)) {
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); //手动发送ACK应答
return;
}
//校验信息数据格式是否正确
$data = json_decode($msg,true);
if (!isset($data['data']) || !isset($data['url'])) {
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); //手动发送ACK应答
return;
}
$system = !empty($data['system']) ?$data['system']: [];
// 可供参考:加redis 防止 重复消息数据
// $data_msg_id = !empty($system['data_msg_id']) ? $system['data_msg_id'] :"";
// if($data_msg_id && $this->isExistsMqMsgID($data_msg_id)){
// $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); //手动发送ACK应答
// return;
// }
$retryMax = !empty($system['retry']) ? $system['retry'] : 3; // 最大重试次数
$retryTime = (int)$headersArray['retryTime'] ?: 0; // 获取消息头中的重试次数
try {
// 消费处理
//……
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); //手动发送ACK应答
} catch (\Exception $ex) {
$retryTime++;
// $this->deleteMqMsgID($data_msg_id); // 如果捕获到错误 删除 redis
sleep($retryTime);
if ($retryTime <= $retryMax) {
$get_properties['application_headers'] = new AMQPTable(['retryTime' => $retryTime]);
$msgNew = new AMQPMessage($msg,$get_properties);
$message->delivery_info['channel']->basic_publish($msgNew,$exchangeName,$routeKey); // 重新入队
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // 应答消息
} else {
// 超过最大次数的处理
// ……
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // 应答消息
// ……
}
}
}
tips: 上面使用的函数isJson:
function isJson($json_string){
$test=json_decode($json_string);
return (json_last_error() == JSON_ERROR_NONE);
}
在消费者回调代码块中,使用了防止消费失败的重试机制,重试3次,失败则重新把消息放入队列。超过3次则单独处理。
basic_qos
是用来设置消费者的预取计数(prefetch count)的函数。预取计数指的是消费者在收到确认之前可以接收的未确认消息的最大数量。
通过设置预取计数,你可以控制 RabbitMQ 向消费者传递的消息数量,以确保消息在消费者端有序、逐个处理。这有助于平衡系统负载,防止某一消费者一次性接收到过多的消息。
$channel->basic_qos($prefetch_size, $prefetch_count, $a_global)
基本语法如下:
$prefetch_size:预取计数的大小,通常为0(表示未设置)。
$prefetch_count:消费者在未确认消息之前可以接收的最大消息数量。
$global:如果设置为 true,则预取计数将应用于整个信道,而不是每个消费者。
这样可以确保每个消费者在确认之前只会接收一条消息,从而确保了消息的顺序处理。
2、golang开发项目应用
注:以下为说明使用的demo,不做实际开发使用,仅供参考。
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
// 生产者封装
func sendMessage( queueName, exchangeName, routingKey, message string) {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 打开一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明交换器
err = ch.ExchangeDeclare(
exchangeName, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 声明队列
// _, err = mq.Channel.QueueDeclarePassive(queueName, true, false, false, true, nil)
_, err = mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
queueName, // 队列名
true, // 是否持久化
false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
false, // 是否阻塞
nil, // 额外属性(我会用)
)
failOnError(err, "Failed to declare a queue")
// 绑定队列到交换器
err = ch.QueueBind(
queueName, // 绑定的队列名称
routingKey, // bindkey 用于消息路由分发的key
exchangeName, // 绑定的exchange名
false, // 是否阻塞
nil, // 额外属性
)
// 发送消息
err = ch.Publish(
exchangeName, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Timestamp: time.Now(),
AppId: "test",
Headers: amqp.Table{
"retryTime": 0,
},
Body: []byte(message),
})
failOnError(err, "Failed to publish a message")
}
// 消费者封装
func consumeMessage(queueName, exchangeName, routingKey string) {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 打开一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明交换器
err = ch.ExchangeDeclare(
exchangeName, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 声明队列
// _, err = mq.Channel.QueueDeclarePassive(queueName, true, false, false, true, nil)
_, err = mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
queueName, // 队列名
true, // 是否持久化
false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
false, // 是否阻塞
nil, // 额外属性(我会用)
)
failOnError(err, "Failed to declare a queue")
// 绑定队列到交换器
err = ch.QueueBind(
queueName, // 绑定的队列名称
routingKey, // bindkey 用于消息路由分发的key
exchangeName, // 绑定的exchange名
false, // 是否阻塞
nil, // 额外属性
)
// 消费消息
sgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
tips: QueueDeclarePassive 和 QueueDeclare 都是用于声明(或检查)消息队列的方法,但它们有一些重要的区别: QueueDeclarePassive 是一种检查动作,它不会真正创建队列,而是用于检查队列是否存在。如果存在,它会返回队列的相关信息,如果不存在,它会抛出异常。 QueueDeclare 是一种声明动作,它会创建一个新的队列,如果队列已经存在,它会根据参数选项进行更新。
3、测试数据
测试数据,以一个订单为例:
{"data":{"order_no":"YZVisit64c72e944a7","order_from_type":"yunzhen_visit","order_type":2,"handle_type":"change_time","message_type":512,"message_param":"{\"appointed_time\":1725328800,\"appointed_time_old\":1725075998}","time_stamp":1725247027,"cuid":0},"url":"","system":{"mail":3,"data_msg_id":"test_6c08a5762fe1e5aba440521b817282f6"}}
json格式的消息,data
是数据部分。在 system
中定义了自己业务使用的一些字段数据,data_msg_id
是生产者的消息id,可根据自己的业务需要自己设置和处理。
如有问题请在下方留言。
或关注我的公众号“
孙三苗
”(sunsanmiao
),输入“联系方式
”。获得进一步帮助。
转载自:https://juejin.cn/post/7409580670232739903