Server-Send-Event (SSE) 技术在单服务多实例副本上的实现
好记性不如烂笔头,趁热记录下,给未来的自己。
0 | 前言
在项目迭代的过程中,有些业务场景,比如客户端(浏览器)需要定期的获取后端的数据,一般比较常规的方式是通过客户端 long polling 的方式。除了这种方式外,还可以通过 Server-Send-Event(SSE)或者 WebSocket 的方式,更加实时的获取后端的数据。
其中:
- SSE
- 是流式单工的(服务端 -> 客户端)
- 基于 HTTP 协议 (很重要)
- 适合客户端监听,服务端主动推消息的场景,如获取日志信息,获取通知消息等
- WebSocket
- 是流式双工的(服务端 <-> 客户端)
- 基于 WS 协议,由 HTTP 协议升级而来,所以在比较复杂的部署架构里(有 LB,WAF,Gateway 等),需要整个链路上的中间件都支持 ws 协议
- 适合前后端交互频繁且实时的场景,如在线聊天
SSE 虽然是单工的,但是可以模拟双工通信,如客户端->服务端,可以发送 HTTP POST请求的方式操作。
本文将使用 SSE 来模拟客户端、服务端的双工通信。
1 | 方案说明
市面上现有的 SSE 技术文章,都是针对单副本服务展开的。如果需要水平扩展服务的副本数,那些方案就无法满足需求了,因为 SSE Emitters 容器是一个本地容器,多副本之间无法共享各自的本地容器,所以,当客户端订阅和发布消息的请求被调度到不同的副本上时,就会出现订阅消息无法推送到客户端问题。
那么如果需要水平扩展服务,来满足分布式 SSE 架构,该如何设计呢?
这里,可以引入 MQ,如 Kafka,来满足需求。
核心思想是:各个服务实例的副本独立维护本地的 SSE Emitters 容器,在 MQ 队列监听推送 SSE 消息任务,各个服务实例的副本自行判断该推送任务是否可以在本副本执行。
大概流程如下:
- 客户端发起订阅请求,服务端将订阅信息存储在本地容器里;
- 客户端发送消息;
- 服务端接收消息,塞入 MQ;
- 服务端订阅 MQ topic,并接收消息;
- 服务端判断接收到的消息是否可以由本实例副本推送 SSE 到客户端(通过消息里的 clientId 从容器中拿 Sse Emitter 对象的方式);
- 如果本实例副本判断可以推送 SSE 消息,则执行;如果不能推送,则终止。
2 | 具体实现
2.1 | Kafka 发送消息
发送kafka消息,支持返回发送消息成功与否。
@Component
@Slf4j
public class Producer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
public boolean send(String topic, String msg) {
if (!StrUtil.isAllNotEmpty(topic, msg)) {
log.error("topic={} or/and msg={} is empty. Will not be sent to Topic.", topic, msg);
return false;
}
log.info("msg will be sent to topic:{}, msg={}", topic, msg);
try {
kafkaTemplate.send(topic, msg).get();
log.info("success send msg to topic:{} message:{}", topic, msg);
} catch (Exception e) {
log.error("send msg fail, topic:{} message: {}, err msg:{}", topic, msg, e.getMessage());
return false;
}
return true;
}
}
2.2 | Kafka 接收消息
在本方案里是,每一个服务的实例副本都要消费监听的 topic,由于 Kafka 消费不支持广播模式,只能将实例副本放到不同的 group 里来实现, 这里通过 UUID 为每一个实例副本匹配一个 groupId。接收消息流程:
- 接收 topic 消息;
- 获取消息里的客户端 id;
- 从 SSE Emitters 容器里获取 SseEmitter 对象;
- 通过 SseEmitter 对象发送 SSE 消息给对应的 客户端。
/**
* get client published message from kafka,process it according to the serverId
* @param msg
* @param ack
*/
@KafkaListener(topics = {"${custom.kafka.topics.sseConnection.topic}"}, clientIdPrefix = "${spring.application.name}", id = "openapi-be-sse-connection-", idIsGroup = false, groupId = "openapi-be-sse-connection-"+"#{T(java.util.UUID).randomUUID()}")
public void sseConnectionProcess(String msg, Acknowledgment ack) {
log.info("get kafka msg from topic:{}, msg:{}", customProperty.getKafka().getTopics().getSseConnection().getTopic(), msg);
if (StrUtil.isEmpty(msg)) {
log.error("kafka msg is empty, no process");
return;
}
SsePublishReqDto reqDto;
try {
reqDto = JsonUtil.jsonStr2Obj(msg, SsePublishReqDto.class);
} catch (Exception e) {
log.error("parsing string to obj failed, msg={}, e={}", msg, e);
return;
}
OpenApiSseEmitter emitter = sseEmitters.get(reqDto.getClientId());
if (emitter == null) {
log.error("can't find the sseEmitter obj from sseEmitters, no process");
return;
}
try {
// 发送消息给客户端
log.info("send sse msg={} to clientId={}", reqDto.getMessage(), reqDto.getClientId());
emitter.send(reqDto.getMessage(), MediaType.APPLICATION_JSON);
ack.acknowledge();
} catch (IOException e) {
emitter.completeWithError(e);
}
}
2.3 | SseEmitters容器
创建 SSE Emitters 容器,以单例 Bean 的方式交由 Spring 管理。 该容器是一个 Map 类型,存放所有通过该实例副本订阅的 SseEmitter 对象。
@Component
public class SseEmitterBean {
/**
* 在这里存储特定客户端的SseEmitter对象
*/
@Bean(name = "sseEmitters")
public Map<String, OpenApiSseEmitter> sseEmitters() {
return new ConcurrentHashMap<>();
}
}
2.4 | SSE-Subscribe
客户端订阅 SSE 接口。客户端订阅后,服务端会和客户端建立基于 SSE 的 TCP 长链接。主要做了:
- 创建一个 SseEmitter 对象,并设置超时时间;
- 将该 SseEmitter 对象存到 SSE Emitters 容器中;
- 设置 SseEmitter 对象在完成时的执行逻辑:将 SseEmitter 对象从容器中清除;
- 返回 SseEmitter 对象。
@Resource
private Map<String, OpenApiSseEmitter> sseEmitters;
@GetMapping("/subscribe/{clientId}")
public SseEmitter subscribe(@PathVariable String clientId) {
log.info("got a subscription, clientId={} will be bound to serverId={}", clientId, appId);
// 创建一个新的SseEmitter对象并设置超时时间,并将其存储在Map中
OpenApiSseEmitter emitter = new OpenApiSseEmitter(Long.MAX_VALUE);
sseEmitters.put(clientId, emitter);
emitter.onCompletion(() -> sseEmitters.remove(clientId));
return emitter;
}
2.5 | SSE-Publish
客户端发送消息接口。流程如下:
- 接收消息;
- 将消息发送到 kafka 的 topic 上;
- 返回消息发送是否成功。
@PostMapping("/publish/{clientId}")
public ResponseEntity<ReturnBase> publish(@PathVariable String clientId, @RequestBody String message) {
SsePublishReqDto build = SsePublishReqDto.builder().clientId(clientId).message(message).serverId(appId).build();
boolean send = producer.send(customProperty.getKafka().getTopics().getSseConnection().getTopic(), JsonUtil.jsonObj2Str(build));
return new ResponseEntity<>(ReturnBase.ok(send), HttpStatus.OK);
}
2.6 | 浏览器客户端代码
<html>
<head>
<script>
console.log('start')
const clientId = "your_client_id_x"; // 设置客户端ID
const eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // 订阅服务器端的SSE
eventSource.onmessage = event => {
console.log(event.data)
const message = JSON.parse(event.data);
console.log(`Received message from server: ${message}`);
};
// 发送消息给服务器端 可通过 postman 调用,所以下面 sendMessage() 调用被注释掉了
function sendMessage() {
const message = "hello sse";
fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(message)
});
console.log('dddd'+JSON.stringify(message))
}
// sendMessage()
</script>
</head>
</html>
3 | Demo 展示
-
发送消息给客户端1
-
发送消息给客户端2
-
发送消息给客户端3
转载自:https://juejin.cn/post/7224060318652153913