likes
comments
collection
share

基于websocket实现《全双工协议》的TS同步调用之Request/Response/Notify 原理与实战~

作者站长头像
站长
· 阅读数 38

本章核心内容

  • 什么是全双工协议,与半双工有什么本质区别?
  • 如何实现全双工协议的同步语法调用?
    • 即通过websocket向服务器发送一条消息,然后等待它的结果。如下图所示:

基于websocket实现《全双工协议》的TS同步调用之Request/Response/Notify 原理与实战~

文末有Go服务端Web Typescript源码~

建议各位看官先点赞收藏,方便以后查找~

背景介绍

在很多场景下,前端都会使用websocket实现一些长连相关的功能。但是在Web端,WebSocket接口的发送消息与接收消息是在两个不同的方法中,如下:

  • WebSocket.onmessage
    • An event listener to be called when a message is received from the server.
  • WebSocket.send(data)
    • Enqueues data to be transmitted.

摘自【Websocket - Web APIs | MDN

因此如果你要实现一个上面的请求响应的模式,对发送端来说,一种调用方式可能就是这样的:

let conn = new w3cwebsocket(url)
conn.binaryType = "arraybuffer"
conn.onopen = () => {
    console.info("websocket open - readyState:", conn.readyState)
    if (conn.readyState === w3cwebsocket.OPEN) {
        let req = JSON.stringify({ "seq": 1, "msg": "hello world" })
        conn.send(req)  <-- 请求
    }
}

conn.onclose = (e: ICloseEvent) => {
    console.debug("event[onclose] fired")
}

conn.onmessage = (evt: IMessageEvent) => {
    let resp = JSON.parse(<string>evt.data)
    console.info(resp) <-- 响应
}

请求与响应分离的编码逻辑,难受不~~

因此,我们接下来就介绍如何解决这个问题。不过,在编码实现之前,容我介绍下关键点:全双工半双工通信的区别。

全双工

  • 全双工:通信双方可以同时发送信息对对方。
  • 半双工:可以双向通信,但是同一时刻只能有一个方向在传输信息。

理解下基本概念即可,在不同的层级会不同的含义。

我们都知道,Websocket与Http1.x都是基于TCP/IP之上的协议,而TCP也是全双工通信协议。为什么说Websocket就是全双工的通信协议,而Http1.x是半双工协议。而且Websocket还是在Http协议的基础上升级而来。

基于websocket实现《全双工协议》的TS同步调用之Request/Response/Notify 原理与实战~

难道Http1.x不是亲生的~

本质的原因HTTP1.x协议是一个请求、响应的模式。在一次请求中,Respose必定是在Request之后发生的,请求包与响应包是不可能同时在网络中传输:

基于websocket实现《全双工协议》的TS同步调用之Request/Response/Notify 原理与实战~

这就是为什么说Http1.x是一个半双工协议。

如果从编码的角度来看,代码就很好写,因为请求与返回是有顺序的,一个伪逻辑如下:

function request(req){
    conn.send(req)
    let resp = conn.read()
    return resp
}

但是在一个全双工的通信中,消息之间是没有明确的顺序与关联关系的。如下图:

基于websocket实现《全双工协议》的TS同步调用之Request/Response/Notify 原理与实战~

虽然说websocket是一个全双工的通信协议,但是在它的协议中也没有不同消息之间的关联信息,当web端收到一条消息它也区分不出来谁与谁。因此,我们可以定义一个简单的上层业务协议,如下:

属性类型说明
Sequence整形消息序号
Type枚举1:请求/2:响应/3:通知
Messagestring消息内容

如此一来,在逻辑上就可以对每个消息打下标记,还以上图为例:

格式消息
Sequence: 1,Type: 1,Message: helloRequest1
Sequence: 2,Type: 1,Message: worldRequest2
Sequence: 1,Type: 2,Message: okResponse1
Sequence: 2,Type: 2,Message: okResponse2
Sequence: 1,Type: 1,Message: testRequest3
Sequence: 1,Type: 3,Message: testNotify3

如此一来,就可以完成我们想要的逻辑了,只要通过SequenceType两个属性,就可以把一个请求与一个响应关联在一起。

Sequence在客户端生成一条消息时自增即可。

实战

typescript代码

第一步:定义好相关对象:

  • Message:业务协议
  • Request:模拟请求缓存
  • Response:模拟响应
export class Message {
    sequence: number = 0;
    type: number = 1;
    message?: string;
    from?: string; // sender
    constructor(message?: string) {
        this.message = message;
        this.sequence = Seq.Next() <--- 自动生成序号
    }
}

export class Request {
    sendTime: number
    callback: (response: Message) => void
    constructor(callback: (response: Message) => void) {
        this.sendTime = Date.now()
        this.callback = callback
    }
}

export class Response {
    success: boolean = false
    message?: Message
    constructor(success: boolean, message?: Message) {
        this.success = success;
        this.message = message;
    }
}

注意,在Request中会有一个callback回调方法,这是实现同步调用的关键。

第二步:创建WebsocketClient对象,并创建一个名为sendq的用于保存请求的Map,然后实现一个request方法:

export class WebsocketClient {
    private sendq = new Map<number, Request>()      <--- 创建map
    
