likes
comments
collection
share

Sentinel源码8-集群流控

作者站长头像
站长
· 阅读数 81

欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

1. 介绍

为什么要使用集群流控呢?假设我们希望给某个用户限制调用某个API 的总 QPS 为 50,但机器数可能很多(比如有 100 台)。这时候我们很自然地就想到,找一个 server 来专门来统计总的调用量,其它的实例都与这台 server 通信来判断是否可以调用。这就是最基础的集群流控的方式

另外集群流控还可以解决流量不均匀导致总体限流效果不佳的问题。假设集群中有 10 台机器,我们给每台机器设置单机限流阈值为 10 QPS,理想情况下整个集群的限流阈值就为 100 QPS。不过实际情况下流量到每台机器可能会不均匀,会导致总量没有到的情况下某些机器就开始限流。因此仅靠单机维度去限制的话会无法精确地限制总体流量。而集群流控可以精确地控制整个集群的调用总量,结合单机限流兜底,可以更好地发挥流量控制的效果

Sentinel单机版的限流是在本地jvm中存放滑动窗口的限流信息。集群限流是需要把限流等一些滑动窗口信息放在一个Token Server中,以集群中所有机器共享一份“调用总数”来进行整体的限流。集群流控中共有两种身份:

  • Token Client:集群流控客户端,用于向所属 Token Server 通信请求 token。集群限流服务端会返回给客户端结果,决定是否限流。
  • Token Server:即集群流控服务端,处理来自 Token Client 的请求,根据配置的集群规则判断是否应该发放 token(是否允许通过)

2. 集群流控规则

2.1 规则

FlowRule 添加了两个字段用于集群限流相关配置:

private boolean clusterMode; // 标识是否为集群限流配置
private ClusterFlowConfig clusterConfig; // 集群限流相关配置项

其中 用一个专门的 ClusterFlowConfig 代表集群限流相关配置项,以与现有规则配置项分开:

// (必需)全局唯一的规则 ID,由集群限流管控端分配.
private Long flowId;

// 阈值模式,默认(0)为单机均摊,1 为全局阈值.
private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL;

private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL;

// 在 client 连接失败或通信失败时,是否退化到本地的限流模式
private boolean fallbackToLocalWhenFail = true;
  • flowId 代表全局唯一的规则 ID,Sentinel 集群限流服务端通过此 ID 来区分各个规则,因此务必保持全局唯一。一般 flowId 由统一的管控端进行分配,或写入至 DB 时生成。

  • thresholdType 代表集群限流阈值模式。其中单机均摊模式下配置的阈值等同于单机能够承受的限额,token server 会根据客户端对应的 namespace(默认为 project.name 定义的应用名)下的连接数来计算总的阈值(比如独立模式下有 3 个 client 连接到了 token server,然后配的单机均摊阈值为 10,则计算出的集群总量就为 30);而全局模式下配置的阈值等同于整个集群的总阈值

ParamFlowRule 热点参数限流相关的集群配置与 FlowRule 相似。

2.2 集群规则配置方式

在集群流控的场景下,我们推荐使用动态规则源来动态地管理规则。

对于客户端,我们可以按照原有的方式来向 FlowRuleManager 和 ParamFlowRuleManager 注册动态规则源,例如:

ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new NacosDataSource<>(remoteAddress, groupId, dataId, parser);
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());

对于集群流控 token server,由于集群限流服务端有作用域(namespace)的概念,因此我们需要注册一个自动根据 namespace 生成动态规则源的 PropertySupplier:

// Supplier 类型:接受 namespace,返回生成的动态规则源,类型为 SentinelProperty<List<FlowRule>>
// ClusterFlowRuleManager 针对集群限流规则,ClusterParamFlowRuleManager 针对集群热点规则,配置方式类似
ClusterFlowRuleManager.setPropertySupplier(namespace -> {
    return new SomeDataSource(namespace).getProperty();
});

然后每当集群限流服务端 namespace set 产生变更时,Sentinel 会自动针对新加入的 namespace 生成动态规则源并进行自动监听,并删除旧的不需要的规则源。

3. 集群限流客户端

要想使用集群限流功能,必须引入集群限流 client 相关依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-cluster-client-default</artifactId>
    <version>1.8.5</version>
