likes
comments
collection
share

Sentinel源码(八)集成其他框架

作者站长头像
站长
· 阅读数 8

前言

本章学习Sentinel与其他框架适配的相关模块源码,包括:

  1. AspectJ:用AOP的方式执行Sentinel核心API;
  2. SpringMVC:拦截Controller方法进行流量防护;
  3. Dubbo:拦截服务接口和方法(包括调用端和服务端)进行流量防护;
  4. SpringCloud:Sentinel适配RestTemplate和Feign;

一、AspectJ

Sentinel的核心API都是在操作资源方法前后做操作,包括统计流量、规则校验、记录异常等等。

sentinel-annotation-aspectj模块使用AspectJ,提供了Sentinel对于AOP的支持。

使用方式如下:

Step1:注入切面。

@Configuration
public class AopConfiguration {

    @Bean
    public SentinelResourceAspect sentinelResourceAspect() {
        return new SentinelResourceAspect();
    }
}

Step2:使用SentinelResource注解在目标资源方法上。这样会在目标方法执行前执行SphU.entry方法,在目标方法执行后执行Entry.exit方法,当发生Exception会走对应异常处理类。

@Override
@SentinelResource(value = "hello", fallback = "helloFallback")
public String hello(long s) {
    if (s < 0) {
        throw new IllegalArgumentException("invalid arg");
    }
    return String.format("Hello at %d", s);
}

1、SentinelResource注解

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SentinelResource {
    // 资源名称
    String value() default "";
    // 流量方向,IN入口,OUT出口,默认出口。如果要使用系统规则,记得改为IN。
    EntryType entryType() default EntryType.OUT;
    // 资源类型 0-普通 1-web(http) 2-rpc 3-gateway
    int resourceType() default 0;

    // 处理Block Exception
    String blockHandler() default "";
    Class<?>[] blockHandlerClass() default {};

    // 处理非Block Exception
    String fallback() default "";
    String defaultFallback() default "";
    Class<?>[] fallbackClass() default {};
    // 哪些异常需要设置到Entry里,用于错误率错误数统计,应用于降级规则
    Class<? extends Throwable>[] exceptionsToTrace() default {Throwable.class};
    // 与exceptionsToTrace相反
    Class<? extends Throwable>[] exceptionsToIgnore() default {};
}

SentinelResource注解包含三部分属性:

  1. 用于SphU.entry方法:value-资源名称;entryType-流量方向;resourceType-资源类型;
  2. BlockException处理:blockHandler-处理BlockException;blockHandlerClass-默认情况下,blockHandler必须在本类中,如果不在本类中,可以通过配置blockHandlerClass,在其他类中,但必须是静态方法;
  3. 非BlockException处理:exceptionsToTrace-哪些异常需要处理;exceptionsToIgnore-哪些异常不需要处理;fallback-降级方法;defaultFallback-默认降级方法;fallbackClass-默认情况下,降级方法必须在本类中,如果不在本类中,可以通过配置fallbackClass,在其他类中,但必须是静态方法;

2、SentinelResourceAspect

SentinelResourceAspect定义了Pointcut为所有SentinelResource注释的方法。

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        Method originMethod = resolveMethod(pjp);
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }
        // 资源名称特殊逻辑
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();
        Entry entry = null;
        try {
            // entry
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            Object result = pjp.proceed();
            return result;
        } catch (BlockException ex) {
            // 处理Block异常
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            // 处理其他异常
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                return handleFallback(pjp, annotation, ex);
            }

            throw ex;
        } finally {
            // 退出
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}

环绕方法invokeResourceWithSentinel注意几个点:

  1. getResourceName获取资源名称;
  2. handleBlockException处理BlockException;
  3. 处理其他Exception;

资源名称优先取注解上的value属性,如果用户没配置,则取类+方法签名作为资源名称

如com.alibaba.csp.sentinel.util.MethodUtil:resolveMethodName(java.lang.reflect.Method)

// AbstractSentinelAspectSupport.java
protected String getResourceName(String resourceName, Method method) {
    if (StringUtil.isNotBlank(resourceName)) {
        return resourceName;
    }
    return MethodUtil.resolveMethodName(method);
}

handleBlockException方法,处理BlockException。

// AbstractSentinelAspectSupport.java
protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
    throws Throwable {
    // 1. 找到方法
    Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),
        annotation.blockHandlerClass());
    if (blockHandlerMethod != null) {
        // 2. 方法入参附加上BlockException
        Object[] originArgs = pjp.getArgs();
        Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
        args[args.length - 1] = ex;
        try {
            // 3. 执行方法
            if (isStatic(blockHandlerMethod)) {
                return blockHandlerMethod.invoke(null, args);
            }
            return blockHandlerMethod.invoke(pjp.getTarget(), args);
        } catch (InvocationTargetException e) {
            throw e.getTargetException();
        }
    }

    // 4. 如果没找到blockHandler,使用fallback方法
    return handleFallback(pjp, annotation, ex);
}

首先根据注解中的blockHandler和blockHandlerClass找到BlockException处理方法。如果没找到blockHandler,不会抛异常,会执行fallback方法兜底

