Sentinel源码分析-StatisticsSlot统计分析
统计维度
StatisticsSlot,顾名思义是统计模块,里有两个统计维度,一个是秒级别的,一个是分钟级别的。每个统计周期内统计6种信息。
统计维度 | 分钟 | 秒 |
---|---|---|
统计周期 | 1min | 1s |
窗口数量 | 60个 | 2个 |
窗口大小(统计周期/窗口大小) | 1s | 500ms |
entry方法
之前的文章讲过Sentinel的执行流程,每个Slot的执行入口是entry。我们看下StatisticsSlot方法的entry做了什么。
- StatisticsSlot节点是slotChain上最后执行的步骤,从fireEntry出来说明是经过前面那一系列检查的,是被允许的请求。
- 在各式各样的分支里面 increaseThreadNum、addPassRequest、increaseBlockQps,分别记载当前时间窗口内通过的线程、请求数和Block数。
- 执行一些回调函数,回调函数的注册在Env.sph的静态代码里。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
下面这三个方法就是统计的入口:
- increaseThreadNum比较简单,curThreadNum 是一个LongAdder 对象,直接加就可以了。
- addPass和addBlock,我们可以看到有Second和Minute两个Counter分别进行add(count)验证了上面说的两个统计周期。
public void increaseThreadNum() {
curThreadNum.increment();
}
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
public void increaseBlockQps(int count) {
rollingCounterInSecond.addBlock(count);
rollingCounterInMinute.addBlock(count);
}
- 两种信息的统计最终都是调用的下面的方法,只是传入的MetricEvent不同而已。MetricEvent枚举有6种类型,就是上面说的6种统计信息。
LongAdder[] counters;
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
public enum MetricEvent {
//Normal pass.
PASS,
//Normal block.
BLOCK,
EXCEPTION,
SUCCESS,
RT,
//Passed in future quota (pre-occupied, since 1.5.0).
OCCUPIED_PASS
}
上面只是粗略的看到了数据最终的add逻辑,它是怎么加到对应的统计周期呢?每个统计周期的滑动窗口是怎样的呢?
看来看一个图:
- StatisticsNode就是责任链中的统计节点。
- Node里面有两个统计周期,对应两个ArrayMetric,分别是 分钟和秒 的ArrayMetric。
- 每个ArrayMetric里面有一个data字段,秒统计周期的data是OccupiableBucketLeapArray,分钟统计周期的data是BucketLeapArray,它们都继承自LeapArray,具体的差异后面再讲。
- 在构造ArrarMetric的时候,两个统计周期传入的参数是不同的,也就决定了统计周期及窗口的大小也是不同的。
- ArrarMetric构造函数,第一个参数是统计周期内的样本数量即窗口数量,第二个参数是统计周期的大小
- ArrarMetric的构造函数最终调的LeapArray的构造函数,各自生成了一个存储时间窗口的数据结构array,array的容量大小就是窗口数量。
//秒级统计周期,一个统计周期2个窗口,统计周期为100ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2,1000);
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
//分钟级统计周期,一个统计周期60个窗口,统计周期为60 * 1000ms 即 1min。
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
// 实际调用的这里。
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
//LeapArray的构造方法
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
//这里是存储时间窗口的数据结构,array的长度就是统计周期的窗口大小。
this.array = new AtomicReferenceArray<>(sampleCount);
}
addPass
理解了上面的逻辑之后,我们来看一下addPass:
- 获取当前的时间窗口。
- 当前窗口内的统计数据加上count。
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
currentWindow
windowLengthInMs = 时间周期大小(ms) , array.length()即统计周期内的窗口个数。
- 可以认为 TimeUtil.currentTimeMillis() 等于当前时间点。
- 获取当前时间对应的窗口索引 idx = (timeMillis / windowLengthInMs) % array.length()
- 获取当前时间对应的窗口的开始时间 windowStart = timeMillis - timeMillis % windowLengthInMs;
- 根据index查询当前窗口 old = array.get(idx),array就是上面在LeapArray构造方法中生成的。
- 如果window 为空,手动构造一个,注意这里可能有多个线程,所以用cas在while中进行替换,cas失败后线程让出CPU使用资源。
- 如果windowStart = old.windowStart(),说明就是我们希望获取的窗口。
- 如果 windowStart > old.windowStart(),说明当前时间可能已经大于上一个窗口一圈或多圈了。因为array是根据索引来获取数据,是循环利用的。此时需要将窗口重置,秒周期和分钟周期在窗口重置这里逻辑有差异。
- 如果 windowStart < old.windowStart(),注意代码里已经写了,代码不可能走到那里。因为过去的时间不可能大于现在的时间,除非时钟错乱了。
- 统计周期内的值怎么计算呢?就是把所有窗口的值加起来。
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
上面的第7条,在窗口重置时:分钟级的统计周期,直接把那个过期窗口的数据丢掉即可。 秒级统计周期的这么做合适吗?这个统计周期只有两个窗口,直接丢掉一半,怎么想也不合适。
设想一个场景,一个资源访问的 QPS 稳定是 10,且请求是均匀分布的,在1秒区间,通过了 10 个请求,我们在 1.1 秒的时候,观察到的 QPS 就是 5,因为第一个时间窗口数据被重置了,只有第二个时间窗口有值。
在秒级周期统计里面进行窗口重置的时候,将borrowArray 里面的数据也加了进来,borrowArray 是在创建OccupiableBucketLeapArray的时候生成的。它代表未来可用的资源数。
我理解是这样的:
由于秒级周期的窗口只有2个,重置1个对它的影响很大,于是它在LeapArray上又添加了两个窗口,用于放置未来可通过的请求,这样当第1个窗口重置后,可以将第2、3个窗口作为一个统计周期。
borrowArray的数据添加逻辑追溯到了FlowSlot里面。后面看到它的时候一起补充这里的逻辑。
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
// Update the start time and reset value.
w.resetTo(time);
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
w.value().reset();
w.value().addPass((int)borrowBucket.pass());
} else {
w.value().reset();
}
return w;
}
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "CombinedBucketArray".
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}
本文到此结束!!!理解不对的欢迎指正。
转载自:https://juejin.cn/post/7208092574162321466