</dependency>

用户可以通过 API 将当前模式置为客户端模式:

http://<ip>:<port>/setClusterMode?mode=<xxx>

或者通过 ClusterStateManager API 手动指定模式:

// 指定当前身份为 Token Client
ClusterStateManager.applyState(ClusterStateManager.CLUSTER_CLIENT);

其中 mode 为 0 代表 client(ClusterStateManager.CLUSTER_CLIENT),1 代表 server。设置成功后,若已有客户端的配置,集群限流客户端将会开启并连接远程的 token server。我们可以在 ~/logs/csp/sentinel-record.log 日志中查看连接的相关日志。

若集群限流客户端未进行配置,则用户需要对客户端进行基本的配置,比如指定集群限流 token server。我们提供了 API 进行配置:

http://<ip>:<port>/cluster/client/modifyConfig?data=<config>

其中 data 是 JSON 格式的配置项:

  • serverHost: token server host
  • serverPort: token server 端口
  • requestTimeout: 请求的超时时间(默认为 20 ms)

当然也可以通过动态配置源进行配置。集群限流 token client 共有两种配置:

  • 客户端分配配置(ClusterClientAssignConfig),包括要连接的对端 token server 地址等相关信息。我们可以通过 ClusterClientConfigManager 的 registerServerAssignProperty 方法注册动态配置源。分配配置通常通过统一的分配表解析而来,可以参考 embedded 模式 demo。
  • 客户端通信配置(ClusterClientConfig),包括通信的超时时长等配置。我们可以通过 ClusterClientConfigManager 的 registerClientConfigProperty 方法注册动态配置源。

配置源注册的相关逻辑可以置于 InitFunc 实现类中,并通过 SPI 注册,在 Sentinel 初始化时即可自动进行配置源加载监听。

若用户未引入集群限流 client 相关依赖,或者 client 未开启/连接失败/通信失败,则对于开启了集群模式的规则:

  • 集群热点限流默认直接通过
  • 普通集群限流会退化到 local 模式的限流,即在本地按照单机阈值执行限流检查

当 token client 与 server 之间的连接意外断开时,token client 会不断进行重试,每次重试的间隔时间以 n * 2000 ms 的形式递增。

4. 集群限流服务端

要想使用集群限流服务端,必须引入集群限流 server 相关依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-cluster-server-default</artifactId>
    <version>1.8.5</version>
</dependency>

4.1 启动方式

Sentinel 集群限流服务端有两种启动方式:

  • 独立模式(Alone),即作为独立的 token server 进程启动,独立部署,隔离性好,但是需要额外的部署操作。独立模式适合作为 Global Rate Limiter 给集群提供流控服务。

    • 如果独立部署的 token server 服务挂掉的话,那其他的 token client 就会退化成本地流控的模式,也就是单机版的流控,所以这种方式的集群限流需要保证 token server 的高可用性。 Sentinel源码8-集群流控
  • 嵌入模式(Embedded),即作为内置的 token server 与服务在同一进程中启动。在此模式下,集群中各个实例都是对等的,token server 和 client 可以随时进行转变,因此无需单独部署,灵活性比较好。但是隔离性不佳,需要限制 token server 的总 QPS,防止影响应用本身。嵌入模式适合某个应用集群内部的流控。

    • 嵌入式部署的模式中,如果 token server 服务挂掉的话,我们可以将另外一个 token client 升级为token server。

    Sentinel源码8-集群流控

5. 源码分析

5.1 模块结构

Sentinel 1.4.0 开始引入了集群流控模块,主要包含以下几部分:

  • sentinel-cluster-common-default: 公共模块,包含公共接口和实体
  • sentinel-cluster-client-default: 默认集群流控 client 模块,使用 Netty 进行通信,提供接口方便序列化协议扩展
  • sentinel-cluster-server-default: 默认集群流控 server 模块,使用 Netty 进行通信,提供接口方便序列化协议扩展;同时提供扩展接口对接规则判断的具体实现(TokenService),默认实现是复用 sentinel-core 的相关逻辑

5.2 源码分析