// AbstractSentinelAspectSupport.java
private Method extractBlockHandlerMethod(ProceedingJoinPoint pjp, String name, Class<?>[] locationClass) {
    if (StringUtil.isBlank(name)) {
        return null;
    }
    boolean mustStatic = locationClass != null && locationClass.length >= 1;
    Class<?> clazz;
    if (mustStatic) {
        // 如果配置了blockHandlerClass,则优先从目标Class找对应静态方法
        clazz = locationClass[0];
    } else {
        // 否则取当前类力的blockHandler
        clazz = pjp.getTarget().getClass();
    }
    // ResourceMetadataRegistry缓存blockHandler
    MethodWrapper m = ResourceMetadataRegistry.lookupBlockHandler(clazz, name);
    if (m == null) {
        // 解析得到目标handler,会递归到父类去找
        Method method = resolveBlockHandlerInternal(pjp, name, clazz, mustStatic);
        // 放到缓存里
        ResourceMetadataRegistry.updateBlockHandlerFor(clazz, name, method);
        return method;
    }
    if (!m.isPresent()) {
        return null;
    }
    return m.getMethod();
}

对于非BlockException,首先通过注解中的exceptionsToIgnore和exceptionsToTrace属性,过滤Throwable是否要处理,默认情况下所有异常都会被处理。

// SentinelResourceAspect.java
// invokeResourceWithSentinel方法
// 处理其他异常
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
      throw ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
      traceException(ex);
      return handleFallback(pjp, annotation, ex);
}

traceException方法将异常记录到当前Entry中,后续exit时让StatisticSlot统计错误率和错误数,用于系统规则,DegradeSlot统计错误率和错误数,用于降级规则。

// AbstractSentinelAspectSupport.java
protected void traceException(Throwable ex) {
  Tracer.trace(ex);
}

handleFallback方法优先选择注解中的fallback方法,其次选择defaultFallback方法,如果都没找到,抛出原始一样。

// AbstractSentinelAspectSupport.java
protected Object handleFallback(ProceedingJoinPoint pjp, String fallback, String defaultFallback,
                                Class<?>[] fallbackClass, Throwable ex) throws Throwable {
    Object[] originArgs = pjp.getArgs();
    // LEVEL1 : 取定制fallback
    Method fallbackMethod = extractFallbackMethod(pjp, fallback, fallbackClass);
    if (fallbackMethod != null) {
        // 把Exception加入参数列表
        int paramCount = fallbackMethod.getParameterTypes().length;
        Object[] args;
        if (paramCount == originArgs.length) {
            args = originArgs;
        } else {
            args = Arrays.copyOf(originArgs, originArgs.length + 1);
            args[args.length - 1] = ex;
        }
        // 执行
        try {
            if (isStatic(fallbackMethod)) {
                return fallbackMethod.invoke(null, args);
            }
            return fallbackMethod.invoke(pjp.getTarget(), args);
        } catch (InvocationTargetException e) {
            throw e.getTargetException();
        }
    }
    // LEVEL2 : 取默认定制fallback,如果定制fallback不存在,则抛出原始异常
    return handleDefaultFallback(pjp, defaultFallback, fallbackClass, ex);
}

二、SpringMVC

Sentinel与SpringMVC集成利用了SpringMVC的HandlerInterceptor扩展点,在DispatcherServlet的doDispatch时,拦截所有Controller方法。

Sentinel官方提供案例InterceptorConfig配置SentinelWebInterceptor拦截器用于适配SpringMVC。

@Configuration
public class InterceptorConfig implements WebMvcConfigurer {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        addSpringMvcInterceptor(registry);
    }

    private void addSpringMvcInterceptor(InterceptorRegistry registry) {
        SentinelWebMvcConfig config = new SentinelWebMvcConfig();
        // 处理BlockException
        config.setBlockExceptionHandler(new DefaultBlockExceptionHandler());
        // 资源名称是否带上Http Method区分
        config.setHttpMethodSpecify(true);
        // 是否区分上下文,如果区分,每个资源对应一个上下文
        config.setWebContextUnify(true);
        // 解析来源origin
        config.setOriginParser(request -> request.getHeader("S-user"));
        // 拦截所有请求
        registry.addInterceptor(new SentinelWebInterceptor(config)).addPathPatterns("/**");
    }
}

AbstractSentinelInterceptor实现了所有拦截逻辑。

public abstract class AbstractSentinelInterceptor implements HandlerInterceptor {
}

执行Controller方法之前,执行preHandle方法。

// AbstractSentinelInterceptor.java
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
    throws Exception {
    try {
        // 1. 子类获取资源名称
        String resourceName = getResourceName(request);
        if (StringUtil.isEmpty(resourceName)) {
            return true;
        }
        // 2. 对于springMVC的forward请求忽略
        if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
            return true;
        }
        // 3. 获取来源
        String origin = parseOrigin(request);
        // 4. 获取上下文名称,默认sentinel_spring_web_context
        String contextName = getContextName(request);
        // 5. 切换上下文
        ContextUtil.enter(contextName, origin);
        // 6. entry 规则校验
        Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
        // 7. 在request.attribute中传递Entry
        request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
        return true;
    } catch (BlockException e) {
        // 8. Block异常处理
        try {
            handleBlockException(request, response, e);
        } finally {
            ContextUtil.exit();
        }
        return false;
    }
}

1、根据request获取resource名称

这步根据子类有不同的实现。

SentinelWebInterceptor默认会使用spring mvc requestMapping里的url作为resource

public class SentinelWebInterceptor extends AbstractSentinelInterceptor {

    private final SentinelWebMvcConfig config;

