likes
comments
collection
share

第八章节:WebSocket心跳检测

作者站长头像
站长
· 阅读数 48

前言

上一章节中,我们借助中间件redis实现了一个分布式IM服务,但是存在一个问题——redis中的数据是100%可信的吗?假设当server突然宕机,所有的ws连接随之也会断开,但是从redis删除连接信息的操作可能并没有完成,这时候redis的数据就是脏数据了,所以要给redis中的数据设置过期时间,这样就算server宕机,redis也会因为到了过期时间自动删除连接数据,加大了数据的可信度。

那么添加了过期时间,当连接正常的时候就需要一直往redis中续时,那么续时的时机和机制如何呢?我们知道WebSocket有ping/pong的机制,那么可以让客户端每隔3秒发送一个ping帧,服务端在收到ping帧后往redis续时6秒,同时服务端检查如果客户端连续2个时间窗口没有发送ping帧,可以断言当前的ws连接不健康,那么就会主动断开连接。

通过上述的心跳机制,加大了连接的可靠性和redis数据的可信度。那么下一步我们试着编码实现。

客户端发送心跳

client.go中添加开启定时发送心跳的方法

// 注意WS连接可能会重新连接,所以需要从chan中监听是否有变化,重新赋值
func ping(connChan chan *websocket.Conn) {
    // 创建一个context用于取消发送ping的定时任务
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    // 定时发送ping消息
    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()
    conn := <-connChan
    for {
        select {
        // 连接重置时,重新赋值
        case conn = <-connChan:
        case <-ticker.C:
            // 发送ping消息
            log.Println("Send ping...")
            err := conn.WriteMessage(websocket.PingMessage, []byte{})
            if err != nil {
                log.Println("Failed to send ping:", err)
            }
        case <-ctx.Done():
            return
        }
    }
}

在连接上WS后立即开启协程发送心跳, 并且当重新连接时应该往chan中发送新的连接对象, 依然还是修改client.go

func main() {
    ...// 省略
    conn, err := connect(uri, header)
    if err != nil {
        log.Fatal("dial failed:", err)
    }
    defer conn.Close()
    connChan := make(chan *websocket.Conn, 1)
    connChan <- conn
    // 发送心跳
    go ping(connChan)
    go func() {
        for {
            m := &model.Message{}
            err = conn.ReadJSON(m)
            if err != nil {
                // server shutdown or socket closed
                if isClosed(err) {
                    // reconnect
                    log.Println("Trying reconnect ...")
                    conn, err = connect(uri, header)
                    if err != nil {
                        log.Fatalln("Reconnect Failed...", err)
                        return
                    }
                    // 发送新的连接对象
                    connChan <- conn
                    continue
                }
                log.Println("Read WS message failed:", err)
                continue
            }
            log.Printf("Received Message %v From %v \n", m.Data, m.FromUserID)
            err = markMsgRead(*m, token)
            if err != nil {
                log.Println("MarkRead failed:", err)
            }
        }
    }()
    ...// 省略
}

测试客户端心跳

分别在2个Terminal启动server和client, 可观察client端日志是定时发送了心跳

第八章节:WebSocket心跳检测 这时候短暂停止server后立即又启动server,观察client端日志发现客户端进行了重新连接并且继续发送ping心跳

第八章节:WebSocket心跳检测 客户端这边心跳、重连机制都没有任何问题

服务端监听心跳

首先修改 redis.go, 添加带TTL的函数

func SetRedisKVWithTTL(k string, v string, ttl time.Duration) error {
    return client.Set(k, v, ttl).Err()
}

修改ConnManager

func (m *ConnManager) AddConn(userID int64, conn *websocket.Conn) {
    // 保存本地
    m.connections.Store(userID, conn)
    // 保存当前连接的服务器信息, 5秒过期
    err := SetRedisKVWithTTL(UserConnKey(userID), model.CurrServer.ServerAddr(), 5*time.Second)
    if err != nil {
        log.Printf("Save conn of %d to redis failed:\n %v\n", userID, err)
    }
}

func (m *ConnManager) SetConnTTL(userID int64) {
    conn := m.FindConn(userID)
    if conn == nil {
        return
    }
    // 保存当前连接的服务器信息, 5秒过期
    err := SetRedisKVWithTTL(UserConnKey(userID), model.CurrServer.ServerAddr(), 5*time.Second)
    if err != nil {
        log.Printf("Save conn of %d to redis failed:\n %v\n", userID, err)
    }
}

