likes
comments
collection
share

Server-Send-Event (SSE) 技术在单服务多实例副本上的实现

作者站长头像
站长
· 阅读数 23

好记性不如烂笔头,趁热记录下,给未来的自己。

0 | 前言

在项目迭代的过程中,有些业务场景,比如客户端(浏览器)需要定期的获取后端的数据,一般比较常规的方式是通过客户端 long polling 的方式。除了这种方式外,还可以通过 Server-Send-Event(SSE)或者 WebSocket 的方式,更加实时的获取后端的数据。

其中:

  • SSE
    1. 是流式单工的(服务端 -> 客户端)
    2. 基于 HTTP 协议 (很重要)
    3. 适合客户端监听,服务端主动推消息的场景,如获取日志信息,获取通知消息等
  • WebSocket
    1. 是流式双工的(服务端 <-> 客户端)
    2. 基于 WS 协议,由 HTTP 协议升级而来,所以在比较复杂的部署架构里(有 LB,WAF,Gateway 等),需要整个链路上的中间件都支持 ws 协议
    3. 适合前后端交互频繁且实时的场景,如在线聊天

SSE 虽然是单工的,但是可以模拟双工通信,如客户端->服务端,可以发送 HTTP POST请求的方式操作。

本文将使用 SSE 来模拟客户端、服务端的双工通信。

Server-Send-Event (SSE) 技术在单服务多实例副本上的实现

1 | 方案说明

市面上现有的 SSE 技术文章,都是针对单副本服务展开的。如果需要水平扩展服务的副本数,那些方案就无法满足需求了,因为 SSE Emitters 容器是一个本地容器,多副本之间无法共享各自的本地容器,所以,当客户端订阅和发布消息的请求被调度到不同的副本上时,就会出现订阅消息无法推送到客户端问题。

那么如果需要水平扩展服务,来满足分布式 SSE 架构,该如何设计呢?

这里,可以引入 MQ,如 Kafka,来满足需求。

核心思想是:各个服务实例的副本独立维护本地的 SSE Emitters 容器,在 MQ 队列监听推送 SSE 消息任务,各个服务实例的副本自行判断该推送任务是否可以在本副本执行

大概流程如下:

  1. 客户端发起订阅请求,服务端将订阅信息存储在本地容器里;
  2. 客户端发送消息;
  3. 服务端接收消息,塞入 MQ;
  4. 服务端订阅 MQ topic,并接收消息;
  5. 服务端判断接收到的消息是否可以由本实例副本推送 SSE 到客户端(通过消息里的 clientId 从容器中拿 Sse Emitter 对象的方式);
  6. 如果本实例副本判断可以推送 SSE 消息,则执行;如果不能推送,则终止。 Server-Send-Event (SSE) 技术在单服务多实例副本上的实现

2 | 具体实现

2.1 | Kafka 发送消息

发送kafka消息,支持返回发送消息成功与否。

@Component
@Slf4j
public class Producer {

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    public boolean send(String topic, String msg) {
        if (!StrUtil.isAllNotEmpty(topic, msg)) {
            log.error("topic={} or/and msg={} is empty. Will not be sent to Topic.", topic, msg);
            return false;
        }
        log.info("msg will be sent to topic:{}, msg={}", topic, msg);

        try {
            kafkaTemplate.send(topic, msg).get();
            log.info("success send msg to topic:{} message:{}", topic, msg);
        } catch (Exception e) {
            log.error("send msg fail, topic:{} message: {}, err msg:{}", topic, msg, e.getMessage());
            return false;
        }
        return true;
    }
}

2.2 | Kafka 接收消息

在本方案里是,每一个服务的实例副本都要消费监听的 topic,由于 Kafka 消费不支持广播模式,只能将实例副本放到不同的 group 里来实现, 这里通过 UUID 为每一个实例副本匹配一个 groupId。接收消息流程:

  1. 接收 topic 消息;
  2. 获取消息里的客户端 id;
  3. 从 SSE Emitters 容器里获取 SseEmitter 对象;
  4. 通过 SseEmitter 对象发送 SSE 消息给对应的 客户端。
/**
 * get client published message from kafka,process it according to the serverId
 * @param msg
 * @param ack
 */