    @Override
    protected String getResourceName(HttpServletRequest request) {
        // 获取url
        Object resourceNameObject = request.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
        if (resourceNameObject == null || !(resourceNameObject instanceof String)) {
            return null;
        }
        String resourceName = (String) resourceNameObject;
        // 给用户扩展,用于加工url
        UrlCleaner urlCleaner = config.getUrlCleaner();
        if (urlCleaner != null) {
            resourceName = urlCleaner.clean(resourceName);
        }
        // 如果配置要求区分请求method类型,返回如 GET:/hello
        if (StringUtil.isNotEmpty(resourceName) && config.isHttpMethodSpecify()) {
            resourceName = request.getMethod().toUpperCase() + ":" + resourceName;
        }
        // 否则直接返回spring web的url 如 /hello
        return resourceName;
    }
}

此外,用户可以实现UrlCleaner接口,用于对url做二次处理。比如将/hello/{id},修改为/hello。

public interface UrlCleaner {
    String clean(String originUrl);
}

如果同一个资源可以同时使用多种Http Method请求,如同时支持POST和GET,可以通过配置httpMethodSpecify区分不同请求方式的同一资源,资源名称如:GET:/hello。

SentinelWebTotalInterceptor实现类,取静态配置作为资源名称。这个Sentinel拦截器是针对Spring Web的全局拦截器,所以资源名称是静态的。

public class SentinelWebTotalInterceptor extends AbstractSentinelInterceptor {

    private final SentinelWebMvcTotalConfig config;

    @Override
    protected String getResourceName(HttpServletRequest request) {
        return config.getTotalResourceName();
    }
}

2、放行forward请求

针对Spring MVC的forward特性,Sentinel只会对首次请求资源做规则校验和流量统计。

increaseReferece方法就是判断同一个客户端请求HttpServletRequest,是否经过AbstractSentinelInterceptor超过1次,如果重复进入,重复的请求不会做任何处理,直接放行。

// AbstractSentinelInterceptor.java
private Integer increaseReferece(HttpServletRequest request, String rcKey, int step) {
    Object obj = request.getAttribute(rcKey);
    if (obj == null) {
        obj = Integer.valueOf(0);
    }
    Integer newRc = (Integer)obj + step;
    request.setAttribute(rcKey, newRc);
    return newRc;
}

3、获取来源

来源需要用户配置RequestOriginParser并实现,默认origin=空字符串。

// AbstractSentinelInterceptor.java
protected String parseOrigin(HttpServletRequest request) {
    String origin = EMPTY_ORIGIN;
    if (baseWebMvcConfig.getOriginParser() != null) {
        origin = baseWebMvcConfig.getOriginParser().parseOrigin(request);
        if (StringUtil.isEmpty(origin)) {
            return EMPTY_ORIGIN;
        }
    }
    return origin;
}

RequestOriginParser的parseOrigin方法从HttpServletRequest中解析出origin。

public interface RequestOriginParser {
    String parseOrigin(HttpServletRequest request);
}

官方案例从请求头中获取origin。

config.setOriginParser(request -> request.getHeader("S-user"));

4、获取上下文名称

默认情况下,上下文名称是sentinel_spring_web_context。

// AbstractSentinelInterceptor.java
protected String getContextName(HttpServletRequest request) {
    // sentinel_spring_web_context
    return SENTINEL_SPRING_WEB_CONTEXT_NAME;
}

如果使用SentinelWebInterceptor拦截器,配置webContextUnify=false,支持使用资源名称作为上下文名称。

开启这项配置,能够支持流控模式为链路的流控规则,其他情况下无用。

// SentinelWebInterceptor.java
@Override
protected String getContextName(HttpServletRequest request) {
    // 默认webContextUnify=true,使用sentinel_spring_web_context
    if (config.isWebContextUnify()) {
        return super.getContextName(request);
    }

    // 设置webContextUnify=false,可以取资源名称作为上下文名称
    return getResourceName(request);
}

接下来5-7步没什么特别的

5、处理BlockException

用户可以实现BlockExceptionHandler并配置,处理BlockException。否则需要用户使用Spring的全局异常处理,来处理BlockException。

// AbstractSentinelInterceptor.java
protected void handleBlockException(HttpServletRequest request, HttpServletResponse response, BlockException e)
    throws Exception {
    if (baseWebMvcConfig.getBlockExceptionHandler() != null) {
        baseWebMvcConfig.getBlockExceptionHandler().handle(request, response, e);
    } else {
        throw e;
    }
}

public interface BlockExceptionHandler {
    void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception;
}

官方提供默认BlockExceptionHandler实现,返回http状态码429,响应体Blocked by Sentinel (flow limiting)。

public class DefaultBlockExceptionHandler implements BlockExceptionHandler {

    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
        response.setStatus(429);
        PrintWriter out = response.getWriter();
        out.print("Blocked by Sentinel (flow limiting)");
        out.flush();
        out.close();
    }
}

6、退出

Controller方法执行完毕,无论是否抛出异常,经过afterCompletion方法。

// AbstractSentinelInterceptor.java
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
                            Object handler, Exception ex) throws Exception {
    // 1. 对于springMVC的forward请求忽略
    if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), -1) != 0) {
        return;
    }

    // 2. 从request.attribute获取Entry
    Entry entry = getEntryInRequest(request, baseWebMvcConfig.getRequestAttributeName());
    if (entry == null) {
        // should not happen
        RecordLog.warn("[{}] No entry found in request, key: {}",
                getClass().getSimpleName(), baseWebMvcConfig.getRequestAttributeName());
        return;
    }

    // 3. 记录异常 并 退出entry
    traceExceptionAndExit(entry, ex);
    // 4. 从request.attribute移除Entry
    removeEntryInRequest(request);
    // 5. 退出上下文
    ContextUtil.exit();
}

