Sentinel源码5-滑动时间窗口(统计数据StatisticSlot)
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
Sentinel 的核心骨架,将不同的 Slot 按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic)和判断部分(rule checking)。核心结构:
上图中的右上角就是滑动窗口的示意图,是 StatisticSlot
的具体实现。StatisticSlot
是 Sentinel
的核心功能插槽之一,用于统计实时的调用数据
。
1. 数据结构和核心类
Sentinel
是基于滑动窗口实现的实时指标数据收集统计
,底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据
,可以很好地支撑写多于读的高并发场景
。核心数据结构:
ArrayMetric
:滑动窗口核心实现类。LeapArray
:滑动窗口顶层数据结构,包含一个一个的窗口数据。WindowWrap
:每一个滑动窗口的包装类,其内部的数据结构用 MetricBucket 表示。MetricBucket
:指标桶,例如通过数量、阻塞数量、异常数量、成功数量、响应时间,已通过未来配额(抢占下一个滑动窗口的数量)。MetricEvent
:指标类型,例如通过数量、阻塞数量、异常数量、成功数量、响应时间等。
1.1 ArrayMetric
滑动窗口的入口类为 ArrayMetric
,实现了 Metric
指标收集核心接口,该接口主要定义一个滑动窗口中成功的数量、异常数量、阻塞数量,TPS、响应时间等数据。
// 这是一个使用数组保存数据的计量器类
public class ArrayMetric implements Metric {
// 数据就保存在data中
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
/**
* @param sampleCount 在一个采集间隔中抽样的个数,默认为 2,即一个采集间隔中会包含两个相等的区间,一个区间就是一个窗口。
* @param intervalInMs 表示一个采集的时间间隔,即滑动窗口的总时间,例如 1 分钟。
* @param enableOccupy 是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量。
*/
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
}
int intervalInMs
:表示一个采集的时间间隔,即滑动窗口的总时间,例如 1 分钟。int sampleCount
:在一个采集间隔中抽样的个数,默认为 2,即一个采集间隔中会包含两个相等的区间,一个区间就是一个窗口。boolean enableOccupy
:是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量。
1.2 LeapArray
LeapArray
用来承载滑动窗口,即成员变量 array,类型为 AtomicReferenceArray<WindowWrap<T>>
,保证创建窗口的原子性(CAS)。
public abstract class LeapArray<T> {
// 样本窗口长度,每一个窗口的时间间隔,单位为毫秒
protected int windowLengthInMs;
// 抽样个数,一个时间窗中包含的样本窗数量
protected int sampleCount;
// 时间窗长度,单位毫秒
protected int intervalInMs;
// 时间窗长度,单位秒
private double intervalInSecond;
// 滑动窗口的数组,滑动窗口类型WindowWrap<MetricBucket>
// 数组,元素为WindowWrap 样本窗口
// 注意,泛型T为 MetricBucket
protected final AtomicReferenceArray<WindowWrap<T>> array;
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
1.3 MetricBucket
Sentinel
使用 MetricBucket
统计一个窗口时间内的各项指标数据,这些指标数据包括请求总数、成功总数、异常总数、总耗时、最小耗时、最大耗时等,而一个 Bucket 可以是记录一秒内的数据,也可以是 10 毫秒内的数据,这个时间长度称为窗口时间。
// 统计数据的封装类
public class MetricBucket {
// 统计的数据存放在这里,数组
// 统计的数据为多维度的,这些维度的类型在MetricEvent枚举中,比如异常总数、请求总数等
private final LongAdder[] counters;
// 这段事件内的最小耗时
private volatile long minRt;
}
Bucket
记录一段时间内的各项指标数据用的是一个 LongAdder
数组,数组的每个元素分别记录一个时间窗口内的请求总数、异常数、总耗时。也就是说:MetricBucket
包含一个 LongAdder
数组,数组的每个元素代表一类 MetricEvent
。
LongAdder
保证了数据修改的原子性,并且性能比 AtomicLong
表现更好。
// 数据统计的维度
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
当需要获取 Bucket
记录总的成功请求数或者异常总数、总的请求处理耗时,可根据事件类型 (MetricEvent
) 从 Bucket
的 LongAdder
数组中获取对应的 LongAdder
,并调用 sum
方法获取总数。
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
当需要 Bucket
记录一个成功请求或者一个异常请求、处理请求的耗时,可根据事件类型(MetricEvent
)从 LongAdder
数组中获取对应的 LongAdder
,并调用其 add
方法。
public void add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
}
1.4 WindowWrap
因为 Bucket
自身并不保存时间窗口信息,所以 Sentinel 给 Bucket
加了一个包装类 WindowWrap
。
Bucket
用于统计各项指标数据,WindowWrap
用于记录 Bucket
的时间窗口信息(窗口的开始时间、窗口的大小),WindowWrap
数组就是一个滑动窗口。
// 样本窗口实例,泛型T为 MetricBucket
public class WindowWrap<T> {
/**
* Time length of a single window bucket in milliseconds.
*/
// 样本窗口的长度
private final long windowLengthInMs;
/**
* Start timestamp of the window in milliseconds.
*/
// 样本窗口的起始时间戳
private long windowStart;
/**
* Statistic data.
*/
// 当前样本窗口中的统计数据,其类型为MetricBucket
private T value;
}
总的来说:
WindowWrap
用于包装Bucket
,随着Bucket
一起创建。WindowWrap
数组实现滑动窗口,Bucket
只负责统计各项指标数据,WindowWrap
用于记录Bucket
的时间窗口信息。- 定位
Bucket
实际上是定位WindowWrap
,拿到WindowWrap
就能拿到Bucket
。
2. 滑动窗口实现
如果我们希望能够知道某个接口的每秒处理成功请求数(成功 QPS)、每秒处理失败请求数(失败 QPS),以及处理每个成功请求的平均耗时(avg RT),注意这里我们只需要控制 Bucket 统计一秒钟的指标数据即可,但如何才能确保 Bucket
存储的就是精确到 1 秒内的数据呢?
Sentinel 是这样实现的:定义一个 Bucket
数组,根据时间戳来定位到数组的下标。
由于只需要保存最近一分钟的数据。那么 Bucket
数组的大小就可以设置为 60
,每个 Bucket
的 windowLengthInMs
(窗口时间)大小就是 1 秒
。内存资源是有限的,而这个数组可以循环使用,并且永远只保存最近 1 分钟
的数据,这样可以避免频繁的创建 Bucket
,减少内存资源的占用
。
那如何定位 Bucket 呢
?我们只需要将当前时间戳减去毫秒部分,得到当前的秒数,再将得到的秒数与数组长度取余数,就能得到当前时间窗口的 Bucket 在数组中的位置(索引)。
calculateTimeIdx
方法中,取余数就是实现循环利用数组。如果想要获取连续的一分钟的 Bucket 数据,就不能简单的从头开始遍历数组,而是指定一个开始时间和结束时间,从开始时间戳开始计算 Bucket 存放在数组中的下标,然后循环每次将开始时间戳加上 1 秒,直到开始时间等于结束时间。
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % array.length());
}
由于循环使用的问题,当前时间戳与一分钟之前的时间戳和一分钟之后的时间戳都会映射到数组中的同一个 Bucket
,因此,必须要能够判断取得的 Bucket
是否是统计当前时间窗口内的指标数据,这便要数组每个元素都存储 Bucket
时间窗口的开始时间戳。
- 比如当前时间戳是 1577017626812,
Bucket
统计一秒的数据,将时间戳的毫秒部分全部替换为 0,就能得到Bucket
时间窗口的开始时间戳为 1577017626000。
//计算时间窗口开始时间戳
protected long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
//判断时间戳是否在当前时间窗口内
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}
2.1 通过时间戳定位 Bucket
当接收到一个请求时,可根据接收到请求的时间戳计算出一个数组索引,从滑动窗口(WindowWrap
数组)中获取一个 WindowWrap
,从而获取 WindowWrap
包装的 Bucket
,调用 Bucket
的 add
方法记录相应的事件。
/**
* 根据时间戳获取bucket
* @param timeMillis 时间戳 毫秒
* @return 如果时间有效,则在提供的时间戳处显示当前存储桶,如果时间无效,则为空
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// todo 计算当前时间所在的样本窗口id,即在计算数组LeapArray中的索引
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
// todo 计算当前样本窗口的开始时间点
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
// 从数组中死循环查找当前的时间窗口,因为可能多个线程都在获取当前时间窗口
while (true) {
// 获取到当前时间所在的样本窗口
WindowWrap<T> old = array.get(idx);
// 一般是项目启动时,时间未到达一个周期,数组还没有存储满,没有到复用阶段,所以数组元素可能为空
// 若当前时间所在样本窗口为null,则创建一个
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
// 创建时间窗, 创建新的 bucket,并创建一个 bucket 包装器
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// 通过CAS方式将新建窗口放入到array,确保线程安全,期望数组下标的元素是空的,否则就不写入,而是复用
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
// 若当前样本窗口的起始时间 与 计算出的样本窗口起始时间点相同,则就是我们想要的bucket
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
// 若当前样本窗口的起始时间 大于 计算出的样本窗口起始时间点
// 说明计算出的样本窗口已经过时了
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
// todo 替换掉老的样本窗口 重置 bucket,并指定 bucket 的新时间窗口的开始时间
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
// 若当前样本窗口的起始时间 大于 计算出的样本窗口起始时间点
// 这种情况一般不会出现,因为时间不会倒流,除 非人为修改了系统时钟
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
上面代码的实现是:通过当前时间戳计算出当前时间窗口的 (new) Bucket 在数组中的索引,通过索引从数组中取得 (old) Bucket;并计算 (new) Bucket 时间窗口的开始时间,与 (old) Bucket 时间窗口的开始时间作对比:
- 如果旧的
bucket
不存在,那么我们在windowStart
处创建一个新的bucket
,然后尝试通过CAS
操作更新环形数组。只有一个线程可以成功更新,保证原子性。 - 如果当前
windowStart
等于旧桶的开始时间戳,表示时间在桶内,所以直接返回桶。 - 如果旧桶的开始时间戳落后于所提供的时间,这意味着桶已弃用,我们可以复用该桶,并将桶重置为当前的
windowStart
。注意重置和清理操作很难是原子的,所以我们需要一个更新锁来保证桶更新的正确性。只有当bucket
已弃用才会上锁,所以在大多数情况下它不会导致性能损失。 - 不应该到这里,因为提供的时间已经落后了,一般是时钟回拨导致的。
3. StatisticSlot
3.1 entry()
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
// todo 调用SlotChain 中后续的所有Slot,完成所有的规则校验
// 其在执行过程中可能会抛出异常,例如,规则校验未通过,抛出BlockException
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
// 代码能走到这里,说明前面所有的规则校验全部通过,此时可以将该请求统计到响应的数据中
// 增加线程数量
node.increaseThreadNum();
// 增加线程的请求数
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
调用SlotChain 中后续的所有Slot,完成所有的规则校验。
我们来看node.addPassRequest(count);
方法:
@Override
public void addPassRequest(int count) {
// 增加当前入口的DefaultNode中的统计数据
super.addPassRequest(count);
// 增加当前资源ClusterNode的全局统计数据
this.clusterNode.addPassRequest(count);
}
com.alibaba.csp.sentinel.node.StatisticNode#addPassRequest
@Override
public void addPassRequest(int count) {
// 滑动计数器增加本次访问的数据
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#addPass
@Override
public void addPass(int count) {
// 获取当前时间点所在的样本窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 将当前请求的计数量 添加到当前样本窗口的统计数据中
wrap.value().addPass(count);
}
其中data.currentWindow();
在前面2.1小结已经分析过了。我们来看一下MetricBucket#addPassaddPass
方法:
public void addPass(int n) {
// 向pass 维度的增加统计数据
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
LongAdder
定义一个LongAdder类型的成员变量counter数组来进行窗口内数据的统计。JDK1.8时,java.util.concurrent.atomic
包中提供了一个新的原子类:LongAdder
。
根据Oracle官方文档的介绍,LongAdder在高并发的场景下会比它的前辈————AtomicLong 具有更好的性能,代价是消耗更多的内存空间:
我们知道,AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。
LongAdder的基本思路就是分散热点
,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
这种做法有没有似曾相识的感觉?没错,ConcurrentHashMap中的“分段锁”其实就是类似的思路。
3.2 exit()
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
Node node = context.getCurNode();
if (context.getCurEntry().getBlockError() == null) {
// Calculate response time (use completeStatTime as the time of completion).
long completeStatTime = TimeUtil.currentTimeMillis();
context.getCurEntry().setCompleteTimestamp(completeStatTime);
long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();
Throwable error = context.getCurEntry().getError();
// Record response time and success count.
recordCompleteFor(node, count, rt, error);
recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
// 集群node添加rt与减少并发线程数
if (resourceWrapper.getEntryType() == EntryType.IN) {
recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
}
}
// Handle exit event with registered exit callback handlers.
// 回调
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
// fix bug https://github.com/alibaba/Sentinel/issues/2374
fireExit(context, resourceWrapper, count, args);
}
private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {
if (node == null) {
return;
}
// 记录rt(响应时间)
node.addRtAndSuccess(rt, batchCount);
// 减少并发线程数
node.decreaseThreadNum();
// 阻塞数+1
if (error != null && !(error instanceof BlockException)) {
node.increaseExceptionQps(batchCount);
}
}
在该方法中,会计算平响,减少线程数等操作。
参考文章
转载自:https://juejin.cn/post/7149036319635669028