likes
comments
collection
share

"SpringBoot+SSE:构建实时通信网络的实践"

作者站长头像
站长
· 阅读数 27

场景+解决方案

近期因为想对接 ChatGpt,加上在公司项目中,也有用到服务器向前端实时推送消息的场景。如果使用常规的 Http 请求方式,页面等待时间比较久,在用户体验上非常差劲,所以研究了使用消息推送相关的技术,今天写一下自己的心得和总结。

"SpringBoot+SSE:构建实时通信网络的实践"

什么是 SSE

Server-Sent Events(SSE)是一种用于实现服务器向客户端实时推送数据的Web技术。与传统的轮询和长轮询相比,SSE提供了更高效和实时的数据推送机制。

SSE基于HTTP协议,允许服务器将数据以事件流(Event Stream)的形式发送给客户端。客户端通过建立持久的HTTP连接,并监听事件流,可以实时接收服务器推送的数据。

SSE的主要特点包括:

简单易用:SSE使用基于文本的数据格式,如纯文本、JSON等,使得数据的发送和解析都相对简单。

单向通信:SSE支持服务器向客户端的单向通信,服务器可以主动推送数据给客户端,而客户端只能接收数据。

实时性:SSE建立长时间的连接,使得服务器可以实时地将数据推送给客户端,而无需客户端频繁地发起请求。

SSE与WebSocket的比较

了解 WebSocket 的同学应该知道它是一种较早就存在的用于实现实时双向通信的Web技术,服务端与客户端可以长链接互发消息。那么为什么有了 webscoket 还要搞出一个 sse 呢?既然存在,必然有着它的优越之处。

SSEwebsocket
基于 http 协议websocket 协议
轻量,使用简单相对复杂
默认支持断线重连需要自己实现断线重连
文本传输二级制传输
提供方法支持自定义的消息类型
一对一推送事件的场景适用于需要实时双向交互的场景
比较新传统

云端源想编程学习,视频课程,知识点讲解,在线编程,小型项目实战+企业级项目实战,论坛交流,学习交友两不误。点击查看。在学习中如果有什么不懂的还可以咨询在线老师,即时通讯,拒绝留言式沟通。都是免费的哦!

调研发现,有两种实时通信技术供我选择:websocket 和 SSE。进一步学习了解后知道, SSE是基于http协议,无需导入其他依赖,特点是服务端主动单向的给客户端推送消息,适合浏览器端只做数据接收。而websocket特点是客户端和服务端实现双工通信(双向) ,多用于即时通信。基于我们项目的特点,我选择了Sse。而且springboot还整合了sse类名SseEmitter,使用简单方便,服务端推送消息我们采用SSE方式进行推送。

"SpringBoot+SSE:构建实时通信网络的实践"

进行SSE实时数据推送时的注意点

  1. 异步处理:由于SSE是基于长连接的机制,推送数据的过程是一个长时间的操作。为了不阻塞服务器线程,推荐使用异步方式处理SSE请求。您可以在控制器方法中使用@Async注解或使用CompletableFuture等异步编程方式。
  2. 超时处理:SSE连接可能会因为网络中断、客户端关闭等原因而发生超时。为了避免无效的连接一直保持在服务器端,您可以设置超时时间并处理连接超时的情况。可以使用SseEmitter对象的setTimeout()方法设置超时时间,并通过onTimeout()方法处理连接超时的逻辑。
  3. 异常处理:在实际应用中,可能会出现一些异常情况,如网络异常、推送数据失败等。您可以使用SseEmitter对象的completeWithError()方法将异常信息发送给客户端,并在客户端通过eventSource.onerror事件进行处理。
  4. 内存管理:使用SseEmitter时需要注意内存管理,特别是在大量并发连接的情况下。当客户端断开连接时,务必及时释放SseEmitter对象,避免造成资源泄漏和内存溢出。
  5. 并发性能:SSE的并发连接数可能会对服务器的性能造成影响。如果需要处理大量的并发连接,可以考虑使用线程池或其他异步处理方式,以充分利用服务器资源。
  6. 客户端兼容性:虽然大多数现代浏览器都支持SSE,但仍然有一些旧版本的浏览器不支持。在使用SSE时,要确保您的目标客户端支持SSE,或者提供备用的实时数据推送机制。

这些注意点将有助于我们正确和高效地使用SseEmitter进行SSE实时数据推送。根据具体的应用需求和实际情况进行调整和优化。

在实际生产应用中,在处理SSE连接时,您可以进行适当的限流和安全控制,以防止滥用和恶意连接的出现。以确保我们服务器的稳定性、安全性和性能。

