likes
comments
collection
share

从单节点到集群:使用Redis解决负载均衡后WebSocket在线聊天室通信难题

作者站长头像
站长
· 阅读数 36

一、简介

在现代的Web应用中,在线聊天室是一种非常流行的实时通讯功能。我们的团队开发了一个在线聊天室,在项目即将部署到生产环境时,我们突然意识到一个潜在的问题:当我们扩展服务器并通过Nginx进行负载均衡时,如何确保各台服务器之间能够正确地处理WebSocket连接,确保用户可以在不同的服务器之间无缝交流?

由于WebSocket连接是持久化的且绑定在特定的服务器实例上,当多个服务器通过负载均衡共享流量时,某些用户的消息可能无法传达到其他用户,尤其是当他们连接到不同的服务器时。这种情况下,我们需要一个解决方案来在多个服务器实例之间同步WebSocket消息,确保所有在线用户都能实时接收到聊天信息。

为了解决这个难题,我们决定使用Redis作为消息中间件,以便在多个服务器节点之间实现消息的分发和同步。接下来,我们将详细介绍如何使用Redis来构建一个支持多服务器、负载均衡的WebSocket在线聊天室。

二、话不多说,开始编码

在接下来的部分中,我们将逐步实现这一解决方案。我们会先介绍方案的总体设计思路,然后通过代码展示如何集成Redis来实现服务器间的消息同步。

方案简介

1. 系统示意图

从单节点到集群:使用Redis解决负载均衡后WebSocket在线聊天室通信难题

2. 发送消息示意图

从单节点到集群:使用Redis解决负载均衡后WebSocket在线聊天室通信难题

看完这张图可能懵懵懂懂的(主要我画的烂),我再给大家举例 :

  1. 每个项目启动时都会绑定一个主题

    • 例如:系统1绑定到主题1,系统2绑定到主题2。
  2. 用户连接到WebSocket时通过Nginx进行负载均衡

    • 假设用户1登录到系统1,Redis会记录用户1在系统1上。同理,用户2被负载均衡到系统2,Redis会记录用户2在系统2上。
  3. 当用户1发送消息给用户2时

    • 首先,系统1会查看本地是否存在用户2。如果本地不存在,则系统1会去Redis中查找用户2在哪台机器上,最终将消息发送到主题2。
    • 系统2接收到主题2的消息后,会查找系统2本地的用户2,并将消息发送给用户2。

3. 编写具体代码

3.1 在项目启动时绑定主题,新增机器到redis,订阅事件

@Component
@RequiredArgsConstructor
public class ChatRoomRunner implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(ChatRoomRunner.class);
    private final SystemConfiguration systemConfiguration;
    private final RedisUtil redisUtil;
    private final RedissonClient redissonClient;
    private final ChatRoomSubscriber chatRoomSubscriber;

    @Override
    public void run(String... args) throws Exception {
        log.info("加载 yf-websocket-redis 模块");
        String machineName = systemConfiguration.getMachineName();
        // 1. 存储机器号到redis
        redisUtil.addToCacheSet(RedisKeyConstants.SYSTEM_MACHINE, machineName);
        // 2. 获取聊天室主题
        RTopic topic = redissonClient.getTopic(ChatRoomConstant.CHAT_ROOM + machineName);
        // 3. 订阅事件
        topic.addListener(ChatRoomMessageDto.class, chatRoomSubscriber);
    }
}

3.2 订阅消息

@Slf4j
@Component
@RequiredArgsConstructor
public class ChatRoomSubscriber implements MessageListener<ChatRoomMessageDto> {

    private final ChatRoomHandler chatRoomHandler;
    private final ChatHandelMsgContext chatHandelMsgContext;

    /**
     * 处理消息
     *
     * @param chatRoomMessageDto 需要转发的消息体
     */
    @Override
    public void onMessage(CharSequence charSequence, ChatRoomMessageDto chatRoomMessageDto) {
        log.info("聊天室消息订阅 : 接收到消息 SenderId : {} , ReceiverId : {} , Channel : {}",
                chatRoomMessageDto.getSenderId(),
                chatRoomMessageDto.getReceiverId(),
                chatRoomMessageDto.getChannel().getLabel());
        /*
        * 正常情况 : 这里永远获取不到发送者的 session
        * 异常情况 : 本机给本机 Subscriber 发送消息,则能获取到 session ( 例如: 心跳踢出用户 )
        * */
        chatHandelMsgContext.handleMessage(chatRoomHandler, null, chatRoomMessageDto);
    }
}

3.3 具体消息消费

  • 注意 : 消息处理采用策略模式 , 代码较多 , 请移步 Gitee源码
@Override
public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) {
    // 允许自己给自己发送信息
    if (message instanceof TextMessage textMessage) {
        // 1. 解析发送数据
        ChatRoomMessageDto chatRoomMessage = JSON.parseObject(textMessage.getPayload(), ChatRoomMessageDto.class);
        if (chatRoomMessage == null) {
            this.close(session);
            return;
        }
        String content = chatRoomMessage.getContent();
        // 2. 非心跳消息则过滤字段 : 防止用户恶意消息
        if (StringUtils.hasText(content)) {
            if (content.length() >= 2000) {
                return;
            }
            String replace = sensitiveWordBs.replace(content);
            chatRoomMessage.setContent(replace);
        }
        // 3. 填充默认信息 : 防止用户恶意篡改
        chatRoomMessage.setSenderId(this.getUserIdBySession(session));
        chatRoomMessage.setServiceProvider(ServiceProviderEnum.CHAT_ROOM);
        chatRoomMessage.setMessageProvider(MessageProviderEnum.USER);
        // 4. 根据频道选择发送方式
        handelMsgContext.handleMessage(this, session, chatRoomMessage);
    }
}

四、在线演示 / 源码

代码结构图

从单节点到集群:使用Redis解决负载均衡后WebSocket在线聊天室通信难题

  • 👀 在线预览

    • 一人一号、账号会自动注册。
    • 手机号登录并未实现。
  • 源码

    • yf/ yf-boot-admin / yf-websocket / yf-websocket-redis 包下
    • 求个start
转载自:https://juejin.cn/post/7401053200949231631
评论
请登录