Golang singleflight (2)源码剖析
上一篇文章中我们了解了如何使用 singleflight 来解决缓存击穿的问题。今天这篇我们来看看 singleflight 的源码,了解一下实现机制。
回到我们上一篇文章中提到的例子,singleflight 用起来很简单,声明一个 singleflight.Group 对象,然后直接用 Do 方法作为容器,加上一个 cache key 实现互斥,然后传入一个 func() (interface{}, error)
实现你的回源逻辑即可。如下:
func main() {
var singleSetCache singleflight.Group
getAndSetCache := func (requestID int, cacheKey string) (string, error) {
//do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
value, _, _ := singleSetCache.Do(cacheKey, func() (ret interface{}, err error) {
return "VALUE", nil
})
return value.(string),nil
}
cacheKey := "cacheKey"
for i:=1;i<10;i++{//模拟多个协程同时请求
go func(requestID int) {
value, _:= getAndSetCache(requestID,cacheKey)
}(i)
}
}
Group
我们先来看看 Group 是什么。
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
Group 可以理解为是一个容纳多个回源任务的容器,一把互斥锁非常显眼。这里的 m 代表了各个任务,key 就是我们上面提到的 cacheKey,用于隔离不同类型任务。value 是一个 call 的指针,从使用者的角度无需关心。
call 和 互斥锁 mu 的配合,一定意义上讲,就是 singleflight 机制的核心。
思考设计方向
如果你来实现 singleflight,你会怎么做呢?
使用 互斥锁 其实是很直观的想法。没错,我们想做到的事情很简单:就算并发多个 goroutine 都需要这份数据,我们希望只有一个 goroutine 进去拿到数据。所以,一把 Mutex,保证了回源任务不会被并发触发。但问题并没有解决。
我们希望做到的,不只是只有一个能拿到数据,还希望这个goroutine拿到数据之后,需要通过某种机制跟大家分享。让别的 goroutine 也能【自动地】获取到数据,不去感知这个 Mutex,不用再次自己去抢锁!
这个问题的解决,很自然地就想到了 channel,毕竟 Rob Pike 大神的名言犹在耳边:
Don’t communicate by sharing memory, share memory by communicating.
ok,除此之外,还有什么问题呢?
性能!这一点也很重要,单独一个 Mutex 带来的问题不仅仅是数据无法共享,你需要考虑锁的粒度,这一点同样重要。到底在哪个时机,解锁,然后让 channel 完成通信。这将成为性能的关键。
是否只能等一个 goroutine 拿到了回源的结果,才能解锁呢?
下来我们就来看看标准库对上面这些讨论点,提供的方案。
call
在看 Do 方法的实现之前,我们先来看看 Group 内嵌的 map 中 call 的结构:
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val any
err error
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val any
Err error
Shared bool
}
一次 singleflight.Do 的所有上下文,需要的数据容器都在这个 call 结构里。
终于,我们看到了 chans []chan<- Result
这个 channel。
核心实现
这一节我们仔细看看 singleflight.Do 是怎样利用 Group 和 call 两个结构实现我们想要的效果:
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
上锁时机毋庸置疑,因为多个回源任务并发来调用 Do,而记录各个任务的 m 又是懒加载,我们需要保证这里是唯一goroutine,所以一上来先 g.mu.Lock
上锁,完成懒加载。
回源 goroutine
这里我们姑且把拿到【回源机会】的goroutine称为【回源goroutine】,对于这个协程来说,流程没有非常特殊,因为是他自己访问底层存储,然后返回数据,不用跟别人分享。
当第一个goroutine 进来时,会发现 g.m[key]
是不存在记录的,所以继续往下走:
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
先 new 出来一个 call,然后 WaitGroup 加 1,并把 call 赋值到 map 中,解锁。
下来很直接,一个 doCall 调用:
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (any, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
上来就直接执行了 fn 函数,拿到了回源的结果,一个 interface{},一个 error。
紧接着 WaitGroup 的 Done,说明此时回源结束,数据已经拿到。意图很直接,这一把已经任务完成,等待通过 channel 发给大家数据就 ok,所以,我们需要把 map 中对应的任务清掉,所以这一步,依然需要在锁内。
最后遍历 chans 对象,将获取到的 val 和 err 组装成一个 Result 对象塞入 channel,再解锁即可。
回到主流程,这个时候直接返回上一步赋值的 c.val 和 c.err
return c.val, c.err, c.dups > 0
旁观 goroutine
这些 goroutine 在【回源goroutine】之后进来 Do 方法,已经没有了回源资格,我们来看看他们眼中发生了什么。
首先,关注一下回源goroutine 是什么时候解锁的,这一点很重要。
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
其实是在创建了 call 对象,写入了 map之后,但在执行 fn 函数之前。也就意味着,此时数据还没拿到,但【回源goroutine】已经整装待发,call 结构生成,准备回源拿数据了。
所以,此时旁观 goroutine 会命中这里的逻辑:
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
发现 g.m[key] 已经存在,所以将 call 的 dups++,表明已经有吃瓜群众搬着小板凳在这里等待了,dups 本质上就是排队goroutine的数量。
然后解锁,因为对于 map 中 call 的更新已经结束。下来吃瓜群众就被阻塞在 c.wg.Wait()
这个 WaitGroup 中。
为什么呢?
因为此时 c.val 和 c.err 还没赋值,需要等待人家【回源goroutine】干完活。
回想一下,WaitGroup 是什么时候 Add ?就是在【回源goroutine】准备干活前,这里利用 WaitGroup 表明的语义很简单:干活前立好靶子,有人去干活了,其他人来了,就等着什么时候活干完(Done),然后拿结果就 ok。
DoChan
看到这里你可能很疑惑,我们一开始期望用 channel 实现结果的通信。但标准库其实是直接用了一把锁 + WaitGroup,只要 wg.Done 结束,就去拿 call 中的 val 和 err 两个属性返回即可。
那要 chans 这个 []chan<- Result
干什么呢?
我们能不能把
g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
直接就改成下面这样:
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
干掉中间的 c.chans 遍历,并塞 channel 就好?
其实,单纯来看 Do 方法的实现,我们并不依赖 chans 做任何事情,的确正如一开始所说,一把锁 + WaitGroup 就解决问题。
但 singleflight 不仅仅提供了 Do 一种方法,还有 DoChan:
DoChan 和 Do 的方法签名是不同的:
func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool)
func (g *Group) DoChan(key string, fn func() (any, error)) (<-chan Result, bool)
前者是直接返回了回源结果,以及 error,是否被共享(即是否有其他旁观 goroutine 拿到结果) 后者则是直接返回一个 channel,谁想用结果,谁就来监听这个channel即可。
// DoChan is like Do but returns a channel that will receive the
// results when they are ready. The second result is true if the function
// will eventually be called, false if it will not (because there is
// a pending request with this key).
func (g *Group) DoChan(key string, fn func() (any, error)) (<-chan Result, bool) {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch, false
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch, true
}
可以看到,对于【回源goroutine】,我们除了要 new 出来一个 call 之外,此处还需要初始化 chans 数组,放入一个长度为 1 的 Result channel,即
ch := make(chan Result, 1)
在执行完 doCall 之后,直接将一开始初始化的 channel 返回回去。
对于【旁观goroutine】,他们会进入到自己的分支,dups++ 用来报到,然后将自己的那个 ch (要获取结果的 channel)塞到 chans 中,然后直接解锁离开。
你可以把旁观goroutine 的那个 ch 当成是个【保温盒】,既然想要吃的,无非两种办法:
- 你就等在窗口,有人把饭拿过来了,你就接着(Do方法的流程,等着 WaitGroup 的 Done);
- 把保温盒上交,不用等,啥时候人家把饭带过来了,会直接放到你的保温盒里,至于保温盒你什么时候拿走那就是你的事情了(DoChan 的流程,把自己的 channel 提交上去,异步等着)
总结
singleflight 其实是一个很利于初学者学习的库,带上所有注释和代码,连 130 行都没超过,建议大家自己再根据源码过一遍。这种 Mutex + WaitGroup 实现同步等待,channel 实现异步等待的方案还是非常精巧和简洁的。
好了,本期就是这些,singleflight 的部分我们就看到这里,感谢阅读!
转载自:https://juejin.cn/post/7123495892572700702