Sentinel源码6-流控管理FlowSlot
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
sentinel 触发流控的类为FlowSlot
,在sentinel中触发流控需要经过以下几个步骤:
- 设置流控规则
- 采集调用信息
- 根据调用信息、流控规则决定是否进行限流
1. 前言
我们先来了解下FlowSlot
的类注释,通过注释来认识FlowSlot
类有哪些功能,会做一些什么样的处理:
- 合并了从以前的插槽(
NodeSelectorSlot,ClusterNodeBuilderSlot和StatisticSlot
),FlowSlot
收集的运行时统计信息将使用预设规则来决定是否应阻止传入请求。 - 调用
SphU.entry(resourceName)
方法,如果有任何规则被触发将抛出:CodeFlowException
,用户可以通过捕捉CodeFlowException
自定义自己的逻辑。 - 一个资源可以有多个流规则。
FlowSlot
遍历这些规则,直到触发其中一个规则或遍历了所有规则。 - 每个流控规则主要由以下因素组成:grade、strategy、path。我们可以结合这些因素来达到不同的效果。
2. Rule 配置对象
Sentinel Dashboard添加流控规则操作界面:
FlowRule类图:
属性定义如下:
-
Resource
:资源名称 -
limitApp
:流控规则中的limitApp
字段用于根据调用来源进行流量控制。该字段的值有以下三种选项,分别对应不同的场景:default
:表示不区分调用者,来自任何调用者的请求都将进行限流统计。如果这个资源名的调用总和超过了这条规则定义的阈值,则触发限流。{some_origin_name}
:表示针对特定的调用者,只有来自这个调用者的请求才会进行流量控制。例如 NodeA 配置了一条针对调用者caller1的规则,那么当且仅当来自 caller1 对 NodeA 的请求才会触发流量控制。other
:表示针对除 {some_origin_name} 以外的其余调用方的流量进行流量控制。例如,资源NodeA配置了一条针对调用者 caller1 的限流规则,同时又配置了一条调用者为 other 的规则,那么任意来自非 caller1 对 NodeA 的调用,都不能超过 other 这条规则定义的阈值。
同一个资源名可以配置多条规则,规则的生效顺序为:{some_origin_name} > other > default
-
grade
:限流控制的阈值类型(0:线程数,1:QPS
)。 -
count
:限流控制阈值计数。 -
strategy
:限流控制策略,默认为STRATEGY_DIRECT
-
refResource
:在具有相关资源或上下文的流控制中引用资源。 -
controlBehavior
:流量控制后的采取的行为,默认为DefaultController
-
warmUpPeriodSec
:预热时间,如果controlBehavior
设置为预热(warm up
)时,可以配置其预热时间,默认值 10s -
maxQueueingTimeMs
:最大超时时间,如果 controlBehavior 设置为排队等待时
,等待的最大超时时间,默认为500ms -
clusterMode
:是否是集群限流模式 -
clusterConfig
:集群扩容相关配置
3. FlowSlot 详解
FlowSlot类结构非常简单,就持有一个CheckRule对象和简单的几个方法,我们重点是要关FlowRuleChecker类:
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
public FlowSlot() {
this(new FlowRuleChecker());
}
/**
* Package-private for test.
*
* @param checker flow rule checker
* @since 1.6.1
*/
FlowSlot(FlowRuleChecker checker) {
AssertUtil.notNull(checker, "flow checker should not be null");
this.checker = checker;
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// todo 检测并应用流控规则
checkFlow(resourceWrapper, context, node, count, prioritized);
// 触发下一个Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
// 获取到所有资源的流控规则
// map的key 为资源名称 value为该资源上加载的所有的流控规则
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
// 获取指定资源的流控规则
return flowRules.get(resource);
}
};
}
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker#checkFlow:
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
// todo 获取到指定资源的所有流控规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
// 逐个应用流控规则,若无法通过则抛出异常,后续规则不再应用
for (FlowRule rule : rules) {
// todo
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
FlowRuleChecker#canPassCheck
方法详解:
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 若规则中获取要限定的来源
String limitApp = rule.getLimitApp();
// 若限流的来源为null,则请求直接通过, 如果限流规则没有配置针对来源也就是limitApp为空,则表示不做限制返回true。
// Sentinel默认配置该值为defualt。
if (limitApp == null) {
return true;
}
// 使用规则处理集群流控
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// todo 使用规则处理 单机 流控
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker#passLocalCheck:
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 通过规则形成 选择出的规则node, 根据流控策略选择一个合适的Node。如果未找到则返回true。
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
// 若没有选择出node,说明没有规则,则直接返回true,表示通过检测
if (selectedNode == null) {
return true;
}
// 调用Rule规则中配置的限流控制器来判断是否符合限流规则,最终调用的是TrafficShapingController#canPass方法。
// todo 使用规则进行逐项检测
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
Sentinel有多种不同的流量控制策略:
- 基于qps/并发数的流量控制
- 基于调用关系的流量控制 具体情况可以看官方文档这篇文章很好的解释了以上两种流量控制策略,详细可以了解官方文档。
FlowRuleChecker#selectNodeByRequesterAndStrategy
详解
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// The limit app should not be empty.
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
// 如果限流规则配置的针对的调用方与当前请求实际调用来源匹配(并且不是 default、other)时的处理逻辑
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
// 如果流控规则针对的调用方(limitApp) 配置的为 default,表示对所有的调用源都生效,其获取实时统计节点(Node)的处理逻辑
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
// 如果流控规则针对的调用方为(other),此时需要判断是否有针对当前的流控规则,只要存在,则这条规则对当前资源“失效”,
// 如果针对该资源没有配置其他额外的流控规则,则获取实时统计节点(Node)的处理逻辑为
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
步骤如下:
-
如果限流规则配置的针对的调用方与当前请求实际调用来源匹配(并且不是 default、other)时的处理逻辑,其实现的要点:
- 如果流控模式为 RuleConstant.STRATEGY_DIRECT(直接),则从 context 中获取源调用方所代表的 Node。
- 如果流控模式为 RuleConstant.STRATEGY_RELATE(关联),则从集群环境中获取对应关联资源所代表的 Node,通过(ClusterBuilderSlot会收集每一个资源的实时统计信息,子集群限流时详细介绍)
- 如果流控模式为 RuleConstant.STRATEGY_CHAIN(调用链),则判断当前调用上下文的入口资源与规则配置的是否一样,如果是,则返回入口资源对应的 Node,否则返回 null,注意:返回空则该条流控规则直接通过。【这部分代码,对应代码中的 selectReferenceNode 方法】
-
如果流控规则针对的调用方(limitApp) 配置的为 default,表示对所有的调用源都生效,其获取实时统计节点(Node)的处理逻辑为:
- 如果流控模式为 RuleConstant.STRATEGY_DIRECT,则直接获取本次调用上下文环境对应的节点的ClusterNode。
- 如果是其他流控模式,与代码@2的获取逻辑一样,都是调用 selectReferenceNode 进行获取。
-
如果流控规则针对的调用方为(other),此时需要判断是否有针对当前的流控规则,只要存在,则这条规则对当前资源“失效”,如果针对该资源没有配置其他额外的流控规则,则获取实时统计节点(Node)的处理逻辑为:
- 如果流控模式为 RuleConstant.STRATEGY_DIRECT(直接),则从 context 中获取源调用方所代表的 Node。
- 如果是其他流控模式,与代码@2的获取逻辑一样,都是调用 selectReferenceNode 进行获取。 从这里可以看出,流控规则针对调用方如果设置为 other,表示针对没有配置流控规则的资源。
根据流控策略选择合适的 Node 的逻辑就介绍到这里,如果没有选择到合适的 Node,则针对该流控规则,默认放行
3.1 Sentinel Controller 流控模式
Sentinel Controller类图:
结合上面FlowSlot代码分析和上边类图可知Sentinel中对应的流控模式主要有以下几种:
DefaultController
快速失败流控RateLimiterController
匀速排队流控WarmUpController
系统预热流控WarmUpRateLimiterController
系统预热+匀速排队流控
Sentenel中如果不做配置的话,默认使用DefaultController
进行流控。
3.1.1 DefaultController 快速失败流控
默认的流量控制方式,当QPS超过任意规则的阈值后,新的请求就会被立即拒绝,拒绝方式为抛出
FlowException
。这种方式适用于对系统处理能力确切已知的情况下,比如通过压测确定了系统的准确水位。核心代码如下:
// 快速失败的流控效果的通过性判断
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// todo 获取当前时间窗口中已经统计的数据
int curCount = avgUsedTokens(node);
// 若 已经统计的数据 与本次请求的数量和 大于 设置的阈值,则返回false,表示没有通过检测
// 若小于 等于阈值,则返回true,表示通过检测
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
// 若没有选择出node,则说明没有做统计工作,直接返回0
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
// 若阈值类型为线程数,则直接返回当前的线程数量
// todo 若阈值类型为QPS,则直接返回当前的QPS
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
3.1.2 RateLimiterController 匀速排队流控
匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER
)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。详细文档可以参考 流量控制 - 匀速器模式。
该方式的作用如下图所示:
这种方式主要用于处理间隔性突发的流量
,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期间逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。
注意:匀速排队模式暂时不支持 QPS > 1000 的场景。
核心代码如下:
public class RateLimiterController implements TrafficShapingController {
// 最大等待超时时间
private final int maxQueueingTimeMs;
// 限流阈值
private final double count;
// 最新的一次通过时间,这是一个原子变量
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
// 当获取计数小于或等于0时通过。
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
// 当count小于或等于0时拒绝。否则,costTime将为long的最大值,并且waitTime在某些情况下将溢出。
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
// 计算每个请求通过的平均时间,也是说把请求平均分配到1000毫秒上
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
// 此请求的预计通过时间。
long expectedTime = costTime + latestPassedTime.get();
// 判断预计通过时间是否大于当前时间,
// 如果小于当前时间则通过,如果大于当前时间则表明请求频繁,需要进行排队等待
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
// 计算等待时间,这里的算法是:预计通过时间+加上次请求通过时间-当前时间;
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 判断等待时间是否大于最大超时时间,如果大于则return false 拒绝该请求。
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 更新最新一条请求通过时间并获取。
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 重新计算计算等待时间。
waitTime = oldTime - TimeUtil.currentTimeMillis();
// 判断等待时间是否大于timeout时间,如果大于则还原最新一条请求通过时间,并return false 拒绝该请求。
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
// 判断是否大于0,排队睡眠。
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
其他的流控算法请看下篇文章。
参考文章
转载自:https://juejin.cn/post/7149336225906688008