likes
comments
collection
share

基于SSE的服务集群状态和数据同步

作者站长头像
站长
· 阅读数 44

笔者前面已经有一篇博文讨论了SSE的技术原理和简单的实现,现在正好有一个机会,需要编写一个服务集群的状态和数据同步的实现机制,想到可以使用SSE来尝试一下。遂有了本文,作为前作的扩展和更深入的实践和讨论。

关于SSE

SSE意为Server Side Event,它是一个HTTP协议的标准功能。它可以在HTTP连接建立并保持之后,可以由服务器发起并向客户端发送数据的技术。它同样可以实现服务器到客户端的消息发送,实现一定的业务需求。但严格而言,它并不是完全体的双工通信,因为客户端只有一次机会在连接建立的时候,向服务器发送参数和信息,然后都是服务器向客户端的单向数据传输了。但它有一个很大的优势是简单和代价低廉,适合于一些特别的使用场景(chatGPT的推理响应?)。

SSE的原理其实不复杂,就以下几个要点,分为服务器和客户端两个方面。

在客户端方面:

  • 可以使用普通的HTTP GET请求来建立连接,包括认证头
  • 客户端要点在于在连接后,接受和处理服务器发送来的信息流
  • 连接建立之后,确认服务端响应的类型是 event-stream,同时连接是持续的
  • 服务器会发送特殊格式的信息,需要按照要求进行处理
  • 完整的客户端实现,应当具备重新连接机制

在服务器方面:

  • 连接建立的时候,服务器的响应头信息中应当包括一些特殊内容,包括
  • Connection: keep-alive, 声明当前连接是持续的
  • Cache-Control: no-cache, 不使用缓存
  • Content-Type: text/event-stream, 声明响应内容类型为 event-stream
  • 服务器后续发送格式化的信息(消息),标准格式为: [field]: value\n\n
  • 可选field包括: data, event, id, retry等
  • 还有一种特殊的消息,使用: 开头,表示注释,无实际意义,可作为心跳信号

为了方便开发和使用,主流的浏览器系统,基于上述的客户端原理,封装了一个EventSource对象,来使用SSE。但实际上,SSE是完全标准的HTTP协议的一部分,没有什么特别的地方,完全可以使用标准的HTTP客户端来进行操作。EventSource主要封装的内容,就是重试机制,和服务端消息的格式处理。所以,即使浏览器本身不支持EventSource这会功能特性,但要使用标准JS程序来实现一个SSE客户端,并不是特别困难的事情。

SSE的基本原理和过程如下图所示:

基于SSE的服务集群状态和数据同步

需求和实现要点

关于这个应用的需求是这样的。笔者有一个系统,后端有几台应用服务器作为横向扩展的机制。业务的需求需要在这些服务器之间,共享一些状态和数据。例如,有一个系统配置进行了改变,需要有一个方式,能够将这个变更同步到所有的服务器系统之中。

由于系统相对不是那么复杂,加之有其他一些业务操作的考虑,笔者并没有使用像Redis这种集中的数据存储和共享的系统,而是自行实现了一个简单的“数据服务”系统,有一些业务操作在此服务系统上统一执行,同时这个系统也负担整个服务集群的数据和状态同步工作。

由于考虑数据同步的及时性,这个系统当前是基于WebSocket技术实现的。就是数据服务系统同时提供HTTP服务和WebSocket服务。应用服务器启动时,会连接这个WebSocket服务;然后后续如果有数据和状态变化,数据服务会通过这个WS通道,广播和发送新的数据,各个业务服务器收到新数据后进行相关的状态和配置更新处理。

由于Nodejs并没有提供原生的WebSocket模块,现有的程序,是基于Socket.IO这个第三方模块来实现的。当然确实,这个Socket.IO的功能非常完善和强大,但考虑到我们的应用场景其实比较简单,使用如此重型复杂的技术,有点“杀鸡用牛刀”的感觉。

所以,基于架构简化和运行效率的考量,针对当前的这个业务应用的需求和特点,笔者觉得,比较适合使用SSE技术来进行实现和改进。

示例和参考代码

基于上面的分析和考虑,笔者编写了相关的实现代码和原型程序,为了方便讨论,笔者将其编写在了同一个JS文件当中,在真正的应用系统中,服务端和客户端显然是分开部署和运行的。

参考代码如下:

const 
EventEmitter = require('node:events'),
http = require("http"),
HOST = "127.0.0.1",
PORT = 8089,
EVHEAD  = "data: ", // event data
EVEND   = "\n\n", // event end 
SSEHEAD = {
    "Access-Control-Allow-Origin": "*",
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
};;

// emitter
class MyEmitter extends EventEmitter {};
const nemit = new MyEmitter();