其他都好理解。第三步Tracer.traceEntry记录异常是为了降级规则统计错误率和错误数,最后退出当前Entry。

// AbstractSentinelInterceptor.java
protected void traceExceptionAndExit(Entry entry, Exception ex) {
    if (entry != null) {
        if (ex != null) {
            Tracer.traceEntry(ex, entry);
        }
        entry.exit();
    }
}

三、Dubbo

1、使用

用户只需要引入sentinel-apache-dubbo-adapter依赖,即可在Dubbo中接入Sentinel。

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-apache-dubbo-adapter</artifactId>
    <version>x.y.z</version>
</dependency>

Dubbo 的服务接口和方法(包括调用端和服务端)会自动成为 Sentinel 中的资源

对于接口,资源为接口全限定名,如com.alibaba.csp.sentinel.demo.apache.dubbo.FooService;

对于方法,资源为接口全限定名:方法签名,如com.alibaba.csp.sentinel.demo.apache.dubbo.FooService:sayHello(java.lang.String)。

对于Provider,建议配置QPS模式的限流,保护服务提供方不被激增的流量拖垮影响稳定性。

对于Consumer,建议配置线程数模式的限流,来保证自身不被不稳定服务所影响。

sentinel-apache-dubbo-adapter底层利用Dubbo SPI加载了三个Filter,可以通过配置禁用部分Sentinel功能。

@Bean
public ConsumerConfig consumerConfig() {
    ConsumerConfig consumerConfig = new ConsumerConfig();
    // 禁用作为服务消费方的Sentinel功能
    consumerConfig.setFilter("-sentinel.dubbo.consumer.filter");
    return consumerConfig;
}

2、Consumer

调用方有两个重要的Filter。

DubboAppContextFilter提取DubboURL中的application参数(应用名),作为attachment保存到RpcContext中,其中attachment的key为dubboApplication

attachment在接口调用中会传给Provider。

@Activate(group = CONSUMER)
public class DubboAppContextFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String application = invoker.getUrl().getParameter(CommonConstants.APPLICATION_KEY);
        if (application != null) {
            RpcContext.getContext().setAttachment(DubboUtils.SENTINEL_DUBBO_APPLICATION_KEY, application);
        }
        return invoker.invoke(invocation);
    }
}

SentinelDubboConsumerFilter负责执行Sentinel的核心API。

@Activate(group = CONSUMER)
public class SentinelDubboConsumerFilter extends BaseSentinelDubboFilter {

    public SentinelDubboConsumerFilter() {
        RecordLog.info("Sentinel Apache Dubbo consumer filter initialized");
    }

    @Override
    String getMethodName(Invoker invoker, Invocation invocation, String prefix) {
        return DubboUtils.getMethodResourceName(invoker, invocation, prefix);
    }

    @Override
    String getInterfaceName(Invoker invoker, String prefix) {
        return DubboUtils.getInterfaceName(invoker, prefix);
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        InvokeMode invokeMode = RpcUtils.getInvokeMode(invoker.getUrl(), invocation);
        if (InvokeMode.SYNC == invokeMode) {
            return syncInvoke(invoker, invocation);
        } else {
            return asyncInvoke(invoker, invocation);
        }
    }
}

syncInvoke同步调用。

// SentinelDubboConsumerFilter.java
private Result syncInvoke(Invoker<?> invoker, Invocation invocation) {
    Entry interfaceEntry = null;
    Entry methodEntry = null;
    // 前缀,默认为空
    String prefix = DubboAdapterGlobalConfig.getDubboConsumerResNamePrefixKey();
    // 接口资源名
    String interfaceResourceName = getInterfaceName(invoker, prefix);
    // 方法资源名
    String methodResourceName = getMethodName(invoker, invocation, prefix);
    try {
       // 接口
        interfaceEntry = SphU.entry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT);
       // 接口方法
        methodEntry = SphU.entry(methodResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT,
            invocation.getArguments());
        // 后续逻辑(远程调用)
        Result result = invoker.invoke(invocation);
        // 记录异常,用于降级规则
        if (result.hasException()) {
            Tracer.traceEntry(result.getException(), interfaceEntry);
            Tracer.traceEntry(result.getException(), methodEntry);
        }
        return result;
    } catch (BlockException e) {
        // 全局配置中,获取DubboFallback接口实现类处理BlockException
        return DubboAdapterGlobalConfig.getConsumerFallback().handle(invoker, invocation, e);
    } catch (RpcException e) {
        // 记录RPC异常,用于降级规则
        Tracer.traceEntry(e, interfaceEntry);
        Tracer.traceEntry(e, methodEntry);
        throw e;
    } finally {
        // 退出
        if (methodEntry != null) {
            methodEntry.exit(1, invocation.getArguments());
        }
        if (interfaceEntry != null) {
            interfaceEntry.exit();
        }
    }
}

asyncInvoke异步调用,与同步调用类似,利用了AsyncEntry。