    async request(data: Message): Promise<Response> {
        return new Promise((resolve, _) => {
            let seq = data.sequence

            // asynchronous wait ack from server
            let callback = (msg: Message) => {     <--- 创建回调
                // remove from sendq
                this.sendq.delete(seq)
                resolve(new Response(true, msg))
            }

            this.sendq.set(seq, new Request(callback))  <--- 暂存 Request

            if (!this.send(JSON.stringify(data))) {   <--- 发送消息
                resolve(new Response(false))
            }
        })
    }
    send(data: string): boolean {
        try {
            if (this.conn == null) {
                return false
            }
            this.conn.send(data)
        } catch (error) {
            return false
        }
        return true
    }

如果说Http的请求与响应是基于先后顺序关联,那么全双工的请求与响应关联的核心就是sendq这个Map对象了,它相当于在客户端缓存着所有等待响应的请求,有点拗口。

在这里request方法中,主要逻辑分三步:

  1. 创建回调
  2. 暂存Request
  3. 发送消息

第三步:接收消息的处理,它在登录方法中:

async login(): Promise<{ success: boolean }> {
    if (this.state == State.CONNECTED) {
        return { success: false }
    }
    this.state = State.CONNECTING
    return new Promise((resolve, _) => {
        let conn = new w3cwebsocket(this.wsurl)
        conn.binaryType = "arraybuffer"
        let returned = false
        conn.onopen = () => {
            console.info("websocket open - readyState:", conn.readyState)
            if (conn.readyState === w3cwebsocket.OPEN) {
                returned = true
                resolve({ success: true })
            }
        }

        // overwrite onmessage
        conn.onmessage = (evt: IMessageEvent) => {
            try {
                let msg = new Message();
                Object.assign(msg, JSON.parse(<string>evt.data))
                if (msg.type == 2) {
                    let req = this.sendq.get(msg.sequence)       <----读取request
                    if (req) {
                        req.callback(msg)                        <----触发回调 
                    }
                } else if (msg.type == 3) {
                    console.log(msg.message, msg.from)
                }
            } catch (error) {
                console.error(evt.data, error)
            }
        }

        conn.onerror = (error) => {
            console.info("websocket error: ", error)
            if (returned) {
                resolve({ success: false })
            }
        }

        conn.onclose = (e: ICloseEvent) => {
            console.debug("event[onclose] fired")
            this.onclose(e.reason)
        }
        this.conn = conn
        this.state = State.CONNECTED
    })
}

可以看到,在全双工的消息收发中没有统一顺序。因此在这里解析出Message对象之后,就会判断它的类型,如果是Response消息,就去sendq中找这个消息的请求Request,并调用回调方法。

Go服务端代码逻辑:

在服务端主要实现了一个消息的广播,完成之后就给发送者一个response消息。主要逻辑在handle方法中,这里就不详细介绍了,感兴趣的可以直接看源码

type Message struct {
	Sequence int    `json:"sequence,omitempty"`
	Type     int    `json:"type,omitempty"`
	Message  string `json:"message,omitempty"`
	From     string `json:"from,omitempty"`
}

func (m *Message) MarshalJSON() []byte {
	bts, _ := json.Marshal(m)
	return bts
}

func parseMessage(text string) *Message {
	var msg Message
	_ = json.Unmarshal([]byte(text), &msg)
	return &msg
}

// 广播消息
func (s *Server) handle(user string, text string) {
	logrus.Infof("recv message %s from %s", text, user)
	s.Lock()
	defer s.Unlock()
	msg := parseMessage(text)
	msg.From = user
	msg.Type = 3 //notify type
	notice := msg.MarshalJSON()
	for u, conn := range s.users {
		if u == user {
			continue
		}
		logrus.Infof("send to %s : %s", u, text)
		err := s.writeText(conn, notice)
		if err != nil {
			logrus.Errorf("write to %s failed, error: %v", user, err)
		}
	}

	conn := s.users[user]
        resp := Message{                 <--- 创建响应包
		Sequence: msg.Sequence,  <--- 序号一定要与请求包中相同
		Type:     2, //response type
		Message:  "ok",
	}
	_ = s.writeText(conn, resp.MarshalJSON())
}

func (s *Server) writeText(conn net.Conn, message []byte) error {
	// 创建文本帧数据
	f := ws.NewTextFrame(message)
	err := conn.SetWriteDeadline(time.Now().Add(s.options.writewait))
	if err != nil {
		return err
	}
	return ws.WriteFrame(conn, f)
}

演示示例

最终得到我们想要的结果:

let resp = await cli.request(req)

// index.ts
const main = async () => {
    let cli = new WebsocketClient("ws://localhost:8000", "ccc");
    let { success } = await cli.login()
    console.log("client login return -- ", success)

    let req = new Message("hello")
    let resp = await cli.request(req)
    console.log("client request", req, "return", resp.message)

    await sleep(5)
    cli.logout()
}

main()

执行看到输出如下:

$ ts-node index.ts
websocket open - readyState: 1
client login return --  true
client request Message { sequence: 1, type: 1, message: 'hello' } return Message { sequence: 1, type: 2, message: 'ok' }
event[onclose] fired
connection closed due to Normal connection closure

最后总结

本文介绍了全双工与半双工概念本质区别。同时通过业务协议Promise完成了全双工通信下的请求与响应的同步调用逻辑。

github源码:klintcheng/demo

最后,最多精彩知识尽在我编写的这本小册中 分布式IM原理与实战: 从0到1打造即时通讯云

基于websocket实现《全双工协议》的TS同步调用之Request/Response/Notify 原理与实战~

看都看到这里了,还不顺手一个点赞

转载自:https://juejin.cn/post/7000140081293950984
评论
请登录