likes
comments
collection
share

SpringCloudGateway2.0.X源码解读

作者站长头像
站长
· 阅读数 24

SCG2.0.x

模块

作为第一个生产可用的版本,相较于1.0.x内容也丰富了许多。

SpringCloudGateway2.0.X源码解读

  1. spring-cloud-gateway-core

核心实现模块

  1. spring-cloud-gateway-dependencies

公共依赖模块

  1. spring-cloud-gateway-mvc

webmvc的实现

  1. spring-cloud-gateway-webflux

webflux的实现

  1. spring-cloud-gateway-sample

官方demo

  1. spring-cloud-starter-gateway

启动start

路由

可以说整个springcloudgateway都围绕着路由展开,那么什么是路由呢?路由就是告诉你当请求到来时,网关应该将这一个请求往哪一个后端进行转发。

来看下SCG是如何定义一个路由的:

@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
  //@formatter:off
  // String uri = "http://httpbin.org:80";
  // String uri = "http://localhost:9080";
  return builder.routes()
      .route(r -> r.host("**.abc.org").and().path("/anything/png")
        .filters(f ->
            f.prefixPath("/httpbin")
                .addResponseHeader("X-TestHeader", "foobar"))
        .uri(uri)
      )
      .route(r -> r.order(-1)
        .host("**.throttle.org").and().path("/get")
        .filters(f -> f.prefixPath("/httpbin")
                .filter(new ThrottleGatewayFilter()
                .setCapacity(1)
                .setRefillTokens(1)
                .setRefillPeriod(10)
                .setRefillUnit(TimeUnit.SECONDS)))
        .uri(uri)
      )
      .build();
}

第一个路由首先制定了host必须满足后缀为.abc.org,并且path为/anything/png,然后会为添加前缀/httpbin,并添加一个key为X-TestHeader,值为foobar的相应头

第二个路由,路由优先级为-1,需要满足host后缀为throttle.org,path为/get,并且会添加前缀/httpbin。

两个uri则为这一条路由定义了需要被转发到的后端。

WebFlux

SCG2.0开始支持了WebFlux来替代WebMVC,webflux是一个流式编程框架支持背压,特别适合网关这种高并发场景,相较于mvc的线程请求1对1模型,webflux采用了多路复用的机制,能够更高效地利用cpu,充分利用io等待的空闲时间。

SpringCloudGateway2.0.X源码解读

目录结构与WebMvc模块保持一致,原理都是通过继承HandlerMethodArgumentResolver,来劫取请求。但是这里的ProxyExchange发生了改变,返回值都变成了Mono对象

SpringCloudGateway2.0.X源码解读

Webflux模块解析

保持着好奇心,我们带着疑问来看看SCG是如何为我们转发请求的?在转发请求时又是如何去匹配规则?

一般情况下SCG都会采用Webflux,故本文所有内容后续所有内容都是构建在该模式下。

  1. ProxyExchangeArgumentResolver

首先看一下org.springframework.cloud.gateway.webflux.config.ProxyExchangeArgumentResolver这个类,该类继承了HandlerMethodArgumentResolver,主要的作用是解析http请求参数,将其包装成一个ProxyExchange,核心代码如下

public class ProxyExchangeArgumentResolver implements HandlerMethodArgumentResolver {
    ProxyExchange<?> proxy = new ProxyExchange<>(rest, exchange, bindingContext,
                                                 type(parameter));
    proxy.headers(headers);
    if (sensitive != null) {
        proxy.sensitive(sensitive.toArray(new String[0]));
    }
    return Mono.just(proxy);
}

在包装ProxyExchange的同时注入了四个参数

  1. WebClient rest

构造http请求的client

  1. ServerWebExchange exchange

交换器,负责保存请求、响应等信息。

  1. BindingContext bindingContext

上下文绑定器,绑定data和model

  1. Type type(parameter)

body中参数的类型

  1. ProxyExchange

