likes
comments
collection
share

singleFlight

作者站长头像
站长
· 阅读数 18

引言

本文包括以下几个要点:

  • singleFlight解决什么问题?
  • 如何使用singleFlight?
  • 使用singleFlight的注意事项?
  • 从设计者的角度思考,如何设计singleFlight?

一、解决什么问题?

singleFlight的功能是合并请求,那么只要多个请求返回值是一样的的,就可以使用singleFlight。比如说,对第三方服务的调用,对缓存系统的调用;

未使用:

singleFlight

使用:

singleFlight

上述两张图形象的表达了singleFlight合并请求的功能;下面我们看下如何使用它;

二、如何使用?

package singleflight // import "internal/singleflight"

type Group struct {
	// Has unexported fields.
}
    Group represents a class of work and forms a namespace in which units of
    work can be executed with duplicate suppression.

func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool)
func (g *Group) DoChan(key string, fn func() (any, error)) <-chan Result
func (g *Group) Forget(key string)

这个包的API非常简单,只有上述三个:

  • Do:同步调用接口;
  • DoChan:异步调用接口;
  • Forget:忘记过去的调用,也就是删除对应的key;

可以看到,上面三个接口都有关键词key,这个key表示某一类请求,也就是合并请求的维度,看几个例子:

func TestSingleFlight(t *testing.T) {
	// 创建一个 SingleFlight Group
	var group singleflight.Group

	// 创建一个等待组,以便等待所有并发请求完成
	var wg sync.WaitGroup

	// 模拟并发请求
	for i := 1; i <= 5; i++ {
		wg.Add(1)

		go func(id int) {
			defer wg.Done()

			// 调用 Do 方法,确保只有一个 goroutine 执行这个函数
			val, _, _ := group.Do("example-key", func() (interface{}, error) {
				var res string

				// 执行业务逻辑操作
				fmt.Println("exec")

				// 拿到的结果
				res = "i am doing"
				// 模拟一些耗时的操作
				time.Sleep(1 * time.Second)
				return res, nil
			})

			fmt.Println(val)
		}(i)
	}

	// 等待所有并发请求完成
	wg.Wait()
}

上述代码表达的点是:

  • exec只打印一次,说明5个请求合并为一个请求;
  • 每个协程都阻塞等待结果返回结果;

三、注意事项

执行函数panic

我们丢到singleFlight中的方法可能会出现panic的请求,那么对于同步与异步的场景,我们需要了解其中的差异:

  • 异步场景:无法cover住panic,进程直接奔溃;
  • 同步场景:能够cover住panic;

可以看docall函数对两种请求差异化的处理:

if e, ok := c.err.(*panicError); ok {
    // In order to prevent the waiting channels from being blocked forever,
    // needs to ensure that this panic cannot be recovered.
    if len(c.chans) > 0 {
        go panic(e)
        select {} // Keep this goroutine around so that it will appear in the crash dump.
    } else {
        panic(e)
    }
} else if c.err == errGoexit {
    // Already in the process of goexit, no need to call again
} else {
    // Normal return
    for _, ch := range c.chans {
        ch <- Result{c.val, c.err, c.dups > 0}
    }
}

执行任务hang住

丢到singleFlight中的函数,可能长时间阻塞住了,那么这个时候所有对这个key的调用都会阻塞住,这肯定不是我们想要的。针对这种问题,可以使用DoChain + select + time.After或具有超时的Context解决,具体代码如下:

func TestSingleHan(t *testing.T) {
	var g singleflight.Group
	result := g.DoChan("example", func() (interface{}, error) {

		time.Sleep(20 * time.Second)
		fmt.Println("20")
		return "are you ok", nil
	})

	select {
	case val, err := <-result:
		fmt.Println(val, err)
	case <-time.After(3 * time.Second):
		fmt.Println("timeout")
		g.Forget("example")
	}

	g.Do("example", func() (interface{}, error) {
		fmt.Println("nonono")

		return nil, nil
	})

	time.Sleep(40 * time.Second)
	fmt.Println("end")
}

上述代码想表达以下几个点:

  • select超时之后,需要执行Forget操作,不然后续的DoChain操作还是无法进行;

singleFlight相关的坑,都围绕着Forget执行时机进行,只要合理的执行Forget即可;

线上问题

func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
	if fn == nil {
		return nil, errors.New("fn is nil")
	}
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err
	}
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	c.val, c.err = fn()
	c.wg.Done()

	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()

	return c.val, c.err
}

go引用包有两种的方式,一种是将包Copy到自己代码库,一种是通过import。上面是我们公司Copy singleFlight包的一段代码,由于是复制的方式,所以一直没有享受到开源社区迭代的福利,还维持2019年的版本。

  • 问题:只要fnpanic,所有对这个key的请求全部阻塞住了。
  • 原因:fn后面的delete的操作没有执行;
  • 改进方案1:修改fn函数,使其不会panic;
  • 改进方案2:使用defer,或者引用最新的single包;

四、从设计者的角度考虑如何实现singleFlight?

  • 目标:同一个时间段对某系统的多个请求,合并为一个请求,最后多个请求,共享请求结果;
  • 一次请求应该具有哪些属性?
  • 请求与请求之间的关系应该如何处理?
  • 如何标识一类请求?
  • 如何管理多类请求?

对于单个请求来说:

type call struct {
	wg sync.WaitGroup

	// These fields are written once before the WaitGroup is done
	// and are only read after the WaitGroup is done.
	val interface{}
	err error

	// These fields are read and written with the singleflight
	// mutex held before the WaitGroup is done, and are read but
	// not written after the WaitGroup is done.
	dups  int
	chans []chan<- Result
}

这个结构,我觉得设计的非常有意思。通过这个结构,我们来思考下一次调用应该具有哪些特征?我们需要知道这个请求的具体功能,所以有了fn字段,我们需要请求所获得的结果,所有有val,err字段,如下:

  • fn:执行的函数handler;
  • val, err:请求锁获取的结果;

另外,我们需要在这个对象中,处理多个请求之间的关系,比如,一个请求拿到数据以后,需要告诉所有等待的请求,这个可以通过sync.WaitGroup实现。对应异步的场景,在go中都是通过chan实现。因此,有:

  • wg:通知-等待功能;
  • chan:异步获取结果;
  • dups:表示是否有多个请求共享结果;

对于同一类请求:

type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

使用map来管理同一类请求,同时需要使用sync.Mutex处理并发问题;

对于包的API:

singleFlight包的目标是:同一个时间段对某系统的多个请求,合并为一个请求,最后多个请求,共享请求结果;围绕这个目标,我们设计下包对外部暴露的API:

  • Do:同步调用接口;
  • DoChan:异步调用接口;
  • Forget:忘记过去的调用,也就是删除对应的key;

【注】虽然有点马后炮的意思,当对于学习完每个包之后,再从设计者的角度思考一遍,对于后续解决其他问题很有帮助;

参考

转载自:https://juejin.cn/post/7303366710693756965
评论
请登录