likes
comments
collection
share

并发编程(八) - Pool: 池化技术

作者站长头像
站长
· 阅读数 6

问题引入

在 sync 包中,有一个很有意思的工具 pool。它是用来缓存已经分配的对象以便后续继续使用,从而减轻垃圾收集器的压力「避免重复申请内存创建对象,也减少了垃圾回收的开销」。它之所以在 sync 包中,是因为它是并发安全的。

使用示例

pool 暴露的 api 也是非常的简洁。调用方只需要通过 Get 获取一个对象使用,然后通过 Put 将其放回即可。

type Pool
// 返回 put 进去的对象,如果 pool 中没有对象,则调用 new 方法
func (p *Pool) Get() any
// 将对象放回 pool
func (p *Pool) Put(x any)

假设有下面这个场景,设计一个 Log 方法,将日志写入到 Writer 中去,同时加上写入日志的时间戳。实现时候使用 bufPool 用于获取 Buffer 对象,避免每次调用 log 方法都新建一个 Buffer 对象。

 // 定一个全局变量 bufPool 用于 Get 和 Put
var bufPool = sync.Pool{
    // 定义对象的创建方法,应该返回一个指针。这样在 Get 和 Put 的时候不会有值拷贝,而是 pool 的作用失效
    New: func() any {
       return new(bytes.Buffer)
    },
}

func log(writer io.Writer, msg string) {
    // 获取一个 buf
    buf := bufPool.Get().(*bytes.Buffer)
    buf.WriteString(time.Now().UTC().Format(time.RFC3339))
    buf.WriteString(msg)
    _, _ = writer.Write(buf.Bytes())
    buf.Reset()
    bufPool.Put(buf)
}

实现原理

整体结构如下图所示。在实现的时候从 GMP 的调度模型着手,为每一个 P 创建一个 poolLocal,从而保证只有一个 goroutine 会操作这个结构体。如果当前 goroutine 对应的 P 上找不到可用对象则从其他的 P 中窃取。shared 是一个 lockfree 的结构,因此也无需加锁。 这个设计的精髓在于利用 GMP 调度模型,创建一个无需要加锁的对象存储池。

并发编程(八) - Pool: 池化技术

内存回收

在 pool 中的对象也不是无限存储的,需要进行垃圾回收避免内存浪费。从下面的方法可以得知。对于每一个 pool 对象,第一次 GC 时,会将 local 中的对象移动到 victim;第二次 GC 时会将 victim 中的对象删除,同时将 local 赋值给 victim。victim 就类似于一个垃圾中转站,两次循环才会回收没有用到的对象。

var (
    allPoolsMu Mutex
    // 全部的 pool 对象
    allPools []*Pool
    // 上一轮 GC 的 pool 对象
    oldPools []*Pool
)

func poolCleanup() {
    // 删除所有 pool 中的 victim
    for _, p := range oldPools {
       p.victim = nil
       p.victimSize = 0
    }

    // 将 pool 中的 local 放入到 victim 中, 并将 local 置为 nil
    for _, p := range allPools {
       p.victim = p.local
       p.victimSize = p.localSize
       p.local = nil
       p.localSize = 0
    }

    // 重置
    oldPools, allPools = allPools, nil
}

Get

明白了 pool 数据结构之后就很容易理解 get 操作。先将 goroutine 绑定到当前的 P 中,防止 goroutine 被其他的 P 执行。拿到 poolLocal 之后,先看 private 是否有。没有则从队列中取,如果还取不到则从其他 P 中窃取。如果整个 pool 中都没有则调用 New 方法创建。

func (p *Pool) Get() any {
    l, pid := p.pin()
    x := l.private
    l.private = nil
    if x == nil {
    // Try to pop the head of the local shard. We prefer
    // the head over the tail for temporal locality of
    // reuse.
    x, _ = l.shared.popHead()
       if x == nil {
          x = p.getSlow(pid)
       }
    }
    runtime_procUnpin()
    if x == nil && p.New != nil {
       x = p.New()
    }
    return x
}

Put

Put 方法比较简单。直接将对象放到对应的 localpool 中即可。由于 localpool 是与 P 绑定的,则保证了只有一个 goroutine 会对其进行写操作。

func (p *Pool) Put(x any) {
    if x == nil {
       return
    }
    l, _ := p.pin()
    if l.private == nil {
       l.private = x
    } else {
       l.shared.pushHead(x)
    }
    runtime_procUnpin()
}

注意事项

在 pool 的实现中,巧妙的利用 GMP 模型,为每个 P 分配一个 poolLocal「无锁队列」 从而避免锁竞争。然而,在具体的使用中仍需要注意以下问题。

Put 和 Get 不一致

如果在使用中没有设置 new 方法,如果 pool 中没有数据,get 就回返回 nil。通常考虑要定义 new 方法,保证 Get 一定能拿到数据。

type obj struct {
}

var pool = sync.Pool{}

func main() {
    // pool 为空,返回 nil
    fmt.Println(pool.Get() == nil)
    pool.Put(&obj{})
    fmt.Println(pool.Get() != nil)
}

var pool = sync.Pool{
    // 如果 pool 为空,get 是返回 New()
    New: func() any {
       return new(obj)
    },
}

Put 和 Get 操作并没有进行类型校验,为了保证操作的类型相同,可以为每一种类型创建一个 pool 对象。避免创建一个公用的 pool,Get 和 Put 不同的类型。

var pool = sync.Pool{
    New: func() any {
       return new(obj)
    },
}

func main() {
    // 同时放入两种不同的 type
    pool.Put(new(obj))
    pool.Put(new(obj2))
    fmt.Printf("%T", pool.Get()) // obj
    fmt.Printf("%T", pool.Get()) // obj2
}

对象会被回收

Pool 中 put 进去的对象并不会一直存在,如果一直没有使用则会在两个 GC 周期内回收掉。例如:对于数据库连接池,创建连接成本较高,即使空闲连接没有被应用到也不应该回收。这种情况下使用 sync.pool 就不太合适了。

type obj struct {
    i int
}

var pool = sync.Pool{
    New: func() any {
       return new(obj)
    },
}

func main() {
    for i := 0; i < 10; i++ {
       o := &obj{
          i: i,
       }
       runtime.SetFinalizer(o, func(o *obj) {
          fmt.Println("collector obj: ", o.i) // gc 回收打印
       })
       pool.Put(o)
    }
    // 第一次 gc, 将 local 移动到 victim
    runtime.GC()
    // 第二次 gc, 回收 victim
    runtime.GC()
    time.Sleep(time.Second)
}

对于 Put 操作,放进去的应该是指针类型,而不是值类型。如果存放的是值类型,其实存储的是值的 copy。使用 pool 则没有什么意义。

func main() {
    for i := 0; i < 10; i++ {
       o := obj{
          i: i,
       }
       runtime.SetFinalizer(&o, func(o *obj) {
          fmt.Println("collector obj: ", o.i)
       })
       // 存放的是值类型,其实是发生了值 copy。等价于 var t any = o; pool.Put(t)
        pool.Put(o)
    }
    runtime.GC() // 发生 gc 回收
    fmt.Println(pool.Get(), pool.Get()) // 输出的是 copy 的值
    time.Sleep(time.Second * 3)
}

总结

sync.pool 巧妙的利用 GMP 调度模型和无锁队列来实现对象池。需要注意的是对于不使用的对象,在两个 GC 周期后将被回收。池化技术是一种很常见的优化技术。如果发现程序中有一些对象被不断重复的创建和回收,或者有些对象的创建开销特别大「TCP 连接」,这时可以考虑选择合适的池化技术优化系统。