// SentinelDubboConsumerFilter.java
private Result asyncInvoke(Invoker<?> invoker, Invocation invocation) {
    LinkedList<EntryHolder> queue = new LinkedList<>();
    String prefix = DubboAdapterGlobalConfig.getDubboConsumerResNamePrefixKey();
    String interfaceResourceName = getInterfaceName(invoker, prefix);
    String methodResourceName = getMethodName(invoker, invocation, prefix);
    try {
        queue.push(new EntryHolder(
            SphU.asyncEntry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT), null));
        queue.push(new EntryHolder(
            SphU.asyncEntry(methodResourceName, ResourceTypeConstants.COMMON_RPC,
                EntryType.OUT, 1, invocation.getArguments()), invocation.getArguments()));
        Result result = invoker.invoke(invocation);
        result.whenCompleteWithContext((r, throwable) -> {
            // 处理非Block异常
            Throwable error = throwable;
            if (error == null) {
                error = Optional.ofNullable(r).map(Result::getException).orElse(null);
            }
            while (!queue.isEmpty()) {
                EntryHolder holder = queue.pop();
                // 记录异常
                Tracer.traceEntry(error, holder.entry);
                // 退出
                exitEntry(holder);
            }
        });
        return result;
    } catch (BlockException e) {
        while (!queue.isEmpty()) {
            exitEntry(queue.pop());
        }
        return DubboAdapterGlobalConfig.getConsumerFallback().handle(invoker, invocation, e);
    }
}

Sentinel适配Dubbo,可以注册自定义DubboFallback方法,处理BlockException。(注意,非BlockException是不会进入fallback的)

@FunctionalInterface
public interface DubboFallback {
    Result handle(Invoker<?> invoker, Invocation invocation, BlockException ex);
}

// demo
public static void registryCustomFallbackForCustomException() {
    DubboAdapterGlobalConfig.setConsumerFallback(
            (invoker, invocation, ex) -> AsyncRpcResult.newDefaultAsyncResult(new RuntimeException("fallback"), invocation));
}

默认情况下,将BlockException转换为RuntimeException返回。

public class DefaultDubboFallback implements DubboFallback {

    @Override
    public Result handle(Invoker<?> invoker, Invocation invocation, BlockException ex) {
        return AsyncRpcResult.newDefaultAsyncResult(ex.toRuntimeException(), invocation);
    }
}

3、Provider

SentinelDubboProviderFilter负责执行Sentinel核心API

@Activate(group = PROVIDER)
public class SentinelDubboProviderFilter extends BaseSentinelDubboFilter {

    public SentinelDubboProviderFilter() {
        RecordLog.info("Sentinel Apache Dubbo provider filter initialized");
    }

    @Override
    String getMethodName(Invoker invoker, Invocation invocation, String prefix) {
        return DubboUtils.getMethodResourceName(invoker, invocation, prefix);
    }

    @Override
    String getInterfaceName(Invoker invoker, String prefix) {
        return DubboUtils.getInterfaceName(invoker, prefix);
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 解析来源系统
        String origin = DubboAdapterGlobalConfig.getOriginParser().parse(invoker, invocation);
        if (null == origin) {
            origin = "";
        }
        Entry interfaceEntry = null;
        Entry methodEntry = null;
        String prefix = DubboAdapterGlobalConfig.getDubboProviderResNamePrefixKey();
        // 解析资源名
        String interfaceResourceName = getInterfaceName(invoker, prefix);
        String methodResourceName = getMethodName(invoker, invocation, prefix);
        try {
            // 根据方法资源名,进入上下文
            ContextUtil.enter(methodResourceName, origin);
            // entry
            interfaceEntry = SphU.entry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.IN);
            methodEntry = SphU.entry(methodResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.IN,
                invocation.getArguments());
            // 业务逻辑
            Result result = invoker.invoke(invocation);
            // 记录异常
            if (result.hasException()) {
                Tracer.traceEntry(result.getException(), interfaceEntry);
                Tracer.traceEntry(result.getException(), methodEntry);
            }
            return result;
        } catch (BlockException e) {
            // fallback
            return DubboAdapterGlobalConfig.getProviderFallback().handle(invoker, invocation, e);
        } catch (RpcException e) {
            // 记录RPC异常
            Tracer.traceEntry(e, interfaceEntry);
            Tracer.traceEntry(e, methodEntry);
            throw e;
        } finally {
            // 退出
            if (methodEntry != null) {
                methodEntry.exit(1, invocation.getArguments());
            }
            if (interfaceEntry != null) {
                interfaceEntry.exit();
            }
            ContextUtil.exit();
        }
    }

}

默认情况下,Provider从attachment中的key=dubboApplication中获取来源系统。

public class DefaultDubboOriginParser implements DubboOriginParser {

    @Override
    public String parse(Invoker<?> invoker, Invocation invocation) {
        return DubboUtils.getApplication(invocation, "");
    }

}
// DubboUtils.java
public static String getApplication(Invocation invocation, String defaultValue) {
  if (invocation == null || invocation.getAttachments() == null) {
    throw new IllegalArgumentException("Bad invocation instance");
  }
  return invocation.getAttachment(SENTINEL_DUBBO_APPLICATION_KEY, defaultValue);
}

四、SpringCloud

spring-cloud-starter-alibaba-sentinel模块提供了Sentinel对SpringCloud的支持。本节基于2.2.5.RELEASE版本。

  1. Sentinel支持RestTemplate
  2. Sentinel支持Feign
  3. SpringMVC自动配置Sentinel拦截器

1、支持RestTemplate

通过Java Config方式注册RestTemplate,只需要在方法上加上SentinelRestTemplate注解,即可接入Sentinel。

