Go的三种扩展原语之 — Semaphore
概述
信号量是并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动。
- 每次获取的资源都会将信号量中的计数器减去对应的数值,在释放时重新加回来
- 当遇到计数器大于信号量大小时,会进入休眠等待其他线程释放信号
Go语言的扩展包中提供了带权重的信号量 semaphore.Weighted,我们可以按照不同的权重管理资源的访问,这种结构体暴露了 4 个方法:
- semaphore.NewWeighted —— 用于创建新的信号量
- semaphore.Weighted.Acquire —— 阻塞地获取指定权重的资源,如果当前没有空闲资源,会陷入休眠等待
- semaphore.Weighted.TryAcquire —— 非阻塞地获取指定权重的资源,如果当前没有空闲资源,会直接返回 false
- semaphore.Weighted.Relesae —— 用于释放指定权重的资源
结构体
semaphore.NewWeighted 方法能提供传入的大量权重创建一个指向 semaphore.Weighted 结构体的指针:
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.list
}
semaphore.Weighted 结构体中包含一个 waiters 列表,其中存储着等待获取资源的 Goroutine。除此之外,它还包含当前信号量的上限以及一个计数器 cur, 这个计数器的范围就是 [0,size]
信号量中的计数器会随着用户对资源的访问和释放而改变,引入的权重概念能够提供更细粒度的资源访问控制,尽可能满足常见用例。
获取
semaphore.Weighted.Acquire 方法能用于获取指定权重的资源,其中包含 3 中情况:
- 当信号量中剩余资源大于获取的资源并且没有等待的 Goroutine 时,会直接获取信号量
- 当需要获取的信号量大于 semaphore.Weighted 的上限时,由于不可能满足条件,因此会直接返回错误
- 遇到其他情况时,会将当前 Goroutine 加入等待列表,并通过 select 等待调度器唤醒当前 Goroutine,Goroutine 被唤醒后会获取信号量
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
if s.size - s.cur >= n && len(s.waiters) == 0 {
s.cur += n
return nil
}
...
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
select {
case <-ctx.Done():
err := ctx.Err()
select {
case <-ready:
err = nil
default:
s.waiters.Remove(elem)
}
return err
case <-ready:
return nil
}
}
另一个用于获取信号量的方法 semaphore.Weighted.TryAcquire 只会非阻塞地判断当前信号量是否有充足的资源,如果有,会立刻返回 true, 否则会返回 false :
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && len(s.waiters) == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
因为 semaphore.Weighted.TryAcquire 不会等待资源的释放,所以可能更适用于一些对延时敏感、用户需要立刻感知结果的场景
释放
当我们要释放信号量时,semaphore.Weighted.Relesae 方法会从头到尾遍历 waiters 列表中全部的等待者,如果释放资源后的信号量有充足的剩余资源,就会通过 Channel 唤醒指定的 Goroutine:
func (c *Weighted) Relesae(n int64) {
s.mu.Lock()
s.cur -= n
for {
next := w.waiters.Front()
if next == nil {
break
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
s.mu.Unlock()
}
当然,也可能会出现剩余资源无法唤醒 Channel 的情况,这时当前方法释放锁之后会直接返回。
通过 semaphore.Weighted.Relesae 的分析我们可以发现,如果一个信号量需要占用的资源非常多,它可能会长时间无法获取锁,这也是 semaphore.Weighted.Acquire 引入上下文参数的原因,即为信号量的获取设置超时时间。
小结
带权重的信号量确实有更多的应用场景,,这也是Go语言对外提供的唯一信号量实现,使用过程中需要注意以下几个问题:
- semaphore.Weighted.Acquire 和 semaphore.Weighted.TryAcquire 都可以适用于获取资源,前者会阻塞获取信号量,后者会非阻塞获取信号量
- semaphore.Weighted.Relesae 方法会按照先进先出的顺序唤醒可以被唤醒的 Goroutine
- 如果一个 Goroutine 获取了较多的资源,由于 semaphore.Weighted.Relesae 的释放策略可能会等待较长时间
转载自:https://juejin.cn/post/7257517788193832997