likes
comments
collection
share

电商架构技术干货(三)

作者站长头像
站长
· 阅读数 59

五、Hystrix 提供系统高可用性

1、运行流程:

电商架构技术干货(三)

2、可以进行资源隔离。

  • 线程隔离
  • 信号量隔离

电商架构技术干货(三)

线程池:适合绝大多数的场景,对依赖服务有网络请求调用的,可解决 timeout 的问题

访问量:适合不对外部依赖访问的。比如代码里有低效率的代码执行较慢,线程多了就会卡,这种情况可以用访问量的方式解决

线程隔离代码示例:


/**
 * @description: 查询商品数据的command
 * @author: mmc
 * @create: 2019-11-05 23:09
 **/

public class GetProductInfoCommand extends HystrixCommand<ProductInfo> {

    private Long productId;

    public GetProductInfoCommand(Long productId) {
        super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoGroup"));
        this.productId=productId;
    }

    @Override
    protected ProductInfo run() throws Exception {
        String url = "http://localhost:8081/getProductInfo?productId=" + productId;
        String response = HttpClientUtils.sendGetRequest(url);
        return JSONObject.parseObject(response, ProductInfo.class);
    }
}
 @RequestMapping("/getProductInfo")
    public String getProductInfo(Long productId){
        HystrixCommand<ProductInfo> getInfoCommand=new GetProductInfoCommand(productId);
        ProductInfo productInfo = getInfoCommand.execute();


        log.info("查询到商品数据:"+productInfo);
        return "success";
    }

3、请求缓存

实现步骤: 1、在 command 中覆写 getCacheKey 方法

@Override
    protected String getCacheKey() {
        return "product_info_"+productId;
    }

2、拦截器


/**
 * @description:hystrix请求上下文过滤器
 * @author: mmc
 * @create: 2019-11-07 22:32
 **/

public class HystrixRequestContextFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {

    }

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try{
            filterChain.doFilter(servletRequest, servletResponse);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            context.shutdown();
        }
    }

    @Override
    public void destroy() {

    }
}

3、注册拦截器到 spring 中

@Bean
	public FilterRegistrationBean filterRegistrationBean(){
		FilterRegistrationBean filterRegistrationBean=new FilterRegistrationBean(new HystrixRequestContextFilter());
		filterRegistrationBean.addUrlPatterns("/*");
		return filterRegistrationBean;

	}

4、fallback 降级机制

1、什么情况会引起降级?

  • 调用接口报错
  • 资源池、信号量满了
  • 调用接口超时
  • 短路器发现异常事件超过一定的比例,直接开启短路

2、代码示例

/**
 * @description: 获取品牌名称
 * @author: mmc
 * @create: 2019-11-09 16:13
 **/

public class GetBrandNameCommand extends HystrixCommand<String> {

    private Long brandId;

    public GetBrandNameCommand(Long brandId){
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ProductInfoService"))
        .andCommandKey(HystrixCommandKey.Factory.asKey("GetProductInfoCommand"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("GetProductInfoPool"))
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                .withCoreSize(15)
                .withQueueSizeRejectionThreshold(10))
        );
    }

    @Override
    protected String run() throws Exception {
        int a=10/0;
        return "iphone xs";
    }

    @Override
    protected String getFallback() {
        return "iphone";
    }
}

5、短路器

  1. 经过短路器的流量超过一定的阈值
  2. 如果短路器统计到的异常请求超过一定的阈值
  3. 短路器从 close 到 open 状态
  4. 短路打开时,所有请求被降级
  5. 经过一段时间,会 half-open(半开状态),会让一条请求调用接口看通了没有,如果正常了,那么短路器自动恢复

配置:

  .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
    .withCircuitBreakerRequestVolumeThreshold(30)  //流量阈值
    .withCircuitBreakerErrorThresholdPercentage(40)  //异常百分比
    .withCircuitBreakerSleepWindowInMilliseconds(3000)  //多长时间后去恢复短路器
                )

6、限流

配置如下(主要是有注释的那几行代码):


  public GetProductInfoCommand(Long productId) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetProductInfoService"))
        .andCommandKey(HystrixCommandKey.Factory.asKey("GetProductInfoCommand"))
        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("GetProductInfoPool"))
        .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
        .withCoreSize(10)  //设置线程池大小
        .withMaxQueueSize(8)   //设置队列大小
        .withQueueSizeRejectionThreshold(10))   //队列达多少后开始拒绝
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                .withCircuitBreakerRequestVolumeThreshold(30)
                        .withCircuitBreakerErrorThresholdPercentage(40)
                        .withCircuitBreakerSleepWindowInMilliseconds(3000)
                        .withExecutionTimeoutInMilliseconds(20000)
                        .withFallbackIsolationSemaphoreMaxConcurrentRequests(30)
                )
        );