@Configuration
public static class TestConfig1 {
   @Bean
   @SentinelRestTemplate(fallback = "fbk")
   RestTemplate restTemplate() {
      return new RestTemplate();
   }
}

原因是SentinelAutoConfiguration自动配置类注入了SentinelBeanPostProcessor

// SentinelAutoConfiguration.java
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.web.client.RestTemplate")
@ConditionalOnProperty(name = "resttemplate.sentinel.enabled", havingValue = "true",
                       matchIfMissing = true)
public SentinelBeanPostProcessor sentinelBeanPostProcessor(
  ApplicationContext applicationContext) {
  return new SentinelBeanPostProcessor(applicationContext);
}

SentinelBeanPostProcessor在Bean初始化以后,对于RestTemplate加入了SentinelProtectInterceptor拦截器。

// SentinelBeanPostProcessor.java
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
      throws BeansException {
      // ...
      RestTemplate restTemplate = (RestTemplate) bean;
      String interceptorBeanName = interceptorBeanNamePrefix + "@"
            + bean.toString();
      registerBean(interceptorBeanName, sentinelRestTemplate, (RestTemplate) bean);
      SentinelProtectInterceptor sentinelProtectInterceptor = applicationContext
            .getBean(interceptorBeanName, SentinelProtectInterceptor.class);
      restTemplate.getInterceptors().add(0, sentinelProtectInterceptor);
}
// 注册SentinelProtectInterceptor拦截器到Spring容器
private void registerBean(String interceptorBeanName,
      SentinelRestTemplate sentinelRestTemplate, RestTemplate restTemplate) {
   DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext
         .getAutowireCapableBeanFactory();
   BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder
         .genericBeanDefinition(SentinelProtectInterceptor.class);
   beanDefinitionBuilder.addConstructorArgValue(sentinelRestTemplate);
   beanDefinitionBuilder.addConstructorArgValue(restTemplate);
   BeanDefinition interceptorBeanDefinition = beanDefinitionBuilder
         .getRawBeanDefinition();
   beanFactory.registerBeanDefinition(interceptorBeanName,
         interceptorBeanDefinition);
}

SentinelProtectInterceptor的intercept方法,拦截所有请求,执行Sentinel API。

public class SentinelProtectInterceptor implements ClientHttpRequestInterceptor {

   private final SentinelRestTemplate sentinelRestTemplate;

   private final RestTemplate restTemplate;

   public SentinelProtectInterceptor(SentinelRestTemplate sentinelRestTemplate,
         RestTemplate restTemplate) {
      this.sentinelRestTemplate = sentinelRestTemplate;
      this.restTemplate = restTemplate;
   }

   @Override
   public ClientHttpResponse intercept(HttpRequest request, byte[] body,
         ClientHttpRequestExecution execution) throws IOException {
      URI uri = request.getURI();
      // host资源
      String hostResource = request.getMethod().toString() + ":" + uri.getScheme()
            + "://" + uri.getHost()
            + (uri.getPort() == -1 ? "" : ":" + uri.getPort());
      // host+path资源
      String hostWithPathResource = hostResource + uri.getPath();
      boolean entryWithPath = true;
      if (hostResource.equals(hostWithPathResource)) {
         entryWithPath = false;
      }
      // URL处理
      Method urlCleanerMethod = BlockClassRegistry.lookupUrlCleaner(
            sentinelRestTemplate.urlCleanerClass(),
            sentinelRestTemplate.urlCleaner());
      if (urlCleanerMethod != null) {
         hostWithPathResource = (String) methodInvoke(urlCleanerMethod,
               hostWithPathResource);
      }

      Entry hostEntry = null;
      Entry hostWithPathEntry = null;
      ClientHttpResponse response = null;
      try {
         // entry
         hostEntry = SphU.entry(hostResource, EntryType.OUT);
         if (entryWithPath) {
            hostWithPathEntry = SphU.entry(hostWithPathResource, EntryType.OUT);
         }
         // 业务
         response = execution.execute(request, body);
          // 异常记录
         if (this.restTemplate.getErrorHandler().hasError(response)) {
            Tracer.trace(new IllegalStateException("RestTemplate ErrorHandler has error"));
         }
      }
      catch (Throwable e) {
         // 非BlockException记录异常
         if (!BlockException.isBlockException(e)) {
            Tracer.trace(e);
         }
         else {
            // 处理BlockException
            return handleBlockException(request, body, execution, (BlockException) e);
         }
      }
      finally {
          // 退出
         if (hostWithPathEntry != null) {
            hostWithPathEntry.exit();
         }
         if (hostEntry != null) {
            hostEntry.exit();
         }
      }
      return response;
   }
}

使用RestTemplate支持两种资源

  1. host:请求方法:协议://host:port,如GET:http://127.0.0.1:8080,支持host维度资源限流;
  2. host+path:请求方法:协议://host:port/path,如GET:http://127.0.0.1:8080/hello,支持具体host+路径的资源限流;

对于非BlockException,SentinelProtectInterceptor仅仅是记录异常(Tracer.trace);

对于BlockException:如果是DegradeException,走fallback方法;其他BlockException,走blockHandler方法,如果找不到处理方法,返回SentinelClientHttpResponse(status=200,body=RestTemplate request block by sentinel)。

