关于chatgpt辅助读源码的那些事
背景
chatgpt的出现可以说对程序员这个职业造成了很大的冲击,除了可以自动生成代码外,也可以理解代码,尝试了下用chatgpt去辅助阅读源码
以kratos的limit为例
实现
下面我会将bbr limit 中的每个方法直接复制给chatgpt,让ai进行解析
cpuproc
cpu 使用率
// cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay)
func cpuproc() {
ticker := time.NewTicker(time.Millisecond * 500) // same to cpu sample rate
defer func() {
ticker.Stop()
if err := recover(); err != nil {
go cpuproc()
}
}()
// EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863
for range ticker.C {
stat := &cpu.Stat{}
cpu.ReadStat(stat)
stat.Usage = min(stat.Usage, 1000)
prevCPU := atomic.LoadInt64(&gCPU)
curCPU := int64(float64(prevCPU)*decay + float64(stat.Usage)*(1.0-decay))
atomic.StoreInt64(&gCPU, curCPU)
}
}
这段代码定义了一个名为cpuproc
的函数,该函数会定期地读取当前CPU的使用率并计算基于EMA算法的平均值。具体而言,这个函数会:
-
创建一个每500毫秒触发一次的定时器
-
在函数返回前停止定时器并检查是否有panic异常,如果有则重新启动cpuproc
-
在定时器触发时,获取当前CPU的使用率并进行“平滑化”处理。具体而言,将当前CPU使用率与上一时刻的平均使用率按照指数加权移动平均(EMA)算法的方式进行加权平均。这里用到了
atomic
包中的LoadInt64()
和StoreInt64()
函数来保证多个goroutine并发执行时平均使用率的正确性。 -
min(stat.Usage, 1000)
语句的作用是确保CPU使用率不会超过100%(1000为10倍的百分比),因为stat.Usage是一个累积值,可能会大于100%。 -
这个函数会一直循环执行,直到程序被强制退出为止。
Stat
cpu 快照
// Stat contains the metrics snapshot of bbr.
type Stat struct {
CPU int64
InFlight int64
MaxInFlight int64
MinRt int64
MaxPass int64
}
这段代码定义了一个名为Stat
的结构体类型,它包含了对于bbr(基于队列的带宽调节算法)的度量快照。具体而言,这个结构体包含了以下字段:
-
CPU
字段表示当前CPU的使用率,单位是百分比,默认最大值为100。 -
InFlight
字段表示当前正在处理的请求数,也就是当前正在占用带宽的请求数。 -
MaxInFlight
字段表示最大允许处理的请求数,也就是最大允许占用带宽的请求数。 -
MinRt
字段表示当前平均响应时间的最小值,单位是纳秒。 -
MaxPass
字段表示当前允许通过的最大QPS(每秒请求数),单位是请求数。
BBR
限流器结构体
// BBR implements bbr-like limiter.
// It is inspired by sentinel.
// https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81
type BBR struct {
cpu cpuGetter
passStat window.RollingCounter
rtStat window.RollingCounter
inFlight int64
bucketPerSecond int64
bucketDuration time.Duration
// prevDropTime defines previous start drop since initTime
prevDropTime atomic.Value
maxPASSCache atomic.Value
minRtCache atomic.Value
opts options
}
这段代码定义了一个名为BBR
的结构体类型,表示BBR限流器。该结构体包含以下字段:
-
cpu
字段表示用于获取系统CPU利用率的函数类型。 -
passStat
字段表示用于统计窗口周期内通过的请求数的计数器。 -
rtStat
字段表示用于统计窗口周期内的请求响应时间的计数器。 -
inFlight
字段表示当前正在处理的请求数量,它是原子类型,可以安全地被并发修改。 -
bucketPerSecond
字段表示每秒钟分配的桶的数量。 -
bucketDuration
字段表示每个桶的时间周期,即固定时间内的请求限流,它是一个time.Duration
类型的值。 -
prevDropTime
字段表示上一次限流降级开始的时间。它是atomic.Value
类型,允许线程安全地读写时间值。 -
maxPASSCache
字段表示窗口周期内最大允许通过数的缓存值。它也是一个atomic.Value
类型。 -
minRtCache
字段表示窗口周期内最小允许响应时间的缓存值。它也是一个atomic.Value
类型。 -
opts
字段表示该限流器的选项。它是一个名为options
的结构体类型,包含了各种参数,如窗口大小、桶的数量、CPU利用率阈值等。
maxPASS
窗口周期内最大允许通过数
func (l *BBR) maxPASS() int64 {
passCache := l.maxPASSCache.Load()
if passCache != nil {
ps := passCache.(*counterCache)
if l.timespan(ps.time) < 1 {
return ps.val
}
}
rawMaxPass := int64(l.passStat.Reduce(func(iterator window.Iterator) float64 {
var result = 1.0
for i := 1; iterator.Next() && i < l.opts.Bucket; i++ {
bucket := iterator.Bucket()
count := 0.0
for _, p := range bucket.Points {
count += p
}
result = math.Max(result, count)
}
return result
}))
l.maxPASSCache.Store(&counterCache{
val: rawMaxPass,
time: time.Now(),
})
return rawMaxPass
}
代码首先尝试从缓存中获取 maxPASS,如果缓存之前已经设置过并且时间跨度小于 1,那么就直接返回缓存中的值。如果缓存不存在或距离上次设置时间大于等于 1,则需要重新计算 maxPASS。
为了计算 maxPASS,代码使用了 passStat 变量的 Reduce 方法,该方法接受一个 window.Iterator 对象,并通过迭代器处理一些计算逻辑,最终返回一个 float64 类型的值。在 Reduce 方法中,代码会遍历迭代器中的数据并计算每个 bucket(桶)中的元素值的和,然后取得这些和的最大值。
计算完 maxPASS 之后,代码将它存储到一个 counterCache 结构体中,并将该结构体设置为缓存的值,以备下次读取。最后,方法返回计算出的 rawMaxPass 值
timespan
// timespan returns the passed bucket count
// since lastTime, if it is one bucket duration earlier than
// the last recorded time, it will return the BucketNum.
func (l *BBR) timespan(lastTime time.Time) int {
v := int(time.Since(lastTime) / l.bucketDuration)
if v > -1 {
return v
}
return l.opts.Bucket
}
该方法的作用是计算自最后一次记录时间(即 lastTime)到现在经过了多少个 bucket,其中 bucket 的刻度为 l.bucketDuration。
首先,方法计算了现在与 lastTime 的时间差并除以 l.bucketDuration,这样可以得到穿越的 bucket 数量。然后将这个差值转换成 int 类型,并返回其值。
但是,如果最后一次记录的时间早于当前时间的 l.bucketDuration,即 lastTime 与当前时间相差少于一个 bucket 的时间,则直接返回 l.opts.Bucket,即默认的 bucket 数量。这个判断是为了避免 timespan 方法返回负数
minRT
最小响应时间
func (l *BBR) minRT() int64 {
rtCache := l.minRtCache.Load()
if rtCache != nil {
rc := rtCache.(*counterCache)
if l.timespan(rc.time) < 1 {
return rc.val
}
}
rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator window.Iterator) float64 {
var result = math.MaxFloat64
for i := 1; iterator.Next() && i < l.opts.Bucket; i++ {
bucket := iterator.Bucket()
if len(bucket.Points) == 0 {
continue
}
total := 0.0
for _, p := range bucket.Points {
total += p
}
avg := total / float64(bucket.Count)
result = math.Min(result, avg)
}
return result
})))
if rawMinRT <= 0 {
rawMinRT = 1
}
l.minRtCache.Store(&counterCache{
val: rawMinRT,
time: time.Now(),
})
return rawMinRT
}
该函数是BBR算法中计算最小RT(最小响应时间)的函数。它首先会从缓存中获取最小响应时间,如果缓存中的数据未过期,则直接返回缓存中的最小响应时间。否则,它会计算最小响应时间,通过遍历滑动窗口中的所有数据,统计每个时间段内的平均响应时间,并选取其中最小的一个。如果最小响应时间小于等于0,则最小响应时间会被设为1。最后,最小响应时间会被保存到缓存中,并返回最小响应时间。
maxInFlight
最大并发数
func (l *BBR) maxInFlight() int64 {
return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5)
}
该函数是BBR算法中计算最大并发请求数的函数。它首先调用maxPASS()函数获取最大传输速率,调用minRT()函数获取最小响应时间,再乘以每秒的时间桶数(即bucketPerSecond),再除以1000(单位转换为毫秒),最后向下取整并加上0.5(四舍五入)得到最大并发请求数。具体计算公式为:最大并发请求数 = maxPASS * minRT * bucketPerSecond / 1000。最大并发请求数用于限制当前连接的并发请求数。
shouldDrop
核心方法
func (l *BBR) shouldDrop() bool {
now := time.Duration(time.Now().UnixNano())
if l.cpu() < l.opts.CPUThreshold {
// current cpu payload below the threshold
prevDropTime, _ := l.prevDropTime.Load().(time.Duration)
if prevDropTime == 0 {
// haven't start drop,
// accept current request
return false
}
if time.Duration(now-prevDropTime) <= time.Second {
// just start drop one second ago,
// check current inflight count
inFlight := atomic.LoadInt64(&l.inFlight)
return inFlight > 1 && inFlight > l.maxInFlight()
}
l.prevDropTime.Store(time.Duration(0))
return false
}
// current cpu payload exceeds the threshold
inFlight := atomic.LoadInt64(&l.inFlight)
drop := inFlight > 1 && inFlight > l.maxInFlight()
if drop {
prevDrop, _ := l.prevDropTime.Load().(time.Duration)
if prevDrop != 0 {
// already started drop, return directly
return drop
}
// store start drop time
l.prevDropTime.Store(now)
}
return drop
}
这是一个 Golang 中的 BBR 拥塞控制算法的实现,根据当前 CPU 负载情况和 inflight count 来判断是否需要进行拥塞控制(即丢弃部分请求)。具体来说,如果当前 CPU 负载低于阈值,且已经有过一次拥塞控制,则需要检查一下 inflight count,如果超过了最大值,则需要进行拥塞控制;如果当前 CPU 负载高于阈值,则需要检查一下 inflight count 是否超过最大值,如果超过了,则需要进行拥塞控制。拥塞控制过程的实现是将 prevDropTime 存储为 0 或当前的时间,如果需要拥塞控制,则将 prevDropTime 设置为当前时间。
Allow
允许通过
// Allow checks all inbound traffic.
// Once overload is detected, it raises limit.ErrLimitExceed error.
func (l *BBR) Allow() (ratelimit.DoneFunc, error) {
if l.shouldDrop() {
return nil, ratelimit.ErrLimitExceed
}
atomic.AddInt64(&l.inFlight, 1)
start := time.Now().UnixNano()
ms := float64(time.Millisecond)
return func(ratelimit.DoneInfo) {
//nolint
rt := int64(math.Ceil(float64(time.Now().UnixNano()-start)) / ms)
l.rtStat.Add(rt)
atomic.AddInt64(&l.inFlight, -1)
l.passStat.Add(1)
}, nil
}
这是一个实现拥塞控制的限流器(即 BBR 算法)的 Allow 方法。Allow 方法会判断当前是否需要进行拥塞控制(即丢弃一部分请求),如果需要则直接返回 Error。如果不需要进行拥塞控制,则将 inflight 加 1,并记录当前时间。同时返回一个 done 函数,表示当请求处理完成后,需要执行的操作。done 函数内部会将 inflight 减 1,并计算本次请求的处理时间,并根据处理时间更新 rtStat 和 passStat。其中,rtStat 记录了每个请求的处理时间的分布情况,passStat 记录了总共通过的请求的数量。
总结
- 使用下来很方便,能够准确的理解函数的意思,提高生产力
- 如果对话前指定上下文环境,ai 会更准确,比如开始前 跟ai说 下面的对话是一段限流算法的源码解析
- 存在一些问题:比如一段时间未输入会丢失上下文环境、对于部分代码理解不了
ps: 我们会被AI取代吗
源码地址
ratelimit bbr: github.com/go-kratos/a…
系统自适应限流:github.com/alibaba/Sen…
转载自:https://juejin.cn/post/7217991690701914171