likes
comments
collection
share

Golang singleflight (2)源码剖析

作者站长头像
站长
· 阅读数 40

上一篇文章中我们了解了如何使用 singleflight 来解决缓存击穿的问题。今天这篇我们来看看 singleflight 的源码,了解一下实现机制。

回到我们上一篇文章中提到的例子,singleflight 用起来很简单,声明一个 singleflight.Group 对象,然后直接用 Do 方法作为容器,加上一个 cache key 实现互斥,然后传入一个 func() (interface{}, error) 实现你的回源逻辑即可。如下:

func main() {
 
  var singleSetCache singleflight.Group
 
  getAndSetCache := func (requestID int, cacheKey string) (string, error) {
      //do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
      value, _, _ := singleSetCache.Do(cacheKey, func() (ret interface{}, err error) {
          return "VALUE", nil
      })
      return value.(string),nil
  }
 
  cacheKey := "cacheKey"
 
  for i:=1;i<10;i++{//模拟多个协程同时请求
      go func(requestID int) {
         value, _:= getAndSetCache(requestID,cacheKey)
       }(i)
  }
}

Group

我们先来看看 Group 是什么。

// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

Group 可以理解为是一个容纳多个回源任务的容器,一把互斥锁非常显眼。这里的 m 代表了各个任务,key 就是我们上面提到的 cacheKey,用于隔离不同类型任务。value 是一个 call 的指针,从使用者的角度无需关心。

call 和 互斥锁 mu 的配合,一定意义上讲,就是 singleflight 机制的核心。

思考设计方向

如果你来实现 singleflight,你会怎么做呢?

使用 互斥锁 其实是很直观的想法。没错,我们想做到的事情很简单:就算并发多个 goroutine 都需要这份数据,我们希望只有一个 goroutine 进去拿到数据。所以,一把 Mutex,保证了回源任务不会被并发触发。但问题并没有解决。

我们希望做到的,不只是只有一个能拿到数据,还希望这个goroutine拿到数据之后,需要通过某种机制跟大家分享。让别的 goroutine 也能【自动地】获取到数据,不去感知这个 Mutex,不用再次自己去抢锁!

这个问题的解决,很自然地就想到了 channel,毕竟 Rob Pike 大神的名言犹在耳边:

Don’t communicate by sharing memory, share memory by communicating.

ok,除此之外,还有什么问题呢?

性能!这一点也很重要,单独一个 Mutex 带来的问题不仅仅是数据无法共享,你需要考虑锁的粒度,这一点同样重要。到底在哪个时机,解锁,然后让 channel 完成通信。这将成为性能的关键。

是否只能等一个 goroutine 拿到了回源的结果,才能解锁呢?

下来我们就来看看标准库对上面这些讨论点,提供的方案。

call

在看 Do 方法的实现之前,我们先来看看 Group 内嵌的 map 中 call 的结构:

// call is an in-flight or completed singleflight.Do call
type call struct {
	wg sync.WaitGroup

	// These fields are written once before the WaitGroup is done
	// and are only read after the WaitGroup is done.
	val any
	err error

	// These fields are read and written with the singleflight
	// mutex held before the WaitGroup is done, and are read but
	// not written after the WaitGroup is done.
	dups  int
	chans []chan<- Result
}

// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
	Val    any
	Err    error
	Shared bool
}

一次 singleflight.Do 的所有上下文,需要的数据容器都在这个 call 结构里。

终于,我们看到了 chans []chan<- Result 这个 channel。

核心实现

这一节我们仔细看看 singleflight.Do 是怎样利用 Group 和 call 两个结构实现我们想要的效果:

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err, true
	}
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

上锁时机毋庸置疑,因为多个回源任务并发来调用 Do,而记录各个任务的 m 又是懒加载,我们需要保证这里是唯一goroutine,所以一上来先 g.mu.Lock 上锁,完成懒加载。

回源 goroutine

这里我们姑且把拿到【回源机会】的goroutine称为【回源goroutine】,对于这个协程来说,流程没有非常特殊,因为是他自己访问底层存储,然后返回数据,不用跟别人分享。

当第一个goroutine 进来时,会发现 g.m[key] 是不存在记录的,所以继续往下走:

   c := new(call)
   c.wg.Add(1)
   g.m[key] = c
   g.mu.Unlock()

先 new 出来一个 call,然后 WaitGroup 加 1,并把 call 赋值到 map 中,解锁。

下来很直接,一个 doCall 调用:

// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (any, error)) {
	c.val, c.err = fn()
	c.wg.Done()

	g.mu.Lock()
	delete(g.m, key)
	for _, ch := range c.chans {
		ch <- Result{c.val, c.err, c.dups > 0}
	}
	g.mu.Unlock()
}

上来就直接执行了 fn 函数,拿到了回源的结果,一个 interface{},一个 error。

