likes
comments
collection
share

Sentinel源码分析-StatisticsSlot统计分析

作者站长头像
站长
· 阅读数 11

统计维度

StatisticsSlot,顾名思义是统计模块,里有两个统计维度,一个是秒级别的,一个是分钟级别的。每个统计周期内统计6种信息。

统计维度分钟
统计周期1min1s
窗口数量60个2个
窗口大小(统计周期/窗口大小)1s500ms

entry方法

之前的文章讲过Sentinel的执行流程,每个Slot的执行入口是entry。我们看下StatisticsSlot方法的entry做了什么。

  1. StatisticsSlot节点是slotChain上最后执行的步骤,从fireEntry出来说明是经过前面那一系列检查的,是被允许的请求。
  2. 在各式各样的分支里面 increaseThreadNum、addPassRequest、increaseBlockQps,分别记载当前时间窗口内通过的线程、请求数和Block数。
  3. 执行一些回调函数,回调函数的注册在Env.sph的静态代码里。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    try {
        // Do some checking.
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        // Request passed, add thread count and pass count.
        node.increaseThreadNum();
        node.addPassRequest(count);

        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException ex) {
        node.increaseThreadNum();
        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (BlockException e) {
        // Blocked, set block exception to current entry.
        context.getCurEntry().setBlockError(e);

        // Add block count.
        node.increaseBlockQps(count);
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseBlockQps(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseBlockQps(count);
        }

        // Handle block event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onBlocked(e, context, resourceWrapper, node, count, args);
        }

        throw e;
    } catch (Throwable e) {
        // Unexpected internal error, set error to current entry.
        context.getCurEntry().setError(e);

        throw e;
    }
}

下面这三个方法就是统计的入口:

  1. increaseThreadNum比较简单,curThreadNum 是一个LongAdder 对象,直接加就可以了。
  2. addPass和addBlock,我们可以看到有Second和Minute两个Counter分别进行add(count)验证了上面说的两个统计周期。
public void increaseThreadNum() {
    curThreadNum.increment();
}

public void addPassRequest(int count) {
    rollingCounterInSecond.addPass(count);
    rollingCounterInMinute.addPass(count);
}

public void increaseBlockQps(int count) {
    rollingCounterInSecond.addBlock(count);
    rollingCounterInMinute.addBlock(count);
}
  1. 两种信息的统计最终都是调用的下面的方法,只是传入的MetricEvent不同而已。MetricEvent枚举有6种类型,就是上面说的6种统计信息。
LongAdder[] counters;

public MetricBucket add(MetricEvent event, long n) {
    counters[event.ordinal()].add(n);
    return this;
}
public enum MetricEvent {
    //Normal pass.
    PASS,
    //Normal block.
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    //Passed in future quota (pre-occupied, since 1.5.0).
    OCCUPIED_PASS
}

上面只是粗略的看到了数据最终的add逻辑,它是怎么加到对应的统计周期呢?每个统计周期的滑动窗口是怎样的呢?

看来看一个图:

  1. StatisticsNode就是责任链中的统计节点。
  2. Node里面有两个统计周期,对应两个ArrayMetric,分别是 分钟和秒 的ArrayMetric。
  3. 每个ArrayMetric里面有一个data字段,秒统计周期的data是OccupiableBucketLeapArray,分钟统计周期的data是BucketLeapArray,它们都继承自LeapArray,具体的差异后面再讲。
  4. 在构造ArrarMetric的时候,两个统计周期传入的参数是不同的,也就决定了统计周期及窗口的大小也是不同的。
  5. ArrarMetric构造函数,第一个参数是统计周期内的样本数量即窗口数量,第二个参数是统计周期的大小
  6. ArrarMetric的构造函数最终调的LeapArray的构造函数,各自生成了一个存储时间窗口的数据结构array,array的容量大小就是窗口数量。

Sentinel源码分析-StatisticsSlot统计分析

//秒级统计周期,一个统计周期2个窗口,统计周期为100ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2,1000);
public ArrayMetric(int sampleCount, int intervalInMs) {
    this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}

