电商架构技术干货(三)
五、Hystrix 提供系统高可用性
1、运行流程:
2、可以进行资源隔离。
- 线程隔离
- 信号量隔离
线程池:适合绝大多数的场景,对依赖服务有网络请求调用的,可解决 timeout 的问题
访问量:适合不对外部依赖访问的。比如代码里有低效率的代码执行较慢,线程多了就会卡,这种情况可以用访问量的方式解决
线程隔离代码示例:
/**
* @description: 查询商品数据的command
* @author: mmc
* @create: 2019-11-05 23:09
**/
public class GetProductInfoCommand extends HystrixCommand<ProductInfo> {
private Long productId;
public GetProductInfoCommand(Long productId) {
super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoGroup"));
this.productId=productId;
}
@Override
protected ProductInfo run() throws Exception {
String url = "http://localhost:8081/getProductInfo?productId=" + productId;
String response = HttpClientUtils.sendGetRequest(url);
return JSONObject.parseObject(response, ProductInfo.class);
}
}
@RequestMapping("/getProductInfo")
public String getProductInfo(Long productId){
HystrixCommand<ProductInfo> getInfoCommand=new GetProductInfoCommand(productId);
ProductInfo productInfo = getInfoCommand.execute();
log.info("查询到商品数据:"+productInfo);
return "success";
}
3、请求缓存
实现步骤: 1、在 command 中覆写 getCacheKey 方法
@Override
protected String getCacheKey() {
return "product_info_"+productId;
}
2、拦截器
/**
* @description:hystrix请求上下文过滤器
* @author: mmc
* @create: 2019-11-07 22:32
**/
public class HystrixRequestContextFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try{
filterChain.doFilter(servletRequest, servletResponse);
}catch (Exception e){
e.printStackTrace();
}finally {
context.shutdown();
}
}
@Override
public void destroy() {
}
}
3、注册拦截器到 spring 中
@Bean
public FilterRegistrationBean filterRegistrationBean(){
FilterRegistrationBean filterRegistrationBean=new FilterRegistrationBean(new HystrixRequestContextFilter());
filterRegistrationBean.addUrlPatterns("/*");
return filterRegistrationBean;
}
4、fallback 降级机制
1、什么情况会引起降级?
- 调用接口报错
- 资源池、信号量满了
- 调用接口超时
- 短路器发现异常事件超过一定的比例,直接开启短路
2、代码示例
/**
* @description: 获取品牌名称
* @author: mmc
* @create: 2019-11-09 16:13
**/
public class GetBrandNameCommand extends HystrixCommand<String> {
private Long brandId;
public GetBrandNameCommand(Long brandId){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ProductInfoService"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetProductInfoCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("GetProductInfoPool"))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(15)
.withQueueSizeRejectionThreshold(10))
);
}
@Override
protected String run() throws Exception {
int a=10/0;
return "iphone xs";
}
@Override
protected String getFallback() {
return "iphone";
}
}
5、短路器
- 经过短路器的流量超过一定的阈值
- 如果短路器统计到的异常请求超过一定的阈值
- 短路器从 close 到 open 状态
- 短路打开时,所有请求被降级
- 经过一段时间,会 half-open(半开状态),会让一条请求调用接口看通了没有,如果正常了,那么短路器自动恢复
配置:
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(30) //流量阈值
.withCircuitBreakerErrorThresholdPercentage(40) //异常百分比
.withCircuitBreakerSleepWindowInMilliseconds(3000) //多长时间后去恢复短路器
)
6、限流
配置如下(主要是有注释的那几行代码):
public GetProductInfoCommand(Long productId) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetProductInfoService"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetProductInfoCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("GetProductInfoPool"))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(10) //设置线程池大小
.withMaxQueueSize(8) //设置队列大小
.withQueueSizeRejectionThreshold(10)) //队列达多少后开始拒绝
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(30)
.withCircuitBreakerErrorThresholdPercentage(40)
.withCircuitBreakerSleepWindowInMilliseconds(3000)
.withExecutionTimeoutInMilliseconds(20000)
.withFallbackIsolationSemaphoreMaxConcurrentRequests(30)
)
);
7、请求合并
请求合并技术针对于访问延迟比较高的请求,可以削减线程池资源耗费。
- 请求合并时,可以设置一个 batch size,以及 elapsed time(控制什么时候触发合并后的 command)
- 有两种合并模式
- request scope 收集一个 request context 里面的请求
- global scope 横跨多个 request context 的
代码示例 command:
/**
* @description: 请求合并的获取商品
* @author: mmc
* @create: 2019-11-12 00:00
**/
public class GetProductInfosCollapserCommand extends HystrixCollapser<List<ProductInfo>,ProductInfo,Long> {
private Long productId;
public GetProductInfosCollapserCommand(Long productId) {
this.productId = productId;
}
@Override
public Long getRequestArgument() {
return productId;
}
@Override
protected String getCacheKey() {
return "product_info_"+productId;
}
@Override
protected HystrixCommand<List<ProductInfo>> createCommand(Collection<CollapsedRequest<ProductInfo, Long>> collapsedRequests) {
return new BatchCommand(collapsedRequests);
}
@Override
protected void mapResponseToRequests(List<ProductInfo> batchResponse, Collection<CollapsedRequest<ProductInfo, Long>> collapsedRequests) {
int count=0;
for (CollapsedRequest request:collapsedRequests){
request.setResponse(batchResponse.get(count++));
}
}
private static final class BatchCommand extends HystrixCommand<List<ProductInfo>>{
private final Collection<CollapsedRequest<ProductInfo,Long>> requests;
public BatchCommand(Collection<CollapsedRequest<ProductInfo, Long>> requests) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ProductInfoService"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetProductInfosCollapserBatchCommand")));
this.requests = requests;
}
@Override
protected List<ProductInfo> run() throws Exception {
StringBuilder sb=new StringBuilder();
for (CollapsedRequest request:requests){
sb.append(request.getArgument()+",");
}
String products = sb.substring(0, sb.length()-1);
String url=ServerConfig.INVENTORY_URL+"getProductInfos?productIds="+products;
String result = HttpClientUtils.sendGetRequest(url);
List<ProductInfo> productInfos = JSONArray.parseArray(result, ProductInfo.class);
return productInfos;
}
}
}
控制器:
@RequestMapping("/getProductInfoCollapser")
public String getProductInfoCollapser(String productIds){
List<Future<ProductInfo>> futures=new ArrayList<>();
for (String productId:productIds.split(",")){
GetProductInfosCollapserCommand getProductInfosCollapserCommand=new GetProductInfosCollapserCommand(Long.valueOf(productId));
Future<ProductInfo> future = getProductInfosCollapserCommand.queue();
futures.add(future);
}
for (Future future:futures){
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return "success";
}
8、多级降级
1、在主服务器挂了的时候可以降级到访问备用服务器,备用也挂了的时候访问其他应用服务器 这种多级降级通过嵌套 command 实现
==注意==:做多级降级的时候,要将降级 command 的线程池单独做一个出来,因为主流程的线程池可能是满的,放在一个里面同样会报错
9、手动降级
就是设置一个布尔类型的标识符,当为 true 的时候就降级,再有一个 controller 能修改这个布尔值就行
10、线程池自动扩容,缩容
线程池数量=每秒高峰次数*99%的访问延时+buffer
每秒 30 次的访问,每次请求大概 200ms,buffer 为 4 时 例:30*0.2+4=10
timeout 就取 99.5%的访问时间在加个 50ms
另外一般一个依赖服务线程池不要超过 20 个,如果算出来过大,还不如多启几个服务器来分担
线程池自动扩容,缩容配置:
.withCoreSize(10)
.withMaximumSize(30)
.withAllowMaximumSizeToDivergeFromCoreSize(true) //允许自动扩容
.withKeepAliveTimeMinutes(1) //一定时间内线程少了,再缩容
11、监控
1、安装 metrics stream
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>1.4.10</version>
</dependency>
@Bean
public ServletRegistrationBean indexServletRegistration() {
ServletRegistrationBean registration = new ServletRegistrationBean(new HystrixMetricsStreamServlet());
registration.addUrlMappings("/hystrix.stream");
return registration;
}
2、安装 gradle
3、下载 hystrix-dashboard 的 war 包
下载后放在 tomcat 的 webapp 下, 然后在/WEB-INF/classes 下放置配置文件
config.properties
turbine.ConfigPropertyBasedDiscovery.default.instances=localhost
turbine.instanceUrlSuffix=:8081/hystrix.stream
4、启动 tomcat
输入地址,点击 monitor Streams。
12、缓存雪崩的解决方案
- 事前解决方案
- 保证缓存服务的高可用性,部署多机房的 redis 集群
- 事中解决方案
- 本地的 echace 缓存提供服务,保住缓存服务有部分数据可以访问
- 对 redis 访问做资源隔离
- 对源服务做限流和资源隔离
- 做一些熔断,降级策略
- 事后解决方案
- redis 如果有备份,就恢复数据
- redis 数据如果太旧,就需要重启,并做缓存预热
- 相关的代码实现:
将原有的获取 redis 缓存数据的 service 用 command 包装起来:
//ProductInfo productInfo = productInfoService.getRedisCache(productId);
GetRedisCacheCommand cacheCommand=new GetRedisCacheCommand(productId);
ProductInfo productInfo = cacheCommand.execute();
GetRedisCacheCommand 代码:
/**
* @description: 获取redis缓存
* @author: mmc
* @create: 2019-11-17 18:32
**/
public class GetRedisCacheCommand extends HystrixCommand<ProductInfo> {
private Integer productId;
public GetRedisCacheCommand(Integer productId) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(CommandKey.REDIS_GROUP)));
this.productId = productId;
}
@Override
protected ProductInfo run() throws Exception {
String key="cache_product_"+productId;
RedisDao redisDao= (RedisDao) SpringUtil.getBean("RedisDao");
String s = redisDao.get(key);
ProductInfo productInfo = JSON.parseObject(s, ProductInfo.class);
return productInfo;
}
@Override
protected ProductInfo getFallback() {
return null;
}
}
定制化的熔断策略:
- 超时时间
- 熔断配置
- 恢复配置
public GetRedisCacheCommand(Integer productId) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(CommandKey.REDIS_GROUP))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(1000) //超时
.withCircuitBreakerRequestVolumeThreshold(1000) //滑动窗口中,有多少个qign'q请求,才会触发开启短路
.withCircuitBreakerErrorThresholdPercentage(70) //异常请求百分比达多少时,开启短路
.withCircuitBreakerSleepWindowInMilliseconds(60*1000)) //短路之后,多长时间后尝试恢复
);
this.productId = productId;
}
限流配置:
//限流配置
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(10)
.withMaximumSize(30)
.withAllowMaximumSizeToDivergeFromCoreSize(true)
.withKeepAliveTimeMinutes(1)
.withMaxQueueSize(50)
.withQueueSizeRejectionThreshold(100))
13、缓存穿透
问题描述:访问请求时使用一些数据库不存在的数据,比如访问商品服务,但是传过来的 id=-2,这样就导致数据库肯定查不到值,缓存里也没有值,那么每次的请求都会查询数据库,缓存服务就形同虚设,这就叫缓存穿透
解决:
- 当数据库查不到值时也返回一个默认的值,这样缓存也就有数据了
- 做一个数据更改的异步监控,当一个原本没有值的数据有值了之后要更新到缓存里面去
14、缓存失效保护机制
问题描述:nginx 里的缓存如果失效,所有的请求都会到 redis 上来,导致 redis 压力剧增
解决方案:设置一个随机的失效时间,避免同时失效
cache_ngx:set(shopCacheKey, shopCache, 10 * 60)
改为:
math.randomseed(tostring(os.time()):reverse():sub(1,7))
local expireTime=math.random(600,1200)
cache_ngx:set(shopCacheKey, shopCache,expireTime)
转载自:https://juejin.cn/post/7155507842491875359