Sentinel全流程分析和网关规则持久化到Nacos
一、背景与目标
- 随着微服务架构的普及,系统之间的交互变得日益复杂,流量控制成为了保障系统稳定运行的关键技术之一。微服务网关作为所有外部请求的入口,可以在这一层进行统一的限流管理。这使得限流策略的配置和维护更加集中和方便,减少了在多个微服务中分别配置限流策略的复杂性。
- 我司之前一直使用的Sentinel企业版AHAS网关限流,但AHAS将在2025年1月5日正式停服,AHAS推荐的方案是MSE微服务治理,但截止2024年5月,MSE暂时不支持API级别的热点参数限流,不满足我司需求。于是需要改造开源Sentinel网关限流,以满足我司需求。
二、改造前流程分析
改造前:
- gateway客户端引入sentinel
- sentinel-dashboard不进行任何修改
1、客户端
- 初始化sentinel配置(com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration#init)
- 上报心跳到sentinel-dashboard,其中包含ip、port、appName、appType等关键信息(com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc#init)
2、sentinel-dashboard
- 新增、修改、删除:
- 控制台对API或者限流规则进行新增、修改、删除
- 对本地内存中的数据进行增、修改、删除
- 再次查询本地内存中数据,并将其推送到客户端接口(http://clientIp:8719/setRules)
- 列表查询
- 远程调用客户端接口查询API或规则(http://clientIp:8719/gateway/getRules)
- 保存到本地内存InMemoryRuleRepositoryAdapter
- 返回列表数据到后台
- 定时拉取监控指标数据
- 远程调用客户端接口查询API或规则监控指标(http://clientIp:8719/metric)
- 保存到本地内存InMemoryMetricsRepository
3、请求限流
- 时间窗口统计请求量(com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry)
- 获取本地内存中的限流规则,并判断是否达到限流条件(com.alibaba.csp.sentinel.adapter.gateway.common.slot.GatewayFlowSlot#entry)
三、问题分析与方案设计
1、改造前问题分析
- 客户端重启
- 此时刷新列表,会读取到客户端的空数据并将控制台内存中的数据进行清空
- 控制台重启
- 此时刷新列表,会读取到客户端内存中的数据到控制台内存,但如果此时新增规则,会出现数据错误。因为重启后,内存中自增ID会被重置为1,因为新增数据时,会覆盖客户端ID为1的数据。
- 此时新增数据,会将新增的这一条数据保存到内存并推送客户端,从而将客户端原有数据给覆盖
- 因为不管是客户端重启还是控制台重启,都可能导致数据异常,因此API和规则的持久化是必须要做的
2、方案设计
只对sentinel-dashboard后端代码进行改动,不改动客户端和前端代码。(只针对网关限流进行改造,其它原理类似)
- 控制台收到客户端注册时,请求nacos配置中心,并将其配置存入本地内存
- 控制台对API或规则进行增删改时,更新本地内存中的数据,并同时推送当前服务下的所有API或者限流规则到nacos
- 控制台进行列表查询时,只查询本地内存中的数据
- ID生成采用雪花算法,避免重启后出现ID重复数据异常
四、实施方案
1、客户端(无需改造,只需常规配置)
- 引入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-sentinel-gateway</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
- yaml配置参数
spring:
cloud:
sentinel:
enable: true
filter:
enabled: false
eager: true #立即加载
transport:
dashboard: localhost:9999
datasource:
gw-flow:
nacos:
group-id: sentinel_group
namespace: sentinel
data-id: ${spring.application.name}-sentinel-gateway-flow-rules # 在修改的sentinel 源码中定义的规则名
server-addr: 192.168.0.7:8848
username: xxxx
password: xxxx
data-type: json
rule-type: gw-flow
gw-api-group:
nacos:
group-id: sentinel_group
namespace: sentinel
data-id: ${spring.application.name}-sentinel-gateway-api-rules # 在修改的sentinel 源码中定义的规则名
server-addr: 192.168.0.7:8848
username: xxxx
password: xxxx
data-type: json
rule-type: gw-api-group
2、sentinel-dashboard
(1)pom.xml去除sentinel-datasource-nacos的scope
<!-- for Nacos rule publisher sample -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
<!-- <scope>test</scope> -->
</dependency>
(2)移动test包下的文件到main包下同级目录
(3)模仿test包下的文件编写网关API和限流规则的Provider和Publisher
@Component("gateWayFlowRulesNacosProvider")
public class GateWayFlowRulesNacosProvider implements DynamicRuleProvider<List<GatewayFlowRuleEntity>> {
@Resource
private ConfigService configService;
@Resource
private Converter<String, List<GatewayFlowRuleEntity>> converter;
@Resource
private NacosConfigProperties nacosConfigProperties;
@Override
public List<GatewayFlowRuleEntity> getRules(String appName) throws Exception {
String rules = configService.getConfig(appName + NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX,
nacosConfigProperties.getGroup(), 3000);
if (StringUtil.isEmpty(rules)) {
return new ArrayList<>();
}
return converter.convert(rules);
}
}
@Component("gateWayFlowRulesNacosPunlisher")
public class GateWayFlowRulesNacosPunlisher implements DynamicRulePublisher<List<GatewayFlowRuleEntity>> {
@Resource
private ConfigService configService;
@Resource
private Converter<List<GatewayFlowRuleEntity>, String> converter;
@Resource
private NacosConfigProperties nacosConfigProperties;
@Override
public void publish(String app, List<GatewayFlowRuleEntity> rules) throws Exception {
AssertUtil.notEmpty(app, "app name cannot be empty");
if (rules == null) {
return;
}
configService.publishConfig(app + NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX,
nacosConfigProperties.getGroup(), converter.convert(rules));
}
}
@Component("getWayApiNacosProvider")
public class GetWayApiNacosProvider implements DynamicRuleProvider<List<ApiDefinitionEntity>> {
@Resource
private ConfigService configService;
@Resource
private Converter<String , List<ApiDefinitionEntity>> converter;
@Resource
private NacosConfigProperties nacosConfigProperties;
@Override
public List<ApiDefinitionEntity> getRules(String appName) throws Exception {
String rules = configService.getConfig(appName+ NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX
,nacosConfigProperties.getGroup(),3000);
if(StringUtil.isEmpty(rules)){
return new ArrayList<>();
}
return converter.convert(rules);
}
}
@Component("getWayApiNacosPublisher")
public class GetWayApiNacosPublisher implements DynamicRulePublisher<List<ApiDefinitionEntity>> {
@Resource
private ConfigService configService;
@Resource
private Converter<List<ApiDefinitionEntity>, String> converter;
@Resource
private NacosConfigProperties nacosConfigProperties;
@Override
public void publish(String app, List<ApiDefinitionEntity> rules) throws Exception {
AssertUtil.notEmpty(app, "app name cannot be empty");
if (rules == null) {
return;
}
configService.publishConfig(app + NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX,
nacosConfigProperties.getGroup(), converter.convert(rules));
}
}
(4)修改GatewayApiController和GatewayFlowRuleController
- 注入NacosPublisher
- 把之前调用客户端的地方修改为调用Nacos
- list.json接口改为只从本地内存中获取数据
@RestController
@RequestMapping(value = "/gateway/api")
public class GatewayApiController {
private final Logger logger = LoggerFactory.getLogger(GatewayApiController.class);
@Autowired
private InMemApiDefinitionStore repository;
@Autowired
@Qualifier("getWayApiNacosPublisher")
private DynamicRulePublisher<List<ApiDefinitionEntity>> rulePublisher;
@GetMapping("/list.json")
@AuthAction(AuthService.PrivilegeType.READ_RULE)
public Result<List<ApiDefinitionEntity>> queryApis(String app, String ip, Integer port) {
if (StringUtil.isEmpty(app)) {
return Result.ofFail(-1, "app can't be null or empty");
}
if (StringUtil.isEmpty(ip)) {
return Result.ofFail(-1, "ip can't be null or empty");
}
if (port == null) {
return Result.ofFail(-1, "port can't be null");
}
try {
// List<ApiDefinitionEntity> apis = sentinelApiClient.fetchApis(app, ip, port).get();
// List<ApiDefinitionEntity> apis = ruleProvider.getRules(app);
//
// repository.saveAll(apis);
List<ApiDefinitionEntity> apis = repository.findAllByApp(app);
return Result.ofSuccess(apis);
} catch (Throwable throwable) {
logger.error("queryApis error:", throwable);
return Result.ofThrowable(-1, throwable);
}
}
@PostMapping("/new.json")
@AuthAction(AuthService.PrivilegeType.WRITE_RULE)
public Result<ApiDefinitionEntity> addApi(HttpServletRequest request, @RequestBody AddApiReqVo reqVo) {
String app = reqVo.getApp();
if (StringUtil.isBlank(app)) {
return Result.ofFail(-1, "app can't be null or empty");
}
ApiDefinitionEntity entity = new ApiDefinitionEntity();
entity.setApp(app.trim());
String ip = reqVo.getIp();
if (StringUtil.isBlank(ip)) {
return Result.ofFail(-1, "ip can't be null or empty");
}
entity.setIp(ip.trim());
Integer port = reqVo.getPort();
if (port == null) {
return Result.ofFail(-1, "port can't be null");
}
entity.setPort(port);
// API名称
String apiName = reqVo.getApiName();
if (StringUtil.isBlank(apiName)) {
return Result.ofFail(-1, "apiName can't be null or empty");
}
entity.setApiName(apiName.trim());
// 匹配规则列表
List<ApiPredicateItemVo> predicateItems = reqVo.getPredicateItems();
if (CollectionUtils.isEmpty(predicateItems)) {
return Result.ofFail(-1, "predicateItems can't empty");
}
List<ApiPredicateItemEntity> predicateItemEntities = new ArrayList<>();
for (ApiPredicateItemVo predicateItem : predicateItems) {
ApiPredicateItemEntity predicateItemEntity = new ApiPredicateItemEntity();
// 匹配模式
Integer matchStrategy = predicateItem.getMatchStrategy();
if (!Arrays.asList(URL_MATCH_STRATEGY_EXACT, URL_MATCH_STRATEGY_PREFIX, URL_MATCH_STRATEGY_REGEX).contains(matchStrategy)) {
return Result.ofFail(-1, "invalid matchStrategy: " + matchStrategy);
}
predicateItemEntity.setMatchStrategy(matchStrategy);
// 匹配串
String pattern = predicateItem.getPattern();
if (StringUtil.isBlank(pattern)) {
return Result.ofFail(-1, "pattern can't be null or empty");
}
predicateItemEntity.setPattern(pattern);
predicateItemEntities.add(predicateItemEntity);
}
entity.setPredicateItems(new LinkedHashSet<>(predicateItemEntities));
// 检查API名称不能重复
List<ApiDefinitionEntity> allApis = repository.findAllByMachine(MachineInfo.of(app.trim(), ip.trim(), port));
if (allApis.stream().map(o -> o.getApiName()).anyMatch(o -> o.equals(apiName.trim()))) {
return Result.ofFail(-1, "apiName exists: " + apiName);
}
Date date = new Date();
entity.setGmtCreate(date);
entity.setGmtModified(date);
try {
entity = repository.save(entity);
} catch (Throwable throwable) {
logger.error("add gateway api error:", throwable);
return Result.ofThrowable(-1, throwable);
}
// if (!publishApis(app, ip, port)) {
// logger.warn("publish gateway apis fail after add");
// }
publishApis(app);
return Result.ofSuccess(entity);
}
@PostMapping("/save.json")
@AuthAction(AuthService.PrivilegeType.WRITE_RULE)
public Result<ApiDefinitionEntity> updateApi(@RequestBody UpdateApiReqVo reqVo) {
String app = reqVo.getApp();
if (StringUtil.isBlank(app)) {
return Result.ofFail(-1, "app can't be null or empty");
}
Long id = reqVo.getId();
if (id == null) {
return Result.ofFail(-1, "id can't be null");
}
ApiDefinitionEntity entity = repository.findById(id);
if (entity == null) {
return Result.ofFail(-1, "api does not exist, id=" + id);
}
// 匹配规则列表
List<ApiPredicateItemVo> predicateItems = reqVo.getPredicateItems();
if (CollectionUtils.isEmpty(predicateItems)) {
return Result.ofFail(-1, "predicateItems can't empty");
}
List<ApiPredicateItemEntity> predicateItemEntities = new ArrayList<>();
for (ApiPredicateItemVo predicateItem : predicateItems) {
ApiPredicateItemEntity predicateItemEntity = new ApiPredicateItemEntity();
// 匹配模式
int matchStrategy = predicateItem.getMatchStrategy();
if (!Arrays.asList(URL_MATCH_STRATEGY_EXACT, URL_MATCH_STRATEGY_PREFIX, URL_MATCH_STRATEGY_REGEX).contains(matchStrategy)) {
return Result.ofFail(-1, "Invalid matchStrategy: " + matchStrategy);
}
predicateItemEntity.setMatchStrategy(matchStrategy);
// 匹配串
String pattern = predicateItem.getPattern();
if (StringUtil.isBlank(pattern)) {
return Result.ofFail(-1, "pattern can't be null or empty");
}
predicateItemEntity.setPattern(pattern);
predicateItemEntities.add(predicateItemEntity);
}
entity.setPredicateItems(new LinkedHashSet<>(predicateItemEntities));
Date date = new Date();
entity.setGmtModified(date);
try {
entity = repository.save(entity);
} catch (Throwable throwable) {
logger.error("update gateway api error:", throwable);
return Result.ofThrowable(-1, throwable);
}
// if (!publishApis(app, entity.getIp(), entity.getPort())) {
// logger.warn("publish gateway apis fail after update");
// }
publishApis(app);
return Result.ofSuccess(entity);
}
@PostMapping("/delete.json")
@AuthAction(AuthService.PrivilegeType.DELETE_RULE)
public Result<Long> deleteApi(Long id) {
if (id == null) {
return Result.ofFail(-1, "id can't be null");
}
ApiDefinitionEntity oldEntity = repository.findById(id);
if (oldEntity == null) {
return Result.ofSuccess(null);
}
try {
repository.delete(id);
} catch (Throwable throwable) {
logger.error("delete gateway api error:", throwable);
return Result.ofThrowable(-1, throwable);
}
// if (!publishApis(oldEntity.getApp(), oldEntity.getIp(), oldEntity.getPort())) {
// logger.warn("publish gateway apis fail after delete");
// }
publishApis(oldEntity.getApp());
return Result.ofSuccess(id);
}
// private boolean publishApis(String app, String ip, Integer port) {
// List<ApiDefinitionEntity> apis = repository.findAllByMachine(MachineInfo.of(app, ip, port));
// return sentinelApiClient.modifyApis(app, ip, port, apis);
// }
/**
* 把配置推给nacos中
*
* @param app
* @throws Exception
*/
private void publishApis(String app) {
List<ApiDefinitionEntity> rules = repository.findAllByApp(app);
try {
rulePublisher.publish(app, rules);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RestController
@RequestMapping(value = "/gateway/flow")
public class GatewayFlowRuleController {
private final Logger logger = LoggerFactory.getLogger(GatewayFlowRuleController.class);
@Autowired
private InMemGatewayFlowRuleStore repository;
@Autowired
@Qualifier("gateWayFlowRulesNacosPunlisher")
private DynamicRulePublisher<List<GatewayFlowRuleEntity>> rulePublisher;
@GetMapping("/list.json")
@AuthAction(AuthService.PrivilegeType.READ_RULE)
public Result<List<GatewayFlowRuleEntity>> queryFlowRules(String app, String ip, Integer port) {
if (StringUtil.isEmpty(app)) {
return Result.ofFail(-1, "app can't be null or empty");
}
if (StringUtil.isEmpty(ip)) {
return Result.ofFail(-1, "ip can't be null or empty");
}
if (port == null) {
return Result.ofFail(-1, "port can't be null");
}
try {
// List<GatewayFlowRuleEntity> rules = sentinelApiClient.fetchGatewayFlowRules(app, ip, port).get();
// List<GatewayFlowRuleEntity> rules = ruleProvider.getRules(app);
//
// repository.saveAll(rules);
List<GatewayFlowRuleEntity> rules = repository.findAllByApp(app);
return Result.ofSuccess(rules);
} catch (Throwable throwable) {
logger.error("query gateway flow rules error:", throwable);
return Result.ofThrowable(-1, throwable);
}
}
@PostMapping("/new.json")
@AuthAction(AuthService.PrivilegeType.WRITE_RULE)
public Result<GatewayFlowRuleEntity> addFlowRule(@RequestBody AddFlowRuleReqVo reqVo) {
String app = reqVo.getApp();
if (StringUtil.isBlank(app)) {
return Result.ofFail(-1, "app can't be null or empty");
}
GatewayFlowRuleEntity entity = new GatewayFlowRuleEntity();
entity.setApp(app.trim());
String ip = reqVo.getIp();
if (StringUtil.isBlank(ip)) {
return Result.ofFail(-1, "ip can't be null or empty");
}
entity.setIp(ip.trim());
Integer port = reqVo.getPort();
if (port == null) {
return Result.ofFail(-1, "port can't be null");
}
entity.setPort(port);
// API类型, Route ID或API分组
Integer resourceMode = reqVo.getResourceMode();
if (resourceMode == null) {
return Result.ofFail(-1, "resourceMode can't be null");
}
if (!Arrays.asList(RESOURCE_MODE_ROUTE_ID, RESOURCE_MODE_CUSTOM_API_NAME).contains(resourceMode)) {
return Result.ofFail(-1, "invalid resourceMode: " + resourceMode);
}
entity.setResourceMode(resourceMode);
// API名称
String resource = reqVo.getResource();
if (StringUtil.isBlank(resource)) {
return Result.ofFail(-1, "resource can't be null or empty");
}
entity.setResource(resource.trim());
// 针对请求属性
GatewayParamFlowItemVo paramItem = reqVo.getParamItem();
if (paramItem != null) {
GatewayParamFlowItemEntity itemEntity = new GatewayParamFlowItemEntity();
entity.setParamItem(itemEntity);
// 参数属性 0-ClientIP 1-Remote Host 2-Header 3-URL参数 4-Cookie
Integer parseStrategy = paramItem.getParseStrategy();
if (!Arrays.asList(PARAM_PARSE_STRATEGY_CLIENT_IP, PARAM_PARSE_STRATEGY_HOST, PARAM_PARSE_STRATEGY_HEADER
, PARAM_PARSE_STRATEGY_URL_PARAM, PARAM_PARSE_STRATEGY_COOKIE).contains(parseStrategy)) {
return Result.ofFail(-1, "invalid parseStrategy: " + parseStrategy);
}
itemEntity.setParseStrategy(paramItem.getParseStrategy());
// 当参数属性为2-Header 3-URL参数 4-Cookie时,参数名称必填
if (Arrays.asList(PARAM_PARSE_STRATEGY_HEADER, PARAM_PARSE_STRATEGY_URL_PARAM, PARAM_PARSE_STRATEGY_COOKIE).contains(parseStrategy)) {
// 参数名称
String fieldName = paramItem.getFieldName();
if (StringUtil.isBlank(fieldName)) {
return Result.ofFail(-1, "fieldName can't be null or empty");
}
itemEntity.setFieldName(paramItem.getFieldName());
}
String pattern = paramItem.getPattern();
// 如果匹配串不为空,验证匹配模式
if (StringUtil.isNotEmpty(pattern)) {
itemEntity.setPattern(pattern);
Integer matchStrategy = paramItem.getMatchStrategy();
if (!Arrays.asList(PARAM_MATCH_STRATEGY_EXACT, PARAM_MATCH_STRATEGY_CONTAINS, PARAM_MATCH_STRATEGY_REGEX).contains(matchStrategy)) {
return Result.ofFail(-1, "invalid matchStrategy: " + matchStrategy);
}
itemEntity.setMatchStrategy(matchStrategy);
}
}
// 阈值类型 0-线程数 1-QPS
Integer grade = reqVo.getGrade();
if (grade == null) {
return Result.ofFail(-1, "grade can't be null");
}
if (!Arrays.asList(FLOW_GRADE_THREAD, FLOW_GRADE_QPS).contains(grade)) {
return Result.ofFail(-1, "invalid grade: " + grade);
}
entity.setGrade(grade);
// QPS阈值
Double count = reqVo.getCount();
if (count == null) {
return Result.ofFail(-1, "count can't be null");
}
if (count < 0) {
return Result.ofFail(-1, "count should be at lease zero");
}
entity.setCount(count);
// 间隔
Long interval = reqVo.getInterval();
if (interval == null) {
return Result.ofFail(-1, "interval can't be null");
}
if (interval <= 0) {
return Result.ofFail(-1, "interval should be greater than zero");
}
entity.setInterval(interval);
// 间隔单位
Integer intervalUnit = reqVo.getIntervalUnit();
if (intervalUnit == null) {
return Result.ofFail(-1, "intervalUnit can't be null");
}
if (!Arrays.asList(INTERVAL_UNIT_SECOND, INTERVAL_UNIT_MINUTE, INTERVAL_UNIT_HOUR, INTERVAL_UNIT_DAY).contains(intervalUnit)) {
return Result.ofFail(-1, "Invalid intervalUnit: " + intervalUnit);
}
entity.setIntervalUnit(intervalUnit);
entity.calIntervalSec();
// 流控方式 0-快速失败 2-匀速排队
Integer controlBehavior = reqVo.getControlBehavior();
if (controlBehavior == null) {
return Result.ofFail(-1, "controlBehavior can't be null");
}
if (!Arrays.asList(CONTROL_BEHAVIOR_DEFAULT, CONTROL_BEHAVIOR_RATE_LIMITER).contains(controlBehavior)) {
return Result.ofFail(-1, "invalid controlBehavior: " + controlBehavior);
}
entity.setControlBehavior(controlBehavior);
if (CONTROL_BEHAVIOR_DEFAULT == controlBehavior) {
// 0-快速失败, 则Burst size必填
Integer burst = reqVo.getBurst();
if (burst == null) {
return Result.ofFail(-1, "burst can't be null");
}
if (burst < 0) {
return Result.ofFail(-1, "invalid burst: " + burst);
}
entity.setBurst(burst);
} else if (CONTROL_BEHAVIOR_RATE_LIMITER == controlBehavior) {
// 1-匀速排队, 则超时时间必填
Integer maxQueueingTimeoutMs = reqVo.getMaxQueueingTimeoutMs();
if (maxQueueingTimeoutMs == null) {
return Result.ofFail(-1, "maxQueueingTimeoutMs can't be null");
}
if (maxQueueingTimeoutMs < 0) {
return Result.ofFail(-1, "invalid maxQueueingTimeoutMs: " + maxQueueingTimeoutMs);
}
entity.setMaxQueueingTimeoutMs(maxQueueingTimeoutMs);
}
Date date = new Date();
entity.setGmtCreate(date);
entity.setGmtModified(date);
try {
entity = repository.save(entity);
} catch (Throwable throwable) {
logger.error("add gateway flow rule error:", throwable);
return Result.ofThrowable(-1, throwable);
}
// if (!publishRules(app, ip, port)) {
// logger.warn("publish gateway flow rules fail after add");
// }
publishRules(app);
return Result.ofSuccess(entity);
}
@PostMapping("/save.json")
@AuthAction(AuthService.PrivilegeType.WRITE_RULE)
public Result<GatewayFlowRuleEntity> updateFlowRule(@RequestBody UpdateFlowRuleReqVo reqVo) {
String app = reqVo.getApp();
if (StringUtil.isBlank(app)) {
return Result.ofFail(-1, "app can't be null or empty");
}
Long id = reqVo.getId();
if (id == null) {
return Result.ofFail(-1, "id can't be null");
}
GatewayFlowRuleEntity entity = repository.findById(id);
if (entity == null) {
return Result.ofFail(-1, "gateway flow rule does not exist, id=" + id);
}
// 针对请求属性
GatewayParamFlowItemVo paramItem = reqVo.getParamItem();
if (paramItem != null) {
GatewayParamFlowItemEntity itemEntity = new GatewayParamFlowItemEntity();
entity.setParamItem(itemEntity);
// 参数属性 0-ClientIP 1-Remote Host 2-Header 3-URL参数 4-Cookie
Integer parseStrategy = paramItem.getParseStrategy();
if (!Arrays.asList(PARAM_PARSE_STRATEGY_CLIENT_IP, PARAM_PARSE_STRATEGY_HOST, PARAM_PARSE_STRATEGY_HEADER
, PARAM_PARSE_STRATEGY_URL_PARAM, PARAM_PARSE_STRATEGY_COOKIE).contains(parseStrategy)) {
return Result.ofFail(-1, "invalid parseStrategy: " + parseStrategy);
}
itemEntity.setParseStrategy(paramItem.getParseStrategy());
// 当参数属性为2-Header 3-URL参数 4-Cookie时,参数名称必填
if (Arrays.asList(PARAM_PARSE_STRATEGY_HEADER, PARAM_PARSE_STRATEGY_URL_PARAM, PARAM_PARSE_STRATEGY_COOKIE).contains(parseStrategy)) {
// 参数名称
String fieldName = paramItem.getFieldName();
if (StringUtil.isBlank(fieldName)) {
return Result.ofFail(-1, "fieldName can't be null or empty");
}
itemEntity.setFieldName(paramItem.getFieldName());
}
String pattern = paramItem.getPattern();
// 如果匹配串不为空,验证匹配模式
if (StringUtil.isNotEmpty(pattern)) {
itemEntity.setPattern(pattern);
Integer matchStrategy = paramItem.getMatchStrategy();
if (!Arrays.asList(PARAM_MATCH_STRATEGY_EXACT, PARAM_MATCH_STRATEGY_CONTAINS, PARAM_MATCH_STRATEGY_REGEX).contains(matchStrategy)) {
return Result.ofFail(-1, "invalid matchStrategy: " + matchStrategy);
}
itemEntity.setMatchStrategy(matchStrategy);
}
} else {
entity.setParamItem(null);
}
// 阈值类型 0-线程数 1-QPS
Integer grade = reqVo.getGrade();
if (grade == null) {
return Result.ofFail(-1, "grade can't be null");
}
if (!Arrays.asList(FLOW_GRADE_THREAD, FLOW_GRADE_QPS).contains(grade)) {
return Result.ofFail(-1, "invalid grade: " + grade);
}
entity.setGrade(grade);
// QPS阈值
Double count = reqVo.getCount();
if (count == null) {
return Result.ofFail(-1, "count can't be null");
}
if (count < 0) {
return Result.ofFail(-1, "count should be at lease zero");
}
entity.setCount(count);
// 间隔
Long interval = reqVo.getInterval();
if (interval == null) {
return Result.ofFail(-1, "interval can't be null");
}
if (interval <= 0) {
return Result.ofFail(-1, "interval should be greater than zero");
}
entity.setInterval(interval);
// 间隔单位
Integer intervalUnit = reqVo.getIntervalUnit();
if (intervalUnit == null) {
return Result.ofFail(-1, "intervalUnit can't be null");
}
if (!Arrays.asList(INTERVAL_UNIT_SECOND, INTERVAL_UNIT_MINUTE, INTERVAL_UNIT_HOUR, INTERVAL_UNIT_DAY).contains(intervalUnit)) {
return Result.ofFail(-1, "Invalid intervalUnit: " + intervalUnit);
}
entity.setIntervalUnit(intervalUnit);
entity.calIntervalSec();
// 流控方式 0-快速失败 2-匀速排队
Integer controlBehavior = reqVo.getControlBehavior();
if (controlBehavior == null) {
return Result.ofFail(-1, "controlBehavior can't be null");
}
if (!Arrays.asList(CONTROL_BEHAVIOR_DEFAULT, CONTROL_BEHAVIOR_RATE_LIMITER).contains(controlBehavior)) {
return Result.ofFail(-1, "invalid controlBehavior: " + controlBehavior);
}
entity.setControlBehavior(controlBehavior);
if (CONTROL_BEHAVIOR_DEFAULT == controlBehavior) {
// 0-快速失败, 则Burst size必填
Integer burst = reqVo.getBurst();
if (burst == null) {
return Result.ofFail(-1, "burst can't be null");
}
if (burst < 0) {
return Result.ofFail(-1, "invalid burst: " + burst);
}
entity.setBurst(burst);
} else if (CONTROL_BEHAVIOR_RATE_LIMITER == controlBehavior) {
// 2-匀速排队, 则超时时间必填
Integer maxQueueingTimeoutMs = reqVo.getMaxQueueingTimeoutMs();
if (maxQueueingTimeoutMs == null) {
return Result.ofFail(-1, "maxQueueingTimeoutMs can't be null");
}
if (maxQueueingTimeoutMs < 0) {
return Result.ofFail(-1, "invalid maxQueueingTimeoutMs: " + maxQueueingTimeoutMs);
}
entity.setMaxQueueingTimeoutMs(maxQueueingTimeoutMs);
}
Date date = new Date();
entity.setGmtModified(date);
try {
entity = repository.save(entity);
} catch (Throwable throwable) {
logger.error("update gateway flow rule error:", throwable);
return Result.ofThrowable(-1, throwable);
}
// if (!publishRules(app, entity.getIp(), entity.getPort())) {
// logger.warn("publish gateway flow rules fail after update");
// }
publishRules(app);
return Result.ofSuccess(entity);
}
@PostMapping("/delete.json")
@AuthAction(AuthService.PrivilegeType.DELETE_RULE)
public Result<Long> deleteFlowRule(Long id) {
if (id == null) {
return Result.ofFail(-1, "id can't be null");
}
GatewayFlowRuleEntity oldEntity = repository.findById(id);
if (oldEntity == null) {
return Result.ofSuccess(null);
}
try {
repository.delete(id);
} catch (Throwable throwable) {
logger.error("delete gateway flow rule error:", throwable);
return Result.ofThrowable(-1, throwable);
}
// if (!publishRules(oldEntity.getApp(), oldEntity.getIp(), oldEntity.getPort())) {
// logger.warn("publish gateway flow rules fail after delete");
// }
publishRules(oldEntity.getApp());
return Result.ofSuccess(id);
}
// private boolean publishRules(String app, String ip, Integer port) {
// List<GatewayFlowRuleEntity> rules = repository.findAllByMachine(MachineInfo.of(app, ip, port));
// return sentinelApiClient.modifyGatewayFlowRules(app, ip, port, rules);
// }
/**
* 把配置推给nacos中
*
* @param app
* @throws Exception
*/
private boolean publishRules(String app) {
List<GatewayFlowRuleEntity> rules = repository.findAllByApp(app);
try {
rulePublisher.publish(app, rules);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
}
(5)修改AppManagement应用注册逻辑,如果是第一次注册,从Nacos拉取配置到本地内存
@Component
public class AppManagement implements MachineDiscovery {
private final Logger logger = LoggerFactory.getLogger(AppManagement.class);
@Autowired
private ApplicationContext context;
@Autowired
@Qualifier("gateWayFlowRulesNacosProvider")
private DynamicRuleProvider<List<GatewayFlowRuleEntity>> flowRuleProvider;
@Autowired
private InMemGatewayFlowRuleStore flowRuleRepository;
@Autowired
@Qualifier("getWayApiNacosProvider")
private DynamicRuleProvider<List<ApiDefinitionEntity>> apiProvider;
@Autowired
private InMemApiDefinitionStore apiRepository;
private MachineDiscovery machineDiscovery;
@PostConstruct
public void init() {
machineDiscovery = context.getBean(SimpleMachineDiscovery.class);
}
@Override
public Set<AppInfo> getBriefApps() {
return machineDiscovery.getBriefApps();
}
@Override
public long addMachine(MachineInfo machineInfo) {
// 是网关并且是第一次注册
String app = machineInfo.getApp();
if (isGateway(machineInfo.getAppType()) && getDetailApp(app) == null) {
pullGatewayNacosApi(app);
pullGatewayNacosFlowRules(app);
}
return machineDiscovery.addMachine(machineInfo);
}
@Override
public boolean removeMachine(String app, String ip, int port) {
return machineDiscovery.removeMachine(app, ip, port);
}
@Override
public List<String> getAppNames() {
return machineDiscovery.getAppNames();
}
@Override
public AppInfo getDetailApp(String app) {
return machineDiscovery.getDetailApp(app);
}
@Override
public void removeApp(String app) {
machineDiscovery.removeApp(app);
}
public boolean isValidMachineOfApp(String app, String ip) {
if (StringUtil.isEmpty(app)) {
return false;
}
return Optional.ofNullable(getDetailApp(app))
.flatMap(a -> a.getMachine(ip))
.isPresent();
}
private boolean isGateway(Integer appType) {
return appType == 1;
}
private void pullGatewayNacosFlowRules(String app) {
try {
List<GatewayFlowRuleEntity> rules = flowRuleProvider.getRules(app);
flowRuleRepository.saveAll(rules);
} catch (Exception e) {
logger.error("pull nacos flow rules error, app:{}", app, e);
}
}
private void pullGatewayNacosApi(String app) {
try {
List<ApiDefinitionEntity> api = apiProvider.getRules(app);
apiRepository.saveAll(api);
} catch (Exception e) {
logger.error("pull nacos api error, app:{}", app, e);
}
}
}
(6)修改内存自增ID为雪花算法(需要引入hutool)
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.15</version>
</dependency>
@Component
public class InMemApiDefinitionStore extends InMemoryRuleRepositoryAdapter<ApiDefinitionEntity> {
@Override
protected long nextId() {
return IdUtil.getSnowflakeNextId();
}
}
@Component
public class InMemGatewayFlowRuleStore extends InMemoryRuleRepositoryAdapter<GatewayFlowRuleEntity> {
@Override
protected long nextId() {
return IdUtil.getSnowflakeNextId();
}
}
(7)还有小BUG修复,完整GIT提交日志如下(完整代码在文末)
五、改造后流程分析
1、客户端
- 初始化Sentinel配置(com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration#init)
- 上报心跳到sentinel-dashboard,其中包含ip、port、appName、appType等关键信息(com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc#init)
- 控制台收到第一次注册时,拉取Nacos配置到本地内存
- 初始化Nacos配置监听器,实时监听nacos配置变化(com.alibaba.csp.sentinel.datasource.nacos.NacosDataSource#initNacosListener)
- 加载Nacos初始化配置到本地内存(com.alibaba.csp.sentinel.datasource.nacos.NacosDataSource#loadInitialConfig)
2、sentinel-dashboard
- 新增、修改、删除:
- 后台页面对API或者限流规则进行新增、修改、删除
- 对本地内存中的数据进行操作
- 再次查询本地内存中数据,并将其推送到nacos
- 列表查询
- 查询本地内存InMemoryRuleRepositoryAdapter
- 返回列表数据到后台
- 定时拉取监控指标数据
- 远程调用客户端接口查询API或规则(http://clientIp:8719/metric)
- 保存到本地内存InMemoryMetricsRepository
3、请求限流(和改造前没有区别)
六、遇到的问题
Q:控制台配置的间隔单位和间隔时间不生效
A:控制台的参数名是interval和intervalUnit,但客户端读取的是intervalSec,需要进行转换并存入Nacos
Q:控制台内存中维护的自增ID,如果控制台重启,自增ID会被重置
A:ID创建修改为雪花算法,ID返回给前端时,格式化为String类型(以防Long传到前端精度丢失)
Q:如果一个链接没有被请求过,控制台修改配置后,客户端会报naco notify-error npe
A:该问题已在1.8.1被修复,可将客户端版本升级到1.8.1及以上,https://github.com/alibaba/Sentinel/pull/1729
Q:控制台是否支持集群部署
A:暂时不支持。因为现在控制台的数据都是存储在本地内存,如果要实现集群部署,需要引入外部数据存储工具,例如:Mysql、Redis
Q:阿里云AHAS在项目中的实际应用
A:网关侧接入AHAS,并将项目中所有需要外部限流的接口路径配置在AHAS中,并根据热点参数对IP、Header、Token进行多维度限流
Q:阿里云MSE微服务治理为什么不能满足需求
A:MSE进行了微服务职责的进一步划分,网关侧只能进行路由级别限流,应用侧才能进行接口级别限流。如果要实现AHAS网关限流同样的功能,需要将所有微服务加入到MSE微服务治理中,对我们来说成本过高
Q:是否还有其它问题
A:还比较多,比如:
1、控制台监控数据未持久化,只能看到最近5分钟数据,并且可能有内存溢出的风险
2、控制台未接入nacos配置中心
3、控制台不支持多账号密码
4、可能有内存和Nacos双写一致性问题
七、总结
我们目前只用到了Sentinel网关限流中热点参数限流,所以只对网关规则进行了持久化改造。如果有其它需求,大致的修改逻辑应该是差不多的。虽然改造后的Sentinel控制台还存在很多问题,但通过上面的流程分析,控制台只是作为规则可视化配置的工具,对性能要求不高,我们也没有监控持久化的需求,所以暂时就不进行其它改造了。
八、参考资料
转载自:https://juejin.cn/post/7366882674184994870