likes
comments
collection
share

关于chatgpt辅助读源码的那些事

作者站长头像
站长
· 阅读数 37

背景

chatgpt的出现可以说对程序员这个职业造成了很大的冲击,除了可以自动生成代码外,也可以理解代码,尝试了下用chatgpt去辅助阅读源码

以kratos的limit为例

实现

下面我会将bbr limit 中的每个方法直接复制给chatgpt,让ai进行解析

cpuproc cpu 使用率

// cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay)
func cpuproc() {
   ticker := time.NewTicker(time.Millisecond * 500) // same to cpu sample rate
   defer func() {
      ticker.Stop()
      if err := recover(); err != nil {
         go cpuproc()
      }
   }()

   // EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863
   for range ticker.C {
      stat := &cpu.Stat{}
      cpu.ReadStat(stat)
      stat.Usage = min(stat.Usage, 1000)
      prevCPU := atomic.LoadInt64(&gCPU)
      curCPU := int64(float64(prevCPU)*decay + float64(stat.Usage)*(1.0-decay))
      atomic.StoreInt64(&gCPU, curCPU)
   }
}

这段代码定义了一个名为cpuproc的函数,该函数会定期地读取当前CPU的使用率并计算基于EMA算法的平均值。具体而言,这个函数会:

  1. 创建一个每500毫秒触发一次的定时器

  2. 在函数返回前停止定时器并检查是否有panic异常,如果有则重新启动cpuproc

  3. 在定时器触发时,获取当前CPU的使用率并进行“平滑化”处理。具体而言,将当前CPU使用率与上一时刻的平均使用率按照指数加权移动平均(EMA)算法的方式进行加权平均。这里用到了atomic包中的LoadInt64()StoreInt64()函数来保证多个goroutine并发执行时平均使用率的正确性。

  4. min(stat.Usage, 1000)语句的作用是确保CPU使用率不会超过100%(1000为10倍的百分比),因为stat.Usage是一个累积值,可能会大于100%。

  5. 这个函数会一直循环执行,直到程序被强制退出为止。

Stat cpu 快照

// Stat contains the metrics snapshot of bbr.
type Stat struct {
   CPU         int64
   InFlight    int64
   MaxInFlight int64
   MinRt       int64
   MaxPass     int64
}

这段代码定义了一个名为Stat的结构体类型,它包含了对于bbr(基于队列的带宽调节算法)的度量快照。具体而言,这个结构体包含了以下字段:

  1. CPU字段表示当前CPU的使用率,单位是百分比,默认最大值为100。

  2. InFlight字段表示当前正在处理的请求数,也就是当前正在占用带宽的请求数。

  3. MaxInFlight字段表示最大允许处理的请求数,也就是最大允许占用带宽的请求数。

  4. MinRt字段表示当前平均响应时间的最小值,单位是纳秒。

  5. MaxPass字段表示当前允许通过的最大QPS(每秒请求数),单位是请求数。

BBR 限流器结构体

// BBR implements bbr-like limiter.
// It is inspired by sentinel.
// https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81
type BBR struct {
   cpu             cpuGetter
   passStat        window.RollingCounter
   rtStat          window.RollingCounter
   inFlight        int64
   bucketPerSecond int64
   bucketDuration  time.Duration

   // prevDropTime defines previous start drop since initTime
   prevDropTime atomic.Value
   maxPASSCache atomic.Value
   minRtCache   atomic.Value

   opts options
}

这段代码定义了一个名为BBR的结构体类型,表示BBR限流器。该结构体包含以下字段:

  1. cpu字段表示用于获取系统CPU利用率的函数类型。

  2. passStat字段表示用于统计窗口周期内通过的请求数的计数器。

  3. rtStat字段表示用于统计窗口周期内的请求响应时间的计数器。

  4. inFlight字段表示当前正在处理的请求数量,它是原子类型,可以安全地被并发修改。

  5. bucketPerSecond字段表示每秒钟分配的桶的数量。

  6. bucketDuration字段表示每个桶的时间周期,即固定时间内的请求限流,它是一个time.Duration类型的值。

  7. prevDropTime字段表示上一次限流降级开始的时间。它是atomic.Value类型,允许线程安全地读写时间值。

  8. maxPASSCache字段表示窗口周期内最大允许通过数的缓存值。它也是一个atomic.Value类型。

  9. minRtCache字段表示窗口周期内最小允许响应时间的缓存值。它也是一个atomic.Value类型。

  10. opts字段表示该限流器的选项。它是一个名为options的结构体类型,包含了各种参数,如窗口大小、桶的数量、CPU利用率阈值等。