7、请求合并

请求合并技术针对于访问延迟比较高的请求,可以削减线程池资源耗费。

  1. 请求合并时,可以设置一个 batch size,以及 elapsed time(控制什么时候触发合并后的 command)
  2. 有两种合并模式
  • request scope 收集一个 request context 里面的请求
  • global scope 横跨多个 request context 的

代码示例 command:


/**
 * @description: 请求合并的获取商品
 * @author: mmc
 * @create: 2019-11-12 00:00
 **/

public class GetProductInfosCollapserCommand extends HystrixCollapser<List<ProductInfo>,ProductInfo,Long> {

    private Long productId;

    public GetProductInfosCollapserCommand(Long productId) {
        this.productId = productId;
    }

    @Override
    public Long getRequestArgument() {
        return productId;
    }

    @Override
    protected String getCacheKey() {
        return "product_info_"+productId;
    }

    @Override
    protected HystrixCommand<List<ProductInfo>> createCommand(Collection<CollapsedRequest<ProductInfo, Long>> collapsedRequests) {

        return new BatchCommand(collapsedRequests);
    }

    @Override
    protected void mapResponseToRequests(List<ProductInfo> batchResponse, Collection<CollapsedRequest<ProductInfo, Long>> collapsedRequests) {
        int count=0;
        for (CollapsedRequest request:collapsedRequests){
            request.setResponse(batchResponse.get(count++));
        }
    }


    private static final class BatchCommand extends HystrixCommand<List<ProductInfo>>{

        private final Collection<CollapsedRequest<ProductInfo,Long>> requests;

        public BatchCommand(Collection<CollapsedRequest<ProductInfo, Long>> requests) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ProductInfoService"))
            .andCommandKey(HystrixCommandKey.Factory.asKey("GetProductInfosCollapserBatchCommand")));
            this.requests = requests;
        }

        @Override
        protected List<ProductInfo> run() throws Exception {
            StringBuilder sb=new StringBuilder();
            for (CollapsedRequest request:requests){
                sb.append(request.getArgument()+",");
            }
            String products = sb.substring(0, sb.length()-1);
            String url=ServerConfig.INVENTORY_URL+"getProductInfos?productIds="+products;
            String result = HttpClientUtils.sendGetRequest(url);
            List<ProductInfo> productInfos = JSONArray.parseArray(result, ProductInfo.class);
            return productInfos;
        }
    }
}