不管是集群Client,或者是Server,都会实现TokenService服务,Server端如果是内嵌TokenService服务,则默认使用 DefaultEmbeddedTokenServer,而Client端则会使 DefaultClusterTokenClient,类图如下:

Sentinel源码8-集群流控

import java.util.Collection;
public interface TokenService {

    //获取令牌Token, 参数规则Id,获取令牌数,优先级
    TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized);
   
    TokenResult requestParamToken(Long ruleId, int acquireCount, Collection<Object> params);
}

在服务端获取令牌的时候,实质是通过 **DefaultEmbeddedTokenServer#requestToken**获取 Token:


public class DefaultEmbeddedTokenServer implements EmbeddedClusterTokenServer {

    private final TokenService tokenService = TokenServiceProvider.getService();
    private final ClusterTokenServer server = new SentinelDefaultTokenServer(true);

    @Override
    public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
        if (tokenService != null) {
            return tokenService.requestToken(ruleId, acquireCount, prioritized);
        }
        return new TokenResult(TokenResultStatus.FAIL);
    }

    @Override
    public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection<Object> params) {
        if (tokenService != null) {
            return tokenService.requestParamToken(ruleId, acquireCount, params);
        }
        return new TokenResult(TokenResultStatus.FAIL);
    }
}
public class DefaultTokenService implements TokenService {
    //获取令牌
    @Override
    public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
        //判断是否是有效的请求
        if (notValidRequest(ruleId, acquireCount)) {
            return badRequest();
        }
        // 根据RuleId查询FlowRule
        FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
        if (rule == null) {
            return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
        }
        //获取令牌
        return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
    }
    //判断是否是一个有效的请求
    private boolean notValidRequest(Long id, int count) {
        return id == null || id <= 0 || count <= 0;
    }
}
//ClusterFlowChecker.java
static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) {
    Long id = rule.getClusterConfig().getFlowId();
    //是否继续,根据RuleId,获取NameSpace,根据nameSpace,判断nameSpace限流是否通过
    if (!allowProceed(id)) {
        return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
    }

    ClusterMetric metric = ClusterMetricStatistics.getMetric(id);
    if (metric == null) {
        return new TokenResult(TokenResultStatus.FAIL);
    }
    //获取Metric,滑动窗口实现,这里获取的是通过的请求平均值
    double latestQps = metric.getAvg(ClusterFlowEvent.PASS_REQUEST);
    //获取全局阀值 根据规则判断是否为全局限流还是平均分摊,并获取总的阀值
    double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
    //判断剩余请求数
    double nextRemaining = globalThreshold - latestQps - acquireCount;
    //如果>=0,则代表请求可以通过
    if (nextRemaining >= 0) {
        //记录请求数量
        metric.add(ClusterFlowEvent.PASS, acquireCount);
        metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
        if (prioritized) {
            metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
        }
        return new TokenResult(TokenResultStatus.OK)
            .setRemaining((int) nextRemaining)
            .setWaitInMs(0);
    } else {
        //这里忽略优先级逻辑
        //其他直接返回失败
        metric.add(ClusterFlowEvent.BLOCK, acquireCount);
        metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
        ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount);
        ClusterServerStatLogUtil.log("flow|block_request|" + id, 1);
        if (prioritized) {
            metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
            ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
        }
        return blockedResult();
    }
}
static boolean allowProceed(long flowId) {
    String namespace = ClusterFlowRuleManager.getNamespace(flowId);
    return GlobalRequestLimiter.tryPass(namespace);
}
static boolean allowProceed(long flowId) {
    String namespace = ClusterFlowRuleManager.getNamespace(flowId);
    return GlobalRequestLimiter.tryPass(namespace);
}

public static boolean tryPass(String namespace) {
    if (namespace == null) {
        return false;
    }
    RequestLimiter limiter = GLOBAL_QPS_LIMITER_MAP.get(namespace);
    if (limiter == null) {
        return true;
    }
    return limiter.tryPass();
}

public boolean tryPass() {
        if (canPass()) {
            add(1);
            return true;
        }
        return false;
}

ClusterServerConfigManager.loadGlobalFlowConfig配置了nameSpace对应的ServerFlowConfig 而在客户端的时候,通过netty通信发送到服务端,由服务端验证是否通过。