//分钟级统计周期,一个统计周期60个窗口,统计周期为60 * 1000ms 即 1min。
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
    if (enableOccupy) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    } else {
        // 实际调用的这里。
        this.data = new BucketLeapArray(sampleCount, intervalInMs);
    }
}

//LeapArray的构造方法
public LeapArray(int sampleCount, int intervalInMs) {
    AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
    AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
    AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

    this.windowLengthInMs = intervalInMs / sampleCount;
    this.intervalInMs = intervalInMs;
    this.intervalInSecond = intervalInMs / 1000.0;
    this.sampleCount = sampleCount;
    //这里是存储时间窗口的数据结构,array的长度就是统计周期的窗口大小。
    this.array = new AtomicReferenceArray<>(sampleCount);
}

addPass

理解了上面的逻辑之后,我们来看一下addPass:

  1. 获取当前的时间窗口。
  2. 当前窗口内的统计数据加上count。
public void addPass(int count) {
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    wrap.value().addPass(count);
}

currentWindow

windowLengthInMs = 时间周期大小(ms) , array.length()即统计周期内的窗口个数。

  1. 可以认为 TimeUtil.currentTimeMillis() 等于当前时间点。
  2. 获取当前时间对应的窗口索引 idx = (timeMillis / windowLengthInMs) % array.length()
  3. 获取当前时间对应的窗口的开始时间 windowStart = timeMillis - timeMillis % windowLengthInMs;
  4. 根据index查询当前窗口 old = array.get(idx),array就是上面在LeapArray构造方法中生成的。
  5. 如果window 为空,手动构造一个,注意这里可能有多个线程,所以用cas在while中进行替换,cas失败后线程让出CPU使用资源。
  6. 如果windowStart = old.windowStart(),说明就是我们希望获取的窗口。
  7. 如果 windowStart > old.windowStart(),说明当前时间可能已经大于上一个窗口一圈或多圈了。因为array是根据索引来获取数据,是循环利用的。此时需要将窗口重置,秒周期和分钟周期在窗口重置这里逻辑有差异。
  8. 如果 windowStart < old.windowStart(),注意代码里已经写了,代码不可能走到那里。因为过去的时间不可能大于现在的时间,除非时钟错乱了。
  9. 统计周期内的值怎么计算呢?就是把所有窗口的值加起来。
public WindowWrap<T> currentWindow() {
    return currentWindow(TimeUtil.currentTimeMillis());
}

public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }

    int idx = calculateTimeIdx(timeMillis);
    // Calculate current bucket start time.
    long windowStart = calculateWindowStart(timeMillis);
    while (true) {
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            return old;
        } else if (windowStart > old.windowStart()) {
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

上面的第7条,在窗口重置时:分钟级的统计周期,直接把那个过期窗口的数据丢掉即可。 秒级统计周期的这么做合适吗?这个统计周期只有两个窗口,直接丢掉一半,怎么想也不合适。

设想一个场景,一个资源访问的 QPS 稳定是 10,且请求是均匀分布的,在1秒区间,通过了 10 个请求,我们在 1.1 秒的时候,观察到的 QPS 就是 5,因为第一个时间窗口数据被重置了,只有第二个时间窗口有值。

在秒级周期统计里面进行窗口重置的时候,将borrowArray 里面的数据也加了进来,borrowArray 是在创建OccupiableBucketLeapArray的时候生成的。它代表未来可用的资源数。

我理解是这样的:

由于秒级周期的窗口只有2个,重置1个对它的影响很大,于是它在LeapArray上又添加了两个窗口,用于放置未来可通过的请求,这样当第1个窗口重置后,可以将第2、3个窗口作为一个统计周期。

borrowArray的数据添加逻辑追溯到了FlowSlot里面。后面看到它的时候一起补充这里的逻辑。

protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
    // Update the start time and reset value.
    w.resetTo(time);
    MetricBucket borrowBucket = borrowArray.getWindowValue(time);
    if (borrowBucket != null) {
        w.value().reset();
        w.value().addPass((int)borrowBucket.pass());
    } else {
        w.value().reset();
    }

    return w;
}

public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
    // This class is the original "CombinedBucketArray".
    super(sampleCount, intervalInMs);
    this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}

本文到此结束!!!理解不对的欢迎指正。