再来看一下org.springframework.cloud.gateway.webflux.ProxyExchange,该类代理了原始发送到网关的请求, 其核心方法为

public class ProxyExchange<T> {
    //....
	public Mono<ResponseEntity<T>> get() {
		RequestEntity<?> requestEntity = headers((BodyBuilder) RequestEntity.get(uri))
				.build();
		return exchange(requestEntity);
	}

    public Mono<ResponseEntity<T>> post() {
		RequestEntity<Object> requestEntity = headers(RequestEntity.post(uri))
				.body(body());
		return exchange(requestEntity);
	}

	public Mono<ResponseEntity<T>> delete() {
		RequestEntity<Void> requestEntity = headers(
				(BodyBuilder) RequestEntity.delete(uri)).build();
		return exchange(requestEntity);
	}

	public Mono<ResponseEntity<T>> put() {
		RequestEntity<Object> requestEntity = headers(RequestEntity.put(uri))
				.body(body());
		return exchange(requestEntity);
	}

    // ....
}

大致就是实现了http协议的各类请求,最终都调用到了exchange这个方法

private Mono<ResponseEntity<T>> exchange(RequestEntity<?> requestEntity) {
		Type type = this.responseType;
        // 通过webClient开始构建请求体
		RequestBodySpec builder = rest.method(requestEntity.getMethod())
				.uri(requestEntity.getUrl())
				.headers(headers -> addHeaders(headers, requestEntity.getHeaders()));
		Mono<ClientResponse> result;
        // 后面的代码都是根据请求内容不同,走不同的逻辑,但最终
		if (requestEntity.getBody() instanceof Publisher) {
			@SuppressWarnings("unchecked")
			Publisher<Object> publisher = (Publisher<Object>) requestEntity.getBody();
			result = builder.body(publisher, Object.class).exchange();
		}
	
        else if (requestEntity.getBody() != null) {
			result = builder.body(BodyInserters.fromObject(requestEntity.getBody()))
					.exchange();
		}
		
        else {
			if (hasBody) {
				result = builder.headers(
						headers -> addHeaders(headers, exchange.getRequest().getHeaders()))
						.body(exchange.getRequest().getBody(), DataBuffer.class)
						.exchange();
			}
			else {
				result = builder.headers(
						headers -> addHeaders(headers, exchange.getRequest().getHeaders()))
						.exchange();
			}
		}
		return result.flatMap(response -> response.toEntity(ParameterizedTypeReference.forType(type)));
	}

在构造请求体完成后,都会调用RequestBodySpec#exchange方法

@Override
public Mono<ClientResponse> exchange() {
    ClientRequest request = (this.inserter != null ?
            initRequestBuilder().body(this.inserter).build() :
            initRequestBuilder().build());
    return exchangeFunction.exchange(request).switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);
}

可以看到这里的exchange方法,又再一次深入调用了ExchangeFunction#exchange方法,而这里的ExchangeFunction,是在webClientBuilder中创建的

final class DefaultWebClientBuilder implements WebClient.Builder {
	@Override
	public WebClient build() {
		ExchangeFunction exchange = initExchangeFunction();
		ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream()
				.reduce(ExchangeFilterFunction::andThen)
				.map(filter -> filter.apply(exchange))
				.orElse(exchange) : exchange);
		return new DefaultWebClient(filteredExchange, initUriBuilderFactory(),
				unmodifiableCopy(this.defaultHeaders), unmodifiableCopy(this.defaultCookies),
				new DefaultWebClientBuilder(this));
	}

    private ExchangeFunction initExchangeFunction() {
    if (this.exchangeFunction != null) {
        return this.exchangeFunction;
    }
    else if (this.connector != null) {
        return ExchangeFunctions.create(this.connector, this.exchangeStrategies);
    }
    else {
        return ExchangeFunctions.create(new ReactorClientHttpConnector(), this.exchangeStrategies);
    }
}