maxPASS 窗口周期内最大允许通过数

func (l *BBR) maxPASS() int64 {
   passCache := l.maxPASSCache.Load()
   if passCache != nil {
      ps := passCache.(*counterCache)
      if l.timespan(ps.time) < 1 {
         return ps.val
      }
   }
   rawMaxPass := int64(l.passStat.Reduce(func(iterator window.Iterator) float64 {
      var result = 1.0
      for i := 1; iterator.Next() && i < l.opts.Bucket; i++ {
         bucket := iterator.Bucket()
         count := 0.0
         for _, p := range bucket.Points {
            count += p
         }
         result = math.Max(result, count)
      }
      return result
   }))
   l.maxPASSCache.Store(&counterCache{
      val:  rawMaxPass,
      time: time.Now(),
   })
   return rawMaxPass
}

代码首先尝试从缓存中获取 maxPASS,如果缓存之前已经设置过并且时间跨度小于 1,那么就直接返回缓存中的值。如果缓存不存在或距离上次设置时间大于等于 1,则需要重新计算 maxPASS。

为了计算 maxPASS,代码使用了 passStat 变量的 Reduce 方法,该方法接受一个 window.Iterator 对象,并通过迭代器处理一些计算逻辑,最终返回一个 float64 类型的值。在 Reduce 方法中,代码会遍历迭代器中的数据并计算每个 bucket(桶)中的元素值的和,然后取得这些和的最大值。

计算完 maxPASS 之后,代码将它存储到一个 counterCache 结构体中,并将该结构体设置为缓存的值,以备下次读取。最后,方法返回计算出的 rawMaxPass 值

timespan

// timespan returns the passed bucket count
// since lastTime, if it is one bucket duration earlier than
// the last recorded time, it will return the BucketNum.
func (l *BBR) timespan(lastTime time.Time) int {
   v := int(time.Since(lastTime) / l.bucketDuration)
   if v > -1 {
      return v
   }
   return l.opts.Bucket
}

该方法的作用是计算自最后一次记录时间(即 lastTime)到现在经过了多少个 bucket,其中 bucket 的刻度为 l.bucketDuration。

首先,方法计算了现在与 lastTime 的时间差并除以 l.bucketDuration,这样可以得到穿越的 bucket 数量。然后将这个差值转换成 int 类型,并返回其值。

但是,如果最后一次记录的时间早于当前时间的 l.bucketDuration,即 lastTime 与当前时间相差少于一个 bucket 的时间,则直接返回 l.opts.Bucket,即默认的 bucket 数量。这个判断是为了避免 timespan 方法返回负数

minRT 最小响应时间

func (l *BBR) minRT() int64 {
   rtCache := l.minRtCache.Load()
   if rtCache != nil {
      rc := rtCache.(*counterCache)
      if l.timespan(rc.time) < 1 {
         return rc.val
      }
   }
   rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator window.Iterator) float64 {
      var result = math.MaxFloat64
      for i := 1; iterator.Next() && i < l.opts.Bucket; i++ {
         bucket := iterator.Bucket()
         if len(bucket.Points) == 0 {
            continue
         }
         total := 0.0
         for _, p := range bucket.Points {
            total += p
         }
         avg := total / float64(bucket.Count)
         result = math.Min(result, avg)
      }
      return result
   })))
   if rawMinRT <= 0 {
      rawMinRT = 1
   }
   l.minRtCache.Store(&counterCache{
      val:  rawMinRT,
      time: time.Now(),
   })
   return rawMinRT
}

该函数是BBR算法中计算最小RT(最小响应时间)的函数。它首先会从缓存中获取最小响应时间,如果缓存中的数据未过期,则直接返回缓存中的最小响应时间。否则,它会计算最小响应时间,通过遍历滑动窗口中的所有数据,统计每个时间段内的平均响应时间,并选取其中最小的一个。如果最小响应时间小于等于0,则最小响应时间会被设为1。最后,最小响应时间会被保存到缓存中,并返回最小响应时间。

maxInFlight 最大并发数

