Sentinel源码9-系统保护SystemSlot和权限控制AuthoritySlot
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
1. SystemSlot
Sentinel 系统自适应限流从整体维度对应用入口流量进行控制,结合应用的 Load、CPU 使用率、总体平均 RT、入口 QPS 和并发线程数等几个维度的监控指标,通过自适应的流控策略,让系统的入口流量和系统的负载达到一个平衡
,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
Sentinel系统自适应限流参考了TCP BBR
实现,根据系统能处理的请求和允许进来的请求来做一个平衡,而不是通过一个系统load来做限流,它最终的目的是在系统不被拖垮的情况下,提高系统吞吐量,而不是load一定要低于某个阈值
1.1 系统规则
系统保护规则是从应用级别的入口流量进行控制,从单台机器的 load、CPU 使用率、平均 RT、入口 QPS 和并发线程数等几个维度监控应用指标,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
系统保护规则是应用整体维度
的,而不是资源维度的,并且仅对入口流量生效
。入口流量指的是进入应用的流量(EntryType.IN
),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。
指标 | 说明 |
---|---|
Load 自适应(仅对 Linux/Unix-like 机器生效) | 系统的 load1 作为启发指标,进行自适应系统保护。当系统 load1 超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才会触发系统保护(BBR 阶段)。系统容量由系统的 maxQps * minRt 估算得出。设定参考值一般是 CPU cores * 2.5。 |
CPU usage(1.5.0+ 版本) | 当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0),比较灵敏。 |
平均 RT | 当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。 |
并发线程数 | 当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。 |
入口 QPS | 当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。 |
1.2 原理
我们把系统处理请求的过程想象为一个水管,到来的请求是往这个水管灌水,当系统处理顺畅的时候,请求不需要排队,直接从水管中穿过,这个请求的RT是最短的;反之,当请求堆积的时候,那么处理请求的时间则会变为:
排队时间 + 最短处理时间
。
如果我们能够保证水管里的水量,能够让水顺畅的流动,则不会增加排队的请求;也就是说,这个时候的系统负载不会进一步恶化。
我们用 T
来表示(水管内部的水量),用RT
来表示请求的处理时间,用P来表示进来的请求数,那么一个请求从进入水管道到从水管出来,这个水管会存在 P * RT
个请求。换一句话来说,当 T ≈ QPS * Avg(RT)
的时候,我们可以认为系统的处理能力和允许进入的请求个数达到了平衡,系统的负载不会进一步恶化。
接下来的问题是,水管的水位是可以达到了一个平衡点,但是这个平衡点只能保证水管的水位不再继续增高,但是还面临一个问题,就是在达到平衡点之前,这个水管里已经堆积了多少水。如果之前水管的水已经在一个量级了,那么这个时候系统允许通过的水量可能只能缓慢通过,RT会大,之前堆积在水管里的水会滞留;反之,如果之前的水管水位偏低,那么又会浪费了系统的处理能力。
当保持入口的流量是水管出来的流量的最大的值的时候,可以最大利用水管的处理能力。
1.3 自适应限流使用例子
public class SystemGuardDemo {
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 100;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
tick();
//初始化规则参数
initSystemRule();
//开启线程执行流量调用
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Entry entry = null;
try {
entry = SphU.entry("methodA", EntryType.IN);
pass.incrementAndGet();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
// ignore
}
} catch (BlockException e1) {
block.incrementAndGet();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
// ignore
}
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
}
}
});
entryThread.setName("working-thread");
entryThread.start();
}
}
private static void initSystemRule() {
List<SystemRule> rules = new ArrayList<SystemRule>();
SystemRule rule = new SystemRule();
// max load is 3
rule.setHighestSystemLoad(3.0);
// max cpu usage is 60%
rule.setHighestCpuUsage(0.6);
// max avg rt of all request is 10 ms
rule.setAvgRt(10);
// max total qps is 20
rule.setQps(20);
// max parallel working thread is 10
rule.setMaxThread(10);
rules.add(rule);
SystemRuleManager.loadRules(Collections.singletonList(rule));
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + ", " + TimeUtil.currentTimeMillis() + ", total:"
+ oneSecondTotal + ", pass:"
+ oneSecondPass + ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
System.exit(0);
}
}
}
1.4 源码分析
1.4.1 自适应限流入口 SystemSlot
@Spi(order = Constants.ORDER_SYSTEM_SLOT)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// todo entry方法中调用SystemRuleManager.checkSystem方法,这里是自适应限流的关键点
SystemRuleManager.checkSystem(resourceWrapper, count);
// 在职责链上继续调用下一个slot节点。
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
1.4.2 SystemRuleManager
1.4.2.1 类图
sentinel自适应限流通过SystemRuleManager类来实现,它里面封装了BBR算法的实现,以及系统指标的采集,接下来我们看下它的类图以及核心属性。
我们来看下SystemRuleManager
的一些属性:
//系统最大负载
private static volatile double highestSystemLoad = Double.MAX_VALUE;
/**
* cpu usage, between [0, 1]
*/
//CPU使用率,介于[0,1]之间
private static volatile double highestCpuUsage = Double.MAX_VALUE;
private static volatile double qps = Double.MAX_VALUE;
//最大延迟
private static volatile long maxRt = Long.MAX_VALUE;
//最大线程数
private static volatile long maxThread = Long.MAX_VALUE;
//采集系统cpu load、cpu使用率的实现。
private static SystemStatusListener statusListener = null;
1.4.2.2 核心方法源码分析
-
checkSystem
:public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException { // 检查资源是否为空,如果为空直接返回 if (resourceWrapper == null) { return; } // Ensure the checking switch is on. // 判断系统自适应限流是否开启,未开启直接返回。 if (!checkSystemStatus.get()) { return; } // for inbound traffic only // 判断资源的流量是否为入口流量,Sentinel系统自适应限流只对入口流量生效 if (resourceWrapper.getEntryType() != EntryType.IN) { return; } // total qps // 从Constants.ENTRY_NODE获取当前qps,如果当前qps大于SystemRule配置的阈值,直接抛SystemBlockException异常 double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.passQps(); if (currentQps + count > qps) { throw new SystemBlockException(resourceWrapper.getName(), "qps"); } // total thread // 从Constants.ENTRY_NODE获取当前thread,如果当前thread大于SystemRule配置的阈值,直接抛SystemBlockException 异常 int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum(); if (currentThread > maxThread) { throw new SystemBlockException(resourceWrapper.getName(), "thread"); } // 从Constants.ENTRY_NODE获取当前avgRT,如果当前avgRT大于SystemRule配置的阈值,直接抛SystemBlockException异常 double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt(); if (rt > maxRt) { throw new SystemBlockException(resourceWrapper.getName(), "rt"); } // load. BBR algorithm. // 进行bbr算法校验 // 校验系统负载开关是否打开,当前系统load是否大于配置的系统load,如果都满足则继续校验 if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) { // 调用checkBbr方法,之前我们有说过系统通过流量:T ≈ QPS * Avg(RT)时 // 我们可以认为系统的处理能力和允许进入的请求个数达到了平衡,所以checkBbr方法计算的公式以秒为单位:T=QPS*RT/1000。 // 如果当前线程数大于T,则进行拦截 if (!checkBbr(currentThread)) { throw new SystemBlockException(resourceWrapper.getName(), "load"); } } // cpu usage // 判断当前CPU使用率是否大于SystemRule配置的阈值,如果是则抛出SystemBlockException异常 if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) { throw new SystemBlockException(resourceWrapper.getName(), "cpu"); } } //bbr算法 private static boolean checkBbr(int currentThread) { if (currentThread > 1 && currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) { return false; } return true; }
-
Constants.ENTRY_NODE
- 自适应限流使用的是全局的ClusterNode节点,这就是说自适应限流的维度是整个系统。
public final static ClusterNode ENTRY_NODE = new ClusterNode(TOTAL_IN_RESOURCE_NAME, ResourceTypeConstants.COMMON);
-
系统指标采集运行
@SuppressWarnings("PMD.ThreadPoolCreationRule") private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("sentinel-system-status-record-task", true)); static { checkSystemStatus.set(false); statusListener = new SystemStatusListener(); scheduler.scheduleAtFixedRate(statusListener, 0, 1, TimeUnit.SECONDS); currentProperty.addListener(listener); }
SystemRuleManager
类中定义了ScheduledExecutorService
线程池,在静态块里面触发SystemStatusListener
类的运行,运行时间是1秒钟一次,这表示Sentinel
的自适应保护信息采集为1秒钟采集系统load、cpu
信息。
1.4.3 系统指标采集源码分析
SystemStatusListener
类图:
该类实现了runnable
接口,他通过一个线程每隔一秒执行一次load、cpu usage
信息的采集。
- 源码分析
public class SystemStatusListener implements Runnable {
volatile double currentLoad = -1;
volatile double currentCpuUsage = -1;
volatile String reason = StringUtil.EMPTY;
volatile long processCpuTime = 0;
volatile long processUpTime = 0;
public double getSystemAverageLoad() {
return currentLoad;
}
public double getCpuUsage() {
return currentCpuUsage;
}
@Override
public void run() {
try {
// Sentinel通过jdk:ManagementFactory类获取系统load、cpu等信息。
OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
// 获取当前系统load。
currentLoad = osBean.getSystemLoadAverage();
/*
* Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad()}:</br>
* Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval.
* A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value
* of 1.0 means that all CPUs were actively running 100% of the time during the recent period being
* observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the
* system. If the system recent cpu usage is not available, the method returns a negative value.
*/
// 获取当前系统cpu usage
double systemCpuUsage = osBean.getSystemCpuLoad();
// calculate process cpu usage to support application running in container environment
RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class);
// 获取系统cpu运行时间
long newProcessCpuTime = osBean.getProcessCpuTime();
// 获取当前jvm cpu运行时间
long newProcessUpTime = runtimeBean.getUptime();
// 获取系统cpu核心数
int cpuCores = osBean.getAvailableProcessors();
long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS
.toMillis(newProcessCpuTime - processCpuTime);
long processUpTimeDiffInMs = newProcessUpTime - processUpTime;
// 计算CPU使用率
double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;
processCpuTime = newProcessCpuTime;
processUpTime = newProcessUpTime;
currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);
if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {
writeSystemStatusLog();
}
} catch (Throwable e) {
RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);
}
}
private void writeSystemStatusLog() {
StringBuilder sb = new StringBuilder();
sb.append("Load exceeds the threshold: ");
sb.append("load:").append(String.format("%.4f", currentLoad)).append("; ");
sb.append("cpuUsage:").append(String.format("%.4f", currentCpuUsage)).append("; ");
sb.append("qps:").append(String.format("%.4f", Constants.ENTRY_NODE.passQps())).append("; ");
sb.append("rt:").append(String.format("%.4f", Constants.ENTRY_NODE.avgRt())).append("; ");
sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append("; ");
sb.append("success:").append(String.format("%.4f", Constants.ENTRY_NODE.successQps())).append("; ");
sb.append("minRt:").append(String.format("%.2f", Constants.ENTRY_NODE.minRt())).append("; ");
sb.append("maxSuccess:").append(String.format("%.2f", Constants.ENTRY_NODE.maxSuccessQps())).append("; ");
RecordLog.info(sb.toString());
}
}
1.5 Sentinel后台配置
配置系统保护规则:
那么,后台配置是怎么映射进入的呢?
在 SystemRuleManager
中,有一个静态方法(loadRules(List<SystemRule> rules)
)去初始化SystemRule配置。
public static void loadRules(List<SystemRule> rules) {
currentProperty.updateValue(rules);
}
而在更新currentProperty
的时候,实质是通知观察者去更新,这里使用的是观察者模式
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
//观察者
protected Set<PropertyListener<T>> listeners = Collections.synchronizedSet(new HashSet<PropertyListener<T>>());
//更新值
@Override
public boolean updateValue(T newValue) {
//如果两个值一样,则返回false,不修改
if (isEqual(value, newValue)) {
return false;
}
value = newValue;
//通知各个观察者
for (PropertyListener<T> listener : listeners) {
listener.configUpdate(newValue);
}
return true;
}
//判断两个对象是否一致
private boolean isEqual(T oldValue, T newValue) {
if (oldValue == null && newValue == null) {
return true;
}
if (oldValue == null) {
return false;
}
return oldValue.equals(newValue);
}
public void close() {
listeners.clear();
}
}
这里的观察者为**SystemPropertyListener
**,在SystemRuleManager
的静态方法区已经添加进去。
public class SystemRuleManager {
//观察者,当systemRule配置发生变更时,会通知该Listener
private final static SystemPropertyListener listener = new SystemPropertyListener();
static {
checkSystemStatus.set(false);
statusListener = new SystemStatusListener();
scheduler.scheduleAtFixedRate(statusListener, 5, 1, TimeUnit.SECONDS);
//添加观察者
currentProperty.addListener(listener);
}
}
而在SystemPropertyListener
更新的时候,会先关闭系统检查,当配置修改完成之后,再启用。
static class SystemPropertyListener extends SimplePropertyListener<List<SystemRule>> {
@Override
public void configUpdate(List<SystemRule> rules) {
//恢复到默认状态
restoreSetting();
if (rules != null && rules.size() >= 1) {
for (SystemRule rule : rules) {
//加载配置
loadSystemConf(rule);
}
} else {
checkSystemStatus.set(false);
}
}
//重置配置信息
protected void restoreSetting() {
checkSystemStatus.set(false);
// should restore changes
highestSystemLoad = Double.MAX_VALUE;
highestCpuUsage = Double.MAX_VALUE;
maxRt = Long.MAX_VALUE;
maxThread = Long.MAX_VALUE;
qps = Double.MAX_VALUE;
highestSystemLoadIsSet = false;
maxRtIsSet = false;
maxThreadIsSet = false;
qpsIsSet = false;
}
}
修改则判断是否小于默认配置,由于之前已经重新初始化过了,所以如果有修改,肯定会比默认的值小。
public static void loadSystemConf(SystemRule rule) {
boolean checkStatus = false;
if (rule.getHighestSystemLoad() >= 0) {
highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
highestSystemLoadIsSet = true;
checkStatus = true;
}
if (rule.getHighestCpuUsage() >= 0) {
highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
highestCpuUsageIsSet = true;
checkStatus = true;
}
if (rule.getAvgRt() >= 0) {
maxRt = Math.min(maxRt, rule.getAvgRt());
maxRtIsSet = true;
checkStatus = true;
}
if (rule.getMaxThread() >= 0) {
maxThread = Math.min(maxThread, rule.getMaxThread());
maxThreadIsSet = true;
checkStatus = true;
}
if (rule.getQps() >= 0) {
qps = Math.min(qps, rule.getQps());
qpsIsSet = true;
checkStatus = true;
}
checkSystemStatus.set(checkStatus);
}
配置加载完毕,如果有SystemRule
配置,则将checkSystemStatus
改为true
。
1.6 总结
经过以上源码分析,我们可以得出几点结论:
- Sentinel自适应限流原理采用了BBR算法
- Sentinel自适应限流维度是整个系统,当系统负载过大时会触发限流。
- Sentinel自适应限流信息采集指标为1秒钟一次。
- Sentinel采集的指标是操作系统级别的,所以要使用自适应限流的话,建议应用服务器不要部署其他负载很高的服务。
2. AuthoritySlot
AuthorizationSlot
则根据黑白名单,来做黑白名单控制;如果该resource配置了AuthorityRule
,则根据策略判断该资源请求的请求来源(origin)是否在配置规则LimitApp
中( (,)隔开
)和策略判断,是否检查通过。
-
如果是白名单
- 判断origin是否在
limitApp
中,如果在,则返回true,否则返回false
- 判断origin是否在
-
如果为黑名单
- 判断origin是否在
limitApp
中,如果在,则返回false,否则返回true
- 判断origin是否在
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
//检查黑白名单
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
//获取认证的规则
Map<String, List<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
//根据resourceName获取该资源下对应的规则
List<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
//认证检查
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
}
检查逻辑在AuthorityRuleChecker
:
final class AuthorityRuleChecker {
static boolean passCheck(AuthorityRule rule, Context context) {
String requester = context.getOrigin();
// 获取orgin请求来源,如果为请求来源为null或者limitApp为null则直接返回通过
if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
return true;
}
//判断limitApp是否含有origin
int pos = rule.getLimitApp().indexOf(requester);
boolean contain = pos > -1;
if (contain) {
boolean exactlyMatch = false;
String[] appArray = rule.getLimitApp().split(",");
for (String app : appArray) {
if (requester.equals(app)) {
exactlyMatch = true;
break;
}
}
contain = exactlyMatch;
}
//根据策略处理是否包含,判断是否通过
int strategy = rule.getStrategy();
if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
return false;
}
if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
return false;
}
return true;
}
private AuthorityRuleChecker() {}
}
AuthorityRule
的配置更新和SystemSlot
一样,更新依赖于AuthorityRuleManager
的loadRules
方法。
2.1 Sentinel后台配置
在Sentinel的1.8版本的源码中,由于在创建Context的时候orion默认是空串,如果我们想验证权限控制,需要将这个改成 具体的orion来源,代码如下:
com.alibaba.csp.sentinel.CtSph.InternalContextUtil#internalEnter(java.lang.String)
:
参考文章
Sentinel1.8.5源码github地址(注释) Sentinel源码解析 Sentinel官网 Setinel源码阅读:系统自适应保护 深入浅出之原理篇AuthoritySlot
转载自:https://juejin.cn/post/7150074488195907615