前端眼中的消息队列(NodeJs)
前端为什么要知道消息队列
可能有同学会疑惑,一个前端为什么要去知道后端的知识?安安心心的写自己的页面逻辑和前端界面不好吗?这肯定是好的,但是是建立在只写前端代码和没有打开BOSS招聘的前提下。
现在招聘要求前端,不仅会自己的本职工作,还要会需求分析、会测试(Docker技术)、会运维(重构模块,搭一个流水线环境),NodeJs(相当于后端了)等等。
什么是消息队列
“消息队列”是在消息的传输过程中保存消息的容器
分析理解:消息队列
是一个容器,生产者将数据发送给消息队列服务器,再由消息队列服务器将数据存储到数据库,消费者在需要时从消息队列服务器获取数据,而且生产者和消费者互相不认识。
举个例子:包子铺早上将包子生产出来,很多上班的人想要购买,但是他们都想自己先买到,就会出现拥挤的情况,这时就是无消息队列的时候;如果他们都排好队,一个一个的购买,这时就是有消息队列的情况,排队买包子的队列就是消息队列。
消息队列,一般应用在商品秒杀、高铁抢票
等类似的场景。
消息队列优势
-
应用解耦
消息队列可以使消费者和生产者直接互不干涉,互不影响,只需要把消息发送到队列即可,而且可独立的扩展或修改两边的处理过程,只要能确保它们遵守同样的接口约定,可以生产者用Node.js实现,消费者用phython实现。
-
灵活性和峰值处理能力
当客户端访问量突然剧增,对服务器的访问已经超过服务所能处理的最大峰值,甚至导致服务器超时负载崩溃,使用消息队列可以解决这个问题,可以通过
控制消费者的处理速度
和生产者可进入消息队列的数量
等来避免峰值问题 -
排序保证
消息队列可以控制数据处理的顺序,因为消息队列本身使用的是队列这个数据结构,
FIFO
(先进选出),在一些场景数据处理的顺序很重要,比如商品下单顺序等。 -
异步通信
消息队列中的有些消息,并不需要立即处理,消息队列提供了异步处理机制,可以把消息放在队列中并不立即处理,需要的时候处理,或者异步慢慢处理,一些不重要的发送短信和邮箱功能可以使用。
这几个优势都很重要,但是一般异步处理提高系统性能(削峰、减少响应所需时间)和 降低系统耦合性这两个优势,最为体现。
Node使用rabbitMQ
rabbitMQ是主流的消息队列
使用前要安装rabbitMQ
在node项目中,使用amqplib工具来管理消息队列,让我们能更好的使用消息队列。
执行 npm
命令,可以快速安装 amqplib
$ npm install amqplib
Node中应用amqplib
初次体验
生产者代码 product.js
const amqp =require('amqplib');
async function product(params) {
// 1.创建链接对象
const connect =await amqp.connect('amqp://localhost:5672');
// 1. 创建链接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 2. 获取通道
const channel = await connection.createChannel();
// 3. 声明参数
const routingKey = 'helloKoalaQueue';
const msg = 'hello koala';
for (let i=0; i<10000; i++) {
// 4. 发送消息
await channel.publish('', routingKey, Buffer.from(`${msg} 第${i}条消息`));
}
// 5. 关闭通道
await channel.close();
// 6. 关闭连接
await connect.close();
}
product();
生产者代码运行结果:
消费者代码 consumer.js
// 构建消费者
const amqp = require('amqplib');
async function consumer() {
// 1. 创建链接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 2. 获取通道
const channel = await connection.createChannel();
// 3. 声明参数
const queueName = 'helloKoalaQueue';
// 4. 声明队列,交换机默认为 AMQP default
await channel.assertQueue(queueName);
// 5. 消费
await channel.consume(queueName, msg => {
console.log('Consumer:', msg.content.toString());
channel.ack(msg);
});
}
consumer();
消费者代码运行结果:
实际运用体验
运用场景:消息的队列保存传递过来的用户组信息,在保存和删除用户的时候进行消费
保存用户组
import { Application } from 'egg';
import { User } from '../../database/entity/user';
async save(app: Application, data: any, ack: any) {
try {
if (data.state === 'FROZEN') {
await this.delete(app, data, ack);
return;
}
// 创建和更新用户
await app.basicModel.transaction(async transManager => {
const targetUser = await this.$select(transManager, data);
// 用于更新和创建用户
const user = transManager.getRepository(User).create({
email: data.email,
name: data.name,
phone: data.phone,
});
// 更新用户信息
await transManager.save(user);
}
ack(true);
} catch (error) {
// 消费失败
ack(false);
}
}
删除用户组
import { Application } from 'egg';
import { User } from '../../database/entity/user';
async delete(app: Application, data: any, ack: any) {
try {
await app.basicModel.transaction(async transManager => {
const user = await this.$select(transManager, data);
// 未找到用户
if (!user) {
ack(true);
return;
}
// 删除用户
await transManager.update(User, {
id: user.id,
});
});
// 消费成功
ack(true);
} catch (e) {
// 消费失败
ack(false);
}
}
import { Application } from 'egg';
import handler from './handler';
// 导出一个默认的异步函数,该函数接收一个名为app的Application对象
export default async (app: Application) => {
// 当队列中有消息时,将会执行回调函数
await rabbitReceive(queueName, (content: any, ack: any) => {
// 解构消息内容(content)
const { businessType, action, data } = content;
// 判断handler对象中是否存在businessType属性对应的处理函数,如果存在,则调用该处理函数
// 防止出现没有对应处理逻辑的消息,造成报错
if (handler[businessType]) {
// 调用handler对象中businessType属性对应的处理函数中的action方法,并将app、data和ack三个参数传入
handler[businessType][action](app, data, ack);
} else {
// 如果不存在对应的处理函数,则调用ack方法,消费失败
ack(false);
}
})
}
消息内容(content),消费者接收确定消息(ack)
转载自:https://juejin.cn/post/7246596616693432378