likes
comments
collection
share

并发编程(六) - Once: 只执行一次的操作

作者站长头像
站长
· 阅读数 9

问题引入

在编程中,我们经常遇到一些场景,有些操作只需要执行一次。例如:加载配置配件,初始化连接池等。在这些场景中,初始化完成后,后续则直接使用。常见的实现方案有:全局变量初始化、init 中执行初始化操作、main 函数中调用初始化操作。在这三种方案中,均可以保证资源只被初始化一次。

 // 1: 变量声明时直接赋值
var config1 = "config"

// 2: init 函数中赋值
var config2 string

func init() {
    config2 = "config"
}

// 3: main 函数启动时调用
var config3 string

func initConfig() {
    config3 = "config"
}

func main() {
    initConfig()
}

有时候我们希望延迟加载,即只有在使用的时候才去加载内容,从而节省一定的资源。这时就需要一个工具类,支持延迟加载,且并发场景下只能执行一次。

使用示例

针对上面的问题,可以使用 go 提供 Once 工具类。它只提供了一个 Do(f func()) 方法,同时保证了 f 在并发场景下只能执行一次。且后续的 Do 调用会阻塞,直到 f 执行完成。

并发编程(六) - Once: 只执行一次的操作 我们可以使用 Once 来完成对 config 的延迟加载。

var config string
var once sync.Once

func getConfig() string {
    once.Do(func() {
       config = "name"
    })
    return config
}

func main() {
    // 第一次调用执行 f
    fmt.Println(getConfig())
    // 第二次调用,不执行 f,直接 return; 如果第一次调用中的 f 没有执行完则阻塞等待
    fmt.Println(getConfig())
}

实现原理

Go 一如既往的秉持大道至简的原则,只暴露一个 Do 方法,方便使用者理解。实现的难点在于并发场景下 f 只执行一次,同时后续的调用需要等待 f 执行完成。下面看一下 Once 结构体:

// done 用于标识 Do(f func()) 是否执行完成,m 用于控制阻塞等待 f 的并发执行
type Once struct {
     // done indicates whether the action has been performed.
     // It is first in the struct because it is used in the hot path.
     // The hot path is inlined at every call site.
     // Placing done first allows more compact instructions on some architectures (amd64/386),
     // and fewer instructions (to calculate offset) on other architectures.
    done uint32
    m    Mutex
}

Do 函数源码。使用原子操作快速判断 f 是否已经执行过。如果没有执行过,或者正在执行,则尝试获取 m。如果 f 未开始执行,获取 m,开始执行 f。如果 f 正在执行中,则阻塞等待 m 释放。获取到 m 后,二次判断 done。这里需要注意的是,在设置 done 是使用了原子操作。是为了保证 fast-path 中的数据竞争问题。

func (o *Once) Do(f func()) {
    // Note: Here is an incorrect implementation of Do:
     //
     // if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
     //    f()
     // }
     //
     // Do guarantees that when it returns, f has finished.
     // This implementation would not implement that guarantee:
     // given two simultaneous calls, the winner of the cas would
     // call f, and the second would return immediately, without
     // waiting for the first's call to f to complete.
     // This is why the slow path falls back to a mutex, and why
     // the atomic.StoreUint32 must be delayed until after f returns.

    if atomic.LoadUint32(&o.done) == 0 {
           // Outlined slow-path to allow inlining of the fast-path.
            o.doSlow(f)
     }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
       defer atomic.StoreUint32(&o.done, 1)
       f()
    }
}

注意事项

不可复制

和 Mutex、RWMutex 一样是不可复制的。

死锁

Once 在第一次执行时执行 f,如果 f 中继续调用 Once 则会导致循环等待而死锁。

func main() {
    var o sync.Once
    o.Do(func() {
       o.Do(func() {
          println("done")
       })
    })
}

f 只能执行一次

Once 保证的是当前变量只会执行一次 f,与 f 的具体值没有关系。

func main() {
    var o sync.Once
    o.Do(func() {
       println("A")
    })
    o.Do(func() {
       println("B")
    })
}

f 发生 panic

Once 并未处理函数中的 panic。因此,需要业务方在定义 f 的时候做好 panic 处理的逻辑。

func main() {
    var o sync.Once
    o.Do(func() {
       panic("err")
    })
}

once 扩展

OnceFunc

Once 解决了开发者常见的延迟加载问题。但也会遇到一些问题。由于传递的 f 并没有返回值,导致开发者很难感知或处理异常情况。例如下面这个 case,如果 f 执行时发生了 panic,后续的调用仍然可以正常执行。

func main() {
    var o sync.Once
    var wg sync.WaitGroup
    f := func() { panic("err") }
    wg.Add(1)
    go func() {
       defer func() {
          if err := recover(); err != nil {
             println("once do f panic:", err)
          }
          wg.Done()
       }()
       o.Do(f)
    }()
    wg.Wait()
    // 可以正常执行, 因为已经执行过 f
    o.Do(f)
}

针对这个场景可以使用 OnceFunc:返回一个函数,保证在并发场景下 f 只会执行一次,如果 f 发生 panic,会保证每次调用都会发生 panic。

func OnceFunc(f func()) func()
func main() {
    var wg sync.WaitGroup
    f := func() { panic("err") }
    onceF := sync.OnceFunc(f)
    wg.Add(1)
    go func() {
       defer func() {
          if err := recover(); err != nil {
             println("once do f panic:", err)
          }
          wg.Done()
       }()
       onceF()
    }()
    wg.Wait()
    // 由于执行 f 时发生 panic,后续调用均会 panic
    onceF()
}

OnceValue

OnceValue 是与 OnceFunc 对应的,用于执行带返回参数的 f。语义和 OnceFunc 一样,如果 f 发生 panic ,则每次调用均会 panic。

func OnceValue[T any](f func() T) func() T
func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

使用 OnceValue 使用延迟加载 config。

var configOnceValue = sync.OnceValue[string](func() string {
    return "config"
})

func getConfig() string {
    return configOnceValue()
}

func main() {
    // 第一次调用执行 f
fmt.Println(getConfig())
    // 第二次调用,不执行 f,直接 return f 的结果; 如果第一次调用中的 f 没有执行完则阻塞等待
fmt.Println(getConfig())
}

总结

Once 是一个很简单但却非常实用的工具类。通常用来解决并发场景下 initOnce 的问题。OnceFunc 和 Once 其实是相互补充,核心区别就开如何对 panic 进行处理。

转载自:https://juejin.cn/post/7338591452913696768
评论
请登录