private static class DefaultExchangeFunction implements ExchangeFunction {

    private final ClientHttpConnector connector;

    private final ExchangeStrategies strategies;

    public DefaultExchangeFunction(ClientHttpConnector connector, ExchangeStrategies strategies) {
        this.connector = connector;
        this.strategies = strategies;
    }

    @Override
    public Mono<ClientResponse> exchange(ClientRequest request) {
        Assert.notNull(request, "ClientRequest must not be null");
        // 发起http请求调用
        // 这里的connector实际上是一个webfluxHttpClient的代理
        return this.connector
                .connect(request.method(), request.url(),
                        clientHttpRequest -> request.writeTo(clientHttpRequest, this.strategies))
                .doOnSubscribe(subscription -> logger.debug("Subscriber present"))
                .doOnRequest(n -> logger.debug("Demand signaled"))
                .doOnCancel(() -> logger.debug("Cancelling request"))
                .map(response -> {
                    if (logger.isDebugEnabled()) {
                        int status = response.getRawStatusCode();
                        HttpStatus resolvedStatus = HttpStatus.resolve(status);
                        logger.debug("Response received, status: " + status +
                                (resolvedStatus != null ? " " + resolvedStatus.getReasonPhrase() : ""));
                    }
                    return new DefaultClientResponse(response, this.strategies);
                });
    }
}

SCG原理解析

首先来看一下官方给出的原理图解,当client发送请求到达SCG时,GatewayHandlerMapping会使用对应GatewayWebHandler对请求进行处理,在处理的过程中包含断言执行,路由匹配,过滤器执行,最后再由SCG将请求转发到后端代理服务上;当请求返回时,逆序经过所有的filter将响应返回给client。

SpringCloudGateway2.0.X源码解读

首先来看请求在SCG中的入口的实现

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
    @Override
	protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
		// don't handle requests on management port if set and different than server port
		// 不转发management端口的请求,management端口时间上就是actuator的端口
        if (this.managementPortType == DIFFERENT && this.managementPort != null
				&& exchange.getRequest().getURI().getPort() == this.managementPort) {
			return Mono.empty();
		}
		exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
                // 1. 为请求匹配路由
		return lookupRoute(exchange)
				// .log("route-predicate-handler-mapping", Level.FINER) //name this
				.flatMap((Function<Route, Mono<?>>) r -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isDebugEnabled()) {
						logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
					}

					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
					return Mono.just(webHandler);
				}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isTraceEnabled()) {
						logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
					}
				})));
	}
}

这里的核心方法为lookupRoute(exchange) ,该方法为请求匹配路由

如果成功匹配到路由,最终会返回一个Mono对象,其内嵌的元素为FilteringWebHandler,该类的核心方法为

public class FilteringWebHandler implements WebHandler {
	@Override
	public Mono<Void> handle(ServerWebExchange exchange) {
        // 获取路由
		Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        // 获取路由级别的filter
		List<GatewayFilter> gatewayFilters = route.getFilters();
        // 获取全局filter
		List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
		combined.addAll(gatewayFilters);
		//TODO: needed or cached?
        // 根据order进行排序
        // 有一点细节,Ordered接口是否实现是可选的,如果实现Ordered则根据order进行排序
        // 否则会根据类名进行排序
		AnnotationAwareOrderComparator.sort(combined);

		if (logger.isDebugEnabled()) {
			logger.debug("Sorted gatewayFilterFactories: "+ combined);
		}
    	// 构建过滤器责任链,并执行
		return new DefaultGatewayFilterChain(combined).filter(exchange);
	}
}

Map FlatMap ConcatMap

blog.csdn.net/qq_35974930…

路由

