使用Nunu脚手架结合gorilla/websocket库快速开发一个聊天API概述 本篇文章给大家介绍如何开发一个基于
概述
本篇文章给大家介绍如何开发一个基于 websocket 协议的聊天 API。websocket 协议库使用 gorilla/websocket,然后结合一个 Gin 框架脚手架开发。
这款脚手架我选择的是 Nunu 脚手架。下面这是关于它的介绍,它里面内置了笔者很喜欢的依赖注入编程范式:
Nunu 是一个基于 Golang 的应用脚手架,它的名字来自于英雄联盟中的游戏角色,一个骑在雪怪肩膀上的小男孩。和努努一样,该项目也是站在巨人的肩膀上,它是由 Golang 生态中各种非常流行的库整合而成的,它们的组合可以帮助你快速构建一个高效、可靠的应用程序。
Nunu 旨在 Golang 项目开发时提供出色的开发体验。
- 超低学习成本和定制:Nunu 封装了 Gopher 最熟悉的一些流行库。您可以轻松定制应用程序以满足特定需求。
- 模块化和可扩展:Nunu 旨在具有模块化和可扩展性。您可以通过使用第三方库或编写自己的模块轻松添加新功能和功能。
- 文档完善和测试完备:Nunu 文档完善,测试完备。它提供了全面的文档和示例,帮助您快速入门。它还包括一套测试套件,确保您的应用程序按预期工作。
使用了下面这些库,但是我们不一定全用到:
- Gin: github.com/gin-gonic/g…
- Gorm: github.com/go-gorm/gor…
- Wire: github.com/google/wire
- Viper: github.com/spf13/viper
- Zap: github.com/uber-go/zap
- Golang-jwt: github.com/golang-jwt/…
- Go-redis: github.com/go-redis/re…
- Testify: github.com/stretchr/te…
- Sonyflake: github.com/sony/sonyfl…
- Gocron: github.com/go-co-op/go…
- Go-sqlmock: github.com/DATA-DOG/go…
- Gomock: github.com/golang/mock
- Swaggo: github.com/swaggo/swag
- Pitaya: github.com/topfreegame…
- More...
准备工作
安装nunu命令行工具
输入这个命令安装 nunu 命令行工具:
go install github.com/go-nunu/nunu@latest
然后使用 new
命令创建一个模板工程:
nunu new easy-im-basic
? Please select a layout: Basic
git clone https://github.com/go-nunu/nunu-layout-base.git
go mod tidy
go install github.com/google/wire/cmd/wire@latest
_ _
| \ | |_ _ _ __ _ _
| \| | | | | '_ \| | | |
| |\ | |_| | | | | |_| |
|_| \_|\__,_|_| |_|\__,_|
A CLI tool for building go aplication.
🎉 Project easy-im-basic created successfully!
Done. Now run:
› cd easy-im-basic
› nunu run
因为刚开始我们只是写一个简单的接口,并且用不到很多复杂模块,所以使用这个 basic 模板工程即可。这是工程目录结构:
安装websocket库
接下来我们需要安装一下 gorilla/websocket 库,基于这个库来开发 websocket 接口:
go get -u github.com/gorilla/websocket
安装wire命令行工具
Nunu 脚手架使用到了 wire 依赖注入库,得确保你安装了 wire 命令行工具用于生产依赖注入代码:
go install github.com/google/wire/cmd/wire@latest
第一个简单的websocket接口
ChatHandler
下面我们正式来开发第一个简单的 websocket 接口。
在 internal/handler 目录下创建一个 chat.go 文件,声明 ChatHandler
接口类型:
type ChatHandler interface {
Chat(c *gin.Context)
}
然后声明 chatHandler
接口体实现这个接口。这个结构体内嵌了 Handler
类型的指针,Handler
在当前目录的 handler.go 文件中已经声明了,里面包含一些公共的属性,比如说日志对象。
internal/handler/handler.go:
type Handler struct {
logger *log.Logger
}
func NewHandler(logger *log.Logger) *Handler {
return &Handler{
logger: logger,
}
}
internal/handler/chat.go:
type chatHandler struct {
*Handler
upGrader *websocket.Upgrader
}
func NewChatHandler(handler *Handler) ChatHandler {
// upGrader 用于将 http 协议升级为 websocket
upGrader := &websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
return &chatHandler{Handler: handler, upGrader: upGrader}
}
func (w *chatHandler) Chat(c *gin.Context) {
// 将 HTTP 请求升级为 WebSocket
conn, err := w.upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"message": "Failed to set WebSocket upgrade"})
return
}
defer conn.Close()
// 双向通信:读取客户端消息,并回复
for {
// 读取消息
messageType, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("read error:", err)
break
}
w.logger.Info("Received message: %s", zap.String("msg", string(message)))
// 响应客户端消息
response := fmt.Sprintf("Server received: %s", message)
if err = conn.WriteMessage(messageType, []byte(response)); err != nil {
fmt.Println("write error:", err)
break
}
}
}
cmd/server/wire/wire.go:
var HandlerSet = wire.NewSet(
handler.NewHandler,
handler.NewUserHandler,
handler.NewChatHandler,
)
server/http.go:
func NewServerHTTP(
logger *log.Logger,
userHandler *handler.UserHandler,
chatHandler handler.ChatHandler,
) *gin.Engine {
// ...
r.GET("/chat", chatHandler.Chat)
}
最后在 cmd/server/wire 目录下执行 wire
命令生成一下依赖注入代码。
接口测试
打开 ApiPost 新建一个 WebSocket 接口测试:
点击连接,然后访问对应的接口。启动服务之前需要检查一下端口是否对应上,config/local.yml:
可以看到成功地连接到了服务:
然后点击发送消息,服务端也成功返回了消息:
websocket 工作流程
websocket 流程图:
工作流程:
- 建立连接(握手阶段):WebSocket 连接是通过标准的 HTTP/HTTPS 协议发起的,因此首先是由客户端发送一个 HTTP 请求来建立 WebSocket 连接。握手请求的关键步骤如下:
- 客户端发送握手请求:客户端发出 HTTP 请求,要求服务器升级协议。这是一个 GET 请求,包含以下关键头信息:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Upgrade: websocket
:表示客户端请求从 HTTP 升级为 WebSocket。Connection: Upgrade
:指明连接类型为升级。Sec-WebSocket-Key
:客户端生成的随机字符串,用于握手验证。Sec-WebSocket-Version
:指定 WebSocket 协议的版本,当前一般为 13。
- 服务器响应握手:服务器收到客户端的握手请求后,会返回一个 HTTP 101 状态码表示协议升级成功,同时包含关键的响应头信息:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
101 Switching Protocols
:表示协议从 HTTP 升级为 WebSocket。Sec-WebSocket-Accept
:服务器通过客户端提供的Sec-WebSocket-Key
和一个固定字符串进行计算后生成的值,用于验证连接的合法性。一旦服务器返回了 HTTP 101 响应,WebSocket 连接即被建立。
-
数据传输阶段:一旦握手成功,客户端和服务器之间的通信不再通过 HTTP 进行,而是通过 WebSocket 协议直接传输数据。此时,双方都可以随时发送消息,而不需要等待对方先发送数据。
-
数据帧:WebSocket 通过“帧”进行数据传输。每一个消息被分割为一个或多个数据帧,这些帧按顺序传输。WebSocket 支持以下几种帧类型:
- 文本帧:发送文本数据(通常是 UTF-8 编码的字符串)。
- 二进制帧:发送二进制数据(如文件、图片)。
- 控制帧:用于管理连接的控制帧,例如 ping/pong 帧用于维持连接的活动性,关闭帧用于断开连接。
-
帧结构:每一个 WebSocket 帧都有固定的结构,包括:
- FIN 位:表示消息是否结束(是否还有后续帧)。
- 操作码(Opcode):表示帧类型(如文本、二进制、关闭、ping、pong)。
- 掩码(Mask):从客户端发往服务器的所有帧都必须经过掩码处理,服务器发往客户端的帧则不用。
-
-
连接关闭:WebSocket 连接可以通过客户端或服务器发送一个“关闭”控制帧来关闭。这是一个带有操作码
0x8
的帧,表示连接的关闭。关闭帧可以携带关闭原因,供对方了解关闭的原因。
在正常情况下,WebSocket 关闭时,双方都会彼此发送关闭帧确认断开连接。
实现消息点对点收发
我们目前仅实现了客户端-服务端双向通信,现在我们来实现不同的客户端之间点对点发送消息。
ChatRequest
客户端之间发送消息需要用一个消息结构体来进行传输数据。数据的传输方式我们使用管道通信,这样的通信方式是十分高效且并发安全的。
internal/model/chat.go:
// ChatRequest 聊天内容请求体
type ChatRequest struct {
Originator string `json:"originator,omitempty"` // 发起人 ID
Receiver string `json:"receiver,omitempty"` // 接收人 ID
Content string `json:"content,omitempty"` // 消息内容
}
ChatClient
对于客户端,我们需要定义一个结构体类型来描述它的信息。声明 ChatClient
如下:
ChatClient
:
// ChatClient 定义一个 WebSocket 客户端的结构体
type ChatClient struct {
Logger *log.Logger
ID string // 用户ID,或可以是用户名
Conn *websocket.Conn // WebSocket 连接
MsgChan chan []byte // 用于发送消息的通道
}
ReadPump
方法用于读取客户端的消息。这个方法参数是一个 ChatHub
类型的指针。ChatHub
用于管理各个 ChatClient
之间的通信,以及 ChatClient
的上线和下线。ChatHub
的 UnregisterCh
的管道变量用于在客户端断开连接之后注销用户。消息的格式我们使用 JSON 类型,在读取到消息之后要把 JSON 数据转为结构体。这里我们使用了 ChatRequest
结构体接收。最后把消息存入 ChatHub
的 BroadcastCh
管道变量中,用于对客户端进行广播。
在目前,我们不区分私聊和群聊等消息类型,所以直接用广播方式即可。
// ReadPump 读取消息
func (c *ChatClient) ReadPump(hub *ChatHub) {
defer func() {
hub.UnregisterCh <- c
_ = c.Conn.Close()
}()
for {
_, messageBytes, err := c.Conn.ReadMessage()
if err != nil {
c.Logger.Error("read error", zap.String("err", err.Error()))
break
}
c.Logger.Debug("read message complete", zap.ByteString("msg", messageBytes))
// 将消息解析为 Message 结构
var message ChatRequest
if err = json.Unmarshal(messageBytes, &message); err != nil {
c.Logger.Error("unmarshal error", zap.String("err", err.Error()))
continue
}
// 将消息发送到 Hub 以处理
hub.BroadcastCh <- message
}
}
WritePump
方法用于向客户端写入响应的消息。ChatClient
的 MsgChan
管道中如果可以获取到数据的话,那么说明有 ChatClient
发消息给当前 ChatClient
,然后就可以把这个消息写回当前客户端。
// WritePump 客户端写消息的方法
func (c *ChatClient) WritePump() {
defer c.Conn.Close()
for {
select {
case message, ok := <-c.MsgChan:
if !ok {
// 如果通道关闭了,终止连接
_ = c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 向客户端发送消息
if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
return
}
}
}
}
ChatHub
ChatHub
:
type ChatHub struct {
logger *log.Logger
Clients map[string]*ChatClient // 用用户ID作为键来存储连接的客户端
BroadcastCh chan ChatRequest // 广播通道
RegisterCh chan *ChatClient // 注册新客户端
UnregisterCh chan *ChatClient // 注销客户端
mu *sync.Mutex // 互斥锁,保证并发安全
}
func NewChatHub(logger *log.Logger) *ChatHub {
return &ChatHub{
logger: logger,
Clients: make(map[string]*ChatClient, 128),
BroadcastCh: make(chan ChatRequest),
RegisterCh: make(chan *ChatClient),
UnregisterCh: make(chan *ChatClient),
mu: &sync.Mutex{},
}
}
定义 Run
方法用于监听 ChatHandler
发来的消息:
// Run 启动 Hub 以管理所有连接
func (h *ChatHub) Run() {
for {
select {
// 处理新客户端的连接
case client := <-h.RegisterCh:
h.mu.Lock()
h.Clients[client.ID] = client
h.mu.Unlock()
h.logger.Debug("client connected", zap.String("uid", client.ID))
// 处理客户端断开连接
case client := <-h.UnregisterCh:
h.mu.Lock()
if _, ok := h.Clients[client.ID]; ok {
delete(h.Clients, client.ID)
close(client.MsgChan)
h.logger.Debug("client disconnected", zap.String("uid", client.ID))
}
h.mu.Unlock()
// 处理消息发送
case message := <-h.BroadcastCh:
h.mu.Lock()
// 找到消息的接收者
if receiver, ok := h.Clients[message.Receiver]; ok {
// 将消息发送给接收者
select {
case receiver.MsgChan <- []byte(message.Content):
default:
close(receiver.MsgChan)
delete(h.Clients, message.Receiver)
}
}
h.mu.Unlock()
}
}
}
ChatHandler并发处理读写
chatHandler
需要做一点小小的改造。它的内部需要依赖一个 ChatHub
变量,这样才能监听消息。
type chatHandler struct {
*Handler
upGrader *websocket.Upgrader
hub *model.ChatHub
}
func NewChatHandler(handler *Handler) ChatHandler {
upGrader := &websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
hub := model.NewChatHub(handler.logger)
// 开启一个协程运行 Run 方法协调各个客户端的通信
go hub.Run()
return &chatHandler{
Handler: handler,
upGrader: upGrader,
hub: hub,
}
}
Chat
方法需要将读取和写回操作改成并发的操作,而且在请求的时候需要带上查询参数 uid
表示用户的 id,用于创建 ChatClient
对象。
func (w *chatHandler) Chat(c *gin.Context) {
uid := c.Query("uid")
if uid == "" {
c.JSON(http.StatusBadRequest, gin.H{"message": "User id doesn't exist"})
return
}
// 将 HTTP 请求升级为 WebSocket
conn, err := w.upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"message": "Failed to set WebSocket upgrade"})
return
}
// 创建客户端对象
client := &model.ChatClient{
Logger: w.logger,
ID: uid,
Conn: conn,
MsgChan: make(chan []byte),
}
// 把客户端对象注册到 Hub
w.hub.RegisterCh <- client
// 开启两个协程分别处理客户端消息接收和发送
// 中间通过管道通信
go client.ReadPump(w.hub)
go client.WritePump()
}
为什么要改成并发的呢?因为 ReadPump
和 WritePump
方法都是 for-select 模式,不用 Go 协程那么必然会阻塞。
接口测试
开启两个 WebSocket 连接。
然后可以看到打印出了日志:
两个连接都成功发送并接收了消息:
简单的网页版
现在我们来实现一个简单的网页版。首先需要设置静态页面路径:
server/http.go:
r.StaticFile("/index", "./web/index.html")
这样 /index
路径就和 web/index.html 对应起来了。
web/index.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket Chat - Private Messaging</title>
</head>
<body>
<h1>WebSocket Private Chat</h1>
<input id="userID" type="text" placeholder="Enter your ID..."/>
<button onclick="connect()">Connect</button>
<br>
<input id="receiverID" type="text" placeholder="Receiver ID"/>
<input id="message" type="text" placeholder="Type a message..."/>
<button onclick="sendMessage()">Send Message</button>
<ul id="messages"></ul>
</body>
</html>
<script>
let ws
let userID
function connect() {
userID = document.getElementById('userID').value
ws = new WebSocket(`ws://localhost:8080/chat?uid=${userID}`)
ws.onmessage = function (event) {
const messages = document.getElementById('messages')
const receiverEl = document.getElementById('receiverID')
const message = document.createElement('li')
message.appendChild(document.createTextNode(receiverEl.value + ": " + event.data))
messages.appendChild(message)
}
}
function sendMessage() {
const receiverID = document.getElementById('receiverID').value
const messageContent = document.getElementById('message').value
const messages = document.getElementById('messages')
// 发送消息的格式:JSON 包含发送者ID、接收者ID、内容
const message = {
sender: userID,
receiver: receiverID,
content: messageContent
}
ws.send(JSON.stringify(message))
const li = document.createElement('li')
li.innerText = 'ME: ' + messageContent
messages.appendChild(li)
}
</script>
效果如下:
转载自:https://juejin.cn/post/7414845811555155987