Fastify+TS实现基础IM服务(四)@fastify/websocket入门
本文主要内容均为@fastifywebsocket入门内容,这部分可以直接去看NPM上的@fastify/websocket #readme
不同的是本文的代码示例都是TS的,如果已经有了这部分基础可以直接跳到改造js+ws聊天室代码部分
Fastify 的 WebSocket 支持,基于 ws@8 构建。
安装
npm i @fastify/websocket
# 或者
yarn add @fastify/websocket
如果你是 TypeScript 用户,这个包内置了自己的 TypeScript 类型,但你还需要安装 ws 包的类型:
npm i @types/ws -D
# 或者
yarn add -D @types/ws
使用方法
注册这个插件后,你可以选择 WS 服务器响应哪些路由。这可以通过在 fastify 的 .get
路由上添加 websocket: true
属性到 routeOptions
来实现。在这种情况下,两个参数将被传递给处理程序,socket 连接和 fastify 请求对象:
import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';
const app = fastify();
app.register(fastifyWebsocket);
app.register(async function (fastify) {
fastify.get('/', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server');
});
});
});
app.listen({ port: 3000 }, err => {
if (err) {
app.log.error(err);
process.exit(1);
}
});
在这种情况下,它将在每个未注册的路由上响应一个 404 错误,关闭传入的升级连接请求。
然而,你仍然可以定义一个通配符路由,它将被用作默认处理程序:
import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';
const app = fastify();
app.register(fastifyWebsocket, {
options: { maxPayload: 1048576 }
});
app.register(async function (fastify) {
fastify.get('/*', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from wildcard route');
});
});
fastify.get('/', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server');
});
});
});
app.listen({ port: 3000 }, err => {
if (err) {
app.log.error(err);
process.exit(1);
}
});
附加事件处理程序
重要的是,WebSocket 路由处理程序在处理程序执行期间同步附加事件处理程序,以避免意外丢弃消息。如果你想在你的 WebSocket 处理程序中进行任何异步工作,比如认证用户或从数据存储加载数据,请确保在触发这个异步工作之前附加任何 on('message') 处理程序。否则,消息可能会在这个异步工作进行时到达,如果没有处理程序监听这个数据,它将被默默丢弃。
下面是一个例子,展示了如何在仍然访问异步资源的同时同步附加消息处理程序。我们将一个异步事情的 promise 存储在一个局部变量中,同步附加消息处理程序,然后使消息处理程序本身异步,以获取异步数据并进行一些处理:
app.get('/*', { websocket: true }, (connection, request) => {
const sessionPromise = request.getSession(); // 示例异步会话获取器,同步调用以返回一个 promise
connection.socket.on('message', async (message) => {
const session = await sessionPromise;
// 使用消息和会话做一些事情
});
});
使用钩子
使用 @fastify/websocket 注册的路由遵循 Fastify 插件封装上下文,因此将运行已注册的任何钩子。这意味着你可能用于认证或错误处理的普通 HTTP 处理程序的相同路由钩子也适用于 websocket 处理程序。
app.addHook('preValidation', async (request, reply) => {
// 检查请求是否已认证
if (!request.isAuthenticated()) {
await reply.code(401).send("未认证");
}
});
app.get('/', { websocket: true }, (connection, req) => {
// 仅为认证的传入请求打开连接
connection.socket.on('message', message => {
// ...
});
});
自定义错误处理程序
你可以选择性地提供一个自定义 errorHandler
,它将用于处理已建立的 websocket 连接的任何清理工作。如果在建立连接后你的 websocket 路由处理程序抛出任何错误,将调用 errorHandler
。
import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';
const app = fastify();
app.register(fastifyWebsocket, {
errorHandler: function (error, connection, req, reply) {
// 做些事情
// 销毁/关闭连接
connection.socket.terminate();
},
options: {
maxPayload: 1048576, // 我们将允许的最大消息大小设置为 1 MiB(1024 字节 * 1024 字节)
verifyClient: function (info, next) {
if (info.req.headers['x-fastify-header'] !== 'fastify is awesome !') {
return next(false); // 连接不被允许
}
next(true); // 连接被允许
}
}
});
app.get('/', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server');
});
});
app.listen({ port: 3000 }, err => {
if (err) {
app.log.error(err);
process.exit(1);
}
});
自定义 preClose 钩子
默认情况下,当服务器关闭时,所有 ws 连接都将关闭。如果你希望修改这种行为,你可以传递你自己的 preClose 函数。
import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';
const app = fastify();
app.register(fastifyWebsocket, {
preClose: (done) => { // 注意:也可以使用 async 风格,不使用 done-callback
const server = this.websocketServer;
for (const socket of server.clients) {
socket.close(1001, 'WS 服务器以自定义方式离线,发送代码 + 消息');
}
server.close(done);
}
});
测试
测试 ws 处理程序可能相当棘手,幸运的是 fastify-websocket 为 fastify 实例装饰了 injectWS。它允许轻松测试 websocket 端点。
import Fastify from 'fastify';
import FastifyWebSocket from '@fastify/websocket';
import ws from 'ws';
const app = Fastify();
await app.register(FastifyWebSocket);
app.get('/', { websocket: true }, (connection, req) => {
const stream = ws.createWebSocketStream(connection.socket, { /* 选项 */ });
stream.setEncoding('utf8');
stream.write('hello client');
stream.on('data', function (data) {
// 确保设置了数据处理程序或以其他方式读取所有传入的数据,否则流背压将导致底层 WebSocket 对象被暂停。
});
});
await app.listen({ port: 3000 });
App.js
import Fastify from 'fastify';
import FastifyWebSocket from '@fastify/websocket';
const App = Fastify();
App.register(FastifyWebSocket);
App.register(async function(fastify) {
fastify.addHook('preValidation', async (request, reply) => {
if (request.headers['api-key'] !== 'some-random-key') {
return reply.code(401).send();
}
});
fastify.get('/', { websocket: true }, (connection) => {
connection.socket.on('message', message => {
connection.socket.send('hi from server');
});
});
});
export default App;
App.test.js
import { test } from 'tap';
import Fastify from 'fastify';
import App from './app';
test('connect to /', async (t) => {
t.plan(1);
const fastify = Fastify();
fastify.register(App);
t.teardown(fastify.close.bind(fastify));
const ws = await fastify.injectWS('/', {headers: { "api-key" : "some-random-key" }});
let resolve;
const promise = new Promise(r => { resolve = r; });
ws.on('message', (data) => {
resolve(data.toString());
});
ws.send('hi from client');
t.assert(await promise, 'hi from server');
// 记得最后关闭 ws
ws.terminate();
});
注意!!!
- 测试结束时需要手动关闭 Websocket。
- 需要等待 fastify.ready() 以确保 fastify 已被装饰。
- 如果你需要处理服务器响应,需要在发送消息之前注册事件监听器。
选项
@fastify/websocket 接受以下 ws 的选项:
- host - 绑定服务器的主机名。
- port - 绑定服务器的端口。
- backlog - 等待连接的队列的最大长度。
- server - 一个预创建的 Node.js HTTP/S 服务器。
- verifyClient - 用于验证传入连接的函数。
- handleProtocols - 用于处理 WebSocket 子协议的函数。
- clientTracking - 指定是否跟踪客户端。
- perMessageDeflate - 启用/禁用 permessage-deflate。
- maxPayload - 允许的最大消息大小(以字节为单位)。
有关更多信息,你可以查看 ws 选项文档。
注意:默认情况下,如果你不提供 server 选项,@fastify/websocket 将把你的 websocket 服务器实例绑定到作用域内的 fastify 实例。
注意:ws 的 path 选项不应提供,因为路由由 fastify 自身处理
注意:ws 的 noServer 选项不应提供,因为 @fastify/websocket 的目的是在 fastify 服务器上监听。如果你想要一个自定义服务器,你可以使用 server 选项,如果你想要更多控制,你可以直接使用 ws 库
ws 不允许你将 objectMode 或 writableObjectMode 设置为 true
完整的 TypeScript 示例
/* eslint-disable @typescript-eslint/no-unused-vars */
import Fastify from 'fastify'
import FastifyWebsocket, { WebSocket } from '@fastify/websocket'
import { FastifyRequest, FastifyReply } from 'fastify'
// 创建 Fastify 实例
const app = Fastify()
// 注册 @fastify/websocket 插件
app.register(FastifyWebsocket, {
options: {
maxPayload: 1048576, // 设置允许的最大消息大小为 1MB
verifyClient: (info, done) => {
// 示例:客户端验证逻辑
if (info.req.headers['x-fastify-header'] === 'fastifyrocks') {
done(true) // 验证通过
} else {
done(false) // 验证失败,连接被拒绝
}
},
clientTracking: true, // 启用客户端跟踪
perMessageDeflate: true // 启用 permessage-deflate 压缩
},
errorHandler: (error, connection, _req, _reply) => {
// 错误处理逻辑
console.error('WebSocket 错误:', error)
connection.terminate() // 终止连接
},
preClose: (done) => {
// 服务器关闭前的清理逻辑
console.log('正在关闭 WebSocket 连接')
done() // 完成清理操作
}
})
// 添加认证钩子
app.addHook(
'preValidation',
async (request: FastifyRequest, _reply: FastifyReply) => {
if (!request.headers['authorization']) {
throw new Error('未授权') // 抛出未授权错误
}
}
)
// 定义 WebSocket 路由
app.get(
'/ws',
{ websocket: true },
(connection: WebSocket, _req: FastifyRequest) => {
connection.on('message', (message) => {
console.log('收到消息:', message.toString())
connection.send('消息已接收') // 向客户端发送响应
})
connection.on('close', () => {
console.log('WebSocket 已关闭')
})
}
)
// 启动服务器
app.listen({ port: 3000 }, (err, address) => {
if (err) {
app.log.error(err)
process.exit(1) // 遇到错误,退出程序
}
console.log(`服务器正在监听地址 ${address}`)
})
关键点解释
-
注册插件:通过
app.register(FastifyWebsocket, {...})
注册@fastify/websocket
插件,并传递配置选项。 -
选项配置:
options
对象中包含了ws
库的配置选项,如maxPayload
,verifyClient
,clientTracking
, 和perMessageDeflate
。maxPayload
:定义了 WebSocket 消息的最大大小(以字节为单位)。这是一个重要的安全特性,用于防止恶意用户通过发送大量数据来尝试耗尽服务器资源。verifyClient
:这是一个函数,用于在 WebSocket 握手阶段验证连接客户端。它接收两个参数:info
和done
。info
对象包含了请求相关的信息,如请求头和请求源等,而done
是一个回调函数,用于基于验证结果接受或拒绝请求。这个验证步骤是实现自定义认证逻辑的理想场所。clientTracking
:当设置为true
时,WebSocket 服务器将会跟踪连接到服务器的客户端。这使得服务器能够保持对所有活动 WebSocket 连接的引用,便于管理这些连接,如广播消息。perMessageDeflate
:这个选项启用了 permessage-deflate 压缩,可以减少通过 WebSocket 发送的数据的大小。这对于减少带宽使用和提高传输效率非常有用,特别是在发送大量数据时。
除了这些选项,
@fastify/websocket
插件还提供了其他几个重要的配置点:errorHandler
:这是一个函数,用于处理 WebSocket 连接过程中的任何错误。这允许开发者自定义错误处理逻辑,例如记录错误或关闭出现问题的连接。preClose
:这是一个函数,在服务器即将关闭前调用,允许进行清理工作,如优雅地关闭 WebSocket 连接。
-
错误处理:
errorHandler
函数用于处理 WebSocket 连接过程中的错误。 -
认证钩子:
preValidation
钩子用于在建立 WebSocket 连接之前进行认证检查。 -
WebSocket 路由:通过
app.get('/ws', {websocket: true}, handler)
定义 WebSocket 路由。在路由处理函数中,可以使用connection.socket
来发送和接收消息。 -
启动服务器:通过
app.listen({ port: 3000 }, callback)
启动服务器。
这个示例展示了如何在 Fastify 应用中使用 TypeScript 来集成和配置 WebSocket 功能,包括如何处理客户端消息、执行认证和错误处理。
转载自:https://juejin.cn/post/7357909187953623081