lookupRoute

  1. 从RouteLocator中获取路由
  2. 使用路由断言进行匹配
  3. 校验路由是否合法,这里是个空方法留着拓展
	protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
		return this.routeLocator
				.getRoutes()
				//individually filter routes so that filterWhen error delaying is not a problem
				// 这里使用concatMap保证路由的消费是有序的
             	.concatMap(route -> Mono
						.just(route)
						.filterWhen(r -> {
							// add the current route we are testing
							exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
							return r.getPredicate().apply(exchange);
						})
						//instead of immediately stopping main flux due to error, log and swallow it
						.doOnError(e -> logger.error("Error applying predicate for route: "+route.getId(), e))
						.onErrorResume(e -> Mono.empty())
				)
				// .defaultIfEmpty() put a static Route not found
				// or .switchIfEmpty()
				// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
				.next()
				//TODO: error handling
				.map(route -> {
					if (logger.isDebugEnabled()) {
						logger.debug("Route matched: " + route.getId());
					}
					validateRoute(route, exchange);
					return route;
				});

		/* TODO: trace logging
			if (logger.isTraceEnabled()) {
				logger.trace("RouteDefinition did not match: " + routeDefinition.getId());
			}*/
	}

Predicate

AsyncPredicate是断言的顶级接口,定义了断言的合并方式

public interface AsyncPredicate<T> extends Function<T, Publisher<Boolean>> {

    // 与逻辑
	default AsyncPredicate<T> and(AsyncPredicate<? super T> other) {
		Objects.requireNonNull(other, "other must not be null");
        // zip将两个断言的结果压缩成一个元祖tuple
        // 实际上等同于 apply(t1) && apply(t2)
		return t -> Flux.zip(apply(t), other.apply(t))
				.map(tuple -> tuple.getT1() && tuple.getT2());
	}

    // 取反
	default AsyncPredicate<T> negate() {
		return t -> Mono.from(apply(t)).map(b -> !b);
	}

    // 或逻辑
	default AsyncPredicate<T> or(AsyncPredicate<? super T> other) {
		Objects.requireNonNull(other, "other must not be null");
    	 // 实际上等同于 apply(t1) || apply(t2)
		return t -> Flux.zip(apply(t), other.apply(t))
				.map(tuple -> tuple.getT1() || tuple.getT2());
	}

}

断言工厂RoutePredicateFactory,保存着断言的元信息

Config 断言的配置

name 断言名称

public interface RoutePredicateFactory<C> extends ShortcutConfigurable, Configurable<C> {
	String PATTERN_KEY = "pattern";

	// useful for javadsl
	default Predicate<ServerWebExchange> apply(Consumer<C> consumer) {
		C config = newConfig();
		consumer.accept(config);
		beforeApply(config);
		return apply(config);
	}

	default AsyncPredicate<ServerWebExchange> applyAsync(Consumer<C> consumer) {
		C config = newConfig();
		consumer.accept(config);
		beforeApply(config);
		return applyAsync(config);
	}

	default Class<C> getConfigClass() {
		throw new UnsupportedOperationException("getConfigClass() not implemented");
	}

	@Override
	default C newConfig() {
		throw new UnsupportedOperationException("newConfig() not implemented");
	}

	default void beforeApply(C config) {}

    // 接受一个配置项作为入参,生产一个断言
	Predicate<ServerWebExchange> apply(C config);

	default AsyncPredicate<ServerWebExchange> applyAsync(C config) {
		return toAsyncPredicate(apply(config));
	}

	default String name() {
		return NameUtils.normalizeRoutePredicateName(getClass());
	}

}

以PathRoutePredicateFactory为例,来看一下断言工厂是如何工作的。PathRoutePredicateFactory该工厂生产一个匹配路由路径的断言。