// SentinelProtectInterceptor.java
private ClientHttpResponse handleBlockException(HttpRequest request, byte[] body,
      ClientHttpRequestExecution execution, BlockException ex) {
   Object[] args = new Object[] { request, body, execution, ex };
   // handle degrade
   if (isDegradeFailure(ex)) {
      Method fallbackMethod = extractFallbackMethod(sentinelRestTemplate.fallback(),
            sentinelRestTemplate.fallbackClass());
      if (fallbackMethod != null) {
         return (ClientHttpResponse) methodInvoke(fallbackMethod, args);
      }
      else {
         return new SentinelClientHttpResponse();
      }
   }
   // handle flow
   Method blockHandler = extractBlockHandlerMethod(
         sentinelRestTemplate.blockHandler(),
         sentinelRestTemplate.blockHandlerClass());
   if (blockHandler != null) {
      return (ClientHttpResponse) methodInvoke(blockHandler, args);
   }
   else {
      return new SentinelClientHttpResponse();
   }
}

2、支持Feign

引入spring-cloud-starter-alibaba-sentinel,会加载自动配置SentinelFeignAutoConfiguration,在Spring父容器中注入原型Feign.Builder,之后FeignContext子容器会创建这个原型对象,并用于构造FeignClient代理对象。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ SphU.class, Feign.class })
public class SentinelFeignAutoConfiguration {

   @Bean
   @Scope("prototype")
   @ConditionalOnMissingBean
   @ConditionalOnProperty(name = "feign.sentinel.enabled")
   public Feign.Builder feignSentinelBuilder() {
      return SentinelFeign.builder();
   }

}

SentinelFeign.builder和HystrixFeign.builder一样。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({HystrixCommand.class, HystrixFeign.class})
protected static class HystrixFeignConfiguration {
    protected HystrixFeignConfiguration() {
    }
    @Bean
    @Scope("prototype")
    @ConditionalOnMissingBean
    @ConditionalOnProperty(
        name = {"feign.hystrix.enabled"}
    )
    public Builder feignHystrixBuilder() {
        return HystrixFeign.builder();
    }
}

SentinelFeign.Builder的build方法构建加入自己的Contract代理和InvocationHandlerFactory。

// SentinelFeign.java
public static final class Builder extends Feign.Builder
      implements ApplicationContextAware {

   private Contract contract = new Contract.Default();

   @Override
   public Feign build() {
      super.invocationHandlerFactory(new InvocationHandlerFactory() {
         @Override
         public InvocationHandler create(Target target,
               Map<Method, MethodHandler> dispatch) {
             // ...
            return new SentinelInvocationHandler(target, dispatch);
         }
      });

      super.contract(new SentinelContractHolder(contract));
      return super.build();
   }

这与Hystrix的做法也是一致的。

// HystrixFeign.java
@Override
public Feign build() {
  return build(null);
}

Feign build(final FallbackFactory<?> nullableFallbackFactory) {
  super.invocationHandlerFactory(new InvocationHandlerFactory() {
    @Override
    public InvocationHandler create(Target target,
                                    Map<Method, MethodHandler> dispatch) {
      return new HystrixInvocationHandler(target, dispatch, setterFactory,
          nullableFallbackFactory);
    }
  });
  super.contract(new HystrixDelegatingContract(contract));
  return super.build();
}

SentinelInvocationHandler执行代理逻辑。

// SentinelInvocationHandler.java
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
      throws Throwable {
   // equals hasCode toString方法处理...
   Object result;
   MethodHandler methodHandler = this.dispatch.get(method);
   if (target instanceof Target.HardCodedTarget) {
      Target.HardCodedTarget hardCodedTarget = (Target.HardCodedTarget) target;
      // 从Contract中获取方法上的元数据
      MethodMetadata methodMetadata = SentinelContractHolder.METADATA_MAP
            .get(hardCodedTarget.type().getName()
                  + Feign.configKey(hardCodedTarget.type(), method));
      if (methodMetadata == null) {
         result = methodHandler.invoke(args);
      }
      else {
         // 资源名称 = HTTPMethod + 协议 + FeignClient注解name或url属性 + 请求路径
         String resourceName = methodMetadata.template().method().toUpperCase()
               + ":" + hardCodedTarget.url() + methodMetadata.template().path();
         Entry entry = null;
         try {
            // 进入上下文
            ContextUtil.enter(resourceName);
            // entry
            entry = SphU.entry(resourceName, EntryType.OUT, 1, args);
            // 执行业务
            result = methodHandler.invoke(args);
         }
         catch (Throwable ex) {
            // 记录非BlockException
            if (!BlockException.isBlockException(ex)) {
               Tracer.trace(ex);
            }
            // 无论是否是BlockException,都执行fallback方法
            if (fallbackFactory != null) {
               try {
                  Object fallbackResult = fallbackMethodMap.get(method)
                        .invoke(fallbackFactory.create(ex), args);
                  return fallbackResult;
               }
               catch (IllegalAccessException e) {
                  throw new AssertionError(e);
               }
               catch (InvocationTargetException e) {
                  throw new AssertionError(e.getCause());
               }
            }
            else {
               throw ex;
            }
         }
         finally {
            // 退出
            if (entry != null) {
               entry.exit(1, args);
            }
            ContextUtil.exit();
         }
      }
   }
   else {
      result = methodHandler.invoke(args);
   }

   return result;
}

这里注意几个点:

  1. 资源名称:HTTPMethod + hardCodedTarget.url + 请求路径,其中hardCodedTarget.url=协议+FeignClient的name属性或url属性。如下面这个方法的资源名称为:GET:http://sentinel-feign-provider-example/hello/{msg};

    @FeignClient(name = "sentinel-feign-provider-example", fallbackFactory = ClientFallbackFactory.class)
    public interface Client {
     // GET:http://sentinel-feign-provider-example/hello/{msg}
       @GetMapping("/hello/{msg}")
       String hello(@PathVariable("msg") String msg);
    }
    
  2. 上下文:用资源名称作为上下文;

  3. fallback方法的执行时机:所有异常,包括BlockException,都会进入定义的fallback方法;

3、SpringMVC自动配置

SentinelWebAutoConfiguration自动配置SentinelWebInterceptor拦截器,没有配置SentinelWebTotalInterceptor全局拦截器。

// ...
public class SentinelWebAutoConfiguration implements WebMvcConfigurer {
   // ...

   @Override
   public void addInterceptors(InterceptorRegistry registry) {
      if (!sentinelWebInterceptorOptional.isPresent()) {
         return;
      }
      SentinelProperties.Filter filterConfig = properties.getFilter();
      registry.addInterceptor(sentinelWebInterceptorOptional.get())
            .order(filterConfig.getOrder())
            .addPathPatterns(filterConfig.getUrlPatterns());
   }

   @Bean
   @ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
         matchIfMissing = true)
   public SentinelWebInterceptor sentinelWebInterceptor(
         SentinelWebMvcConfig sentinelWebMvcConfig) {
      return new SentinelWebInterceptor(sentinelWebMvcConfig);
   }

   @Bean
   @ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled", matchIfMissing = true)
   public SentinelWebMvcConfig sentinelWebMvcConfig() {
      // ...
   }

}