"SpringBoot+SSE:构建实时通信网络的实践"

在Spring Boot中使用SSE的场景案例

这里我们模拟使用 AI 聊天来SSE的数据推送,为了方便实验,ai 的返回数据会用一个 ArrayList 代替,不是真实去访问接口。

开发环境版本说明:

    springboot 2.7.8
    jdk8
    maven3.6.3
    swagger3.0.0

先看代码结构图,后面的代码放到对应的位置

springboot的初始化,我们在这里不做过多赘述。为了照顾所有人,后面出一片文章,专门给大家介绍springboot框架的初始化,请关注!框架中的TestController是我写的一个测试文件,可以不用去管 "SpringBoot+SSE:构建实时通信网络的实践"

引入maven依赖

SseEmitter 包含在 spring-webmvc 包中,如果是 spring boot 项目,确定已经引入了如下依赖即可;但是为了方便大家查看,本次还整合了 swagger 接口文档,

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!-- springfox-boot-starter -->
            <dependency>
                <groupId>io.springfox</groupId>
                <artifactId>springfox-boot-starter</artifactId>
                <version>3.0.0</version>
            </dependency>
            <!-- springfox-swagger-ui -->
            <dependency>
                <groupId>com.github.xiaoymin</groupId>
                <artifactId>knife4j-spring-boot-starter</artifactId>
                <version>3.0.3</version>
            </dependency>

配置 SwaggerConfig

    @Configuration
    @EnableOpenApi
    @EnableKnife4j
    public class SwaggerConfig implements WebMvcConfigurer {
    
        /**
         * Docket类是Swagger的配置类,要自定义修改 Swagger 的默认配置信息,我们需要覆盖该对象
         *
         */
        @Bean
        public Docket docket(){
            //1.以OAS_30标准构建Docket配置类
            return new Docket(DocumentationType.OAS_30)
                    //2.配置Swagger接口文档基本信息apiInfo
                    .apiInfo(apiInfo())
                    //3.select方法开启配置扫描接口的Builder
                    .select()
                    //4.指定要扫描/维护接口文档的包(否则就全部扫描)
                    .apis(RequestHandlerSelectors.basePackage("ydcode.controller"))
                    //5.路径过滤:该Docket-UI展示时,只展示指定路径下的接口文档(any表示都展示)
                    .paths(PathSelectors.any())
                    .build();
        }
    
        /**
         * 配置 Swagger 接口文档的基本信息
         */
        private ApiInfo apiInfo(){
            return new ApiInfoBuilder()
                    //接口文档标题
                    .title("云端源想接口文档")
                    //接口文档描述内容
                    .description("云端源想")
                    //项目文档迭代版本
                    .version("1.0")
                    //主要联系人信息(姓名name,个人主页url,邮箱email)
                    .contact(new Contact("云端源想接口文档","", ""))
                    //lisence
                    .license("云端源想")
                    //接口服务地址 根据自己的地址填写
    //                .termsOfServiceUrl("http://localhost:端口号/swagger-ui/index.html")
                    .termsOfServiceUrl("http://localhost:端口号/doc.html")
                    //返回构建的ApiInfo对象
                    .build();
        }
    
    }

"SpringBoot+SSE:构建实时通信网络的实践"