public class PathRoutePredicateFactory extends AbstractRoutePredicateFactory<PathRoutePredicateFactory.Config> {
	// ....
	@Override
	public Predicate<ServerWebExchange> apply(Config config) {
		synchronized (this.pathPatternParser) {
			pathPatternParser.setMatchOptionalTrailingSeparator(config.isMatchOptionalTrailingSeparator());
			config.pathPattern = this.pathPatternParser.parse(config.pattern);
		}
		return exchange -> {
            // 获取请求路径,并使用配置中的路径模式进行匹配
			PathContainer path = parsePath(exchange.getRequest().getURI().getRawPath());
        	
			boolean match = config.pathPattern.matches(path);
			traceMatch("Pattern", config.pathPattern.getPatternString(), path, match);
			if (match) {
				PathMatchInfo pathMatchInfo = config.pathPattern.matchAndExtract(path);
				putUriTemplateVariables(exchange, pathMatchInfo.getUriVariables());
			}
			return match;
		};
	}


	@Validated
	public static class Config {
		private String pattern;
		private PathPattern pathPattern;
		private boolean matchOptionalTrailingSeparator = true;

		public String getPattern() {
			return pattern;
		}

		public Config setPattern(String pattern) {
			this.pattern = pattern;
			return this;
		}

		public boolean isMatchOptionalTrailingSeparator() {
			return matchOptionalTrailingSeparator;
		}

		public Config setMatchOptionalTrailingSeparator(boolean matchOptionalTrailingSeparator) {
			this.matchOptionalTrailingSeparator = matchOptionalTrailingSeparator;
			return this;
		}

		@Override
		public String toString() {
			return new ToStringCreator(this)
					.append("pattern", pattern)
					.append("matchOptionalTrailingSeparator", matchOptionalTrailingSeparator)
					.toString();
		}
	}


}

Filter

GatewayFilter是路由级别过滤器的接口,在编写如针对路由级别的流量治理、或者业务逻辑时都应该实现该接口

public interface GatewayFilter extends ShortcutConfigurable {

	String NAME_KEY = "name";
	String VALUE_KEY = "value";

	/**
	 * Process the Web request and (optionally) delegate to the next
	 * {@code WebFilter} through the given {@link GatewayFilterChain}.
	 * @param exchange the current server exchange
	 * @param chain provides a way to delegate to the next filter
	 * @return {@code Mono<Void>} to indicate when request processing is complete
	 */
	Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);

}

GlobalFilter是全局级别的过滤器的接口,在编写如请求转发,响应写回的过滤器时应实现该接口

public interface GlobalFilter {

	/**
	 * Process the Web request and (optionally) delegate to the next
	 * {@code WebFilter} through the given {@link GatewayFilterChain}.
	 * @param exchange the current server exchange
	 * @param chain provides a way to delegate to the next filter
	 * @return {@code Mono<Void>} to indicate when request processing is complete
	 */
	Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);

}

GatewayFilterChain为责任链的实现接口

public interface GatewayFilterChain {

	/**
	 * Delegate to the next {@code WebFilter} in the chain.
	 * @param exchange the current server exchange
	 * @return {@code Mono<Void>} to indicate when request handling is complete
	 */
	Mono<Void> filter(ServerWebExchange exchange);

}

路由的注入

yaml注入

首先看GatewayProperties这个类,写在yaml中的配置最终将被写入到GatewayProperties,路由的配置转化为List

public class GatewayProperties {
    	private List<RouteDefinition> routes = new ArrayList<>();

	/**
	 * List of filter definitions that are applied to every route.
	 */
	private List<FilterDefinition> defaultFilters = new ArrayList<>();
}

在application.yaml中,定义如下的路由即可被注入到改Properties中

