likes
comments
collection
share

Springboot中SSE(Server-Sent Events)的使用

作者站长头像
站长
· 阅读数 2

1 概述

Server-Sent Events 服务器推送事件,简称 SSE,是一种服务端实时主动向浏览器推送消息的技术。SSE 是 HTML5 中一个与通信相关的 API,主要由两部分组成:服务端与浏览器端的通信协议(HTTP 协议)及浏览器端可供 JavaScript 使用的 EventSource 对象。

2 后端实现

基于Springboot2.0.9.RELEASE,添加依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2.1 连接管理器

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.http.MediaType;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class SseEmitterManager {

    private static final Map<String, List<SseEmitter>> dataEmitters = new ConcurrentHashMap<>();
    private static final Logger logger = LoggerFactory.getLogger(SseEmitterManager.class);

    /**
     * 订阅特定主题的SSE连接。
     *
     * @param topic   要订阅的主题
     * @param emitter SSE连接
     */
    public static void subscribe(String topic, SseEmitter emitter) {
        dataEmitters.computeIfAbsent(topic, k -> Collections.synchronizedList(new ArrayList<>())).add(emitter);
        emitter.onCompletion(() -> {
            logger.info("onCompletion then remove");
            //removeEmitter(topic, emitter);//不要调用removeEmitter()重复删除
        });
        emitter.onTimeout(() -> {
            logger.info("onTimeout then remove");//如果一开始订阅时new SseEmitter()有timeout时间,则到超时时间后会触发
            removeEmitter(topic, emitter);
        });
        emitter.onError((e) -> {
            logger.error("onError with " + e.getMessage());
            //removeEmitter(topic, emitter);//不要调用removeEmitter()重复删除
        });
    }

    /**
     * 推送特定主题的数据给所有已订阅的连接。
     *
     * @param topic 推送的主题
     * @param data  要推送的数据
     */
    public static void pushData(String topic, String data) {
        List<SseEmitter> emitters = dataEmitters.getOrDefault(topic, Collections.synchronizedList(new ArrayList<>()));
        Iterator<SseEmitter> iterator = emitters.iterator();
        while (iterator.hasNext()) {
            SseEmitter emitter = iterator.next();
            try {
                //如果浏览器标签页关闭或调用了eventSource.close()方法,会触发onCompletion
                emitter.send(SseEmitter.event().id(String.valueOf(System.currentTimeMillis())).data(data, MediaType.TEXT_PLAIN));
            } catch (Exception e) {
                //1.有异常时删除对应的连接emitter,但不可调用removeEmitter(),否则抛出ConcurrentModificationException
                //2.不用foreach遍历,否则中间出现异常并调用removeEmitter()删除后,影响后面的遍历
                //3.如果前端eventSource.close()或浏览器关闭,将触发ClientAbortException: java.io.IOException: 你的主机中的软件中止了一个已建立的连接。
                //4.最终会触发onCompletion回调,不要在回调里面removeEmitter()
                iterator.remove();
                logger.info("send fail");
            }
        }
    }

    /**
     * 删除已订阅的连接
     *
     * @param topic
     * @param emitter
     */
    private static void removeEmitter(String topic, SseEmitter emitter) {
        List<SseEmitter> emitters = dataEmitters.get(topic);
        if (emitters != null) {
            emitters.remove(emitter);
        }
    }
}

2.2 Controller中定义接口

/**
 * 前端订阅感兴趣的主题
 * @param topic
 * @return
 */
@GetMapping(value = "/subscribe/{topic}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter sse(@PathVariable("topic") String topic) {
    //设为不过期(-1),一直保持长连接,否则每隔设定的timeout时间就会抛出AsyncRequestTimeoutException
    //如果不设置过期时间,可配置spring.mvc.async.request-timeout,这个是全局的
    SseEmitter emitter = new SseEmitter(-1L);
    SseEmitterManager.subscribe(topic, emitter);
    return emitter;
}

/**
 * 模拟推送数据给前端,用于测试
 * @param topic
 * @param testText
 * @return
 */
@GetMapping("/push/{topic}/{testText}")
public ResponseEntity<String> pushEnvironmentData(@PathVariable String topic, @PathVariable String testText) {
    SseEmitterManager.pushData(topic, testText);
    return ResponseEntity.ok("Data pushed successfully.");
}

3 前端实现

以订阅温度数据为例

<!DOCTYPE html>
<html>
<body>
	<h1>Getting temperature updates</h1>
	<input type="button" value="start" onclick="start()" />
	<input type="button" value="end" onclick="end()" />
	<div id="result"></div>
	<script>
		var source = null;
		function start() {
			if (typeof (EventSource) !== "undefined") {
				if (source != null) {
					source.close();
				}
				//订阅温度数据推送
				source = new EventSource("http://localhost:8080/v1/module/subscribe/temperature");
				source.onopen = function (event) {
					document.getElementById("result").innerHTML += " is open " + "<br>";
				};
				source.onmessage = function (event) {
					document.getElementById("result").innerHTML += event.data + "<br>";
				};
				source.onerror = (error) => {
					console.error('Error occurred:', error);
				};
			} else {
				document.getElementById("result").innerHTML = "Sorry, your browser does not support server-sent events...";
			}
		}

		function end() {
			if (source != null) {
				source.close();
			}
		}
	</script>
</body>
</html>

4 效果展示

Springboot中SSE(Server-Sent Events)的使用

Springboot中SSE(Server-Sent Events)的使用

Springboot中SSE(Server-Sent Events)的使用

转载自:https://juejin.cn/post/7373669988388339723
评论
请登录