第七章节: 分布式WebSocket Server方案
前言
上一章节中我们在Client端实现了断线重连的功能,这一章节我们将继续探讨高可用方案。当前我们的Server端还是单机模式的,所有的连接保存在本地。在我们开发web应用程序时,我们一般是通过redis来保存用户的登录信息,完成服务端无状态化。
同样,在这个场景下我们也可借助redis完成用户连接信息的记录。当server启动时,记录当前的ip和port,然后当有客户端连接时,在绑定连接的同时将用户ID作为key、服务器的ip、port作为value保存在redis。当server端收到用户A发送消息给用户B时,先在server本地搜寻用户B的连接,如果本地没有,从redis获取用户B的连接信息,如果存在表示用户B在线,然后调用 ip:host/send
通知另外一个服务器去发送消息。客户端断开连接时将redis的数据删掉。
连接注册
消息通讯
下面我们将编码实现上述的方案思路,以及调试验证其可行性
集成Redis
添加go-redis依赖
go get -u github.com/go-redis/redis/v8
编写操作Redis的工具方法, 在conn文件夹下创建redis.go
package conn
import (
"log"
"github.com/go-redis/redis"
)
var client *redis.Client
func init() {
// 创建一个新的Redis客户端
client = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
// 检查连接
pong, err := client.Ping().Result()
if err != nil {
panic(err)
}
log.Printf("connected to redis and ping get %v", pong)
}
// set
func SetRedisKV(k string, v string) error {
return client.Set(k, v, 0).Err()
}
// get
func GetRedisVal(k string) (string, error) {
return client.Get(k).Result()
}
// del
func DelRedisKey(k string) error {
return client.Del(k).Err()
}
服务启动保存服务器信息
当server服务启动时获取当前服务的host和port,先在models.go
中定一个struct
type ServerInfo struct {
Host string
Port int
}
var CurrServer ServerInfo = ServerInfo{}
func (s *ServerInfo) ServerAddr() string {
return fmt.Sprintf("%s:%d", s.Host, s.Port)
}
修改server.go
, 当启动时将服务���信息保存
func main() {
args := os.Args[1:]
port, err := strconv.ParseInt(args[0], 10, 64)
if err != nil {
log.Fatal("Parse ServerPort Failed:", err)
}
// 本地测试,当部署到生产环境时可以通过容器的环境变量获取IP地址
model.CurrServer.Host = "localhost"
model.CurrServer.Port = int(port)
server := gin.Default()
server.GET("/ws", ws).POST("/markRead", markRead)
server.Run(model.CurrServer.ServerAddr())
}
修改ConnManager
的AddConn和DelConn方法,添加相应的redis操作
func (m *ConnManager) AddConn(userID int64, conn *websocket.Conn) {
// 保存本地
m.connections.Store(userID, conn)
// 保存当前连接的服务器信息
err := SetRedisKV(UserConnKey(userID), model.CurrServer.ServerAddr())
if err != nil {
log.Printf("Save conn of %d to redis failed:\n %v\n", userID, err)
}
}
func (m *ConnManager) DelConn(userID int64) {
// 删除本地记录
m.connections.Delete(userID)
// 删除Redis中的信息
err := DelRedisKey(UserConnKey(userID))
if err != nil {
log.Printf("Del conn of %d to redis failed:\n %v\n", userID, err)
}
}
func UserConnKey(userID int64) string {
return fmt.Sprintf("conn:%d", userID)
}
Server间互相调用
在server.go
中添加send接口,供其他server调用
// 推送消息接口
func send(ctx *gin.Context) {
// TODO 签名校验, 确保只能server间调用
var message model.Message
// 将请求的JSON绑定到user结构体中
if err := ctx.ShouldBindJSON(&message); err != nil {
ctx.AbortWithError(http.StatusBadRequest, err)
return
}
// 获取接收方的连接
conn := ConnManager.FindConn(message.ToUserID)
if conn == nil {
msg := fmt.Sprintf("%d does not online", message.ToUserID)
ctx.JSON(http.StatusOK, gin.H{"message": msg})
return
}
// 推送消息
err := conn.WriteJSON(message)
if err != nil {
ctx.AbortWithError(http.StatusInternalServerError, err)
return
}
ctx.JSON(http.StatusOK, gin.H{"message": "send success"})
}
func main() {
... // 省略
server := gin.Default()
server.GET("/ws", ws).POST("/markRead", markRead).POST("/send", send)
server.Run(model.CurrServer.ServerAddr())
}
修改Sender
代码, 发送消息时如果接收方不在本地那么从redis获取服务器信息,如果在线那么调用服务的send接口
func (sender *Sender) Send(msg *model.Message) error {
// 保存数据库
repo.MessageRepoOps.Create(msg)
conn := GetConnManager().FindConn(msg.ToUserID)
if conn == nil {
// 本地没有,查询redis
address, err := GetRedisVal(UserConnKey(msg.ToUserID))
if err != nil {
return nil
}
// call remote send by http
if len(address) > 0 {
return sender.callRemoteSend(msg, address)
}
return fmt.Errorf("%v does not online", msg.ToUserID)
}
return conn.WriteJSON(msg)
}
func (Sender) callRemoteSend(msg *model.Message, remoteAddr string) (err error) {
var (
bodyBytes []byte
req *http.Request
resp *http.Response
)
bodyBytes, err = json.Marshal(msg)
if err != nil {
return err
}
// 创建一个请求体,这里使用的是JSON格式的数据
body := bytes.NewBufferString(string(bodyBytes))
url := fmt.Sprintf("http://%s/send", remoteAddr)
log.Printf("call POST %s", url)
req, err = http.NewRequest("POST", url, body)
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")
// 使用http.DefaultClient.Do方法来发送请求
resp, err = http.DefaultClient.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
// 读取响应体
bodyBytes, err = io.ReadAll(resp.Body)
if err != nil {
return
}
log.Printf("call remote send resp: %s \n", string(bodyBytes))
return
}
调试
先分别在两个Terminal中启动两个server, 模拟多实例
go run src/main/server.go 8898
go run src/main/server.go 8848
然后分别在两个Terminal中用2个用户启动两个client(记得修改一下server端的port), 这样用户1001和用户1002分别连接了不同的server, 分别给对方发送消息,查看控制台日志
Server端日志
Client端日志
提供日志我们可以看到两个用户分别连接了不同的server,但是依然可以互相通信, 这样我们就实现了一个最基础的分布式IM服务
小结
本章节中我们通过使用中间件redis来共享客户端的连接信息,可以通过redis来获取每个用户真实WebSocket连接的服务器地址,有了服务器地址后就可以互相调用通知对方发送聊天内容。但是既然使用了中间件,就得考虑redis中数据的可靠性,如果一个server宕机了,这样它所有的ws连接就会断开,但是redis中保存的连接信息还没来得及删除,其他服务获取的连接信息就不准确了,所以需要还需要考虑如何保障server的连接和redis中的连接信息一致性,将会在下一个章节探讨这个问题的解决方案。
转载自:https://juejin.cn/post/7360528073630613515