Spring:
	cloud:
  	gateway:
			routes:
        # =====================================
        # to run server
        # $ wscat --listen 9000
        # to run client
        # $ wscat --connect ws://localhost:8080/echo
        - id: websocket_test
          uri: ws://localhost:9000
          order: 9000
          predicates:
          - Path=/echo
        # =====================================
        - id: default_path_to_httpbin
          uri: ${test.uri}
          order: 10000
          predicates:
          - Path=/**

代码注入

在代码层面我们可以通过如下方式注入路由,该方法返回一个ROuteLocator

@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
//@formatter:off
// String uri = "http://httpbin.org:80";
// String uri = "http://localhost:9080";
return builder.routes()
.route(r -> r.host("**.abc.org").and().path("/anything/png")
       .filters(f ->
                f.prefixPath("/httpbin")
                .addResponseHeader("X-TestHeader", "foobar"))
       .uri(uri)
      ).builder();
}

最终的builder返回一个匿名的RouteLocator实现

public RouteLocator build() {
    return () -> Flux.fromIterable(this.routes).map(routeBuilder -> routeBuilder.build());
}

这里routeBuilder.builder实际上是将我们定义的路由进行创建

public Route build() {
    Assert.notNull(this.id, "id can not be null");
    Assert.notNull(this.uri, "uri can not be null");
    AsyncPredicate<ServerWebExchange> predicate = getPredicate();
    Assert.notNull(predicate, "predicate can not be null");

    return new Route(this.id, this.uri, this.order, predicate, this.gatewayFilters);
}

endpoint注入

除了常规上两种方式以外,SCG还支持通过Endpoint对路由进行CRUD:

@RestControllerEndpoint(id = "gateway")
public class GatewayControllerEndpoint implements ApplicationEventPublisherAware {
    /*
http POST :8080/admin/gateway/routes/apiaddreqhead uri=http://httpbin.org:80 predicates:='["Host=**.apiaddrequestheader.org", "Path=/headers"]' filters:='["AddRequestHeader=X-Request-ApiFoo, ApiBar"]'
*/
    @PostMapping("/routes/{id}")
    @SuppressWarnings("unchecked")
    public Mono<ResponseEntity<Void>> save(@PathVariable String id, @RequestBody Mono<RouteDefinition> route);

    @DeleteMapping("/routes/{id}")
    public Mono<ResponseEntity<Object>> delete(@PathVariable String id);

    @GetMapping("/routes/{id}")
    public Mono<ResponseEntity<RouteDefinition>> route(@PathVariable String id);
}

该功能依赖的类为

值得注意的是RouteDefinitionRepository接口继承了RouteDefinitionLocator,这一点在后面会用到