控制器:

 @RequestMapping("/getProductInfoCollapser")
    public String getProductInfoCollapser(String productIds){
        List<Future<ProductInfo>> futures=new ArrayList<>();
        for (String productId:productIds.split(",")){
            GetProductInfosCollapserCommand getProductInfosCollapserCommand=new GetProductInfosCollapserCommand(Long.valueOf(productId));
            Future<ProductInfo> future = getProductInfosCollapserCommand.queue();
            futures.add(future);
        }
        for (Future future:futures){
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return "success";
    }

8、多级降级

1、在主服务器挂了的时候可以降级到访问备用服务器,备用也挂了的时候访问其他应用服务器 这种多级降级通过嵌套 command 实现

==注意==:做多级降级的时候,要将降级 command 的线程池单独做一个出来,因为主流程的线程池可能是满的,放在一个里面同样会报错

9、手动降级

就是设置一个布尔类型的标识符,当为 true 的时候就降级,再有一个 controller 能修改这个布尔值就行

10、线程池自动扩容,缩容

线程池数量=每秒高峰次数*99%的访问延时+buffer

每秒 30 次的访问,每次请求大概 200ms,buffer 为 4 时 例:30*0.2+4=10

timeout 就取 99.5%的访问时间在加个 50ms

另外一般一个依赖服务线程池不要超过 20 个,如果算出来过大,还不如多启几个服务器来分担

线程池自动扩容,缩容配置:

.withCoreSize(10)
        .withMaximumSize(30)
        .withAllowMaximumSizeToDivergeFromCoreSize(true)    //允许自动扩容
        .withKeepAliveTimeMinutes(1)   //一定时间内线程少了,再缩容

11、监控

1、安装 metrics stream

<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-metrics-event-stream</artifactId>
    <version>1.4.10</version>
</dependency>
@Bean
public ServletRegistrationBean indexServletRegistration() {
    ServletRegistrationBean registration = new ServletRegistrationBean(new HystrixMetricsStreamServlet());
    registration.addUrlMappings("/hystrix.stream");
    return registration;
}

2、安装 gradle

3、下载 hystrix-dashboard 的 war 包

下载后放在 tomcat 的 webapp 下, 然后在/WEB-INF/classes 下放置配置文件

config.properties

turbine.ConfigPropertyBasedDiscovery.default.instances=localhost
turbine.instanceUrlSuffix=:8081/hystrix.stream

4、启动 tomcat

电商架构技术干货(三)

输入地址,点击 monitor Streams。

12、缓存雪崩的解决方案

  1. 事前解决方案
  • 保证缓存服务的高可用性,部署多机房的 redis 集群
  1. 事中解决方案
  • 本地的 echace 缓存提供服务,保住缓存服务有部分数据可以访问
  • 对 redis 访问做资源隔离
  • 对源服务做限流和资源隔离
  • 做一些熔断,降级策略
  1. 事后解决方案
  • redis 如果有备份,就恢复数据
  • redis 数据如果太旧,就需要重启,并做缓存预热
  1. 相关的代码实现:

将原有的获取 redis 缓存数据的 service 用 command 包装起来:

  //ProductInfo productInfo = productInfoService.getRedisCache(productId);
        GetRedisCacheCommand cacheCommand=new GetRedisCacheCommand(productId);
        ProductInfo productInfo = cacheCommand.execute();

GetRedisCacheCommand 代码:

/**
 * @description: 获取redis缓存
 * @author: mmc
 * @create: 2019-11-17 18:32
 **/

public class GetRedisCacheCommand extends HystrixCommand<ProductInfo> {

    private Integer productId;



    public GetRedisCacheCommand(Integer productId) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(CommandKey.REDIS_GROUP)));
        this.productId = productId;
    }

    @Override
    protected ProductInfo run() throws Exception {
        String key="cache_product_"+productId;
        RedisDao redisDao= (RedisDao) SpringUtil.getBean("RedisDao");
        String s = redisDao.get(key);
        ProductInfo productInfo = JSON.parseObject(s, ProductInfo.class);
        return productInfo;
    }

    @Override
    protected ProductInfo getFallback() {
        return null;
    }
}

定制化的熔断策略:

  • 超时时间
  • 熔断配置
  • 恢复配置
 public GetRedisCacheCommand(Integer productId) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(CommandKey.REDIS_GROUP))
            .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                .withExecutionTimeoutInMilliseconds(1000)  //超时
                .withCircuitBreakerRequestVolumeThreshold(1000)    //滑动窗口中,有多少个qign'q请求,才会触发开启短路
                .withCircuitBreakerErrorThresholdPercentage(70)   //异常请求百分比达多少时,开启短路
                .withCircuitBreakerSleepWindowInMilliseconds(60*1000))   //短路之后,多长时间后尝试恢复
        );
        this.productId = productId;
    }

限流配置:

 //限流配置
            .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                .withCoreSize(10)
                .withMaximumSize(30)
                .withAllowMaximumSizeToDivergeFromCoreSize(true)
                .withKeepAliveTimeMinutes(1)
                .withMaxQueueSize(50)
                .withQueueSizeRejectionThreshold(100))

13、缓存穿透

问题描述:访问请求时使用一些数据库不存在的数据,比如访问商品服务,但是传过来的 id=-2,这样就导致数据库肯定查不到值,缓存里也没有值,那么每次的请求都会查询数据库,缓存服务就形同虚设,这就叫缓存穿透

解决:

  1. 当数据库查不到值时也返回一个默认的值,这样缓存也就有数据了
  2. 做一个数据更改的异步监控,当一个原本没有值的数据有值了之后要更新到缓存里面去

14、缓存失效保护机制

问题描述:nginx 里的缓存如果失效,所有的请求都会到 redis 上来,导致 redis 压力剧增

解决方案:设置一个随机的失效时间,避免同时失效

 cache_ngx:set(shopCacheKey, shopCache, 10 * 60)

改为:

math.randomseed(tostring(os.time()):reverse():sub(1,7))
        local expireTime=math.random(600,1200)
        cache_ngx:set(shopCacheKey, shopCache,expireTime)
转载自:https://juejin.cn/post/7155507842491875359
评论
请登录