基于websocket实现《全双工协议》的TS同步调用之Request/Response/Notify 原理与实战~
本章核心内容
- 什么是
全双工
协议,与半双工
有什么本质区别? - 如何实现全双工协议的
同步语法
调用?- 即通过websocket向服务器发送一条消息,然后等待它的结果。如下图所示:
文末有Go服务端
与Web Typescript
源码~
建议各位看官先
点赞
加收藏
,方便以后查找~
背景介绍
在很多场景下,前端都会使用websocket实现一些长连相关的功能。但是在Web端,WebSocket接口的发送消息与接收消息是在两个不同的方法中,如下:
WebSocket.onmessage
- An event listener to be called when a message is received from the server.
WebSocket.send(data)
- Enqueues data to be transmitted.
摘自【Websocket -
Web APIs | MDN
】
因此如果你要实现一个上面的请求
、响应
的模式,对发送端来说,一种调用方式可能就是这样的:
let conn = new w3cwebsocket(url)
conn.binaryType = "arraybuffer"
conn.onopen = () => {
console.info("websocket open - readyState:", conn.readyState)
if (conn.readyState === w3cwebsocket.OPEN) {
let req = JSON.stringify({ "seq": 1, "msg": "hello world" })
conn.send(req) <-- 请求
}
}
conn.onclose = (e: ICloseEvent) => {
console.debug("event[onclose] fired")
}
conn.onmessage = (evt: IMessageEvent) => {
let resp = JSON.parse(<string>evt.data)
console.info(resp) <-- 响应
}
请求与响应
分离
的编码逻辑,难受不~~
因此,我们接下来就介绍如何解决这个问题。不过,在编码实现之前,容我介绍下关键点:全双工
与半双工
通信的区别。
全双工
- 全双工:通信双方可以同时发送信息对对方。
- 半双工:可以双向通信,但是同一时刻只能有一个方向在传输信息。
理解下基本概念即可,在不同的层级会不同的含义。
我们都知道,Websocket与Http1.x都是基于TCP/IP之上的协议,而TCP也是全双工通信协议。为什么说Websocket就是全双工的通信协议,而Http1.x是半双工协议。而且Websocket还是在Http协议的基础上升级而来。
难道Http1.x不是亲生的~
本质的原因HTTP1.x协议是一个请求、响应的模式。在一次请求中,Respose必定是在Request之后发生的
,请求包与响应包是不可能同时在网络中传输:
这就是为什么说Http1.x是一个半双工协议。
如果从编码的角度来看,代码就很好写,因为请求与返回是有顺序的,一个伪逻辑如下:
function request(req){
conn.send(req)
let resp = conn.read()
return resp
}
但是在一个全双工的通信中,消息之间是没有明确的顺序与关联关系的。如下图:
虽然说websocket是一个全双工的通信协议,但是在它的协议中也没有不同消息之间的关联信息
,当web端收到一条消息它也区分不出来谁与谁。因此,我们可以定义一个简单的上层业务协议
,如下:
属性 | 类型 | 说明 |
---|---|---|
Sequence | 整形 | 消息序号 |
Type | 枚举 | 1:请求/2:响应/3:通知 |
Message | string | 消息内容 |
如此一来,在逻辑上就可以对每个消息打下标记,还以上图为例:
格式 | 消息 |
---|---|
Sequence: 1,Type: 1,Message: hello | Request1 |
Sequence: 2,Type: 1,Message: world | Request2 |
Sequence: 1,Type: 2,Message: ok | Response1 |
Sequence: 2,Type: 2,Message: ok | Response2 |
Sequence: 1,Type: 1,Message: test | Request3 |
Sequence: 1,Type: 3,Message: test | Notify3 |
如此一来,就可以完成我们想要的逻辑了,只要通过Sequence
和Type
两个属性,就可以把一个请求与一个响应关联在一起。
Sequence在客户端生成一条消息时
自增
即可。
实战
typescript代码
第一步
:定义好相关对象:
- Message:业务协议
- Request:模拟请求缓存
- Response:模拟响应
export class Message {
sequence: number = 0;
type: number = 1;
message?: string;
from?: string; // sender
constructor(message?: string) {
this.message = message;
this.sequence = Seq.Next() <--- 自动生成序号
}
}
export class Request {
sendTime: number
callback: (response: Message) => void
constructor(callback: (response: Message) => void) {
this.sendTime = Date.now()
this.callback = callback
}
}
export class Response {
success: boolean = false
message?: Message
constructor(success: boolean, message?: Message) {
this.success = success;
this.message = message;
}
}
注意,在Request中会有一个
callback
回调方法,这是实现同步调用
的关键。
第二步
:创建WebsocketClient对象,并创建一个名为sendq
的用于保存请求的Map,然后实现一个request
方法:
export class WebsocketClient {
private sendq = new Map<number, Request>() <--- 创建map
async request(data: Message): Promise<Response> {
return new Promise((resolve, _) => {
let seq = data.sequence
// asynchronous wait ack from server
let callback = (msg: Message) => { <--- 创建回调
// remove from sendq
this.sendq.delete(seq)
resolve(new Response(true, msg))
}
this.sendq.set(seq, new Request(callback)) <--- 暂存 Request
if (!this.send(JSON.stringify(data))) { <--- 发送消息
resolve(new Response(false))
}
})
}
send(data: string): boolean {
try {
if (this.conn == null) {
return false
}
this.conn.send(data)
} catch (error) {
return false
}
return true
}
如果说Http的请求与响应是基于先后顺序关联,那么全双工的请求与响应关联的核心就是sendq
这个Map对象了,它相当于在客户端缓存着所有等待响应的请求
,有点拗口。
在这里request方法中,主要逻辑分三步:
- 创建回调
- 暂存Request
- 发送消息
第三步
:接收消息的处理,它在登录方法中:
async login(): Promise<{ success: boolean }> {
if (this.state == State.CONNECTED) {
return { success: false }
}
this.state = State.CONNECTING
return new Promise((resolve, _) => {
let conn = new w3cwebsocket(this.wsurl)
conn.binaryType = "arraybuffer"
let returned = false
conn.onopen = () => {
console.info("websocket open - readyState:", conn.readyState)
if (conn.readyState === w3cwebsocket.OPEN) {
returned = true
resolve({ success: true })
}
}
// overwrite onmessage
conn.onmessage = (evt: IMessageEvent) => {
try {
let msg = new Message();
Object.assign(msg, JSON.parse(<string>evt.data))
if (msg.type == 2) {
let req = this.sendq.get(msg.sequence) <----读取request
if (req) {
req.callback(msg) <----触发回调
}
} else if (msg.type == 3) {
console.log(msg.message, msg.from)
}
} catch (error) {
console.error(evt.data, error)
}
}
conn.onerror = (error) => {
console.info("websocket error: ", error)
if (returned) {
resolve({ success: false })
}
}
conn.onclose = (e: ICloseEvent) => {
console.debug("event[onclose] fired")
this.onclose(e.reason)
}
this.conn = conn
this.state = State.CONNECTED
})
}
可以看到,在全双工的消息收发中没有统一顺序。因此在这里解析出Message
对象之后,就会判断它的类型,如果是Response消息,就去sendq中找这个消息的请求Request,并调用回调方法。
Go服务端代码逻辑:
在服务端主要实现了一个消息的广播,完成之后就给发送者一个response消息
。主要逻辑在handle
方法中,这里就不详细介绍了,感兴趣的可以直接看源码
。
type Message struct {
Sequence int `json:"sequence,omitempty"`
Type int `json:"type,omitempty"`
Message string `json:"message,omitempty"`
From string `json:"from,omitempty"`
}
func (m *Message) MarshalJSON() []byte {
bts, _ := json.Marshal(m)
return bts
}
func parseMessage(text string) *Message {
var msg Message
_ = json.Unmarshal([]byte(text), &msg)
return &msg
}
// 广播消息
func (s *Server) handle(user string, text string) {
logrus.Infof("recv message %s from %s", text, user)
s.Lock()
defer s.Unlock()
msg := parseMessage(text)
msg.From = user
msg.Type = 3 //notify type
notice := msg.MarshalJSON()
for u, conn := range s.users {
if u == user {
continue
}
logrus.Infof("send to %s : %s", u, text)
err := s.writeText(conn, notice)
if err != nil {
logrus.Errorf("write to %s failed, error: %v", user, err)
}
}
conn := s.users[user]
resp := Message{ <--- 创建响应包
Sequence: msg.Sequence, <--- 序号一定要与请求包中相同
Type: 2, //response type
Message: "ok",
}
_ = s.writeText(conn, resp.MarshalJSON())
}
func (s *Server) writeText(conn net.Conn, message []byte) error {
// 创建文本帧数据
f := ws.NewTextFrame(message)
err := conn.SetWriteDeadline(time.Now().Add(s.options.writewait))
if err != nil {
return err
}
return ws.WriteFrame(conn, f)
}
演示示例
最终得到我们想要的结果:
let resp = await cli.request(req)
// index.ts
const main = async () => {
let cli = new WebsocketClient("ws://localhost:8000", "ccc");
let { success } = await cli.login()
console.log("client login return -- ", success)
let req = new Message("hello")
let resp = await cli.request(req)
console.log("client request", req, "return", resp.message)
await sleep(5)
cli.logout()
}
main()
执行看到输出如下:
$ ts-node index.ts
websocket open - readyState: 1
client login return -- true
client request Message { sequence: 1, type: 1, message: 'hello' } return Message { sequence: 1, type: 2, message: 'ok' }
event[onclose] fired
connection closed due to Normal connection closure
最后总结
本文介绍了全双工与半双工概念与本质区别。同时通过业务协议
与Promise
完成了全双工通信下的请求与响应的同步调用逻辑。
github源码:klintcheng/demo
最后,最多精彩知识尽在我编写的这本小册中 分布式IM原理与实战: 从0到1打造即时通讯云。
看都看到这里了,还不顺手一个
点赞
。
转载自:https://juejin.cn/post/7000140081293950984