public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {

    private final Map<String, RouteDefinition> routes = synchronizedMap(new LinkedHashMap<String, RouteDefinition>());

    @Override
    public Mono<Void> save(Mono<RouteDefinition> route) {
        return route.flatMap( r -> {
            routes.put(r.getId(), r);
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> delete(Mono<String> routeId) {
        return routeId.flatMap(id -> {
            if (routes.containsKey(id)) {
                routes.remove(id);
                return Mono.empty();
            }
            return Mono.defer(() -> Mono.error(new NotFoundException("RouteDefinition not found: "+routeId)));
        });
    }

    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        return Flux.fromIterable(routes.values());
    }
}

路由创建全流程

从上面路由注入的实现中,我们可以发现一些端倪,即创建路由的流程中存在中间状态,即路由的创建可以是

路由配置注入 -> createRoute

也可以是

路由配置注入 -> createRouteDefinition -> createRoute

这里的create*并不是特指某个方法,而是抽象创建这个动作。

RouteDefinitionLocator

首先来看一下RouteDefinitionLocator这个接口

SpringCloudGateway2.0.X源码解读

该接口有四个实现

InMemoryRouteDefinitionRepository

故名思义基于内存实现的RouteDefinition数据库,支持RouteDefinition增删改查

public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {

	private final Map<String, RouteDefinition> routes = synchronizedMap(new LinkedHashMap<String, RouteDefinition>());

	@Override
	public Mono<Void> save(Mono<RouteDefinition> route) {
		return route.flatMap( r -> {
			routes.put(r.getId(), r);
			return Mono.empty();
		});
	}

	@Override
	public Mono<Void> delete(Mono<String> routeId) {
		return routeId.flatMap(id -> {
			if (routes.containsKey(id)) {
				routes.remove(id);
				return Mono.empty();
			}
			return Mono.defer(() -> Mono.error(new NotFoundException("RouteDefinition not found: "+routeId)));
		});
	}

	@Override
	public Flux<RouteDefinition> getRouteDefinitions() {
		return Flux.fromIterable(routes.values());
	}
}

PropertiesRouteDefinitionLocator

该实现将通过配置文件注入的routeDefinition flux化。

public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {

	private final GatewayProperties properties;

	public PropertiesRouteDefinitionLocator(GatewayProperties properties) {
		this.properties = properties;
	}

	@Override
	public Flux<RouteDefinition> getRouteDefinitions() {
		return Flux.fromIterable(this.properties.getRoutes());
	}
}

CompositeRouteDefinitionLocator

组合模式,负责将其他的RouteDefinitionLocator进行组合,并将所有的RouteDefinition合并成一个Flux

public class CompositeRouteDefinitionLocator implements RouteDefinitionLocator {

	private final Flux<RouteDefinitionLocator> delegates;

	public CompositeRouteDefinitionLocator(Flux<RouteDefinitionLocator> delegates) {
		this.delegates = delegates;
	}

	@Override
	public Flux<RouteDefinition> getRouteDefinitions() {
		return this.delegates.flatMap(RouteDefinitionLocator::getRouteDefinitions);
	}
}

CachineRouteDefinitionLocator

负责缓存RouteDefinition,在SCG2.0.X中没有实现

RouteLocator

该接口的功能是获取包含所有Route的Flux

public interface RouteLocator {

	Flux<Route> getRoutes();
}

实现关系如下图所示

SpringCloudGateway2.0.X源码解读

RouteDefinitionRouteLocator

该实现的职责是将RouteDefinition转换为Route实体,核心方法如下

public class RouteDefinitionRouteLocator implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
	@Override
	public Flux<Route> getRoutes() {
		return this.routeDefinitionLocator.getRouteDefinitions()
				.map(this::convertToRoute)
				//TODO: error handling
				.map(route -> {
					if (logger.isDebugEnabled()) {
						logger.debug("RouteDefinition matched: " + route.getId());
					}
					return route;
				});


		/* TODO: trace logging
			if (logger.isTraceEnabled()) {
				logger.trace("RouteDefinition did not match: " + routeDefinition.getId());
			}*/
	}
}

CachedRouteLocator

顾名思义,负责将Route缓存进内存

public class CachingRouteLocator implements RouteLocator {

	private final RouteLocator delegate;
	private final Flux<Route> routes;
	private final Map<String, List> cache = new HashMap<>();

	public CachingRouteLocator(RouteLocator delegate) {
		this.delegate = delegate;
        // lookup 方法定义从哪里获取缓存
        // onCacheMissResume定义获取缓存的supplier
		routes = CacheFlux.lookup(cache, "routes", Route.class)
				.onCacheMissResume(() -> this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE));
	}

	@Override
	public Flux<Route> getRoutes() {
		return this.routes;
	}

	/**
	 * Clears the routes cache
	 * @return routes flux
	 */
	public Flux<Route> refresh() {
		this.cache.clear();
		return this.routes;
	}

	@EventListener(RefreshRoutesEvent.class)
	/* for testing */ void handleRefresh() {
		refresh();
	}
}

CompositeRouteLocator

组合多个RouteLocator,提供统一的对外访问

服务发现

上文主要介绍了SCG路由的生命周期,其中可以通过yaml配置,代码注入,和通过endpoint进行route注入,虽然这三种方法已经提供了非常灵活的路由配置,但是随着微服务的增多,这种手动注入路由的方式未免显得低效,人为操作也使得系统略显脆弱。

为了解决这种现象SCG也提供了通过服务发现的方式注册路由

DiscoveryClientRouteDefinitionLocator是实现服务发现的关键,该类的职责就是通过discoveryClients获取服务,并将该服务注册成一个路由。

