likes
comments
collection
share

Fastify+TS实现基础IM服务(四)@fastify/websocket入门

作者站长头像
站长
· 阅读数 1

本文主要内容均为@fastifywebsocket入门内容,这部分可以直接去看NPM上的@fastify/websocket #readme

不同的是本文的代码示例都是TS的,如果已经有了这部分基础可以直接跳到改造js+ws聊天室代码部分

Fastify 的 WebSocket 支持,基于 ws@8 构建。

安装

npm i @fastify/websocket
# 或者
yarn add @fastify/websocket

如果你是 TypeScript 用户,这个包内置了自己的 TypeScript 类型,但你还需要安装 ws 包的类型:

npm i @types/ws -D
# 或者
yarn add -D @types/ws

使用方法

注册这个插件后,你可以选择 WS 服务器响应哪些路由。这可以通过在 fastify 的 .get 路由上添加 websocket: true 属性到 routeOptions 来实现。在这种情况下,两个参数将被传递给处理程序,socket 连接和 fastify 请求对象:

import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';

const app = fastify();

app.register(fastifyWebsocket);

app.register(async function (fastify) {
  fastify.get('/', { websocket: true }, (connection, req) => {
    connection.socket.on('message', message => {
      // message.toString() === 'hi from client'
      connection.socket.send('hi from server');
    });
  });
});

app.listen({ port: 3000 }, err => {
  if (err) {
    app.log.error(err);
    process.exit(1);
  }
});

在这种情况下,它将在每个未注册的路由上响应一个 404 错误,关闭传入的升级连接请求。

然而,你仍然可以定义一个通配符路由,它将被用作默认处理程序:

import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';

const app = fastify();

app.register(fastifyWebsocket, {
  options: { maxPayload: 1048576 }
});

app.register(async function (fastify) {
  fastify.get('/*', { websocket: true }, (connection, req) => {
    connection.socket.on('message', message => {
      // message.toString() === 'hi from client'
      connection.socket.send('hi from wildcard route');
    });
  });

  fastify.get('/', { websocket: true }, (connection, req) => {
    connection.socket.on('message', message => {
      // message.toString() === 'hi from client'
      connection.socket.send('hi from server');
    });
  });
});

app.listen({ port: 3000 }, err => {
  if (err) {
    app.log.error(err);
    process.exit(1);
  }
});

附加事件处理程序

重要的是,WebSocket 路由处理程序在处理程序执行期间同步附加事件处理程序,以避免意外丢弃消息。如果你想在你的 WebSocket 处理程序中进行任何异步工作,比如认证用户或从数据存储加载数据,请确保在触发这个异步工作之前附加任何 on('message') 处理程序。否则,消息可能会在这个异步工作进行时到达,如果没有处理程序监听这个数据,它将被默默丢弃。

下面是一个例子,展示了如何在仍然访问异步资源的同时同步附加消息处理程序。我们将一个异步事情的 promise 存储在一个局部变量中,同步附加消息处理程序,然后使消息处理程序本身异步,以获取异步数据并进行一些处理:

app.get('/*', { websocket: true }, (connection, request) => {
  const sessionPromise = request.getSession(); // 示例异步会话获取器,同步调用以返回一个 promise

  connection.socket.on('message', async (message) => {
    const session = await sessionPromise;
    // 使用消息和会话做一些事情
  });
});

使用钩子

使用 @fastify/websocket 注册的路由遵循 Fastify 插件封装上下文,因此将运行已注册的任何钩子。这意味着你可能用于认证或错误处理的普通 HTTP 处理程序的相同路由钩子也适用于 websocket 处理程序。

app.addHook('preValidation', async (request, reply) => {
  // 检查请求是否已认证
  if (!request.isAuthenticated()) {
    await reply.code(401).send("未认证");
  }
});

app.get('/', { websocket: true }, (connection, req) => {
  // 仅为认证的传入请求打开连接
  connection.socket.on('message', message => {
    // ...
  });
});

自定义错误处理程序

你可以选择性地提供一个自定义 errorHandler,它将用于处理已建立的 websocket 连接的任何清理工作。如果在建立连接后你的 websocket 路由处理程序抛出任何错误,将调用 errorHandler

import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';

const app = fastify();