紧接着 WaitGroup 的 Done,说明此时回源结束,数据已经拿到。意图很直接,这一把已经任务完成,等待通过 channel 发给大家数据就 ok,所以,我们需要把 map 中对应的任务清掉,所以这一步,依然需要在锁内。

最后遍历 chans 对象,将获取到的 val 和 err 组装成一个 Result 对象塞入 channel,再解锁即可。

回到主流程,这个时候直接返回上一步赋值的 c.val 和 c.err

return c.val, c.err, c.dups > 0

旁观 goroutine

这些 goroutine 在【回源goroutine】之后进来 Do 方法,已经没有了回源资格,我们来看看他们眼中发生了什么。

首先,关注一下回源goroutine 是什么时候解锁的,这一点很重要。

   c := new(call)
   c.wg.Add(1)
   g.m[key] = c
   g.mu.Unlock()
   
   g.doCall(c, key, fn)

其实是在创建了 call 对象,写入了 map之后,但在执行 fn 函数之前。也就意味着,此时数据还没拿到,但【回源goroutine】已经整装待发,call 结构生成,准备回源拿数据了。

所以,此时旁观 goroutine 会命中这里的逻辑:

if c, ok := g.m[key]; ok {
      c.dups++
      g.mu.Unlock()
      c.wg.Wait()
      return c.val, c.err, true
}

发现 g.m[key] 已经存在,所以将 call 的 dups++,表明已经有吃瓜群众搬着小板凳在这里等待了,dups 本质上就是排队goroutine的数量。

然后解锁,因为对于 map 中 call 的更新已经结束。下来吃瓜群众就被阻塞在 c.wg.Wait() 这个 WaitGroup 中。

为什么呢?

因为此时 c.val 和 c.err 还没赋值,需要等待人家【回源goroutine】干完活。

回想一下,WaitGroup 是什么时候 Add ?就是在【回源goroutine】准备干活前,这里利用 WaitGroup 表明的语义很简单:干活前立好靶子,有人去干活了,其他人来了,就等着什么时候活干完(Done),然后拿结果就 ok。

DoChan

看到这里你可能很疑惑,我们一开始期望用 channel 实现结果的通信。但标准库其实是直接用了一把锁 + WaitGroup,只要 wg.Done 结束,就去拿 call 中的 val 和 err 两个属性返回即可。

那要 chans 这个 []chan<- Result 干什么呢?

我们能不能把

g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
        ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()

直接就改成下面这样:

g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()

干掉中间的 c.chans 遍历,并塞 channel 就好?

其实,单纯来看 Do 方法的实现,我们并不依赖 chans 做任何事情,的确正如一开始所说,一把锁 + WaitGroup 就解决问题。

但 singleflight 不仅仅提供了 Do 一种方法,还有 DoChan:

DoChan 和 Do 的方法签名是不同的:

func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool)
func (g *Group) DoChan(key string, fn func() (any, error)) (<-chan Result, bool)

前者是直接返回了回源结果,以及 error,是否被共享(即是否有其他旁观 goroutine 拿到结果) 后者则是直接返回一个 channel,谁想用结果,谁就来监听这个channel即可。

// DoChan is like Do but returns a channel that will receive the
// results when they are ready. The second result is true if the function
// will eventually be called, false if it will not (because there is
// a pending request with this key).
func (g *Group) DoChan(key string, fn func() (any, error)) (<-chan Result, bool) {
	ch := make(chan Result, 1)
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch, false
	}
	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	go g.doCall(c, key, fn)

	return ch, true
}

可以看到,对于【回源goroutine】,我们除了要 new 出来一个 call 之外,此处还需要初始化 chans 数组,放入一个长度为 1 的 Result channel,即 ch := make(chan Result, 1)

在执行完 doCall 之后,直接将一开始初始化的 channel 返回回去。

对于【旁观goroutine】,他们会进入到自己的分支,dups++ 用来报到,然后将自己的那个 ch (要获取结果的 channel)塞到 chans 中,然后直接解锁离开。

你可以把旁观goroutine 的那个 ch 当成是个【保温盒】,既然想要吃的,无非两种办法:

  1. 你就等在窗口,有人把饭拿过来了,你就接着(Do方法的流程,等着 WaitGroup 的 Done);
  2. 把保温盒上交,不用等,啥时候人家把饭带过来了,会直接放到你的保温盒里,至于保温盒你什么时候拿走那就是你的事情了(DoChan 的流程,把自己的 channel 提交上去,异步等着)

总结

singleflight 其实是一个很利于初学者学习的库,带上所有注释和代码,连 130 行都没超过,建议大家自己再根据源码过一遍。这种 Mutex + WaitGroup 实现同步等待,channel 实现异步等待的方案还是非常精巧和简洁的。

好了,本期就是这些,singleflight 的部分我们就看到这里,感谢阅读!

转载自:https://juejin.cn/post/7123495892572700702
评论
请登录