likes
comments
collection
share

Go的三种扩展原语之 — Semaphore

作者站长头像
站长
· 阅读数 35

概述

信号量是并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动。

  • 每次获取的资源都会将信号量中的计数器减去对应的数值,在释放时重新加回来
  • 当遇到计数器大于信号量大小时,会进入休眠等待其他线程释放信号

Go语言的扩展包中提供了带权重的信号量 semaphore.Weighted,我们可以按照不同的权重管理资源的访问,这种结构体暴露了 4 个方法:

  • semaphore.NewWeighted —— 用于创建新的信号量
  • semaphore.Weighted.Acquire —— 阻塞地获取指定权重的资源,如果当前没有空闲资源,会陷入休眠等待
  • semaphore.Weighted.TryAcquire —— 非阻塞地获取指定权重的资源,如果当前没有空闲资源,会直接返回 false
  • semaphore.Weighted.Relesae —— 用于释放指定权重的资源

结构体

semaphore.NewWeighted 方法能提供传入的大量权重创建一个指向 semaphore.Weighted 结构体的指针:

func NewWeighted(n int64) *Weighted {
    w := &Weighted{size: n}
    return w
}

type Weighted struct {
    size    int64
    cur     int64
    mu      sync.Mutex
    waiters list.list
}

semaphore.Weighted 结构体中包含一个 waiters 列表,其中存储着等待获取资源的 Goroutine。除此之外,它还包含当前信号量的上限以及一个计数器 cur, 这个计数器的范围就是 [0,size]

信号量中的计数器会随着用户对资源的访问和释放而改变,引入的权重概念能够提供更细粒度的资源访问控制,尽可能满足常见用例。

获取

semaphore.Weighted.Acquire 方法能用于获取指定权重的资源,其中包含 3 中情况:

  • 当信号量中剩余资源大于获取的资源并且没有等待的 Goroutine 时,会直接获取信号量
  • 当需要获取的信号量大于 semaphore.Weighted 的上限时,由于不可能满足条件,因此会直接返回错误
  • 遇到其他情况时,会将当前 Goroutine 加入等待列表,并通过 select 等待调度器唤醒当前 Goroutine,Goroutine 被唤醒后会获取信号量
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    if s.size - s.cur >= n && len(s.waiters) == 0 {
        s.cur += n
        return nil
    }
    
    ...
    
    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    select {
    case <-ctx.Done():
        err := ctx.Err()
        select {
        case <-ready:
            err = nil
        default:
            s.waiters.Remove(elem)
        }
        return err
    case <-ready:
        return nil
    }
}

另一个用于获取信号量的方法 semaphore.Weighted.TryAcquire 只会非阻塞地判断当前信号量是否有充足的资源,如果有,会立刻返回 true, 否则会返回 false :

func (s *Weighted) TryAcquire(n int64) bool {
    s.mu.Lock()
    success := s.size-s.cur >= n && len(s.waiters) == 0
    if success {
        s.cur += n
    }
    s.mu.Unlock()
    return success
}

因为 semaphore.Weighted.TryAcquire 不会等待资源的释放,所以可能更适用于一些对延时敏感、用户需要立刻感知结果的场景

释放

当我们要释放信号量时,semaphore.Weighted.Relesae 方法会从头到尾遍历 waiters 列表中全部的等待者,如果释放资源后的信号量有充足的剩余资源,就会通过 Channel 唤醒指定的 Goroutine:

func (c *Weighted) Relesae(n int64) {
    s.mu.Lock()
    s.cur -= n
    for {
        next := w.waiters.Front()
        if next == nil {
            break
        }
        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            break
        }
        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready)
    }
    s.mu.Unlock()
}

当然,也可能会出现剩余资源无法唤醒 Channel 的情况,这时当前方法释放锁之后会直接返回。

通过 semaphore.Weighted.Relesae 的分析我们可以发现,如果一个信号量需要占用的资源非常多,它可能会长时间无法获取锁,这也是 semaphore.Weighted.Acquire 引入上下文参数的原因,即为信号量的获取设置超时时间。

小结

带权重的信号量确实有更多的应用场景,,这也是Go语言对外提供的唯一信号量实现,使用过程中需要注意以下几个问题:

  • semaphore.Weighted.Acquire 和 semaphore.Weighted.TryAcquire 都可以适用于获取资源,前者会阻塞获取信号量,后者会非阻塞获取信号量
  • semaphore.Weighted.Relesae 方法会按照先进先出的顺序唤醒可以被唤醒的 Goroutine
  • 如果一个 Goroutine 获取了较多的资源,由于 semaphore.Weighted.Relesae 的释放策略可能会等待较长时间
转载自:https://juejin.cn/post/7257517788193832997
评论
请登录