基于SSE的服务集群状态和数据同步
笔者前面已经有一篇博文讨论了SSE的技术原理和简单的实现,现在正好有一个机会,需要编写一个服务集群的状态和数据同步的实现机制,想到可以使用SSE来尝试一下。遂有了本文,作为前作的扩展和更深入的实践和讨论。
关于SSE
SSE意为Server Side Event,它是一个HTTP协议的标准功能。它可以在HTTP连接建立并保持之后,可以由服务器发起并向客户端发送数据的技术。它同样可以实现服务器到客户端的消息发送,实现一定的业务需求。但严格而言,它并不是完全体的双工通信,因为客户端只有一次机会在连接建立的时候,向服务器发送参数和信息,然后都是服务器向客户端的单向数据传输了。但它有一个很大的优势是简单和代价低廉,适合于一些特别的使用场景(chatGPT的推理响应?)。
SSE的原理其实不复杂,就以下几个要点,分为服务器和客户端两个方面。
在客户端方面:
- 可以使用普通的HTTP GET请求来建立连接,包括认证头
- 客户端要点在于在连接后,接受和处理服务器发送来的信息流
- 连接建立之后,确认服务端响应的类型是 event-stream,同时连接是持续的
- 服务器会发送特殊格式的信息,需要按照要求进行处理
- 完整的客户端实现,应当具备重新连接机制
在服务器方面:
- 连接建立的时候,服务器的响应头信息中应当包括一些特殊内容,包括
- Connection: keep-alive, 声明当前连接是持续的
- Cache-Control: no-cache, 不使用缓存
- Content-Type: text/event-stream, 声明响应内容类型为 event-stream
- 服务器后续发送格式化的信息(消息),标准格式为: [field]: value\n\n
- 可选field包括: data, event, id, retry等
- 还有一种特殊的消息,使用: 开头,表示注释,无实际意义,可作为心跳信号
为了方便开发和使用,主流的浏览器系统,基于上述的客户端原理,封装了一个EventSource对象,来使用SSE。但实际上,SSE是完全标准的HTTP协议的一部分,没有什么特别的地方,完全可以使用标准的HTTP客户端来进行操作。EventSource主要封装的内容,就是重试机制,和服务端消息的格式处理。所以,即使浏览器本身不支持EventSource这会功能特性,但要使用标准JS程序来实现一个SSE客户端,并不是特别困难的事情。
SSE的基本原理和过程如下图所示:
需求和实现要点
关于这个应用的需求是这样的。笔者有一个系统,后端有几台应用服务器作为横向扩展的机制。业务的需求需要在这些服务器之间,共享一些状态和数据。例如,有一个系统配置进行了改变,需要有一个方式,能够将这个变更同步到所有的服务器系统之中。
由于系统相对不是那么复杂,加之有其他一些业务操作的考虑,笔者并没有使用像Redis这种集中的数据存储和共享的系统,而是自行实现了一个简单的“数据服务”系统,有一些业务操作在此服务系统上统一执行,同时这个系统也负担整个服务集群的数据和状态同步工作。
由于考虑数据同步的及时性,这个系统当前是基于WebSocket技术实现的。就是数据服务系统同时提供HTTP服务和WebSocket服务。应用服务器启动时,会连接这个WebSocket服务;然后后续如果有数据和状态变化,数据服务会通过这个WS通道,广播和发送新的数据,各个业务服务器收到新数据后进行相关的状态和配置更新处理。
由于Nodejs并没有提供原生的WebSocket模块,现有的程序,是基于Socket.IO这个第三方模块来实现的。当然确实,这个Socket.IO的功能非常完善和强大,但考虑到我们的应用场景其实比较简单,使用如此重型复杂的技术,有点“杀鸡用牛刀”的感觉。
所以,基于架构简化和运行效率的考量,针对当前的这个业务应用的需求和特点,笔者觉得,比较适合使用SSE技术来进行实现和改进。
示例和参考代码
基于上面的分析和考虑,笔者编写了相关的实现代码和原型程序,为了方便讨论,笔者将其编写在了同一个JS文件当中,在真正的应用系统中,服务端和客户端显然是分开部署和运行的。
参考代码如下:
const
EventEmitter = require('node:events'),
http = require("http"),
HOST = "127.0.0.1",
PORT = 8089,
EVHEAD = "data: ", // event data
EVEND = "\n\n", // event end
SSEHEAD = {
"Access-Control-Allow-Origin": "*",
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
};;
// emitter
class MyEmitter extends EventEmitter {};
const nemit = new MyEmitter();
// client list
let CLIST = [];
nemit.on('notify', (noti) => {
console.log("ClientCount:",CLIST.length, CLIST);
CLIST.forEach((res,idx)=>{
if (res) {
if (res.destroyed) {
CLIST[idx] = null;
} else {
res.write( EVHEAD + "to " + res.clientid +", message: " + noti + EVEND );
}
}
});
});
// web handle
const handle = (req,res)=>{
console.log("reqest:",[req.socket.remoteAddress,req.socket.remotePort,req.localAddress,req.socket.localPort].join(":"),req.url);
// client info
let
data = [],
[prefix,action,client] = req.url.split("/");
if (action == "subscript") {
// store clientid add to list
res.writeHead(200, SSEHEAD);
res.clientid = client;
CLIST.push(res);
} else if (action == "notify") {
req
.on('data', (chunk) => data.push(chunk))
.on('end', () => {
let noti = Buffer.concat(data).toString();
nemit.emit('notify', noti);
res.writeHead(200);
res.end("OK");
});
} else {
res.writeHead(500);
res.end("ErrRequest");
};
req.on('close', () => {
req.destroy();
// console.log('Client disconnected');
});
};
const startWeb=()=>{
http
.createServer(handle)
.listen(PORT, HOST, null,()=>{
console.log("Service Started", HOST, PORT);
});
// setInterval(chkList,2000);
}; startWeb();
const clientConn = (cid, abort)=>{
console.log("Client Connecting...");
const req = http
.get(`http://${HOST}:${PORT}/subscript/`+cid,res=>{
res.setEncoding('utf8');
res
.on('data', (chunk) => console.log("Get:", chunk.trim()))
.on('end', () => {
console.log("Client End");
process.exit(0);
});
})
.on("error",e=>{
});
// auto abort just for test
if (abort) {
setTimeout(()=>{ req.destroy() }, abort);
}
};
// do notify by post data
const doNotify = ()=>{
const postData = JSON.stringify({
T: Date.now(),
D: 0|Math.random()*100
});
const req = http.request({
host: HOST,
port: PORT,
path: "/notify",
method: "POST",
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData)
}
},(res) => {
const data = [];
res
.on('data', (chunk) => data.push(chunk))
.on('end', () => {
console.log("Notify Post:", Buffer.concat(data).toString());
});
});
req.on("error", err=>{
console.error(err);
});
req.write(postData);
req.end();
// random notify
setTimeout(doNotify, 2000 + 5000 *Math.random());
}
// multi client
clientConn("c1");
clientConn("c2");
clientConn("c3", 3000);
setTimeout( doNotify ,3000);
简单说明一下:
- 程序启动后,会创建一个HTTP Web服务,并处理两个路径的请求
- /subscript 是客户端SSE连接的路径
- /notify 是发起消息通知的路径,任何post到这个路径上的消息包括数据,都会被转发到已经订阅的SSE客户端系统上
- HTTP使用一个列表,来维护SSE客户端
- HTTP收到订阅请求后(HTTP客户端访问subscript路径),使用SSE的设置响应头
- SSE客户端,就是简单的HTTP客户端,但由于订阅时,服务器会keep-live连接,所以不会马上中断连接
- SSE客户端使用常规的onData事件,来处理服务器发来的event-stream数据
- 可以通过/subscript后续的路径,为连接客户端设置标识,基于此标识,服务器可以给特定客户端发送信息
- 支持多个SSE客户端的连接和处理
- 可以清理已中断的连接
扩展思考和问题
笔者觉得,当前这个实现方式,和以前的基于Socket.IO的技术实现相比,具有以下的优势和特点:
- 无附加的npm依赖,代码很轻量,方便移植、改造、升级和业务适配
- 业务逻辑和实现过程更加简单清晰
- SSE实现的处理和网络性能略高
- 比较适合简单、小规模的部署环境
- 通过改进程序,可以处理如连接中断,连接安全等问题
小结
本文探讨了使用SSE技术实现的一个HTTP服务集群系统的状态和数据同步机制,从讨论Server Side Event技术的基本原理和过程开始,到相关的业务和应用需求分析,以及具体的代码实现和相关的问题和思考。
转载自:https://juejin.cn/post/7380222254195703820