@Override
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
    //验证是否有效请求
    if (notValidRequest(flowId, acquireCount)) {
        return badRequest();
    }
    //初始化FlowRequest
    FlowRequestData data = new FlowRequestData().setCount(acquireCount)
        .setFlowId(flowId).setPriority(prioritized);
    ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
    try {
        //发送请求到服务端
        TokenResult result = sendTokenRequest(request);
        logForResult(result);
        return result;
    } catch (Exception ex) {
        ClusterClientStatLogUtil.log(ex.getMessage());
        return new TokenResult(TokenResultStatus.FAIL);
    }
}
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
    if (transportClient == null) {
        RecordLog.warn(
            "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
        return clientFail();
    }
    ClusterResponse response = transportClient.sendRequest(request);
    TokenResult result = new TokenResult(response.getStatus());
    if (response.getData() != null) {
        FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
        result.setRemaining(responseData.getRemainingCount())
            .setWaitInMs(responseData.getWaitInMs());
    }
    return result;
}

在 **FlowSlot**限流的时候,根据节点配置是否启用ClusterMode,判断是否走限流,然后根据节点状态(是Server,或者是Client)获取服务,申请令牌。

static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                     boolean prioritized) {
   String limitApp = rule.getLimitApp();
   if (limitApp == null) {
       return true;
   }
   //如果是集群模式
   if (rule.isClusterMode()) {
       return passClusterCheck(rule, context, node, acquireCount, prioritized);
   }
   //单机模式
   return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                        boolean prioritized) {
    try {
        //获取令牌服务
        TokenService clusterService = pickClusterService();
        if (clusterService == null) {
            return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
        }
        long flowId = rule.getClusterConfig().getFlowId();
        //申请令牌
        TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
        return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
    } catch (Throwable ex) {
        RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
    }
    //如果失败,降级为单机模式
    return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
private static TokenService pickClusterService() {
    if (ClusterStateManager.isClient()) {
        return TokenClientProvider.getClient();
    }
    if (ClusterStateManager.isServer()) {
        return EmbeddedClusterTokenServerProvider.getServer();
    }
    return null;
}

6. 集群流控Demo

关于集群限流的功能,官方文档写的非常详细:集群流控官方文档 官方文档中有demo,但是隐藏了太多细节,且依赖了nacos,我这里自己写了一个demo。

demo地址: github.com/hsfxuebao/S…

首先启动一个Server端:

  • 设置FlowRule,并将其设置为Cluster模式
  • 初始化Server(Client和Server都可以共用)
  • 初始化Client,指定Server端地址。
  • 将该节点设置为Server 端,并启动。
 public static void main(String[] args) {
    //初始化限流规则
    initClusterFlowRule();
    //初始化Server
    initServer();
    //初始化Cluster Client规则,指定Server ip和端口
    initClusterClientRule();
    //设置为Server 并启动
    setToServer();
    while (true){
        //等待10秒
        try{
            Thread.sleep(100);
        }catch (Exception e){
            e.printStackTrace();
        }
        try{
            //对资源A限流
            Entry entry=SphU.entry("a");
            System.out.println("pass");
        }catch (Exception e){
            System.out.println("block");
        }
    }
}

在初始化规则的时候,指定集群 or 单机模式,指定限流规则,指定限流类型qps or threads ,最后,加载单机规则,集群NameSpace以及nameSpace对应的集群规则。

private static void initClusterFlowRule(){
   List<FlowRule> flowRules=new ArrayList<FlowRule>();
   FlowRule flowRule=new FlowRule();
   //指定限流规则的Resource
   flowRule.setResource("a");
   //集群限流规则配置
   ClusterFlowConfig clusterFlowConfig=new ClusterFlowConfig();
   //集群失效是否转移
   clusterFlowConfig.setFallbackToLocalWhenFail(true);
   //指定flowId 可以使用IP+进程号区分
   clusterFlowConfig.setFlowId(1L);
   //限流规则 全局总数或者平均分摊
   clusterFlowConfig.setThresholdType(ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL);
   flowRule.setClusterConfig(clusterFlowConfig);
   //是否启用Cluster模式
   flowRule.setClusterMode(true);
   //默认限流规则,具体可以看FlowSlot介绍
   flowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
   //总数
   flowRule.setCount(10);
   //策略
   flowRule.setStrategy(RuleConstant.STRATEGY_DIRECT);
   //限制QPS 也可以指定为线程数
   flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
   flowRules.add(flowRule);
   //加载配置
   FlowRuleManager.loadRules(flowRules);
   //注册NameSpace
   ClusterFlowRuleManager.registerPropertyIfAbsent("1-name");
   //NameSpace 下面的规则加载到集群模式
   ClusterFlowRuleManager.loadRules("1-name", flowRules);
}

在初始化Server的时候,指定TokenServer的Ip和端口,并加载nameSpace信息到Cluster。

private static void initServer(){
    //指定提供TokenService的端口和地址
    ServerTransportConfig ServerTransportConfig=new ServerTransportConfig(18730,600);
    //加载配置
    ClusterServerConfigManager.loadGlobalTransportConfig(ServerTransportConfig);
    Set<String> nameSpaceSet=new HashSet<String>();
    nameSpaceSet.add("1-name");
    //服务配置namespace
    ClusterServerConfigManager.loadServerNamespaceSet(nameSpaceSet);
    //ClusterServerConfigManager.loadGlobalFlowConfig();
    //配置了nameSpace对应的ServerFlowConfig
}

初始化客户端的时候,实质是指定客户端规则,配置服务端地址和IP。

private static void initClusterClientRule(){
    //初始化客户端规则
    ClusterClientConfig clusterClientConfig = new ClusterClientConfig();
    //指定获取Token超时时间
    clusterClientConfig.setRequestTimeout(1000);
    //Client指定配置
    ClusterClientConfigManager.applyNewConfig(clusterClientConfig);
    //指定TokenServer的Ip和地址
    ClusterClientAssignConfig clusterClientAssignConfig=new ClusterClientAssignConfig("127.0.0.1",18730);
    //应用
    ClusterClientConfigManager.applyNewAssignConfig(clusterClientAssignConfig);
}

最后,启动服务,则是将上面配置的Server信息对外暴漏服务。

private static void setToServer(){
        ClusterStateManager.setToServer();
}

这里的端口号是设置服务端的端口号,单独启动Server,这里配置1s中10个QPS,模拟1s中10个请求,能看到运行结果如下:

Fri Mar 22 17:41:24 CST 2019 pass
Fri Mar 22 17:41:24 CST 2019 pass
Fri Mar 22 17:41:24 CST 2019 pass
Fri Mar 22 17:41:24 CST 2019 pass
Fri Mar 22 17:41:24 CST 2019 pass
Fri Mar 22 17:41:24 CST 2019 pass
Fri Mar 22 17:41:24 CST 2019 pass
Fri Mar 22 17:41:24 CST 2019 pass
Fri Mar 22 17:41:24 CST 2019 pass
Fri Mar 22 17:41:25 CST 2019 pass
Fri Mar 22 17:41:25 CST 2019 pass
Fri Mar 22 17:41:25 CST 2019 pass
Fri Mar 22 17:41:25 CST 2019 pass
Fri Mar 22 17:41:25 CST 2019 pass
Fri Mar 22 17:41:25 CST 2019 pass
Fri Mar 22 17:41:25 CST 2019 pass
Fri Mar 22 17:41:25 CST 2019 pass

结果都是通过的,但并不代表限流生效。将1s中请求10个改为20,也就是超时时间由100改为50,运行结果一半通过,一半阻塞

Fri Mar 22 17:44:10 CST 2019 pass
Fri Mar 22 17:44:10 CST 2019 block
Fri Mar 22 17:44:10 CST 2019 pass
Fri Mar 22 17:44:10 CST 2019 block
Fri Mar 22 17:44:10 CST 2019 pass
Fri Mar 22 17:44:10 CST 2019 pass
Fri Mar 22 17:44:10 CST 2019 pass
Fri Mar 22 17:44:10 CST 2019 pass
Fri Mar 22 17:44:10 CST 2019 block
Fri Mar 22 17:44:10 CST 2019 block
Fri Mar 22 17:44:10 CST 2019 block
Fri Mar 22 17:44:10 CST 2019 pass
Fri Mar 22 17:44:10 CST 2019 block
Fri Mar 22 17:44:10 CST 2019 pass
Fri Mar 22 17:44:10 CST 2019 block
Fri Mar 22 17:44:10 CST 2019 pass
Fri Mar 22 17:44:10 CST 2019 block
Fri Mar 22 17:44:10 CST 2019 pass
Fri Mar 22 17:44:10 CST 2019 block
Fri Mar 22 17:44:11 CST 2019 pass
Fri Mar 22 17:44:11 CST 2019 block
Fri Mar 22 17:44:11 CST 2019 pass
Fri Mar 22 17:44:11 CST 2019 block
Fri Mar 22 17:44:11 CST 2019 pass
Fri Mar 22 17:44:11 CST 2019 pass
Fri Mar 22 17:44:11 CST 2019 pass
Fri Mar 22 17:44:11 CST 2019 pass
Fri Mar 22 17:44:11 CST 2019 block
Fri Mar 22 17:44:11 CST 2019 block
Fri Mar 22 17:44:11 CST 2019 block
Fri Mar 22 17:44:11 CST 2019 pass
Fri Mar 22 17:44:11 CST 2019 block
Fri Mar 22 17:44:11 CST 2019 pass
Fri Mar 22 17:44:11 CST 2019 pass
Fri Mar 22 17:44:11 CST 2019 block
Fri Mar 22 17:44:11 CST 2019 pass
Fri Mar 22 17:44:11 CST 2019 block

可以看到,单机限流是生效了。 为了模拟集群环境,启动一个客户端,客户端的配置和服务端区别在于最后启动的不是Server,而是Client。

public static void main(String[] args) {
    initClusterFlowRule();
    initServer();
    initClusterClientRule();
    //设置为客户端
    setToClient();
    while (true){
        try{
            Thread.sleep(100);
        }catch (Exception e){
            e.printStackTrace();
        }
        try{
            Entry entry=SphU.entry("a");
            System.out.println("pass");
        }catch (Exception e){
            System.out.println("block");
        }
    }
}

设置客户端方式和服务端一致。

private static void setToClient(){
    //设置为客户端
    ClusterStateManager.setToClient();
}

先启动服务端。后启动客户端,可以看到服务端日志:

Fri Mar 22 17:50:23 CST 2019 pass
Fri Mar 22 17:50:23 CST 2019 pass
Fri Mar 22 17:50:23 CST 2019 pass
Fri Mar 22 17:50:23 CST 2019 pass
Fri Mar 22 17:50:23 CST 2019 pass
Fri Mar 22 17:50:23 CST 2019 pass
Fri Mar 22 17:50:23 CST 2019 pass
Fri Mar 22 17:50:23 CST 2019 pass
Fri Mar 22 17:50:23 CST 2019 pass
Fri Mar 22 17:50:24 CST 2019 pass
Fri Mar 22 17:50:24 CST 2019 pass
Fri Mar 22 17:50:24 CST 2019 pass
Fri Mar 22 17:50:24 CST 2019 pass
Fri Mar 22 17:50:24 CST 2019 pass
Fri Mar 22 17:50:24 CST 2019 pass
Fri Mar 22 17:50:24 CST 2019 pass
Fri Mar 22 17:50:24 CST 2019 pass
Fri Mar 22 17:50:24 CST 2019 pass
Fri Mar 22 17:50:24 CST 2019 pass
Fri Mar 22 17:50:25 CST 2019 pass
Fri Mar 22 17:50:25 CST 2019 pass
Fri Mar 22 17:50:25 CST 2019 pass
Fri Mar 22 17:50:25 CST 2019 pass
Fri Mar 22 17:50:25 CST 2019 pass
Fri Mar 22 17:50:25 CST 2019 pass
Fri Mar 22 17:50:25 CST 2019 pass
Fri Mar 22 17:50:25 CST 2019 pass
Fri Mar 22 17:50:25 CST 2019 pass
三月 22, 2019 5:50:25 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xa7c7acc4, L:/0:0:0:0:0:0:0:0:18730] READ: [id: 0x4338a7c7, L:/127.0.0.1:18730 - R:/127.0.0.1:56654]
三月 22, 2019 5:50:25 下午 io.netty.bootstrap.AbstractBootstrap setChannelOption
警告: Unknown channel option 'SO_TIMEOUT' for channel '[id: 0x4338a7c7, L:/127.0.0.1:18730 - R:/127.0.0.1:56654]'
三月 22, 2019 5:50:25 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xa7c7acc4, L:/0:0:0:0:0:0:0:0:18730] READ COMPLETE
Fri Mar 22 17:50:25 CST 2019 pass
Fri Mar 22 17:50:26 CST 2019 pass
Fri Mar 22 17:50:26 CST 2019 pass
Fri Mar 22 17:50:26 CST 2019 pass
Fri Mar 22 17:50:26 CST 2019 pass
Fri Mar 22 17:50:26 CST 2019 pass
Fri Mar 22 17:50:26 CST 2019 pass
Fri Mar 22 17:50:26 CST 2019 pass
Fri Mar 22 17:50:26 CST 2019 pass
Fri Mar 22 17:50:26 CST 2019 pass
Fri Mar 22 17:50:27 CST 2019 pass
Fri Mar 22 17:50:27 CST 2019 pass
Fri Mar 22 17:50:27 CST 2019 pass
Fri Mar 22 17:50:27 CST 2019 pass
Fri Mar 22 17:50:27 CST 2019 pass
Fri Mar 22 17:50:27 CST 2019 pass
Fri Mar 22 17:50:27 CST 2019 pass
Fri Mar 22 17:50:27 CST 2019 pass
Fri Mar 22 17:50:27 CST 2019 pass
Fri Mar 22 17:50:27 CST 2019 block
Fri Mar 22 17:50:28 CST 2019 block
Fri Mar 22 17:50:28 CST 2019 block
Fri Mar 22 17:50:28 CST 2019 block
Fri Mar 22 17:50:28 CST 2019 block
Fri Mar 22 17:50:28 CST 2019 block
Fri Mar 22 17:50:28 CST 2019 block
Fri Mar 22 17:50:28 CST 2019 block
Fri Mar 22 17:50:28 CST 2019 block
Fri Mar 22 17:50:28 CST 2019 block
Fri Mar 22 17:50:28 CST 2019 pass
Fri Mar 22 17:50:29 CST 2019 pass
Fri Mar 22 17:50:29 CST 2019 pass
Fri Mar 22 17:50:29 CST 2019 block
Fri Mar 22 17:50:29 CST 2019 pass
Fri Mar 22 17:50:29 CST 2019 pass
Fri Mar 22 17:50:29 CST 2019 pass
Fri Mar 22 17:50:29 CST 2019 pass
Fri Mar 22 17:50:29 CST 2019 pass
Fri Mar 22 17:50:29 CST 2019 pass
Fri Mar 22 17:50:30 CST 2019 pass
Fri Mar 22 17:50:30 CST 2019 pass
Fri Mar 22 17:50:30 CST 2019 pass
Fri Mar 22 17:50:30 CST 2019 pass
Fri Mar 22 17:50:30 CST 2019 pass
Fri Mar 22 17:50:30 CST 2019 pass
Fri Mar 22 17:50:30 CST 2019 pass
Fri Mar 22 17:50:30 CST 2019 block
Fri Mar 22 17:50:30 CST 2019 block
Fri Mar 22 17:50:30 CST 2019 block
Fri Mar 22 17:50:31 CST 2019 block
Fri Mar 22 17:50:31 CST 2019 block
Fri Mar 22 17:50:31 CST 2019 block
Fri Mar 22 17:50:31 CST 2019 block
Fri Mar 22 17:50:31 CST 2019 block
Fri Mar 22 17:50:31 CST 2019 block
Fri Mar 22 17:50:31 CST 2019 block
Fri Mar 22 17:50:31 CST 2019 block
Fri Mar 22 17:50:31 CST 2019 block
Fri Mar 22 17:50:31 CST 2019 block
Fri Mar 22 17:50:32 CST 2019 block
Fri Mar 22 17:50:32 CST 2019 pass
Fri Mar 22 17:50:32 CST 2019 pass
Fri Mar 22 17:50:32 CST 2019 pass
Fri Mar 22 17:50:32 CST 2019 pass
Fri Mar 22 17:50:32 CST 2019 pass
Fri Mar 22 17:50:32 CST 2019 pass
Fri Mar 22 17:50:32 CST 2019 block
Fri Mar 22 17:50:32 CST 2019 block
Fri Mar 22 17:50:33 CST 2019 block
Fri Mar 22 17:50:33 CST 2019 block
Fri Mar 22 17:50:33 CST 2019 block
Fri Mar 22 17:50:33 CST 2019 block
Fri Mar 22 17:50:33 CST 2019 block
Fri Mar 22 17:50:33 CST 2019 block
Fri Mar 22 17:50:33 CST 2019 block
Fri Mar 22 17:50:33 CST 2019 block
Fri Mar 22 17:50:33 CST 2019 block
Fri Mar 22 17:50:33 CST 2019 pass
Fri Mar 22 17:50:34 CST 2019 pass
Fri Mar 22 17:50:34 CST 2019 pass
Fri Mar 22 17:50:34 CST 2019 pass
Fri Mar 22 17:50:34 CST 2019 pass
Fri Mar 22 17:50:34 CST 2019 block