func (l *BBR) maxInFlight() int64 {
   return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5)
}

该函数是BBR算法中计算最大并发请求数的函数。它首先调用maxPASS()函数获取最大传输速率,调用minRT()函数获取最小响应时间,再乘以每秒的时间桶数(即bucketPerSecond),再除以1000(单位转换为毫秒),最后向下取整并加上0.5(四舍五入)得到最大并发请求数。具体计算公式为:最大并发请求数 = maxPASS * minRT * bucketPerSecond / 1000。最大并发请求数用于限制当前连接的并发请求数。

shouldDrop 核心方法

func (l *BBR) shouldDrop() bool {
   now := time.Duration(time.Now().UnixNano())
   if l.cpu() < l.opts.CPUThreshold {
      // current cpu payload below the threshold
      prevDropTime, _ := l.prevDropTime.Load().(time.Duration)
      if prevDropTime == 0 {
         // haven't start drop,
         // accept current request
         return false
      }
      if time.Duration(now-prevDropTime) <= time.Second {
         // just start drop one second ago,
         // check current inflight count
         inFlight := atomic.LoadInt64(&l.inFlight)
         return inFlight > 1 && inFlight > l.maxInFlight()
      }
      l.prevDropTime.Store(time.Duration(0))
      return false
   }
   // current cpu payload exceeds the threshold
   inFlight := atomic.LoadInt64(&l.inFlight)
   drop := inFlight > 1 && inFlight > l.maxInFlight()
   if drop {
      prevDrop, _ := l.prevDropTime.Load().(time.Duration)
      if prevDrop != 0 {
         // already started drop, return directly
         return drop
      }
      // store start drop time
      l.prevDropTime.Store(now)
   }
   return drop
}

这是一个 Golang 中的 BBR 拥塞控制算法的实现,根据当前 CPU 负载情况和 inflight count 来判断是否需要进行拥塞控制(即丢弃部分请求)。具体来说,如果当前 CPU 负载低于阈值,且已经有过一次拥塞控制,则需要检查一下 inflight count,如果超过了最大值,则需要进行拥塞控制;如果当前 CPU 负载高于阈值,则需要检查一下 inflight count 是否超过最大值,如果超过了,则需要进行拥塞控制。拥塞控制过程的实现是将 prevDropTime 存储为 0 或当前的时间,如果需要拥塞控制,则将 prevDropTime 设置为当前时间。

Allow 允许通过

// Allow checks all inbound traffic.
// Once overload is detected, it raises limit.ErrLimitExceed error.
func (l *BBR) Allow() (ratelimit.DoneFunc, error) {
   if l.shouldDrop() {
      return nil, ratelimit.ErrLimitExceed
   }
   atomic.AddInt64(&l.inFlight, 1)
   start := time.Now().UnixNano()
   ms := float64(time.Millisecond)
   return func(ratelimit.DoneInfo) {
      //nolint
      rt := int64(math.Ceil(float64(time.Now().UnixNano()-start)) / ms)
      l.rtStat.Add(rt)
      atomic.AddInt64(&l.inFlight, -1)
      l.passStat.Add(1)
   }, nil
}

这是一个实现拥塞控制的限流器(即 BBR 算法)的 Allow 方法。Allow 方法会判断当前是否需要进行拥塞控制(即丢弃一部分请求),如果需要则直接返回 Error。如果不需要进行拥塞控制,则将 inflight 加 1,并记录当前时间。同时返回一个 done 函数,表示当请求处理完成后,需要执行的操作。done 函数内部会将 inflight 减 1,并计算本次请求的处理时间,并根据处理时间更新 rtStat 和 passStat。其中,rtStat 记录了每个请求的处理时间的分布情况,passStat 记录了总共通过的请求的数量。

总结

  • 使用下来很方便,能够准确的理解函数的意思,提高生产力
  • 如果对话前指定上下文环境,ai 会更准确,比如开始前 跟ai说 下面的对话是一段限流算法的源码解析
  • 存在一些问题:比如一段时间未输入会丢失上下文环境、对于部分代码理解不了

ps: 我们会被AI取代吗

源码地址

ratelimit bbr: github.com/go-kratos/a…

系统自适应限流:github.com/alibaba/Sen…

转载自:https://juejin.cn/post/7217991690701914171
评论
请登录