gorilla-websocket 提供了处理Ping帧的函数,默认是回复一个Pong帧,这里我们需要自定义PingHandler函数,添加往redis续时的操作, 修改server.go

func ws(c *gin.Context) {
    ... // 省略
    log.Printf("%s connected... \n", user.Name)
    defer OnDisconnect(*user)
    err = Sender.SendUnRead(user.ID, conn)
    if err != nil {
        log.Printf("Send Unread to %s failed... \n", user.Name, err)
    }
    // 设置一个通道用于接收ping消息
    pingChan := make(chan string)
    conn.SetPingHandler(func(ping string) error {
        log.Printf("Received Ping %s ...", string(ping))
        pingChan <- ping
        // 回复Pong
        return conn.WriteMessage(websocket.PongMessage, []byte("Pong"))
    })
    // 监听ping chan
    go func() {
    LOOP:
        for {
            select {
            case <-pingChan:
                // 往redis添加TTL
                ConnManager.SetConnTTL(user.ID)
            case <-time.After(6 * time.Second):
                // 如果6秒内没有收到任何消息,结束
                err = errors.New("ping timeout")
                break LOOP
            }
        }
        if err != nil {
            log.Println("caught an unexpected err:", err)
        }
    }()
    ... // 省略
}

测试

分别在两个Terminal启动server和client,观察server和client的日志可以发现client端正常发送ping帧和聊天消息,server端也能正常收消息和发消息

整理Server代码

但是此时server端代码比较混乱,有多处代码read消息,我们稍微整理下server.go的代码

func ws(c *gin.Context) {
    ... // 省略
    log.Printf("%s connected... \n", user.Name)
    defer OnDisconnect(*user)
    err = Sender.SendUnRead(user.ID, conn)
    if err != nil {
        log.Printf("Send Unread to %s failed... \n", user.Name, err)
    }
    // 设置一个通道用于接收ping消息
    pingChan := make(chan string)
    conn.SetPingHandler(func(ping string) error {
        log.Printf("Received Ping %s ...", string(ping))
        pingChan <- ping
        // 回复Pong
        return conn.WriteMessage(websocket.PongMessage, []byte("Pong"))
    })
    // 用于接收JSON消息
    jsonChan := make(chan *model.Message)
    // 接收ping消息
    go func() {
        defer close(pingChan)
        defer close(jsonChan)
        for {
            // 读取WebSocket消息
            mt, message, err := conn.ReadMessage()
            if err != nil {
                if isConnClosed(err) {
                    return
                }
                log.Println("Read Message Failed", err)
            }
            // 如果收到ping消息,通过通道发送
            switch mt {
            case websocket.BinaryMessage:
                fallthrough
            case websocket.TextMessage:
                v := &model.Message{}
                json.Unmarshal(message, v)
                jsonChan <- v
            case websocket.CloseMessage:
                log.Printf("Read CloseMessage %s", string(message))
                return
            default:
                log.Printf("Read OtherMessage %s, Type: %s", string(message), mt)
                continue
            }
        }
    }()
LOOP:
    for {
        select {
        case v := <-jsonChan:
            err = Sender.Send(v)
            if err != nil {
                log.Println("Send Message Failed", err)
            }
        case <-pingChan:
            // 往redis添加TTL
            ConnManager.SetConnTTL(user.ID)
        case <-time.After(6 * time.Second):
            // 如果6秒内没有收到任何消息,结束
            err = errors.New("ping timeout")
            break LOOP
        }
    }
    if err != nil {
        log.Println("caught an unexpected err:", err)
    }
}

func isConnClosed(err error) bool {
    _, ok := err.(*websocket.CloseError)
    if ok {
        return true
    }
    return strings.Contains(err.Error(), "use of closed network connection")
}

小结

本章节我们通过设置Redis中的数据自动过期,利用客户端每隔3秒发送Ping帧,服务端接收Ping帧时往Redis延续过期时间的方式提高WebSocket连接的可靠性以及Redis中客户端连接信息的可靠性。

转载自:https://juejin.cn/post/7360887387918712895
评论
请登录