总结

Sentinel适配不同框架,大多是利用原有框架类似于Filter的扩展点,在进行方法调用前后执行Sentinel核心API。

对于不同框架,无非需要注意几个点:

  1. 拦截的时机是什么?
  2. 哪些东西会成为资源,资源名称是什么?
  3. 异常怎么处理?
  4. 来源系统?上下文?

1、AspectJ

1-1、拦截SentinelResource注解注释的方法;

1-2、每个被拦截的方法会成为资源,资源名称优先取注SentinelResource解上的value属性,如果用户没配置,则取当前方法签名作为资源名称;

1-3、对于BlockException,会找blockHandler处理,如果没找到blockHandler会兜底使用fallback处理;对于非BlockException,会找fallback处理,如果没找到fallback,会抛出原始异常;

1-4、不支持设置来源系统和上下文

2、SpringMVC

2-1、利用SpringMVC的HandlerInterceptor扩展点,SentinelWebInterceptor拦截配置路径的所有Controller方法,在执行Controller方法前后执行(DispatcherServlet#doService阶段);

2-2、被拦截的Controller方法成为资源,资源名称是RequestMapping路径,也可以通过设置HttpMethodSpecify=true,在路径前追加HttpMethod,如GET:/hello;

2-3、SentinelWebInterceptor的preHandle执行过程中会抛出BlockException,需要配置BlockExceptionHandler来实现BlockException处理。官方提供默认使用DefaultBlockExceptionHandler,返回http状态码429,响应体Blocked by Sentinel (flow limiting)。对于业务执行过程中的异常,没有fallback方法兜底,因为用户可以使用SpringMVC自己的全局异常处理;

2-4、SpringMVC默认上下文都是同一个sentinel_spring_web_context,实现RequestOriginParser接口可以自定义获取请求来源系统;

3、Dubbo

3-1、拦截时机:利用Dubbo的Filter,客户端调用服务端接口前拦截,服务端接收请求执行业务前拦截;

3-2、Dubbo 的服务接口和方法(包括调用端和服务端)会自动成为 Sentinel 中的资源;接口资源名为接口全限定名,如com.alibaba.csp.sentinel.demo.apache.dubbo.FooService;方法资源名为接口全限定名:方法签名,如com.alibaba.csp.sentinel.demo.apache.dubbo.FooService:sayHello(java.lang.String);

3-3、对于BlockException,找DubboFallback实现类处理,默认DefaultDubboFallback会将BlockException转换为RuntimeException给外部;非BlockException不做处理;

3-4、对于Consumer端,由于是出口流量,没有上下文,也没有来源系统;对于Provider端,上下文为方法资源名,来源系统为Consumer端通过attachment传入的application应用名;

4、RestTemplate

4-1、拦截时机:http请求前,通过SentinelProtectInterceptor拦截器拦截;

4-2、资源名称:HttpMethod+host、HttpMethod+host+path,如GET:http://127.0.0.1:8080GET:http://127.0.0.1:8080/hello

4-3、对于BlockException的处理:如果是DegradeException,走fallback方法;其他BlockException,走blockHandler方法;如果找不到处理方法,返回SentinelClientHttpResponse(status=200,body=RestTemplate request block by sentinel)。对于非BlockException不处理;

4-4、由于是出口流量,没有上下文,也没有来源系统;

5、Feign

5-1、拦截时机:执行实际Feign方法调用前,SentinelInvocationHandler代理方法拦截;

5-2、资源名称:HTTPMethod + 协议 + FeignClient注解name或url属性 + 请求路径,如GET:http://sentinel-feign-provider-example/hello/{msg}

5-3、对于所有异常,包括BlockException,如果存在fallback方法,执行fallback,否则直接抛出;

5-4、虽然是出口流量,仍然使用资源名称作为上下文名称,但没有来源系统;