// client list 
let CLIST = [];

nemit.on('notify', (noti) => {
    console.log("ClientCount:",CLIST.length, CLIST);
    CLIST.forEach((res,idx)=>{
        if (res) {
            if (res.destroyed) {
                CLIST[idx] = null;
            } else {
                res.write( EVHEAD + "to " + res.clientid +", message: " + noti + EVEND );
            }
        }
    });
});

// web handle
const handle = (req,res)=>{
    console.log("reqest:",[req.socket.remoteAddress,req.socket.remotePort,req.localAddress,req.socket.localPort].join(":"),req.url);

    // client info
    let
    data = [], 
    [prefix,action,client] = req.url.split("/");

    if (action == "subscript") {
        // store clientid add to list
        res.writeHead(200, SSEHEAD);
        res.clientid = client;
        CLIST.push(res);
    } else if (action == "notify") {
        req
        .on('data', (chunk) => data.push(chunk))
        .on('end', () => {
            let noti = Buffer.concat(data).toString();
            nemit.emit('notify', noti);

            res.writeHead(200);
            res.end("OK");  
        });
    } else {
        res.writeHead(500);
        res.end("ErrRequest");
    };

    req.on('close', () => {
        req.destroy();
        // console.log('Client disconnected');
    });
};

const startWeb=()=>{
    http
    .createServer(handle)
    .listen(PORT, HOST, null,()=>{
        console.log("Service Started", HOST, PORT);
    });

    // setInterval(chkList,2000);
}; startWeb();

const clientConn = (cid, abort)=>{
    console.log("Client Connecting...");

    const req = http
    .get(`http://${HOST}:${PORT}/subscript/`+cid,res=>{
        res.setEncoding('utf8');
        res
        .on('data', (chunk) => console.log("Get:", chunk.trim()))
        .on('end', () => {
            console.log("Client End");
            process.exit(0);
        });
    })
    .on("error",e=>{

    });

    // auto abort just for test 
    if (abort) { 
        setTimeout(()=>{ req.destroy() }, abort);
    }
}; 

// do notify by post data 
const doNotify = ()=>{
    const postData = JSON.stringify({
        T: Date.now(),
        D: 0|Math.random()*100
    }); 

    const req = http.request({
        host: HOST,
        port: PORT,
        path: "/notify",
        method: "POST",
        headers: {
            'Content-Type': 'application/json',
            'Content-Length': Buffer.byteLength(postData)
        }
    },(res) => {
        const data = [];

        res
        .on('data', (chunk) => data.push(chunk))
        .on('end', () => {
          console.log("Notify Post:", Buffer.concat(data).toString());
        });
    });
    
    req.on("error", err=>{
        console.error(err);
    });
    req.write(postData);
    req.end();

    // random notify 
    setTimeout(doNotify, 2000 + 5000 *Math.random());
}

// multi client
clientConn("c1");
clientConn("c2");
clientConn("c3", 3000);

setTimeout( doNotify ,3000);


简单说明一下:

  • 程序启动后,会创建一个HTTP Web服务,并处理两个路径的请求
  • /subscript 是客户端SSE连接的路径
  • /notify 是发起消息通知的路径,任何post到这个路径上的消息包括数据,都会被转发到已经订阅的SSE客户端系统上
  • HTTP使用一个列表,来维护SSE客户端
  • HTTP收到订阅请求后(HTTP客户端访问subscript路径),使用SSE的设置响应头
  • SSE客户端,就是简单的HTTP客户端,但由于订阅时,服务器会keep-live连接,所以不会马上中断连接
  • SSE客户端使用常规的onData事件,来处理服务器发来的event-stream数据
  • 可以通过/subscript后续的路径,为连接客户端设置标识,基于此标识,服务器可以给特定客户端发送信息
  • 支持多个SSE客户端的连接和处理
  • 可以清理已中断的连接

扩展思考和问题

笔者觉得,当前这个实现方式,和以前的基于Socket.IO的技术实现相比,具有以下的优势和特点:

  • 无附加的npm依赖,代码很轻量,方便移植、改造、升级和业务适配
  • 业务逻辑和实现过程更加简单清晰
  • SSE实现的处理和网络性能略高
  • 比较适合简单、小规模的部署环境
  • 通过改进程序,可以处理如连接中断,连接安全等问题

小结

本文探讨了使用SSE技术实现的一个HTTP服务集群系统的状态和数据同步机制,从讨论Server Side Event技术的基本原理和过程开始,到相关的业务和应用需求分析,以及具体的代码实现和相关的问题和思考。

转载自:https://juejin.cn/post/7380222254195703820
评论
请登录