public class DiscoveryClientRouteDefinitionLocator implements RouteDefinitionLocator {
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {

        SpelExpressionParser parser = new SpelExpressionParser();
        Expression includeExpr = parser.parseExpression(properties.getIncludeExpression());
        Expression urlExpr = parser.parseExpression(properties.getUrlExpression());

        Predicate<ServiceInstance> includePredicate;
        if (properties.getIncludeExpression() == null || "true".equalsIgnoreCase(properties.getIncludeExpression())) {
            includePredicate = instance -> true;
        } else {
            includePredicate = instance -> {
                Boolean include = includeExpr.getValue(evalCtxt, instance, Boolean.class);
                if (include == null) {
                    return false;
                }
                return include;
            };
        }

        return Flux.fromIterable(discoveryClient.getServices())
        .map(discoveryClient::getInstances)
        .filter(instances -> !instances.isEmpty())
        .map(instances -> instances.get(0))
        .filter(includePredicate)
        .map(instance -> {
            String serviceId = instance.getServiceId();

            RouteDefinition routeDefinition = new RouteDefinition();
            routeDefinition.setId(this.routeIdPrefix + serviceId);
            String uri = urlExpr.getValue(evalCtxt, instance, String.class);
            routeDefinition.setUri(URI.create(uri));

            final ServiceInstance instanceForEval = new DelegatingServiceInstance(instance, properties);

            for (PredicateDefinition original : this.properties.getPredicates()) {
                PredicateDefinition predicate = new PredicateDefinition();
                predicate.setName(original.getName());
                for (Map.Entry<String, String> entry : original.getArgs().entrySet()) {
                    String value = getValueFromExpr(evalCtxt, parser, instanceForEval, entry);
                    predicate.addArg(entry.getKey(), value);
                }
                routeDefinition.getPredicates().add(predicate);
            }

            for (FilterDefinition original : this.properties.getFilters()) {
                FilterDefinition filter = new FilterDefinition();
                filter.setName(original.getName());
                for (Map.Entry<String, String> entry : original.getArgs().entrySet()) {
                    String value = getValueFromExpr(evalCtxt, parser, instanceForEval, entry);
                    filter.addArg(entry.getKey(), value);
                }
                routeDefinition.getFilters().add(filter);
            }

            return routeDefinition;
        });
    }

对于服务发现的路由会走LoadBalancerClientFilter的逻辑,该类会为命中的路由负载均衡到注册到注册中心的某一复合要求的后端服务上。

public class LoadBalancerClientFilter implements GlobalFilter, Ordered {
    	@Override
	@SuppressWarnings("Duplicates")
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
		String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
		if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
			return chain.filter(exchange);
		}
		//preserve the original url
		addOriginalRequestUrl(exchange, url);

		log.trace("LoadBalancerClientFilter url before: " + url);

		final ServiceInstance instance = choose(exchange);

		if (instance == null) {
			throw new NotFoundException("Unable to find instance for " + url.getHost());
		}

		URI uri = exchange.getRequest().getURI();

		// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
		// if the loadbalancer doesn't provide one.
		String overrideScheme = instance.isSecure() ? "https" : "http";
		if (schemePrefix != null) {
			overrideScheme = url.getScheme();
		}

		URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);

		log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
		exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
		return chain.filter(exchange);
	}
}

优化点

  1. SCG的路由和服务的关系似乎有些模糊不清,一个服务即对应一条路由,但是在某些情况下我们希望服务和路由是解耦,松散的,即一个服务可以有多个路由,不同的路由又有不同的规则。
  2. 在生产模式下,通过yaml、代码配置的路由导致整个项目运维困难,需要提供一种动态路由配置方式。
  3. 在SCG2.0.x版本中,SCG也没有提供监控指标的透出。
转载自:https://juejin.cn/post/7392115478560669723
评论
请登录