使用 SseEmitter

  1. Controller 接口代码如下,先同步返回一个建立的 SseEmitter 连接给客户端,然后在异步线程中进行数据推送。为了防止串流以及后续支持客户端主动停止推流,每次请求携带唯一的客户端id。
    @Api(value = "sse", tags = {"sse接口文档"})
    @RestController
    public class SseController {
        @Resource
        private SseService sseService;
    
        @GetMapping(value = "sse/{clientId}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
        @ApiOperation(value = " 建立连接")
        public SseEmitter test(@PathVariable("clientId") @ApiParam("客户端 id") String clientId, @RequestParam String message) {
            final SseEmitter emitter = sseService.getConn(clientId);
            CompletableFuture.runAsync(() -> {
                try {
                    sseService.send(clientId, message);
                } catch (Exception e) {
                    System.out.println("推送数据异常");
                }
            });
    
            return emitter;
        }
    
        @GetMapping("closeConn/{clientId}")
        @ApiOperation(value = " 关闭连接")
        public String closeConn(@PathVariable("clientId") @ApiParam("客户端 id") String clientId) {
            sseService.closeConn(clientId);
            return "连接已关闭";
        }
    
    }
  1. Sevice 层实现类相关代码如下 这里只显示了实现类的代码,需要自己去创建一个service接口,完成接口内方法的编写哦。(这种如果不会,私我)
    @Service
    public class SseServiceImpl implements SseService {
        private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
    
    
        @Override
        public SseEmitter getConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
    
            if (sseEmitter != null) {
                return sseEmitter;
            } else {
                // 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用
                final SseEmitter emitter = new SseEmitter(600_000L);
                // 注册超时回调,超时后触发
                emitter.onTimeout(() -> {
                    System.out.println("连接已超时,正准备关闭,clientId = "+clientId);
                    SSE_CACHE.remove(clientId);
                });
                // 注册完成回调,调用 emitter.complete() 触发
                emitter.onCompletion(() -> {
                    System.out.println("连接已关闭,正准备释放,clientId = "+clientId);
                    SSE_CACHE.remove(clientId);
                    System.out.println("连接已释放,clientId = " +clientId);
                });
                // 注册异常回调,调用 emitter.completeWithError() 触发
                emitter.onError(throwable -> {
                    System.out.println("连接已异常,正准备关闭,clientId = "+ clientId+"==>"+ throwable);
                    SSE_CACHE.remove(clientId);
                });
    
                SSE_CACHE.put(clientId, emitter);
    
                return emitter;
            }
        }
    
        /**
         * 模拟类似于 chatGPT 的流式推送回答
         *
         * @param clientId 客户端 id
         */
        @Override
        public void send(@NotBlank String clientId, @NotBlank String message) {
            final SseEmitter emitter = SSE_CACHE.get(clientId);
    
            //模拟推送数据
            ArrayList<String> replyList = new ArrayList<>();
    
            replyList.add("你的问题是:"+message);
            replyList.add("Vue.js 是一款用于构建用户");
            replyList.add("界面的渐进式框架。");
            replyList.add("与其他大型框架不同的是,");
            replyList.add("Vue被设计为可以自底向上逐层应用。<br>");
            replyList.add("Vue的核心库只");
            replyList.add("关注视图层,");
            replyList.add("不仅易于上手,还便于与第三方库或既有项目整合。");
            replyList.add("[DONE]");
            // 推流内容到客户端
    
                replyList.forEach(item -> {
                    System.out.println("正在推流:" + item);
                    try {
                        emitter.send(item);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
    
            // 结束推流
            emitter.complete();
        }
    
        @Override
        public void closeConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
            if (sseEmitter != null) {
                sseEmitter.complete();
            }
        }
    
    }

跨域方案解决-使用配置文件

在测试过程中可能会出现接口调用跨域报错,所以使用了配置代码的方式,解决跨域问题,以下是代码

    @Configuration
    public class CorsConfig {
    
        @Bean
        public FilterRegistrationBean crosFilter() {
    
            UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
    
            CorsConfiguration cros = new CorsConfiguration();
    
            cros.setAllowCredentials(false);
    
            cros.addAllowedOrigin("*");
    
            cros.addAllowedHeader("*");
    
            cros.addAllowedMethod("*");
    
            source.registerCorsConfiguration("/**", cros);
    
            FilterRegistrationBean bean = new FilterRegistrationBean(new CorsFilter(source));
    
            bean.setOrder(0);
    
            return bean;
        }
    }

"SpringBoot+SSE:构建实时通信网络的实践"

客户端代码

最后我们来看看如何展示吧。我们的前端 index.html 代码放在 resources 的,大家如果要运行直接点击使用浏览器打开即可

<!DOCTYPE html>
<header>
    <meta charset="UTF-8">
    <title>chatgpt</title>
</header>
<body>


    <h1></h1>
    请输入问题:<input id="dadada" type="text" /> 
    <button onclick="test()">大大大</button>
    <h3><pre id="buttonshow"></pre></h3>

</body>
<script>

    const buttonshow = document.getElementById('buttonshow')
    const keyword = document.getElementById('keyword')

    function test() {
        const message = document.getElementById('dadada').value
        
        const eventSource = new EventSource(`http://127.0.0.1:8080/sse/1?message=${message}`);
        buttonshow.innerHTML = ''
        eventSource.onmessage = e => {
            
            console.log(e.data == "[DONE]");
            if(e.data == '[DONE]') {
                return eventSource.close();
            }
           
            buttonshow.innerHTML += e.data    
        }
    }

</script>

</html>

效果展示

"SpringBoot+SSE:构建实时通信网络的实践"

"SpringBoot+SSE:构建实时通信网络的实践"