如何反向代理 WebSocket
前言
本篇文章将介绍如何反向代理 websocket 协议。
我们将从基础理论知识开始,了解反向代理和 websocket 协议;同时我们将通过阅读 Hertz 开源实现的源码来进一步学习如何实现以及相关细节。
- Hertz: 字节跳动开源的 Golang 高性能 HTTP 框架;
- reverseproxy: Hertz 框架的 websocket 反向代理扩展,受 fasthttp-reverse-proxy websocket 反向代理启发;
基础知识
什么是反向代理(reverse proxy)
反向代理是一种代理服务器,位于内部服务器和外部网络之间,用于处理对内部服务器的请求。当客户端发送请求时,它并不直接访问目标服务器,而是通过反向代理服务器。这个代理服务器负责将请求转发给一个或多个目标服务器,并将得到的响应返回给客户端。
反向代理的作用一般如下:
- 隐藏后端服务器:客户端只与反向代理通信,不直接与后端服务器通信,从而隐藏了后端服务器的信息。
- 负载均衡:反向代理可以根据请求的负载情况将请求分发给多个后端服务器,以平衡服务器负载。
- 缓存静态资源:可以缓存静态内容,减少服务器负载,加快响应速度。
- 安全性:可以充当防火墙,过滤恶意请求,提高安全性。
一个反向代理的示意图如下所示,NGINX 是一个经常被使用的反向代理服务器(通常也被称为网关 gateway),用户的请求会打到反向代理服务器上再由反向代理服务器转发给后端的服务器集群。
什么是 websocket
WebSocket 是一种在单个TCP连接上提供全双工通信的协议。与传统的HTTP请求-响应模式不同,WebSocket允许服务器与客户端之间建立持久性连接,双向通信,实现了实时数据传输。它允许服务器主动向客户端推送数据,而不需要客户端发起请求。
- 全双工通信:允许服务器和客户端之间的双向通信,可以实时地发送和接收数据。
- 低延迟:建立在单个TCP连接上,减少了建立连接的开销,降低了通信延迟。
- 减少数据传输开销:相比于传统的HTTP轮询,WebSocket可以降低数据传输的开销,更高效地进行数据传输。
如何实现
在了解完基础知识后,我们就可以开始考虑如何具体实现了。
术语约定
我们先约定几个将会使用的术语,方便我们理解整个过程:
-
客户端(client):用户;
-
代理服务器(proxy server):反向代理服务器,例如前面提到的 NGINX 以及我们这里使用的 Hertz;
-
后端服务器(backend server):实际接收并处理客户端请求的后端服务器,不直接与客户端通信;
基本思路
由于我们是基于 Hertz HTTP 框架实现反向代理服务,所以这里的反向代理服务器就是 Hertz,那么要反向代理 websocket 我们则可以通过提供一个 http.Handler
来完成,这个 Handler 可以将用户建立 websocket 连接的请求转发给后端服务器,然后在代理服务器与后端服务器,客户端与代理服务器之间建立 websocket 连接,代理服务器负责两个连接之间的消息传递。
一个从建立连接到完成 websocket 反向代理的过程如下所示:
- 客户端向代理服务器发起建立 websocket 连接的握手(handshake)请求;
- 代理服务器转发请求到后端服务器;
- 代理服务器与后端服务器建立 websocket 连接;
- 客户端与代理服务器建立 websocket 连接;
至此所有连接建立完毕,websocket 反向代理完成,下面是一个客户端通过 websocket 反向代理发送信息的过程示例:
- 客户端写消息到与代理服务器建立的 websocket 连接;
- 代理服务器从与客户端的 websocket 连接读取消息;
- 代理服务器将读取到的消息写到与后端服务器建立的 websocket 连接;
笔者也提供了一个时序图,图中的编号与上面的过程编号一一对应,读者可以结合图示来理解反向代理 websocket 的整个过程。
具体实现
有了实现的基本思路之后我们就可以开始考虑具体实现了,这里我们通过阅读 Hertz 框架 websocket 反向代理扩展 的代码来理解具体的实现过程。
这个扩展中 websocket 反向代理的代码在以下两个文件中:
- ws_reverse_proxy.go: websocket 反向代理的主要业务逻辑;
- ws_reverse_proxy_option.go: 提供给用户的自定义配置选项;
读者可以把这个库 clone 下来对照着阅读也可以直接看下面的源码解析。
主结构体 WSReverseProxy
type WSReverseProxy struct {
target string
options *Options
}
可以看到 websocket 反向代理的主结构体非常简单,只有两个字段:
- target:反向代理的目标地址,即后端服务器的路径;
- options:配置选项,WSReverseProxy 通过 Functional Option 的形式提供了三个可供用户自定义的配置选项;
构造方法 NewWSReverseProxy
func NewWSReverseProxy(target string, opts ...Option) *WSReverseProxy {
if target == "" {
panic("target string must not be empty")
}
options := newOptions(opts...)
wsrp := &WSReverseProxy{
target: target,
options: options,
}
return wsrp
}
构造方法的逻辑如下:
- 先对位于方法形参的后端服务器路径是否为空进行判断,如果为空则直接 panic;
- 初始化配置选项,如果用户进行了配置则使用用户的配置值,否则使用默认值;
- 创建主结构体并进行赋值;
- 将赋值后的主结构体返回给用户;
核心方法 ServeHTTP
func (w *WSReverseProxy) ServeHTTP(ctx context.Context, c *app.RequestContext)
ServeHTTP
方法是实现 websocket 反向代理的核心方法,也是我们前面提到的 Handler,用户通过使用构造方法返回的主结构体实例来调用 ServeHTTP
方法并注册对应的路由来实现 websocket 反向代理的过程。
由于 ServeHTTP
非常长,这里将 ServeHTTP
方法按照实现 websocket 反向代理的过程拆分为 4 个部分,我们依次对这 4 个部分进行解析。
第一部分:准备转发头
forwardHeader := prepareForwardHeader(ctx, c)
if w.options.Director != nil {
w.options.Director(ctx, c, forwardHeader)
}
这一部分,Handler 将客户端握手请求的请求头通过 prepareForwardHeader
方法进行处理并返回一个 forwardHeader
用于让代理服务器发起与后端服务器建立 websocket 连接的请求。
并且如果用户自定义了 Director
选项,则可以进一步对 forwardHeader
进行对应的处理。
type Director func(ctx context.Context, c *app.RequestContext, forwardHeader http.Header)
接下来我们就一起来看下 prepareForwardHeader
方法:
func prepareForwardHeader(_ context.Context, c *app.RequestContext) http.Header {
forwardHeader := make(http.Header, 4)
if origin := string(c.Request.Header.Peek("Origin")); origin != "" {
forwardHeader.Add("Origin", origin)
}
if proto := string(c.Request.Header.Peek("Sec-Websocket-Protocol")); proto != "" {
forwardHeader.Add("Sec-WebSocket-Protocol", proto)
}
if cookie := string(c.Request.Header.Peek("Cookie")); cookie != "" {
forwardHeader.Add("Cookie", cookie)
}
if host := string(c.Request.Host()); host != "" {
forwardHeader.Set("Host", host)
}
clientIP := c.ClientIP()
if prior := c.Request.Header.Peek("X-Forwarded-For"); prior != nil {
clientIP = string(prior) + ", " + clientIP
}
forwardHeader.Set("X-Forwarded-For", clientIP)
forwardHeader.Set("X-Forwarded-Proto", "http")
if string(c.Request.URI().Scheme()) == "https" {
forwardHeader.Set("X-Forwarded-Proto", "https")
}
return forwardHeader
}
方法逻辑如下:
-
先初始化了一个 4 个大小的
http.Header
(type Header map[string][]string
); -
从客户端的请求头中查看是否有
Origin
,Sec-Websocket-Protocol
,Cookie
和Host
字段(HTTP Header),如果有的话则设置到forwardHeader
中;其中
Sec-Websocket-Protocol
Header 用于指定客户端和服务器在建立 webSocket 连接时所使用的子协议。WebSocket 协议允许在建立连接时定义一个或多个子协议,这些子协议可以描述在该连接上发送和接收的数据类型或消息格式。 -
从客户端请求头中查看是否有
X-Forwarded-For
字段,如果有的话就为现有的X-Forwarded-For
追加上当前客户端的 IP,如果没有的话为forwardHeader
设置X-Forwarded-For
字段为当前客户端的 IP;其中
X-Forwarded-For
通常由代理服务器添加在转发请求时,用于标识原始客户端的 IP 地址。在网络通信中,当请求经过多个代理服务器(比如负载均衡器、反向代理等)转发时,最初发起请求的客户端的真实 IP 地址可能会被隐藏。为了追踪请求的真实来源,代理服务器通常会在 HTTP 请求头部中添加
X-Forwarded-For
字段,其中包含了客户端原始的 IP 地址。一个示例如下:
注意:这里的 client1, proxy1, proxy2 都是真实的 IP 地址 X-Forwarded-For: client1, proxy1, proxy2
-
为
forwardHeader
设置X-Forwarded-Proto
字段并返回;其中
X-Forwarded-Proto
通常由代理服务器添加在转发请求时,用于指示原始请求所使用的协议(HTTP 或 HTTPS)。
注意点:
header.Add
和header.Set
方法的区别;
Add
方法会判断是否已存在 Key,如果不存在就设置对应的 Key 和 Value,如果存在就在已有的 Value 数组后追加当前 Value,Set
方法则不会进行判断,不管是否存在都设置为当前的 Key 和 Value。
结合以下代码可能更方便你进行理解:
header := make(http.Header)
header.Add("Key1", "Value1")
fmt.Println(header)
header.Add("Key1", "Value2")
fmt.Println(header)
header.Set("Key1", "Value3")
fmt.Println(header)
// output:
// map[Key1:[Value1]]
// map[Key1:[Value1 Value2]]
// map[Key1:[Value3]]
- 为什么不为
forwardHeader
设置Connection
,Upgrade
,Sec-WebSocket-Key
等字段;
熟悉 websocket 协议的读者可能知道这些字段都是 websocket 握手(Handshake)请求的必要字段,可以表明发起握手的客户端想要升级为 websocket 协议,如果你在 prepareForwardHeader
方法中加上以下代码,就可以发现其实客户端的请求中是包含这些字段的。
fmt.Println("Upgrade: " + string(c.Request.Header.Peek("Upgrade")))
fmt.Println("Connection: " + string(c.Request.Header.Peek("Connection")))
fmt.Println("Sec-WebSocket-Key: " + string(c.Request.Header.Peek("Sec-Websocket-Key")))
但是为什么这里没有把这些字段加入到 forwardHeader
里是因为正如我们前面提到的,客户端不会和后端服务器之间直接建立 websocket 连接,而是与代理服务器建立 websocket 连接。
那么可能有读者又要问了,forwardHeader
中没有这些必要字段是什么和后端服务器建立起 websocket 连接的,请看接下来的第二部分。
第二部分:代理服务器与后端服务器建立 websocket 连接
connBackend, respBackend, err := w.options.Dialer.Dial(w.target, forwardHeader)
if err != nil {
hlog.CtxErrorf(ctx, "can not dial to remote backend(%v): %v", w.target, err)
if respBackend != nil {
if err = wsCopyResponse(&c.Response, respBackend); err != nil {
hlog.CtxErrorf(ctx, "can not copy response: %v", err)
}
} else {
c.AbortWithMsg(err.Error(), consts.StatusServiceUnavailable)
}
return
}
在第一部分中我们已经准备好了 forwardHeader
,接下来就可以把后端服务器的路径(target)以及 forwardHeader
传递给 Dial
方法了,Dial
方法会向后端服务器发起握手请求,从而让代理服务器与后端服务器先建立起 websocket 连接。
在 Dial
方法中也会为发起的请求添加上在第一部分中提到的必要字段:
req.Header["Upgrade"] = []string{"websocket"}
req.Header["Connection"] = []string{"Upgrade"}
req.Header["Sec-WebSocket-Key"] = []string{challengeKey}
req.Header["Sec-WebSocket-Version"] = []string{"13"}
Dial
方法返回的 connBackend
就是代理服务器与后端服务器建立的 websocket 连接实例。
当然如果 Dial
方法报错,即代理服务器与后端服务器建立 websocket 连接失败则需要将对应的响应返回给客户端,由于不是主要的逻辑部分,这里不做过多叙述。
第三部分:客户端与代理服务器建立 websocket 连接
if err := w.options.Upgrader.Upgrade(c, func(connClient *hzws.Conn) {
defer connClient.Close()
...
}); err != nil {
hlog.CtxErrorf(ctx, "can not upgrade to websocket: %v", err)
}
在第二部分中,我们看到通过 Dial
方法,代理服务器与后端服务器已经成功建立起了 websocket 连接。在第三部分中,我们则通过 Upgrade
方法让客户端与代理服务器建立 websocket 连接。
Upgrade
会在与客户端的握手(Handshake)结束后将建立好的 websocket 连接传递到 connClient
中,也就是说 connClient
就是客户端与代理服务器建立的 websocket 连接实例。
当然如果 Upgrade
方法报错,说明连接建立失败则记录日志,整个 ServeHTTP
方法也随之结束。
第四部分:连接通信
var (
errClientC = make(chan error, 1)
errBackendC = make(chan error, 1)
errMsg string
)
hlog.CtxDebugf(ctx, "upgrade handler working...")
gopool.CtxGo(ctx, func() {
replicateWSRespConn(ctx, connClient, connBackend, errClientC)
})
gopool.CtxGo(ctx, func() {
replicateWSReqConn(ctx, connBackend, connClient, errBackendC)
})
for {
select {
case err = <-errClientC:
errMsg = "copy websocket response err: %v"
case err = <-errBackendC:
errMsg = "copy websocket request err: %v"
}
var ce *websocket.CloseError
var hzce *hzws.CloseError
if !errors.As(err, &ce) || !errors.As(err, &hzce) {
hlog.CtxErrorf(ctx, errMsg, err)
}
}
在第三部分中,客户端与代理服务器已经建立好了 websocket 连接,我们也获取到了连接实例 connClient
,接下来就是使用以下两个连接让客户端可以以反向代理的方式与后端服务器进行通信:
- connClient:客户端与代理服务器的 websocket 连接实例;
- connBackend:代理服务器与后端服务器的 websocket 连接实例;
可以看到我们先准备了两个 channel 用于接收通信过程中的错误:
- errClientC:用于接收后端服务器向客户端发送消息时的错误;
- errBackendC:用于接收客户端向后端服务器发送消息时的错误;
然后我们通过从协程池中取出两个 goroutine 并行执行 replicateWSRespConn
和 replicateWSReqConn
方法,这个方法的命名其实有些奇怪,因为 websocket 是一个全双工的通信协议,请求和响应都是相对的。
但这里统一把后端服务器发送给客户端的消息看做响应,把客户端发送给后端服务器的消息看做请求,是相对客户端来说的。
由于这里比较绕,笔者绘制了以下示意图帮助理解:
我们继续看这两个并行执行的通信方法,由于两个方法的逻辑基本一致,这里我们就看一下 replicateWSRespConn
方法的逻辑:
func replicateWSRespConn(ctx context.Context, dst *hzws.Conn, src *websocket.Conn, errC chan error) {
for {
msgType, msg, err := src.ReadMessage()
if err != nil {
...
errC <- err
if err = dst.WriteMessage(hzws.CloseMessage, msg); err != nil {
hlog.CtxErrorf(ctx, "write message failed when replicate websocket conn: err=%v", err)
}
break
}
err = dst.WriteMessage(msgType, msg)
if err != nil {
hlog.CtxErrorf(ctx, "write message failed when replicating websocket conn: msgType=%v msg=%v err=%v", msgType, msg, err)
errC <- err
break
}
}
}
可以看到方法的逻辑也非常简单,即从 connBackend
的 ReadMessage
方法读取后端服务器发送给代理服务器的消息,再通过 connClient
的 WriteMessage
方法将读取的消息写入客户端与代理服务器的连接。
如果出现错误则传递给 errClientC
在方法外部统一进行处理。
回到方法外部,通过 for-select
的形式从 errClientC
和 errBackendC
接收错误,判断如果不是 CloseError 则进行日志记录。
至此我们完成了核心方法 ServeHTTP
方法逻辑的阅读,而 ServeHTTP
的方法逻辑也与我们在基本思路处列出的 websocket 反向代理过程完全一致,读者可以对照基本思路和具体实现部分进行更好的理解。
使用示例(echo server)
刚刚我们阅读完了 Hertz 框架 websocket 反向代理扩展的核心代码,这里我们将使用这个扩展完成一个 echo server 的使用实例,帮助读者更好的理解应该如何使用这个扩展。
我们将这个例子同样分为三个部分进行讲解,完整代码将在结尾给出。
首先我们对代理服务器和后端服务器的地址和路径进行全局声明:
var (
proxyURL = "ws://127.0.0.1:8080/ws"
backendURL = "ws://127.0.0.1:9090/backend"
proxyAddr = "127.0.0.1:8080"
backendAddr = "127.0.0.1:9090"
)
代理服务器
// proxy server
wsrp := reverseproxy.NewWSReverseProxy(backendURL)
ps := server.Default(server.WithHostPorts(proxyAddr))
ps.GET("/ws", wsrp.ServeHTTP)
go ps.Spin()
使用我们在具体实现部分学习的构造方法 NewWSReverseProxy
创建一个 websocket 反向代理实例,target 参数为 backendURL
,然后将 ServeHTTP
方法注册到 /ws
路由下并通过一个 goroutine 启动代理服务器,注意代理服务器的地址与注册的路由要与 proxyURL
对应。
后端服务器
go func() {
// backend server
bs := server.Default(server.WithHostPorts(backendAddr))
bs.GET("/backend", func(ctx context.Context, c *app.RequestContext) {
upgrader := &websocket.HertzUpgrader{}
if err := upgrader.Upgrade(c, func(conn *websocket.Conn) {
for {
msgType, msg, err := conn.ReadMessage()
if err != nil {
hlog.Errorf("backend read message err: %v", err)
}
err = conn.WriteMessage(msgType, msg)
if err != nil {
hlog.Errorf("backend write message err: %v", err)
}
}
}); err != nil {
hlog.Errorf("upgrade error: %v", err)
return
}
})
bs.Spin()
}()
通过一个 goroutine 启动后端服务器,并注册 /backend
路由,同样需要注意后端服务器的地址与注册的路由要与 backendURL
对应,然后在 handler 中处理到来的 websocket 连接。
先通过 Upgrade
方法将 HTTP 协议升级为 websocket 协议,然后从建立的 websocket 连接读取客户端(这里的客户端实际指的是代理服务器)发送的消息并将消息写回 websocket 连接,即完成了 echo 的逻辑,通过 for 循环一直重复这个过程。
客户端
// client
conn, _, err := reverseproxy.DefaultOptions.Dialer.Dial(proxyURL, make(http.Header))
if err != nil {
hlog.Errorf("client dial err: %v", err)
return
}
time.Sleep(time.Second)
var echoInput string
for {
fmt.Print("send: ")
_, _ = fmt.Scanln(&echoInput)
err = conn.WriteMessage(websocket.TextMessage, []byte(echoInput))
if err != nil {
hlog.Errorf("client write message err: %v", err)
}
_, echoOutput, err := conn.ReadMessage()
if err != nil {
hlog.Errorf("client read message err: %v", err)
}
fmt.Println("receive: " + string(echoOutput))
}
通过 Dial
方法发送握手(Handshake)请求到代理服务器升级为 websocket 协议,获取到返回的连接实例后,就可以通过 Scanln
方法读取用户输入的消息并写到连接中,同时从连接中接收返回的消息并打印出来,并通过 for 循环持续这个过程。
至此我们完成了一个 echo server 的 websocket 反向代理示例。
完整代码
echo server 示例完整代码如下:
package main
import (
"context"
"fmt"
"net/http"
"time"
"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/app/server"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/hertz-contrib/reverseproxy"
"github.com/hertz-contrib/websocket"
)
var (
proxyURL = "ws://127.0.0.1:8080/ws"
backendURL = "ws://127.0.0.1:9090/backend"
proxyAddr = "127.0.0.1:8080"
backendAddr = "127.0.0.1:9090"
)
func main() {
// proxy server
wsrp := reverseproxy.NewWSReverseProxy(backendURL)
ps := server.Default(server.WithHostPorts(proxyAddr))
ps.GET("/ws", wsrp.ServeHTTP)
go ps.Spin()
time.Sleep(time.Second)
go func() {
// backend server
bs := server.Default(server.WithHostPorts(backendAddr))
bs.GET("/backend", func(ctx context.Context, c *app.RequestContext) {
upgrader := &websocket.HertzUpgrader{}
if err := upgrader.Upgrade(c, func(conn *websocket.Conn) {
for {
msgType, msg, err := conn.ReadMessage()
if err != nil {
hlog.Errorf("backend read message err: %v", err)
}
err = conn.WriteMessage(msgType, msg)
if err != nil {
hlog.Errorf("backend write message err: %v", err)
}
}
}); err != nil {
hlog.Errorf("upgrade error: %v", err)
return
}
})
bs.Spin()
}()
time.Sleep(time.Second)
// client
conn, _, err := reverseproxy.DefaultOptions.Dialer.Dial(proxyURL, make(http.Header))
if err != nil {
hlog.Errorf("client dial err: %v", err)
return
}
time.Sleep(time.Second)
var echoInput string
for {
fmt.Print("send: ")
_, _ = fmt.Scanln(&echoInput)
err = conn.WriteMessage(websocket.TextMessage, []byte(echoInput))
if err != nil {
hlog.Errorf("client write message err: %v", err)
}
_, echoOutput, err := conn.ReadMessage()
if err != nil {
hlog.Errorf("client read message err: %v", err)
}
fmt.Println("receive: " + string(echoOutput))
}
}
总结
以上就是本篇文章的所有内容了,我们从 websocket 反向代理的基础知识出发,到确定实现的基本思路,最后通过阅读 Hertz 框架的 websocket 反向代理扩展的代码并通过一个 echo server 的使用示例来深入理解如何反向代理 websocket。
希望本篇文章可以对读者理解 websocket 反向代理有所帮助,如果哪里写错了或者有问题欢迎评论或者私聊指出,以上。
参考列表
转载自:https://juejin.cn/post/7308571272002387980