@KafkaListener(topics = {"${custom.kafka.topics.sseConnection.topic}"}, clientIdPrefix = "${spring.application.name}", id = "openapi-be-sse-connection-", idIsGroup = false, groupId = "openapi-be-sse-connection-"+"#{T(java.util.UUID).randomUUID()}")
public void sseConnectionProcess(String msg, Acknowledgment ack) {
    log.info("get kafka msg from topic:{}, msg:{}", customProperty.getKafka().getTopics().getSseConnection().getTopic(), msg);
    if (StrUtil.isEmpty(msg)) {
        log.error("kafka msg is empty, no process");
        return;
    }
    SsePublishReqDto reqDto;
    try {
        reqDto = JsonUtil.jsonStr2Obj(msg, SsePublishReqDto.class);
    } catch (Exception e) {
        log.error("parsing string to obj failed, msg={}, e={}", msg, e);
        return;
    }

    OpenApiSseEmitter emitter = sseEmitters.get(reqDto.getClientId());
    if (emitter == null) {
        log.error("can't find the sseEmitter obj from sseEmitters, no process");
        return;
    }

    try {
        // 发送消息给客户端
        log.info("send sse msg={} to clientId={}", reqDto.getMessage(), reqDto.getClientId());
        emitter.send(reqDto.getMessage(), MediaType.APPLICATION_JSON);
        ack.acknowledge();
    } catch (IOException e) {
        emitter.completeWithError(e);
    }
}

2.3 | SseEmitters容器

创建 SSE Emitters 容器,以单例 Bean 的方式交由 Spring 管理。 该容器是一个 Map 类型,存放所有通过该实例副本订阅的 SseEmitter 对象。

@Component
public class SseEmitterBean {

    /**
     * 在这里存储特定客户端的SseEmitter对象
     */
    @Bean(name = "sseEmitters")
    public Map<String, OpenApiSseEmitter> sseEmitters() {
        return new ConcurrentHashMap<>();
    }
}

2.4 | SSE-Subscribe

客户端订阅 SSE 接口。客户端订阅后,服务端会和客户端建立基于 SSE 的 TCP 长链接。主要做了:

  1. 创建一个 SseEmitter 对象,并设置超时时间;
  2. 将该 SseEmitter 对象存到 SSE Emitters 容器中;
  3. 设置 SseEmitter 对象在完成时的执行逻辑:将 SseEmitter 对象从容器中清除;
  4. 返回 SseEmitter 对象。
@Resource
private Map<String, OpenApiSseEmitter> sseEmitters;

@GetMapping("/subscribe/{clientId}")
public SseEmitter subscribe(@PathVariable String clientId) {
    log.info("got a subscription, clientId={} will be bound to serverId={}", clientId, appId);
    // 创建一个新的SseEmitter对象并设置超时时间,并将其存储在Map中
    OpenApiSseEmitter emitter = new OpenApiSseEmitter(Long.MAX_VALUE);
    sseEmitters.put(clientId, emitter);
    emitter.onCompletion(() -> sseEmitters.remove(clientId));

    return emitter;
}

2.5 | SSE-Publish

客户端发送消息接口。流程如下:

  1. 接收消息;
  2. 将消息发送到 kafka 的 topic 上;
  3. 返回消息发送是否成功。
@PostMapping("/publish/{clientId}")
public ResponseEntity<ReturnBase> publish(@PathVariable String clientId, @RequestBody String message) {

    SsePublishReqDto build = SsePublishReqDto.builder().clientId(clientId).message(message).serverId(appId).build();
    boolean send = producer.send(customProperty.getKafka().getTopics().getSseConnection().getTopic(), JsonUtil.jsonObj2Str(build));
    return new ResponseEntity<>(ReturnBase.ok(send), HttpStatus.OK);
}

2.6 | 浏览器客户端代码

<html>
    <head>
        <script>
            console.log('start')
            const clientId = "your_client_id_x"; // 设置客户端ID
            const eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // 订阅服务器端的SSE

            eventSource.onmessage = event => {
                console.log(event.data)
                const message = JSON.parse(event.data);
                console.log(`Received message from server: ${message}`);
            };

            // 发送消息给服务器端 可通过 postman 调用,所以下面 sendMessage() 调用被注释掉了
            function sendMessage() {
                const message = "hello sse";
                fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, {
                    method: "POST",
                    headers: { "Content-Type": "application/json" },
                    body: JSON.stringify(message)
                });
                console.log('dddd'+JSON.stringify(message))
            }
            // sendMessage()
        </script>
    </head>
</html>

3 | Demo 展示

  • 发送消息给客户端1 Server-Send-Event (SSE) 技术在单服务多实例副本上的实现

  • 发送消息给客户端2 Server-Send-Event (SSE) 技术在单服务多实例副本上的实现

  • 发送消息给客户端3 Server-Send-Event (SSE) 技术在单服务多实例副本上的实现