从单节点到集群:使用Redis解决负载均衡后WebSocket在线聊天室通信难题
一、简介
在现代的Web应用中,在线聊天室是一种非常流行的实时通讯功能。我们的团队开发了一个在线聊天室,在项目即将部署到生产环境时,我们突然意识到一个潜在的问题:当我们扩展服务器并通过Nginx进行负载均衡时,如何确保各台服务器之间能够正确地处理WebSocket连接,确保用户可以在不同的服务器之间无缝交流?
由于WebSocket连接是持久化的且绑定在特定的服务器实例上,当多个服务器通过负载均衡共享流量时,某些用户的消息可能无法传达到其他用户,尤其是当他们连接到不同的服务器时。这种情况下,我们需要一个解决方案来在多个服务器实例之间同步WebSocket消息,确保所有在线用户都能实时接收到聊天信息。
为了解决这个难题,我们决定使用Redis作为消息中间件,以便在多个服务器节点之间实现消息的分发和同步。接下来,我们将详细介绍如何使用Redis来构建一个支持多服务器、负载均衡的WebSocket在线聊天室。
二、话不多说,开始编码
在接下来的部分中,我们将逐步实现这一解决方案。我们会先介绍方案的总体设计思路,然后通过代码展示如何集成Redis来实现服务器间的消息同步。
方案简介
1. 系统示意图
2. 发送消息示意图
看完这张图可能懵懵懂懂的(主要我画的烂),我再给大家举例 :
-
每个项目启动时都会绑定一个主题:
- 例如:系统1绑定到主题1,系统2绑定到主题2。
-
用户连接到WebSocket时通过Nginx进行负载均衡:
- 假设用户1登录到系统1,Redis会记录用户1在系统1上。同理,用户2被负载均衡到系统2,Redis会记录用户2在系统2上。
-
当用户1发送消息给用户2时:
- 首先,系统1会查看本地是否存在用户2。如果本地不存在,则系统1会去Redis中查找用户2在哪台机器上,最终将消息发送到主题2。
- 系统2接收到主题2的消息后,会查找系统2本地的用户2,并将消息发送给用户2。
3. 编写具体代码
3.1 在项目启动时绑定主题,新增机器到redis,订阅事件
@Component
@RequiredArgsConstructor
public class ChatRoomRunner implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(ChatRoomRunner.class);
private final SystemConfiguration systemConfiguration;
private final RedisUtil redisUtil;
private final RedissonClient redissonClient;
private final ChatRoomSubscriber chatRoomSubscriber;
@Override
public void run(String... args) throws Exception {
log.info("加载 yf-websocket-redis 模块");
String machineName = systemConfiguration.getMachineName();
// 1. 存储机器号到redis
redisUtil.addToCacheSet(RedisKeyConstants.SYSTEM_MACHINE, machineName);
// 2. 获取聊天室主题
RTopic topic = redissonClient.getTopic(ChatRoomConstant.CHAT_ROOM + machineName);
// 3. 订阅事件
topic.addListener(ChatRoomMessageDto.class, chatRoomSubscriber);
}
}
3.2 订阅消息
@Slf4j
@Component
@RequiredArgsConstructor
public class ChatRoomSubscriber implements MessageListener<ChatRoomMessageDto> {
private final ChatRoomHandler chatRoomHandler;
private final ChatHandelMsgContext chatHandelMsgContext;
/**
* 处理消息
*
* @param chatRoomMessageDto 需要转发的消息体
*/
@Override
public void onMessage(CharSequence charSequence, ChatRoomMessageDto chatRoomMessageDto) {
log.info("聊天室消息订阅 : 接收到消息 SenderId : {} , ReceiverId : {} , Channel : {}",
chatRoomMessageDto.getSenderId(),
chatRoomMessageDto.getReceiverId(),
chatRoomMessageDto.getChannel().getLabel());
/*
* 正常情况 : 这里永远获取不到发送者的 session
* 异常情况 : 本机给本机 Subscriber 发送消息,则能获取到 session ( 例如: 心跳踢出用户 )
* */
chatHandelMsgContext.handleMessage(chatRoomHandler, null, chatRoomMessageDto);
}
}
3.3 具体消息消费
- 注意 : 消息处理采用策略模式 , 代码较多 , 请移步 Gitee源码
@Override
public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) {
// 允许自己给自己发送信息
if (message instanceof TextMessage textMessage) {
// 1. 解析发送数据
ChatRoomMessageDto chatRoomMessage = JSON.parseObject(textMessage.getPayload(), ChatRoomMessageDto.class);
if (chatRoomMessage == null) {
this.close(session);
return;
}
String content = chatRoomMessage.getContent();
// 2. 非心跳消息则过滤字段 : 防止用户恶意消息
if (StringUtils.hasText(content)) {
if (content.length() >= 2000) {
return;
}
String replace = sensitiveWordBs.replace(content);
chatRoomMessage.setContent(replace);
}
// 3. 填充默认信息 : 防止用户恶意篡改
chatRoomMessage.setSenderId(this.getUserIdBySession(session));
chatRoomMessage.setServiceProvider(ServiceProviderEnum.CHAT_ROOM);
chatRoomMessage.setMessageProvider(MessageProviderEnum.USER);
// 4. 根据频道选择发送方式
handelMsgContext.handleMessage(this, session, chatRoomMessage);
}
}
四、在线演示 / 源码
代码结构图
转载自:https://juejin.cn/post/7401053200949231631