app.register(fastifyWebsocket, {
  errorHandler: function (error, connection, req, reply) {
    // 做些事情
    // 销毁/关闭连接
    connection.socket.terminate();
  },
  options: {
    maxPayload: 1048576, // 我们将允许的最大消息大小设置为 1 MiB(1024 字节 * 1024 字节)
    verifyClient: function (info, next) {
      if (info.req.headers['x-fastify-header'] !== 'fastify is awesome !') {
        return next(false); // 连接不被允许
      }
      next(true); // 连接被允许
    }
  }
});

app.get('/', { websocket: true }, (connection, req) => {
  connection.socket.on('message', message => {
    // message.toString() === 'hi from client'
    connection.socket.send('hi from server');
  });
});

app.listen({ port: 3000 }, err => {
  if (err) {
    app.log.error(err);
    process.exit(1);
  }
});

自定义 preClose 钩子

默认情况下,当服务器关闭时,所有 ws 连接都将关闭。如果你希望修改这种行为,你可以传递你自己的 preClose 函数。

import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';

const app = fastify();

app.register(fastifyWebsocket, {
  preClose: (done) => { // 注意:也可以使用 async 风格,不使用 done-callback
    const server = this.websocketServer;

    for (const socket of server.clients) {
      socket.close(1001, 'WS 服务器以自定义方式离线,发送代码 + 消息');
    }

    server.close(done);
  }
});

测试

测试 ws 处理程序可能相当棘手,幸运的是 fastify-websocket 为 fastify 实例装饰了 injectWS。它允许轻松测试 websocket 端点。

import Fastify from 'fastify';
import FastifyWebSocket from '@fastify/websocket';
import ws from 'ws';

const app = Fastify();
await app.register(FastifyWebSocket);

app.get('/', { websocket: true }, (connection, req) => {
  const stream = ws.createWebSocketStream(connection.socket, { /* 选项 */ });
  stream.setEncoding('utf8');
  stream.write('hello client');
  
  stream.on('data', function (data) {
    // 确保设置了数据处理程序或以其他方式读取所有传入的数据,否则流背压将导致底层 WebSocket 对象被暂停。
  });
});

await app.listen({ port: 3000 });

App.js

import Fastify from 'fastify';
import FastifyWebSocket from '@fastify/websocket';

const App = Fastify();

App.register(FastifyWebSocket);

App.register(async function(fastify) {
  fastify.addHook('preValidation', async (request, reply) => {
    if (request.headers['api-key'] !== 'some-random-key') {
      return reply.code(401).send();
    }
  });

  fastify.get('/', { websocket: true }, (connection) => {
    connection.socket.on('message', message => {
      connection.socket.send('hi from server');
    });
  });
});

export default App;

App.test.js

import { test } from 'tap';
import Fastify from 'fastify';
import App from './app';

test('connect to /', async (t) => {
  t.plan(1);

  const fastify = Fastify();
  fastify.register(App);
  t.teardown(fastify.close.bind(fastify));

  const ws = await fastify.injectWS('/', {headers: { "api-key" : "some-random-key" }});
  let resolve;
  const promise = new Promise(r => { resolve = r; });

  ws.on('message', (data) => {
    resolve(data.toString());
  });
  ws.send('hi from client');

  t.assert(await promise, 'hi from server');
  // 记得最后关闭 ws
  ws.terminate();
});

注意!!!

  • 测试结束时需要手动关闭 Websocket。
  • 需要等待 fastify.ready() 以确保 fastify 已被装饰。
  • 如果你需要处理服务器响应,需要在发送消息之前注册事件监听器。

选项

@fastify/websocket 接受以下 ws 的选项:

  • host - 绑定服务器的主机名。
  • port - 绑定服务器的端口。
  • backlog - 等待连接的队列的最大长度。
  • server - 一个预创建的 Node.js HTTP/S 服务器。
  • verifyClient - 用于验证传入连接的函数。
  • handleProtocols - 用于处理 WebSocket 子协议的函数。
  • clientTracking - 指定是否跟踪客户端。
  • perMessageDeflate - 启用/禁用 permessage-deflate。
  • maxPayload - 允许的最大消息大小(以字节为单位)。

有关更多信息,你可以查看 ws 选项文档。

注意:默认情况下,如果你不提供 server 选项,@fastify/websocket 将把你的 websocket 服务器实例绑定到作用域内的 fastify 实例。

注意:ws 的 path 选项不应提供,因为路由由 fastify 自身处理

注意:ws 的 noServer 选项不应提供,因为 @fastify/websocket 的目的是在 fastify 服务器上监听。如果你想要一个自定义服务器,你可以使用 server 选项,如果你想要更多控制,你可以直接使用 ws 库

