第八章节:WebSocket心跳检测
前言
上一章节中,我们借助中间件redis实现了一个分布式IM服务,但是存在一个问题——redis中的数据是100%可信的吗?假设当server突然宕机,所有的ws连接随之也会断开,但是从redis删除连接信息的操作可能并没有完成,这时候redis的数据就是脏数据了,所以要给redis中的数据设置过期时间,这样就算server宕机,redis也会因为到了过期时间自动删除连接数据,加大了数据的可信度。
那么添加了过期时间,当连接正常的时候就需要一直往redis中续时,那么续时的时机和机制如何呢?我们知道WebSocket有ping/pong的机制,那么可以让客户端每隔3秒发送一个ping帧,服务端在收到ping帧后往redis续时6秒,同时服务端检查如果客户端连续2个时间窗口没有发送ping帧,可以断言当前的ws连接不健康,那么就会主动断开连接。
通过上述的心跳机制,加大了连接的可靠性和redis数据的可信度。那么下一步我们试着编码实现。
客户端发送心跳
在client.go
中添加开启定时发送心跳的方法
// 注意WS连接可能会重新连接,所以需要从chan中监听是否有变化,重新赋值
func ping(connChan chan *websocket.Conn) {
// 创建一个context用于取消发送ping的定时任务
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 定时发送ping消息
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
conn := <-connChan
for {
select {
// 连接重置时,重新赋值
case conn = <-connChan:
case <-ticker.C:
// 发送ping消息
log.Println("Send ping...")
err := conn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
log.Println("Failed to send ping:", err)
}
case <-ctx.Done():
return
}
}
}
在连接上WS后立即开启协程发送心跳, 并且当重新连接时应该往chan中发送新的连接对象, 依然还是修改client.go
func main() {
...// 省略
conn, err := connect(uri, header)
if err != nil {
log.Fatal("dial failed:", err)
}
defer conn.Close()
connChan := make(chan *websocket.Conn, 1)
connChan <- conn
// 发送心跳
go ping(connChan)
go func() {
for {
m := &model.Message{}
err = conn.ReadJSON(m)
if err != nil {
// server shutdown or socket closed
if isClosed(err) {
// reconnect
log.Println("Trying reconnect ...")
conn, err = connect(uri, header)
if err != nil {
log.Fatalln("Reconnect Failed...", err)
return
}
// 发送新的连接对象
connChan <- conn
continue
}
log.Println("Read WS message failed:", err)
continue
}
log.Printf("Received Message %v From %v \n", m.Data, m.FromUserID)
err = markMsgRead(*m, token)
if err != nil {
log.Println("MarkRead failed:", err)
}
}
}()
...// 省略
}
测试客户端心跳
分别在2个Terminal启动server和client, 可观察client端日志是定时发送了心跳
这时候短暂停止server后立即又启动server,观察client端日志发现客户端进行了重新连接并且继续发送ping心跳
客户端这边心跳、重连机制都没有任何问题
服务端监听心跳
首先修改 redis.go
, 添加带TTL的函数
func SetRedisKVWithTTL(k string, v string, ttl time.Duration) error {
return client.Set(k, v, ttl).Err()
}
修改ConnManager
func (m *ConnManager) AddConn(userID int64, conn *websocket.Conn) {
// 保存本地
m.connections.Store(userID, conn)
// 保存当前连接的服务器信息, 5秒过期
err := SetRedisKVWithTTL(UserConnKey(userID), model.CurrServer.ServerAddr(), 5*time.Second)
if err != nil {
log.Printf("Save conn of %d to redis failed:\n %v\n", userID, err)
}
}
func (m *ConnManager) SetConnTTL(userID int64) {
conn := m.FindConn(userID)
if conn == nil {
return
}
// 保存当前连接的服务器信息, 5秒过期
err := SetRedisKVWithTTL(UserConnKey(userID), model.CurrServer.ServerAddr(), 5*time.Second)
if err != nil {
log.Printf("Save conn of %d to redis failed:\n %v\n", userID, err)
}
}
gorilla-websocket
提供了处理Ping帧的函数,默认是回复一个Pong帧,这里我们需要自定义PingHandler
函数,添加往redis续时的操作, 修改server.go
func ws(c *gin.Context) {
... // 省略
log.Printf("%s connected... \n", user.Name)
defer OnDisconnect(*user)
err = Sender.SendUnRead(user.ID, conn)
if err != nil {
log.Printf("Send Unread to %s failed... \n", user.Name, err)
}
// 设置一个通道用于接收ping消息
pingChan := make(chan string)
conn.SetPingHandler(func(ping string) error {
log.Printf("Received Ping %s ...", string(ping))
pingChan <- ping
// 回复Pong
return conn.WriteMessage(websocket.PongMessage, []byte("Pong"))
})
// 监听ping chan
go func() {
LOOP:
for {
select {
case <-pingChan:
// 往redis添加TTL
ConnManager.SetConnTTL(user.ID)
case <-time.After(6 * time.Second):
// 如果6秒内没有收到任何消息,结束
err = errors.New("ping timeout")
break LOOP
}
}
if err != nil {
log.Println("caught an unexpected err:", err)
}
}()
... // 省略
}
测试
分别在两个Terminal启动server和client,观察server和client的日志可以发现client端正常发送ping帧和聊天消息,server端也能正常收消息和发消息
整理Server代码
但是此时server端代码比较混乱,有多处代码read消息,我们稍微整理下server.go
的代码
func ws(c *gin.Context) {
... // 省略
log.Printf("%s connected... \n", user.Name)
defer OnDisconnect(*user)
err = Sender.SendUnRead(user.ID, conn)
if err != nil {
log.Printf("Send Unread to %s failed... \n", user.Name, err)
}
// 设置一个通道用于接收ping消息
pingChan := make(chan string)
conn.SetPingHandler(func(ping string) error {
log.Printf("Received Ping %s ...", string(ping))
pingChan <- ping
// 回复Pong
return conn.WriteMessage(websocket.PongMessage, []byte("Pong"))
})
// 用于接收JSON消息
jsonChan := make(chan *model.Message)
// 接收ping消息
go func() {
defer close(pingChan)
defer close(jsonChan)
for {
// 读取WebSocket消息
mt, message, err := conn.ReadMessage()
if err != nil {
if isConnClosed(err) {
return
}
log.Println("Read Message Failed", err)
}
// 如果收到ping消息,通过通道发送
switch mt {
case websocket.BinaryMessage:
fallthrough
case websocket.TextMessage:
v := &model.Message{}
json.Unmarshal(message, v)
jsonChan <- v
case websocket.CloseMessage:
log.Printf("Read CloseMessage %s", string(message))
return
default:
log.Printf("Read OtherMessage %s, Type: %s", string(message), mt)
continue
}
}
}()
LOOP:
for {
select {
case v := <-jsonChan:
err = Sender.Send(v)
if err != nil {
log.Println("Send Message Failed", err)
}
case <-pingChan:
// 往redis添加TTL
ConnManager.SetConnTTL(user.ID)
case <-time.After(6 * time.Second):
// 如果6秒内没有收到任何消息,结束
err = errors.New("ping timeout")
break LOOP
}
}
if err != nil {
log.Println("caught an unexpected err:", err)
}
}
func isConnClosed(err error) bool {
_, ok := err.(*websocket.CloseError)
if ok {
return true
}
return strings.Contains(err.Error(), "use of closed network connection")
}
小结
本章节我们通过设置Redis中的数据自动过期,利用客户端每隔3秒发送Ping帧,服务端接收Ping帧时往Redis延续过期时间的方式提高WebSocket连接的可靠性以及Redis中客户端连接信息的可靠性。
转载自:https://juejin.cn/post/7360887387918712895