Go 标准库 Context 实战应用:控制并发任务生命周期的利器Go 语言官方提供了一个名为 context 的标准库
前言
在现代编程中,特别是在分布式系统和并发编程中,控制任务的生命周期和上下文传播是非常重要的,同时也是十分困难的。而 Go 语言官方提供了一个名为 context 的标准库包,专门用于在不同的 goroutine 之间传递截止日期、取消信号以及请求范围内的元数据,很好的解决了这类问题。
什么是 Context?
解决的问题
context 提供了一种机制,用于在 API 边界传递信号,以便控制并发操作的生命周期。它解决了以下问题:
- 取消操作:在需要时取消 goroutine。
- 超时控制:自动中断长时间运行的操作。
- 传递元数据:跨 API 边界传递额外的信息(如请求追踪 ID、用户身份信息等元数据)。
简述原理
context 包的主要组成部分:
- 两个接口(Context 、canceler)
- 实现了 Context 接口,被称为 Context;
- 实现了 Context + canceler 接口,被称为可取消的 Context。
- 四个结构体 (emptyCtx、valueCtx、cancelCtx、timerCtx)
- emptyCtx:用于根节点的创建,不可取消,返回 nil;
- valueCtx:用于传值,不可取消,以 key-val 存储对应的键值对,注意只能回溯直系祖先,不能寻找兄弟节点,而且相同的 key 采取就近原则;
- cancelCtx:该类是这个包的核心类,实现了 canceler 接口,是一个可取消的 Context,也是唯一一个实现了非空 Done() 方法的结构体,结构体中字段 children 维护了 canceler 树的结构,用于级联取消孩子节点,完成整个分支的取消操作;而 Context 的取消操作是通过关闭一个只读 channel 进行广播通知的。
- timerCtx:该结构体中内嵌了 cancelCtx,并定义了定时器,用于定时取消。
emptyCtx 一般作为 root 节点使用,valueCtx、cancelCtx、timerCtx 通过不同的函数依据祖先 Context 派生出来,结构体中通过嵌入 Context 的方式拥有对祖先 Context 的回溯机制。
基础用法
这一小节让我们了解 context 包的一些基本用法,包括 ctx 的创建和派生方式。
- 创建基础 Context(俗称 root ctx):一般使用 context.Background() 返回一个空的上下文 emptyCtx,todo 不常用。这是创建派生上下文的起点。
ctx := context.Background()
ctx := context.TODO()
- 派生带取消功能的 Context:创建一个可取消的上下文,并调用 cancel 函数来取消上下文。
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- 派生带超时的 Context:创建一个带有超时功能的上下文。如果在指定时间内不取消,上下文会自动取消。
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- 派生带截止日期的 Context:创建一个带有具体截止日期的上下文。到达指定截止时间,上下文会自动取消。(其实和 WithTimeout 原理一致)
deadline := time.Now().Add(10 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
- 在 Context 中传递值:将键值对存储在上下文中,以便在不同的函数调用中传递元数据。
ctx := context.WithValue(context.Background(), "userID", "12345")
// 从上下文中取出用户ID, ok 为 true 时,可以使用
userID,ok := ctx.Value("userID").(string)
实战应用
传递元数据
传递元数据的功能,比较容易理解,项目中也经常用到,这里以 traceId 为例讲述一下:
- 服务内部:直接使用 ctx = context.WithValue(ctx, "traceId", "1234567") 就能完成 ctx 派生,ctx 总是作为函数或方法的第一个参数传递下去,这样服务内部的 traceId 就可以串接起来了。
- 服务间:上下游之间的 traceID 并不是使用 context 进行传递的,一般都是放在 http.Request.Header 进行传递,通过 request.Header.Get("TraceId") 就可以轻松的完成 traceId 的跨服务传递,这样服务间的 traceID 链路就可以串联起来了。
取消操作和超时控制
取消操作只针对可取消的 ctx,通过调用其派生时产生的 cancel 方法,来关闭一个只读 channel 进行广播通知,此时所有订阅了 <-ctx.Done() 的 goroutine,都能进行中断处理,即 return。
而超时控制其实是自动取消的一种,原理就是内部设置了一个定时器,根据派生 ctx 设置的超时时间或截止时间,运行定时器,等定时器到点之后,自动调用 cancel 方法,针对 ctx 进行取消,和我们程序中主动调用 cancel 方法效果是一致的。(原理不懂的,回看上面推荐的文章哈)
接下来,我们就针对 context 中的超时控制功能,详细讲解一下代码应该怎么写。我们先看一个简单的示例:
func main() {
// 创建一个有取消功能的上下文,1s 后自动取消
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
go func(ctx context.Context) {
// 模拟一个任务
select {
case <-time.After(2 * time.Second):
fmt.Println("Task completed")
case <-ctx.Done():
fmt.Println("Task cancelled or timed out")
}
}(ctx)
// 等待一段时间
time.Sleep(10 * time.Second)
fmt.Println("Main function finished")
}
这个示例展示了如何使用 context 来控制 goroutine 的生命周期,并在超时时取消它。
仔细思考,你会发现这个示例存在以下几个问题:
- 真正的任务该在哪里执行?
- 使用 time.Sleep 来等待 goroutine,是不是不太优雅?
针对这两个问题,我们改进一下代码:
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
// 启动一个 goroutine
go func(ctx context.Context) {
defer wg.Done()
select {
case <-time.After(1 * time.Second):
doWork() // 任务执行
fmt.Println("Task completed")
case <-ctx.Done():
fmt.Println("Task cancelled or timed out")
}
}(ctx)
wg.Wait()
fmt.Println("Main function finished")
}
func doWork() {
fmt.Println("doWork start")
time.Sleep(5 * time.Second)
fmt.Println("doWork end")
}
这段代码做了如下改进:
- 添加了任务执行 doWork,总算能干自己的活了
- 使用 sync.WaitGroup,优雅地解决了等待问题。
改进后,还存在几个致命的问题:
- 如果 time.After 的时间比 ctx 超时的时间短,即使任务耗时很长,goroutine 中 case <-ctx.Done() 将永远没有机会执行,违背了我们超时控制的初衷。
- 而且总不能每次都 time.After 之后才执行 doWork 吧,逻辑上也说不通啊!
针对这两个问题,我们再改进一下代码:
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var wg sync.WaitGroup
wg.Add(2)
go func(ctx context.Context) {
defer wg.Done()
done := make(chan struct{})
go func() {
doWork1() // 任务执行
close(done) // 任务完成 - 通知父 goroutine
}()
select {
case <-done:
fmt.Println("Task1 completed")
case <-ctx.Done():
fmt.Println("Task1 cancelled or timed out")
}
}(ctx)
go func(ctx context.Context) {
defer wg.Done()
done := make(chan struct{})
go func() {
doWork2() // 任务执行
close(done) // 任务完成 - 通知父 goroutine
}()
select {
case <-done:
fmt.Println("Task2 completed")
case <-ctx.Done():
fmt.Println("Task2 cancelled or timed out")
}
}(ctx)
wg.Wait()
fmt.Println("Main function finished")
}
func doWork1() {
fmt.Println("doWork1 start")
time.Sleep(2 * time.Second)
fmt.Println("doWork1 end")
}
func doWork2() {
fmt.Println("doWork2 start")
time.Sleep(5 * time.Second)
fmt.Println("doWork2 end")
}
这段代码做了如下改进:
- 使用一个 channel 来通知父 goroutine 子 goroutine 完成;
- 启动子 goroutine 异步执行任务,任务完成时,关闭 done channel,通知父 goroutine;
- 父 goroutine 中监听子 goroutine 任务执行情况,以及 ctx 的超时取消标志;这样无论是任务执行完成,还是任务超时,都可以被监听到,然后返回,最终实现超时控制。
使用两个 goroutine 进行演示,一个正常执行结束,一个超时取消结束。一般基础的写法也就这样了,想要进阶的话,不妨去看看标准库是怎么干的,接下来我们看个案例。
net/http 包应用案例
当你不会用某种写法时,不妨看看官方包是怎么用的,今天我们就一起探索一下官方 net/http 包,如何使用 context 来控制并发 goroutine 的生命周期。
客户端代码:
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req, err := http.NewRequest("GET", "http://localhost:8080/hello", nil)
if err != nil {
fmt.Printf("Error creating request: %v\n", err)
return
}
req = req.WithContext(ctx)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("Request error: %v\n", err)
return
}
defer resp.Body.Close()
// 读取响应体
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Printf("Error reading response: %v\n", err)
return
}
fmt.Printf("Response body: %s\n", body)
fmt.Printf("Response status: %v\n", resp.Status)
}
主要步骤:
- 创建一个 10s 超时的 ctx,
- 创建一个 http.Request 对象,请求地址 http://localhost:8080/hello
- 将 ctx 塞入 http.Request 对象中,通过 client.Do 发送请求
- 用 http.Response 接收请求响应
服务端代码:
func helloHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
fmt.Println("Handler started")
defer fmt.Println("Handler ended")
var wg sync.WaitGroup
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
done := make(chan struct{})
go func() {
fmt.Println("Handler: working")
time.Sleep(time.Second * 5)
w.Write([]byte("Hello, World!"))
close(done) // 任务完成 - 通知父 goroutine
}()
select {
case <-done:
fmt.Println("Task1 completed")
case <-ctx.Done():
err := ctx.Err()
fmt.Printf("Handler: %v\n", err)
http.Error(w, err.Error(), http.StatusRequestTimeout)
}
}(ctx)
wg.Wait()
}
func main() {
http.HandleFunc("/hello", helloHandler)
fmt.Println("Server is listening on port 8080")
http.ListenAndServe(":8080", nil)
}
主要步骤:
- 使用 Request 的 ctx 做上下文控制,如果被取消,直接终止请求流程
- 主任务执行 5s 后,返回 Hello, World! 给客户端
启动 Server 端,分两次发送不同超时时间的客户端请求,看一下返回结果:
- 客户端超时时间 10s:正常执行,并返回结果
// 客户端
Response body: Hello, World!
Response status: 200 OK
// 服务端
Handler started
Handler: working
Task1 completed
Handler ended
- 客户端超时时间 3s:返回超时错误
// 客户端
Request error: Get "http://localhost:8080/hello": context deadline exceeded
// 服务端
Handler started
Handler: working
Handler: context canceled
Handler ended
哇塞!net/http 官方包本身就已经支持了请求的超时控制,如果我们的服务框架想支持超时功能,只需要往 http.Request 中塞入带超时时间的 ctx 即可,可太方便了。
通过执行上边的代码,我们可以发现客户端设置的超时时间,在服务端也起了作用,上下文好像通过 Request 传递到了服务端,然后实现了上下游服务之间的超时控制。那到底是不是这样的呢?
我们继续往后追,看一下源码(只展示必要片段)。
进入客户端这边的处理链路 client.Do(req) -> c.do(req) -> c.send(req, deadline) -> send(req, c.transport(), deadline) -> rt.RoundTrip(req) -> pconn.roundTrip(treq)
这段链路里面,随处可见 ctx 控制的影子,我们只看最后这块:
func (t *Transport) roundTrip(req *Request) (*Response, error) {
//...
// 与服务端建立连接,使用 socket 通信,此时服务端处于接收请求的状态
//
pconn, err := t.getConn(treq, cm)
//...
}
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
//...
// 返回值,从这读取
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
//...
ch: resc,
//...
}
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
ctxDoneChan := req.Context().Done()
pcClosed := pc.closech
canceled := false
for {
select {
case err := <-writeErrCh:
//...
case <-pcClosed:
//...
case <-respHeaderTimer:
//...
case re := <-resc:
//...
return re.res, nil
case <-cancelChan:
canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
cancelChan = nil
case <-ctxDoneChan:
canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err())
cancelChan = nil
ctxDoneChan = nil
}
}
}
源码中通过监听 req.Context().Done() 来完成对一次请求的超时控制,当服务端没有按时返回数据时,就会造成超时,随后会与服务端断开连接,返回 context deadline exceeded 错误。
我们再来看看服务端代码:
处理链路:http.ListenAndServe -> server.ListenAndServe -> srv.Serve(ln) -> for{l.Accept() -> go c.serve(connCtx)}
func (c *conn) serve(ctx context.Context) {
// ...
// 创建可取消的 ctx
ctx, cancelCtx := context.WithCancel(ctx)
// conn 中塞入取消函数
c.cancelCtx = cancelCtx
defer cancelCtx()
for{
// 等待读取一次请求,创建 req,塞入 ctx
w, err := c.readRequest(ctx)
// ...
req := w.req
//...
// 后台读取下一次请求(长连接)
w.conn.r.startBackgroundRead()
// ...
// 处理这一次请求(会跳转到业务代码)-> helloHandler
serverHandler{c.server}.ServeHTTP(w, w.req)
//...
}
}
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
//...
req, err := readRequest(c.bufr)
//...
ctx, cancelCtx := context.WithCancel(ctx)
req.ctx = ctx
//...
w = &response{
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body,
//...
}
//...
}
从源码中我们可以看出,ctx 是根据 baseCtx := context.Background() 创建的(具体源码在 Serve(l net.Listener) 中),根本不是从客户端获取的。
那服务端是如何根据客户端设置的超时,一起控制任务的生命周期的呢? 答案就在 w.conn.r.startBackgroundRead() 这行代码中:
func (cr *connReader) startBackgroundRead() {
//...
cr.conn.rwc.SetReadDeadline(time.Time{})
go cr.backgroundRead() // 使用异步处理
}
func (cr *connReader) backgroundRead() {
// 预读取数据,只是为了查看后续还有没有请求,加快处理长连接请求的速度
n, err := cr.conn.rwc.Read(cr.byteBuf[:])
cr.lock()
if n == 1 {
cr.hasByte = true
}
if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() {
// Ignore this error. It's the expected error from
// another goroutine calling abortPendingRead.
} else if err != nil {
cr.handleReadError(err)
}
cr.aborted = false
cr.inRead = false
cr.unlock()
cr.cond.Broadcast()
}
func (cr *connReader) handleReadError(_ error) {
cr.conn.cancelCtx()
cr.closeNotify()
}
这段源码显示:
- 程序会异步预读取 conn(连接)中的数据(是否还存在未读取的数据),以加快后续处理速度(预读取只是看看后边还有没有数据,并不是真正的读取);
- 如果遇到连接断开的错误,就会对错误进行处理 cr.handleReadError(err) -> cr.conn.cancelCtx() 从而取消上下文,这个上下文是在上面塞入连接 conn 的,因此该连接 conn 下的所有派生的 ctx 都将被取消;
- 刚好我们的业务代码处理器 helloHandler 监听了 req.Ctx.Done(),所以可以处理取消操作,至此客户端和服务端完成了统一的超时控制。
当然,如果服务端不想受客户端的超时约束,还想继续执行下去,那就不监听 req.Ctx.Done() 就可以了!
总结
本篇文章中,详细讲解了一下官方库 context 的实战应用,接下来我们总结一下要点: context 提供了一种机制,用于在 API 边界传递信号,以便控制并发操作的生命周期。它解决了以下问题:
- 取消操作:在需要时取消 goroutine。
- 超时控制:自动中断长时间运行的操作。
- 传递元数据:跨 API 边界传递额外的信息(如请求追踪 ID)。
本文详细讲解了如何使用 context 解决超时控制问题,并用官方库 net/http 包进行了实战和源码讲解,要点如下:
- net/http 官方包支持了 http 请求的超时控制,只需往 http.Request 中塞入带超时时间的 ctx 即可,大部分 http 框架的超时控制都是依赖这个特性实现的。
- 上下游 http 请求的超时控制并不是依赖 http.Request 中的 ctx 传递来完成的,客户端会依据塞入 http.Request 的 ctx 进行取消,取消后会主动断开连接;服务端通过处理断开连接的错误来主动取消本地 ctx,进而完成上下游联动。
- 值得一提的是,服务端可以灵活的选择是否要监听上下文的取消信号,从而根据业务定制自己的处理器,设计得十分的巧妙。
好了,今天就聊到这吧,如果觉得还不错的话欢迎点赞,转发和关注,感谢支持。
转载自:https://juejin.cn/post/7403619185522065408