可以看到,当服务端启动的时候,只有服务端一个节点,所以请求全部通过,当客户端联通服务端之后,因为有了客户端连接,所以服务端有block的请求了。 再看一下客户端的日志:

Fri Mar 22 17:50:26 CST 2019 block
Fri Mar 22 17:50:26 CST 2019 block
Fri Mar 22 17:50:26 CST 2019 block
Fri Mar 22 17:50:26 CST 2019 block
Fri Mar 22 17:50:26 CST 2019 block
Fri Mar 22 17:50:26 CST 2019 block
Fri Mar 22 17:50:26 CST 2019 pass
Fri Mar 22 17:50:26 CST 2019 block
Fri Mar 22 17:50:26 CST 2019 block
Fri Mar 22 17:50:27 CST 2019 block
Fri Mar 22 17:50:27 CST 2019 block
Fri Mar 22 17:50:27 CST 2019 block
Fri Mar 22 17:50:27 CST 2019 block
Fri Mar 22 17:50:27 CST 2019 block
Fri Mar 22 17:50:27 CST 2019 block
Fri Mar 22 17:50:27 CST 2019 block
Fri Mar 22 17:50:27 CST 2019 block
Fri Mar 22 17:50:27 CST 2019 pass
Fri Mar 22 17:50:28 CST 2019 pass
Fri Mar 22 17:50:28 CST 2019 pass
Fri Mar 22 17:50:28 CST 2019 pass
Fri Mar 22 17:50:28 CST 2019 pass
Fri Mar 22 17:50:28 CST 2019 pass
Fri Mar 22 17:50:28 CST 2019 pass
Fri Mar 22 17:50:28 CST 2019 pass
Fri Mar 22 17:50:28 CST 2019 pass
Fri Mar 22 17:50:28 CST 2019 pass
Fri Mar 22 17:50:28 CST 2019 block
Fri Mar 22 17:50:29 CST 2019 block
Fri Mar 22 17:50:29 CST 2019 pass
Fri Mar 22 17:50:29 CST 2019 pass
Fri Mar 22 17:50:29 CST 2019 block
Fri Mar 22 17:50:29 CST 2019 block
Fri Mar 22 17:50:29 CST 2019 block
Fri Mar 22 17:50:29 CST 2019 block
Fri Mar 22 17:50:29 CST 2019 block
Fri Mar 22 17:50:29 CST 2019 block
Fri Mar 22 17:50:30 CST 2019 block
Fri Mar 22 17:50:30 CST 2019 block
Fri Mar 22 17:50:30 CST 2019 block
Fri Mar 22 17:50:30 CST 2019 block

可以看到,在同1s的时间里,客户端和服务端的请求通过数加起来就是集群QPS数。

27s 时 客户端pass1 服务端pass9
28s 时 客户端pass1 服务端pass9
29s 时 客户端pass8 服务端pass2

集群限流生效。

参考文章

Sentinel1.8.5源码github地址(注释) Sentinel源码解析 Sentinel官网 深入浅出之原理篇集群流控之Demo 深入浅出之原理篇集群流控之原理

转载自:https://juejin.cn/post/7149690407851393032
评论
请登录