likes
comments
collection
share

ChartGPT应用的实时通讯技术EventSource前端实现

作者站长头像
站长
· 阅读数 17

背景简介

最近这几个月chartGPT充斥了每一位互联网人的耳朵,茶余饭后周围的人都在谈论它的强大之处,同时作为互联网人也时刻关注新鲜事物,尤其听到更多的是各行各业可能因为大模型的出现而被取代,所以我们就更应该关注它的进展和消息,第一时间去体验了一把。

真面目

其实它也没有那么神秘,只不过是一个拥有庞大数据量的一个服务器,据说是可以进行机器学习,这一点我暂时没有体会到,但是它的本质还是大模型搜索,所以我们互联网人暂时还是安全的,但是既然它是潮流,那我们就从技术的角度去研究一下他的实现;从用户输入问题,然后大模型收到消息后,从它庞大的数据库中查询有用的信息,然后通过SSE技术实现实时推送内容到前端展示。

实现

我们前面提到过,大模型实现实时推送的技术是EventSource,众所周知,长链接的实现方案常见的几种方式:

  1. WebSocket:WebSocket是一种通信协议,允许服务器和客户端之间进行双向、实时的通信。WebSocket在浏览器中非常受欢迎,因为它可以创建具有高度交互性的Web应用程序。你可以使用JavaScript的WebSocket API来创建客户端和服务器端代码。
  2. Server-Sent Events (SSE):这是一种服务器向客户端推送数据的技术。与WebSocket不同的是,SSE是单向的:只能从服务器向客户端发送数据。这使得SSE对于一些用例(例如,服务器向客户端推送实时数据)非常有用。SSE可以使用JavaScript的EventSource对象来访问。
  3. Polling:这是最简单也最古老的实时通信方法之一。客户端定期向服务器发送请求,以获取新数据。虽然这种方法可以在某些情况下工作,但它的效率很低,可能会导致大量的网络流量和服务器负载。
  4. Long-Polling:这是一种改进的轮询技术,客户端发送一个请求到服务器,如果服务器没有新的数据,则该请求保持打开状态,直到服务器有新数据为止。这种方法减少了不必要的网络流量,但仍然可能导致服务器负载过高。
  5. Ajax:通过在后台与服务器进行少量数据交换,Ajax可以使网页实现异步更新。这意味着可以在不重新加载整个网页的情况下,对网页的某部分进行更新。Ajax可以使用JavaScript的XMLHttpRequest对象来访问。
  6. WebSockets 和 Server-Sent Events (SSE) 的结合:在一些情况下,你可能需要结合使用WebSockets和SSE。例如,你可以使用SSE来接收服务器推送的实时消息,然后使用WebSockets来向服务器发送响应。

那么为什么chartGPT选择的是第二种SSE而不是WebSocket? SSE的优势:

  • SSE是一种轻量级协议,相对简单。
  • SSE部署在HTTP协议之上,现有的服务器软件都支持SSE。
  • SSE默认支持断线重连。
  • SSE支持自定义发送的数据类型。
  • SSE单向通讯

WebSocket的优势:

  • WebSocket是全双工通道,可以双向通信,功能更强。
  • WebSocket使用持久连接,而SSE需要频繁地发起HTTP请求来获取数据。
  • WebSocket只需在握手阶段建立一次连接,然后保持连接打开。
  • WebSocket提供了更低的延迟和更高的实时性,因为它支持双向通信,可以立即将数据推送给客户端。

就大模型的场景来看,SSE比较适合他的场景,单向推送、实现简单, 所以最终选择SSE方案 那么我们前端如何封装一个通用的EventSource工具类呢?

//重试次数
const DEFAULT_RETRY_TIMES = 10;
class CCEventSource {
    //构造函数
    constructor(options) {
        this.eventSourceUrl = options && options.url;
        this.onmessage = options && options.onmessage;
        this.eventSource = null;
        this.retryTimes =
            options && options.retryTimes
                ? options.retryTimes
                : DEFAULT_RETRY_TIMES;
        this.#initEventSource(this.eventSourceUrl);
        this.currentTimer = 0;
        this.eventArray = [];
    }

    //初始化eventSource
    #initEventSource(url) {
        //兼容判断
        if ('EventSource' in window) {
            let that = this;

            //实例化EventSource
            this.eventSource = new EventSource(that.eventSourceUrl);

            //EventSource打开
            this.eventSource.onopen = function () {
                console.log('EventSource连接成功', that.eventSourceUrl);
            };

            //EventSource接收到新消息
            this.eventSource.onmessage = function (event) {
                // 连接成功后,重置重试次数的值
                this.currentTimer = 0;
                try {
                    if (event.data && typeof event.data === 'string') {
                        // let data = JSON.parse(JSON.parse(event.data));
                        let data = event.data;

                        //业务逻辑回调
                        if (typeof that.onmessage === 'function') {
                            that.onmessage(data);
                        }
                    }
                } catch (error) {
                    console.log('EventSource初始化异常', error);
                }
            };

            //EventSource关闭
            this.eventSource.onclose = function () {
                console.log('EventSource连接断开', that.eventSourceUrl);
            };

            //EventSource错误
            this.eventSource.onerror = function (error) {
                // 监听错误
                console.log('EventSource连接错误', error);
                that.currentTimer++;
                if (that.currentTimer > that.retryTimes) {
                    that.close();
                }
            };
        } else {
            throw new Error('浏览器不支持EventSource对象');
        }
    }

    addEventForESInstance(eventName, callback) {
        let es = this.eventSource;
        let eventArray = this.eventArray;
        if (es) {
            es.addEventListener(eventName, callback);
            this.eventArray = [...eventArray, { eventName, callback }];
        }
    }
    removeEventForESInstance(eventName) {
        let es = this.eventSource;
        let eventArray = this.eventArray;
        if (es && eventArray.length) {
            let eventArrayWaitRemove = eventArray.filter(
                (item) => item.eventName === eventName,
            );
            if (eventArrayWaitRemove.length) {
                eventArrayWaitRemove.forEach((eventItem) => {
                    es.removeEventListener(
                        eventItem.eventName,
                        eventItem.callback,
                    );
                });
            }
            this.eventArray = eventArray.filter(
                (item) => item.eventName !== eventName,
            );
        }
    }

    //关闭eventSource
    close() {
        this.eventSource.close();
        this.eventSourceUrl = '';
        this.eventSource = null;
        this.onmessage = null;
        this.currentTimer = 0;
    }
}

export default CCEventSource;

调用的组件中实现

const beginGreaterHandle = () => {
        Ajax.getAuthUrl(sdkOptionsMethods, 'xxx')
            .then((res) => {
                console.log(res);
                const es = new CCEventSource({
                    url: res,
                    retryTimes: 4, // 设置重试次数
                });
                es.addEventForESInstance('message', (e) => {
                    setDialogLoading(false);
                    if (e.data === 'END') {
                        es.close();
                    } else {
                        setDialogData(e.data);
                    }
                });
            });
    };

这里面尤其注意一点,后端发消息结束后,一定要发一个END事件,因为只有前端接收到END事件后,才会主动的断开链接,否则,后端发送结束后,断开连接,但是前端不知道,那么EventSource会有重连机制,所以会再次连接,那么又会重新发送,这一点尤其注意。