likes
comments
collection

【对象池化组件】- bytebufferpool

作者站长头像
站长
· 阅读数 14

1. 针对问题

在编程开发的过程中,我们经常会有创建同类对象的场景,这样的操作可能会对性能产生影响,一个比较常见的做法是使用对象池,需要创建对象的时候,我们先从对象池中查找,如果有空闲对象,则从对象池中移除这个对象并将其返回给调用者使用,只有在池中无空闲对象的时候,才会真正创建一个新对象

另一方面,对于使用完的对象,我们并不会对它进行销毁,而是将它放回到对象池以供后续使用,使用对象池在频繁创建和销毁对象的情况下,能大幅的提升性能,同时为了避免对象池中的对象占用过多的内存,对象池一般还配有特定的清理策略,Go的标准库sync.Pool就是这样一个例子,sync.Pool 中的对象会被垃圾回收清理掉

这类对象中,有一种比较特殊的是字节切片,在做字符串拼接的时候,为了拼接高效,我们通常将中间结果存放在一个字节缓冲中,拼接完之后,再从字节缓冲区生成字符串

Go标准库bytes.Buffer封装字节切片,提供一些使用接口,我们知道切片的容量是有限的,容量不足时需要进行扩容,而频繁的扩容容易造成性能抖动

bytebufferpool实现了自己的Buffer类型,并引入一个简单的算法降低扩容带来的性能损失

2. 使用方法

bytebufferpool的接入很轻量

func main() {
   bf := bytebufferpool.Get()
   bf.WriteString("Hello")
   bf.WriteString(" World!!")
   fmt.Println(bf.String())
}

上面的这种用法使用的是defaultPoolbytebufferpoolPool对象是公开的,也可以自行新建

3. 源码剖析

bytebufferpool是如何做到最大程度减小内存分配和浪费的呢,先宏观的看整个Pool的定义,然后细化到相关的方法,就可以找到答案

bytebufferpoolPool结构体的定义为

type Pool struct {
   calls       [steps]uint64
   calibrating uint64

   defaultSize uint64
   maxSize     uint64

   pool sync.Pool
}

其中calls存储了某一个区间内不同大小对象的个数,calibrating是一个标志位,标志当前Pool是否在重新规划中,defaultSize是元素新建时的默认大小,它的选取逻辑是当前calls中出现次数最多的对象对应的区间最大值,这样可以防止从对象池中捞取之后的频繁扩容,maxSize限制了放入Pool中的最大元素的大小,防止因为一些很大的对象占用过多的内存

bytebufferpool中定义了一些和defaultSizemaxSize计算相关的常量

const (
   minBitSize = 6 // 2**6=64 is a CPU cache line size
   steps      = 20

   minSize = 1 << minBitSize
   maxSize = 1 << (minBitSize + steps - 1)

   calibrateCallsThreshold = 42000
   maxPercentile           = 0.95
)

其中minBitSize表示的是第一个区间对象大小的最大值(2的xx次方-1),在bytebufferpool中,将对象大小分为20个区间,也就是steps,第一个区间为[0, 2^6-1],第二个为[2^6, 2^7-1]...,依此类推

calibrateCallsThreshold表示如果某个区间内对象的数量超过这个阈值,则对Pool中的变量进行重新的计算,maxPercentile用于计算Pool中的maxSize,表示前95%的元素大小

bytebufferpool中的方法也比较少,核心的是GetPut方法

  • Get
func (p *Pool) Get() *ByteBuffer {
   v := p.pool.Get()
   if v != nil {
      return v.(*ByteBuffer)
   }
   return &ByteBuffer{
      B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)),
   }
}

可以看到,如果对象池中没有对象的话,会申请defaultSize大小的切片返回

  • Put
func (p *Pool) Put(b *ByteBuffer) {
   idx := index(len(b.B))

   if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
      p.calibrate()
   }

   maxSize := int(atomic.LoadUint64(&p.maxSize))
   if maxSize == 0 || cap(b.B) <= maxSize {
      b.Reset()
      p.pool.Put(b)
   }
}

Put方法会比较麻烦,我们分步来看

  1. 计算放入元素在calls数组中的位置
func index(n int) int {
   n--
   n >>= minBitSize
   idx := 0
   for n > 0 {
      n >>= 1
      idx++
   }
   if idx >= steps {
      idx = steps - 1
   }
   return idx
}

这里的逻辑就是先将长度右移minBitSize,如果依然大于0,则每次右移一位,idx加1,最后如果idx超出了总的steps(20),则位置就在最后一个区间

  1. 判断当前区间放入元素的个数是否超过了calibrateCallsThreshold指定的阈值,超过则重新计算Pool中元素的值
func (p *Pool) calibrate() {
   // 如果正在重新计算,则返回,控制多并发
   if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) {
      return
   }

   // 计算每一段区间中的元素个数 & 元素总个数
   a := make(callSizes, 0, steps)
   var callsSum uint64
   for i := uint64(0); i < steps; i++ {
      calls := atomic.SwapUint64(&p.calls[i], 0)
      callsSum += calls
      a = append(a, callSize{
         calls: calls,
         size:  minSize << i,
      })
   }
   // 按照对象元素的个数从大到小排序
   sort.Sort(a)

   // defaultSize 为内部切片的默认大小,减少扩容次数
   // maxSize 限制放入pool中的最大元素大小
   defaultSize := a[0].size
   maxSize := defaultSize

   // 将前95%元素中的最大size给maxSize
   maxSum := uint64(float64(callsSum) * maxPercentile)
   callsSum = 0
   for i := 0; i < steps; i++ {
      if callsSum > maxSum {
         break
      }
      callsSum += a[i].calls
      size := a[i].size
      if size > maxSize {
         maxSize = size
      }
   }

   // 对defaultSize和maxSize进行赋值
   atomic.StoreUint64(&p.defaultSize, defaultSize)
   atomic.StoreUint64(&p.maxSize, maxSize)

   atomic.StoreUint64(&p.calibrating, 0)
}
  1. 判断当前放入元素的大小是否超过了maxSize,超过则不放入对象池中