likes
comments
collection
share

前端眼中的消息队列(NodeJs)

作者站长头像
站长
· 阅读数 21

前端为什么要知道消息队列

可能有同学会疑惑,一个前端为什么要去知道后端的知识?安安心心的写自己的页面逻辑和前端界面不好吗?这肯定是好的,但是是建立在只写前端代码和没有打开BOSS招聘的前提下。

现在招聘要求前端,不仅会自己的本职工作,还要会需求分析、会测试(Docker技术)、会运维(重构模块,搭一个流水线环境),NodeJs(相当于后端了)等等。

什么是消息队列

“消息队列”是在消息的传输过程中保存消息的容器

分析理解:消息队列是一个容器,生产者将数据发送给消息队列服务器,再由消息队列服务器将数据存储到数据库,消费者在需要时从消息队列服务器获取数据,而且生产者和消费者互相不认识。

前端眼中的消息队列(NodeJs)

举个例子:包子铺早上将包子生产出来,很多上班的人想要购买,但是他们都想自己先买到,就会出现拥挤的情况,这时就是无消息队列的时候;如果他们都排好队,一个一个的购买,这时就是有消息队列的情况,排队买包子的队列就是消息队列。

消息队列,一般应用在商品秒杀、高铁抢票等类似的场景。

消息队列优势

  • 应用解耦

    消息队列可以使消费者和生产者直接互不干涉,互不影响,只需要把消息发送到队列即可,而且可独立的扩展或修改两边的处理过程,只要能确保它们遵守同样的接口约定,可以生产者用Node.js实现,消费者用phython实现。

前端眼中的消息队列(NodeJs)

  • 灵活性和峰值处理能力

    当客户端访问量突然剧增,对服务器的访问已经超过服务所能处理的最大峰值,甚至导致服务器超时负载崩溃,使用消息队列可以解决这个问题,可以通过控制消费者的处理速度生产者可进入消息队列的数量等来避免峰值问题

  • 排序保证

    消息队列可以控制数据处理的顺序,因为消息队列本身使用的是队列这个数据结构,FIFO(先进选出),在一些场景数据处理的顺序很重要,比如商品下单顺序等。

  • 异步通信

    消息队列中的有些消息,并不需要立即处理,消息队列提供了异步处理机制,可以把消息放在队列中并不立即处理,需要的时候处理,或者异步慢慢处理,一些不重要的发送短信和邮箱功能可以使用。

前端眼中的消息队列(NodeJs)

这几个优势都很重要,但是一般异步处理提高系统性能(削峰、减少响应所需时间)和 降低系统耦合性这两个优势,最为体现。

Node使用rabbitMQ

rabbitMQ是主流的消息队列

使用前要安装rabbitMQ

在node项目中,使用amqplib工具来管理消息队列,让我们能更好的使用消息队列。

执行 npm 命令,可以快速安装 amqplib

$ npm install amqplib

Node中应用amqplib

初次体验

生产者代码 product.js

const amqp =require('amqplib');

async function  product(params) {
    // 1.创建链接对象
    const connect =await amqp.connect('amqp://localhost:5672');
     // 1. 创建链接对象
     const connection = await amqp.connect('amqp://localhost:5672');

     // 2. 获取通道
     const channel = await connection.createChannel();
 
     // 3. 声明参数
     const routingKey = 'helloKoalaQueue';
     const msg = 'hello koala';
 
     for (let i=0; i<10000; i++) {
         // 4. 发送消息
         await channel.publish('', routingKey, Buffer.from(`${msg}${i}条消息`));
     }
 
     // 5. 关闭通道
     await channel.close();
     // 6. 关闭连接
     await connect.close();
}
product();

生产者代码运行结果:

前端眼中的消息队列(NodeJs)

消费者代码 consumer.js

// 构建消费者
const amqp = require('amqplib');

async function consumer() {
    // 1. 创建链接对象
    const connection = await amqp.connect('amqp://localhost:5672');

    // 2. 获取通道
    const channel = await connection.createChannel();

    // 3. 声明参数
    const queueName = 'helloKoalaQueue';
  
    // 4. 声明队列,交换机默认为 AMQP default
    await channel.assertQueue(queueName);

    // 5. 消费
    await channel.consume(queueName, msg => {
        console.log('Consumer:', msg.content.toString());
        channel.ack(msg);
    });
}
consumer();

消费者代码运行结果:

前端眼中的消息队列(NodeJs)

实际运用体验

运用场景:消息的队列保存传递过来的用户组信息,在保存和删除用户的时候进行消费

保存用户组

import { Application } from 'egg';
import { User } from '../../database/entity/user';

async save(app: Application, data: any, ack: any) {
    try {
      if (data.state === 'FROZEN') {
        await this.delete(app, data, ack);
        return;
      }
      // 创建和更新用户
      await app.basicModel.transaction(async transManager => {
        const targetUser = await this.$select(transManager, data);
        // 用于更新和创建用户
        const user = transManager.getRepository(User).create({
          email: data.email,
          name: data.name,
          phone: data.phone,
        });

        // 更新用户信息
        await transManager.save(user);
      }
      ack(true);
    } catch (error) {
      // 消费失败
      ack(false);
    }
  }

删除用户组

import { Application } from 'egg';
import { User } from '../../database/entity/user';

async delete(app: Application, data: any, ack: any) {
    try {
      await app.basicModel.transaction(async transManager => {
        const user = await this.$select(transManager, data);
        // 未找到用户
        if (!user) {
          ack(true);
          return;
        }
        // 删除用户
        await transManager.update(User, {
          id: user.id,
        });
      });
      // 消费成功
      ack(true);
    } catch (e) {
      // 消费失败
      ack(false);
    }
  }
import { Application } from 'egg';
import handler from './handler';

// 导出一个默认的异步函数,该函数接收一个名为app的Application对象
export default async (app: Application) => {
  // 当队列中有消息时,将会执行回调函数
  await rabbitReceive(queueName, (content: any, ack: any) => {
    // 解构消息内容(content)
    const { businessType, action, data } = content;
    // 判断handler对象中是否存在businessType属性对应的处理函数,如果存在,则调用该处理函数
    // 防止出现没有对应处理逻辑的消息,造成报错
    if (handler[businessType]) {
      // 调用handler对象中businessType属性对应的处理函数中的action方法,并将app、data和ack三个参数传入
      handler[businessType][action](app, data, ack);
    } else {
      // 如果不存在对应的处理函数,则调用ack方法,消费失败
      ack(false);
    }
  })
}

消息内容(content),消费者接收确定消息(ack)