ws 不允许你将 objectMode 或 writableObjectMode 设置为 true

完整的 TypeScript 示例

/* eslint-disable @typescript-eslint/no-unused-vars */
import Fastify from 'fastify'
import FastifyWebsocket, { WebSocket } from '@fastify/websocket'
import { FastifyRequest, FastifyReply } from 'fastify'

// 创建 Fastify 实例
const app = Fastify()

// 注册 @fastify/websocket 插件
app.register(FastifyWebsocket, {
  options: {
    maxPayload: 1048576, // 设置允许的最大消息大小为 1MB
    verifyClient: (info, done) => {
      // 示例:客户端验证逻辑
      if (info.req.headers['x-fastify-header'] === 'fastifyrocks') {
        done(true) // 验证通过
      } else {
        done(false) // 验证失败,连接被拒绝
      }
    },
    clientTracking: true, // 启用客户端跟踪
    perMessageDeflate: true // 启用 permessage-deflate 压缩
  },
  errorHandler: (error, connection, _req, _reply) => {
    // 错误处理逻辑
    console.error('WebSocket 错误:', error)
    connection.terminate() // 终止连接
  },
  preClose: (done) => {
    // 服务器关闭前的清理逻辑
    console.log('正在关闭 WebSocket 连接')
    done() // 完成清理操作
  }
})

// 添加认证钩子
app.addHook(
  'preValidation',
  async (request: FastifyRequest, _reply: FastifyReply) => {
    if (!request.headers['authorization']) {
      throw new Error('未授权') // 抛出未授权错误
    }
  }
)

// 定义 WebSocket 路由
app.get(
  '/ws',
  { websocket: true },
  (connection: WebSocket, _req: FastifyRequest) => {
    connection.on('message', (message) => {
      console.log('收到消息:', message.toString())
      connection.send('消息已接收') // 向客户端发送响应
    })

    connection.on('close', () => {
      console.log('WebSocket 已关闭')
    })
  }
)

// 启动服务器
app.listen({ port: 3000 }, (err, address) => {
  if (err) {
    app.log.error(err)
    process.exit(1) // 遇到错误,退出程序
  }
  console.log(`服务器正在监听地址 ${address}`)
})

关键点解释

  • 注册插件:通过 app.register(FastifyWebsocket, {...}) 注册 @fastify/websocket 插件,并传递配置选项。

  • 选项配置options 对象中包含了 ws 库的配置选项,如 maxPayload, verifyClient, clientTracking, 和 perMessageDeflate

    • maxPayload:定义了 WebSocket 消息的最大大小(以字节为单位)。这是一个重要的安全特性,用于防止恶意用户通过发送大量数据来尝试耗尽服务器资源。
    • verifyClient:这是一个函数,用于在 WebSocket 握手阶段验证连接客户端。它接收两个参数:infodoneinfo 对象包含了请求相关的信息,如请求头和请求源等,而 done 是一个回调函数,用于基于验证结果接受或拒绝请求。这个验证步骤是实现自定义认证逻辑的理想场所。
    • clientTracking:当设置为 true 时,WebSocket 服务器将会跟踪连接到服务器的客户端。这使得服务器能够保持对所有活动 WebSocket 连接的引用,便于管理这些连接,如广播消息。
    • perMessageDeflate:这个选项启用了 permessage-deflate 压缩,可以减少通过 WebSocket 发送的数据的大小。这对于减少带宽使用和提高传输效率非常有用,特别是在发送大量数据时。

    除了这些选项,@fastify/websocket 插件还提供了其他几个重要的配置点:

    • errorHandler:这是一个函数,用于处理 WebSocket 连接过程中的任何错误。这允许开发者自定义错误处理逻辑,例如记录错误或关闭出现问题的连接。
    • preClose:这是一个函数,在服务器即将关闭前调用,允许进行清理工作,如优雅地关闭 WebSocket 连接。
  • 错误处理errorHandler 函数用于处理 WebSocket 连接过程中的错误。

  • 认证钩子preValidation 钩子用于在建立 WebSocket 连接之前进行认证检查。

  • WebSocket 路由:通过 app.get('/ws', {websocket: true}, handler) 定义 WebSocket 路由。在路由处理函数中,可以使用 connection.socket 来发送和接收消息。

  • 启动服务器:通过 app.listen({ port: 3000 }, callback) 启动服务器。

这个示例展示了如何在 Fastify 应用中使用 TypeScript 来集成和配置 WebSocket 功能,包括如何处理客户端消息、执行认证和错误处理。