Dubbo源码(二)服务暴露与引用
前言
本文基于Dubbo2.7.6版本,分析最传统的rpc服务暴露和服务引用的流程。
Dubbo2.7.x提出了很多新概念新特性,比如应用粒度服务发现、元数据中心、服务自省等等。
在本章会忽略这些部分,因为默认情况下2.7.x还是采用rpc服务粒度注册和发现,后续章节再来看2.7的新特性。
在总结部分整理了服务暴露和服务引用的整体流程。
服务暴露
这里我们以最传统简单的api方式,暴露一个RPC服务DemoService。
public static void main(String[] args) throws Exception {
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
// rpc服务定义
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl());
// 应用配置
service.setApplication(new ApplicationConfig("heihei-app"));
// 注册中心配置
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
// rpc协议配置
ProtocolConfig protocolConfig = new ProtocolConfig();
protocolConfig.setName("dubbo");
protocolConfig.setPort(20881);
service.setProtocol(protocolConfig);
// 暴露rpc服务
service.export();
System.out.println("dubbo service started");
new CountDownLatch(1).await();
}
核心api是ServiceConfig#export暴露一个RPC服务,大体上分为三步:
1)环境初始化
2)ServiceConfig二次填充和校验
3)doExport真正暴露rpc服务
public synchronized void export() {
// 是否允许暴露 serviceConfig.export or providerConfig.export
if (!shouldExport()) {
return;
}
// 【step1】2.7.5新单例api 初始化Environment全局配置
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
// 【step2】serviceConfig二次填充并校验
checkAndUpdateSubConfigs();
// RPC服务元数据信息 暂时忽略
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
if (shouldDelay()) {
// 特性:延迟暴露,忽略
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 【step3】暴露【核心】
doExport();
}
// 2.7.5 发布ServiceConfigExportedEvent事件
exported();
}
DubboBootstrap初始化
DubboBootstrap是2.7.5新增的api,是个单例对象,可以统一管理整个dubbo的配置和启动,和Netty的ServerBootstrap差不多都是一个意思。
利用DubboBootstrap可以支持应用粒度注册发现、元数据中心等新特性,这个我们后续再看。
private static void startWithBootstrap(boolean useApplicationRegistry) {
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl());
ServiceConfig<GreetingServiceImpl> service2 = new ServiceConfig<>();
service2.setInterface(GreetingService.class);
service2.setRef(new GreetingServiceImpl());
DubboBootstrap bootstrap = DubboBootstrap.getInstance();
ApplicationConfig applicationConfig = new ApplicationConfig("haha-app");
RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181");
bootstrap.application(applicationConfig)
.registry(registryConfig)
.service(service)
.service(service2)
.start()
.await();
}
而对于早期的ServiceConfig#export这类api做了兼容,所以ServiceConfig#export会主动调用一次DubboBootstrap初始化。
ConfigManager和Environment
DubboBootstrap单例对象在构造阶段会做三个事情:
1)创建ConfigManager
2)创建Environment
3)注册ShutdownHook,用于在进程结束前执行一些销毁动作(比如RPC服务注销等等)
这里主要要介绍一下ConfigManager和Environment。
ConfigManager和Environment都是SPI接口FrameworkExt的实现类,都是框架级别的单例对象,不支持扩展,统一通过ApplicationModel暴露给外部使用。
FrameworkExt只是一个标记接口,父接口Lifecycle定义了一些生命周期方法。
@SPI
public interface FrameworkExt extends Lifecycle {
}
public interface Lifecycle {
void initialize() throws IllegalStateException;
void start() throws IllegalStateException;
void destroy() throws IllegalStateException;
}
Environment可以理解为一堆kv配置,包括文件系统的dubbo.properties、环境变量、外部配置中心配置等等。
ConfigManager统一管理AbstractConfig。
后面会看到Environment其实不会暴露给业务使用,最终都是注入AbstractConfig供业务使用。
比如ServiceConfig、ApplicationConfig、ProtocolConfig等等,都属于AbstractConfig子类。
比如ServiceConfig#setApplication。
如果顶层应用配置为空,不仅将application放入ServiceConfig,还会加入顶层配置。
这些api都废弃了,更合理的使用方式肯定是用DubboBootstrap#application。
这里提出来主要是要知道,通过ConfigManager可以拿到全局顶层配置。
initialize
DubboBoostrap的初始化方法,主要是new一些单例对象,如果有走配置中心就从配置中心拉配置到Environment。
具体逻辑不展开分析,我们只需要知道,Environment中的kv配置都有了,包括dubbo.properties、配置中心配置、环境变量等等。
二次填充和校验
ServiceConfig#checkAndUpdateSubConfigs:
在上面Environment准备好之后,可以对ServiceConfig做二次填充,最后做一些校验。
为什么我叫它二次填充,是因为在用户硬编码之后,一些属性会被覆盖或者赋予默认值,认为是一种二次加工。
这里重点分析一下ServiceConfig二次填充,笔者将其分为两类:
1)复合属性注入:如RegsitryConfig注入,当ServiceConfig一些复合配置为空,从上级获取注入。
2)简单属性注入:如ServiceConfig.delay延迟暴露,从Environment中获取kv配置,通过反射注入。
第一类是ServiceConfig#completeCompoundConfigs。
这里举个例子说明这个方法在做啥,具体源码不贴了。
比如注册中心配置,可以针对rpc服务ServiceConfig配置。
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
// 应用配置
service.setApplication(new ApplicationConfig("heihei-app"));
// 注册中心配置
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
也可以走全局ApplicationConfig的注册中心配置。
此时会取Application级别的Registry作为当前RPC服务的注册地址。
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
// 应用配置
ApplicationConfig applicationConfig = new ApplicationConfig("heihei-app");
applicationConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
service.setApplication(applicationConfig);
// 注册中心配置
// service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
对于第二类,包含:
1)checkProtocol:获取dubbo.protocol(s)注入ProtocolConfig,再将ProtocolConfig注入ServiceConfig
2)checkRegistry:获取dubbo.registry(s)注入RegistryConfig,再将RegistryConfig注入ServiceConfig
3)refresh:获取dubbo.service.全类名.属性名注入ServiceConfig
dubbo.protocol.name=dubbo
dubbo.protocol.port=20882
dubbo.registry.address=zookeeper://127.0.0.1:2181
dubbo.service.org.apache.dubbo.demo.DemoService.delay=3000
无论是ProtocolConfig还是RegistryConfig还是ServiceConfig,顶层都是AbstractConfig。
AbstractConfig#refresh是实现普通属性注入的核心方法。
逻辑也比较简单,就是根据当前AbstractConfig前缀,获取kv配置,封装为一个组合配置CompositeConfiguration,循环找setter方法通过反射注入。
Environment#getPrefixedConfiguration构造前缀组合配置,比如ProtocolConfig的前缀是dubbo.protocol。
配置优先级由高到低:
1)java环境变量如-D参数
2)系统环境变量
3)配置中心针对应用级别配置
4)配置中心针对全局配置
5)AbstractConfig.metadata
6)文件系统dubbo.properties和OrderedPropertiesProviderSPI注入的properties
暴露
ServiceConfig#doExport-> doExportUrls:
1)注册RPC服务模型和RPC服务提供者模型到ServiceRepository内存中
ServiceRepository在内存中存储了当前应用所有rpc服务相关信息。
消费和提供rpc服务key由interface接口+group分组+version版本唯一确定。
比如:
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl());
service.setGroup("a");
service.setVersion("1.0");
2)根据RegistryConfig构建注册中心url,重点在于url的registry参数,表明了注册中心类型
如:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=heihei-app&dubbo=2.0.2&pid=28847®istry=zookeeper×tamp=1678443137183
3)循环协议暴露
ServiceConfig#doExportUrlsFor1Protocol:
1)构造发布到注册中心的url
2)exportLocal:暴露injvm协议,默认情况下都会暴露,这部分忽略,和34两步极为相似
3)PROXY_FACTORY.getInvoker:利用ProxyFactory的Adaptive实现,创建Invoker代理对象
4)PROTOCOL.export:利用Protocol的Adaptive实现,暴露rpc服务
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
// 【step1】 将各种配置,注入这个map,用map作为parameter,构造url
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
ServiceConfig.appendRuntimeParameters(map);
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
AbstractConfig.appendParameters(map, provider);
AbstractConfig.appendParameters(map, protocolConfig);
AbstractConfig.appendParameters(map, this);
// 元数据相关忽略
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
for (MethodConfig method : getMethods()) {
// map填充rpc服务方法信息
}
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
// 暴露到注册中心的url【关注】
URL url = new URL(name, host, port,
getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// 扩展点 ConfiguratorFactory 对url做特殊处理
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// ...
}
// ...
// 【step2】暴露injvm协议
exportLocal(url);
for (URL registryURL : registryURLs) {
if ("injvm".equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// registryURL=注册中心的url,其中export参数对应发布到注册中心的rpc协议url
URL registryUrlAndExportUrl = registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString());
// 【step3】创建代理对象,负责当前RPC服务的方法调用
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryUrlAndExportUrl);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 【step4】暴露
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
// 2.7.5 元数据服务 发布服务定义
WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
metadataService.publishServiceDefinition(url);
this.urls.add(url);
}
Invoker代理
ProxyFactory主要负责创建代理对象,这里先看getInvoker。
@SPI("javassist")
public interface ProxyFactory {
@Adaptive({"proxy"})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
@Adaptive({"proxy"})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
ProxyFactory#getInvoker入参有三个:
1)proxy:proxy命名不太符合实际语义,应该改为target更合适,因为传入的实际上是rpc服务实现类实例,如DemoService的实现DemoServiceImpl;
2)type:rpc服务接口,如DemoService;
3)url:注册中心url,且export参数为发布到注册中心的url,如
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=heihei-app&dubbo=2.0.2&pid=3337®istry=zookeeper×tamp=1678540659987
&
export=dubbo://127.0.0.1:20881/org.apache.dubbo.demo.DemoService?
anyhost=true&application=heihei-app&bind.ip=127.0.0.1&bind.port=20881&
deprecated=false&dubbo=2.0.2&
dynamic=true&generic=false&
interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&
pid=3337&release=&scope=remote&side=provider×tamp=1678540660025
ProxyFactory#getInvoker走的是自适应实现,由于url中没有指定proxy,所以默认走javassist实现。
javassist底层还是利用target目标class拼java代码,动态生成一个Wrapper类,封装一个AbstractProxyInvoker。
因为javassist不好理解,我们看JdkProxyFactory的实现。
JdkProxyFactory#getInvoker将target(DemoServiceImpl)封装到AbstractProxyInvoker中,invoke方法就能用传入的方法和参数直接调用到目标类上。
很明显,这个Invoker就是用于暴露给外部调用的,外部只要传入反射需要用到的必要参数,比如方法名、参数列表,就能调用到目标类的目标方法。
JdkProxyFactory#getInvoker仅仅是封装了一个Invoker,invoker运行时用到了java反射技术。
javassist实现性能更好,因为Wrapper#invokeMethod直接调用目标方法,不走反射。
javassist拼的代码大致如下:
此外,JdkProxyFactory类名比较容易误解,应该理解为用jdk的api实现的代理工厂,并不是使用了jdk动态代理的工厂。
虽然JdkProxyFactory#getProxy方法用了jdk动态代理实现(服务消费方用,这里先不深入)。
protocol#export(Invoker)
上面创建完invoker代理之后,走Protocol的自适应扩展点实现。
第一章提到过AdaptiveClassCodeGenerator会生成自适应扩展类,如果参数列表没有url,会从参数列表里找一个参数有getUrl方法,用他作为获取url途径。
这里Invoker就有getUrl方法,对应url就是构造Invoker时使用的。
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=heihei-app&dubbo=2.0.2&pid=3337®istry=zookeeper×tamp=1678540659987
&
export=dubbo://127.0.0.1:20881/org.apache.dubbo.demo.DemoService?
anyhost=true&application=heihei-app&bind.ip=127.0.0.1&bind.port=20881&
deprecated=false&dubbo=2.0.2&
dynamic=true&generic=false&
interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&
pid=3337&release=&scope=remote&side=provider×tamp=1678540660025
第一章说到获取扩展名,能通过url中的params参数来获取。
特殊的,对于protocol参数支持调用url#getProtocol方法获取扩展名,也就是说这里会先走RegistryProtocol实现。
比较晦涩的一点是,在进入RegistryProtocol之前会走两个Wrapper扩展点。
ProtocolListenerWrapper,忽略,主要是利用InvokerListener SPI实现一些事件监听。
ProtocolFilterWrapper,做了个特殊处理,暴露阶段会经过两次这个export方法(提示:Wrapper对象是两个不同的对象,并不是内部包装的protocol会变,见SPI实现),第一次包装的是RegistryProtocol。
RegistryProtocol#export共分为两步:1)暴露rpc服务;2)向注册中心注册。
暴露rpc服务
暴露rpc服务的入口,在RegsitryProtocol#doLocalExport。
RegistryProtocol.protocol是通过setter注入的Adaptive实现。
这次,会再次进入ProtocolFilterWrapper的export方法,只不过url变成了rpc协议,比如dubbo://。
所以会走ProtocolFilterWrapper的buildInvokerChain。
这个方法会将特别常用的扩展点Filter,适配为Invoker,形成一个Invoker链。
Invoker被Filter包装后,export最终调用到rpc协议的实现类,比如DubboProtocol#export。
DubboProtocol#export创建DubboExporter,包裹Invoker,缓存在内部的exporterMap中。
exporterMap的key={分组}/{rpc服务}:{版本号}:{端口}。
对于每个address(host+port),创建一个单例的ProtocolServer。
通讯层的默认实现就是netty4.NettyTransporter,netty的相关配置项都来源于url,不再深入。
注册rpc服务
RegistryProtocol#register注册rpc服务入口。
RegistryFactory是通过setter注入的自适应扩展实现,取url的协议对应实现。
目前所有RegistryFactory都继承自AbstractRegistryFactory
AbstractRegistryFactory#getRegistry:抽象类做缓存工作,子类实现createRegistry创建Registry实例。
基本上所有注册中心都是利用三方jar实现,当protocol=zookeeper时ZookeeperRegistry实现如下。
因为zk的客户端有很多,通过切换ZookeeperTransporter(自适应扩展)可以接入不同的zk客户端。
目前只有apache curator实现,一般注册中心客户端中都涵盖心跳逻辑,这里也不例外,不深入分析。
从RegistryFactory获取到Registry后,调用Registry#register注册。
基本所有Registry都继承自FailbackRegistry,FailbackRegistry实现register方法,实现重试。
doRegister方法一般也是调用三方api,实现服务注册逻辑。
比如zk注册中心是创建"/dubbo/rpc服务全类名/providers/rpc协议url如dubbo"这个znode,并且是个临时节点。
服务引用
下面用最简单的api方式,创建一个服务消费方。
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("consumer-app"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
DemoService service = reference.get(); // 核心
String message = service.sayHello("dubbo");
System.out.println(message);
核心方法是通过ReferenceConfig#get方法得到rpc服务的代理。
本质上是ReferenceConfig在首次调用get方法时会创建一个rpc服务代理并缓存。
ReferenceConfig#init方法初始化创建rpc服务代理,分为四步:
1)DubboBootstrap#init:环境初始化,和服务提供方一样,主要是初始化Environment得到kv配置
2)ReferenceConfig二次填充和校验
3)拼装map,为后续组装URL做铺垫
4)createProxy:利用组装好的map创建rpc服务代理
public synchronized void init() {
if (initialized) {
return;
}
// 【step1】初始化 主要是Environment
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
// 【step2】ReferenceConfig二次填充校验
checkAndUpdateSubConfigs();
checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);
// 【step3】将各种配置注入map,为后续创建URL做铺垫
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
ReferenceConfigBase.appendRuntimeParameters(map);
// ...
// 【step4】创建rpc服务代理
ref = createProxy(map);
// ...
initialized = true;
}
前三步和服务提供方非常类似,不再赘述,重点看createProxy方法。
ReferenceConfig#createProxy:
1)执行自适应Protocol的refer方法,传入url协议为registry,走RegistryProtocol,返回一个Invoker;
2)执行自适应ProxyFactory的getProxy方法,传入Invoker,返回一个rpc服务代理对象;
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
// 【特性:本地调用】直接走injvm,忽略
} else {
urls.clear();
if (url != null && url.length() > 0) {
// 【特性:直连提供者】用户指定url,不走注册中心,忽略
} else {
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
// 根据ReferenceConfig配置,构造注册中心url,即registry://...
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
for (URL u : us) {
// 在registry://后面拼refer=map
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
}
if (urls.size() == 1) {
// 【重点】自适应Protocol执行refer方法
// url = registry://...
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
// 【特性:多注册中心】忽略
}
// 【关注】自适应ProxyFactory将invoker转换为rpc服务代理对象
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
在ProtocolFilterWrapper中,和provider一样,会两次进入refer方法,第一次由于协议是registry,直接透传到RegistryProtocol。
RegistryProtocol#doRefer:
1)构造RegistryDirectory,执行RegistryDirectory#subscribe方法订阅rpc服务;
2)执行Cluster自适应扩展点的join方法,将RegistryDirectory转换为一个Invoker;
Directory/Invoker/Cluster
先来介绍一下Directory和Invoker和Cluster的关系。
Directory持有多个Invoker,运行时根据Invocation可以获取到其中部分Invoker。
Directory持有的Invoker,实际上会是对应rpc协议Protocol通过refer方法返回的,比如DubboProtocol。
这里的Invoker和provider侧的不同,就是负责底层网络通讯
为什么对于一个rpc服务会有多个呢?因为每个rpc服务都可能有多个provider实例。
Protocol#refer:可以猜测,这里url就是注册中心上一个rpc服务不同address的provider的url。
虽然Directory有多个Invoker,但是对于客户端来说,最终只能暴露一个rpc服务代理。
所以需要有一层Cluster来屏蔽底层多个Invoker的事实。
粗略认为,Cluster负责负载均衡,Directory是内存注册表,Invoker是netty连接。
订阅(Directory)
RegistryDirectory#subscribe:订阅rpc服务,传入url是consumer://协议。
这里直接看ZookeeperRegistry#doSubscribe实现。
这里根据入参url=consumer://xxxx,构造出监听zk的三个path,即/dubbo/{rpc服务}/{providers,configurators,routers}。
循环三个path注册children监听,并且将第一次获取的数据,通过ZookeeperRegistry#notify通知RegistryDirectory。
我们只关注/dubbo/{rpc服务}/providers下的数据变更,其他两种先不管。
rpc提供者变更(Directory)
RegistryDirectory#notify接收到provider变更通知,刷新内部存储的invokers。
RegistryDirectory#refreshInvoker:转换provider的url集合为Invoker,刷新Directory中缓存的invokers。
private volatile Map<String, Invoker<T>> urlInvokerMap;
private volatile List<Invoker<T>> invokers;
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
// case1 没有rpc服务提供者,invokerUrls只有一个empty://协议的url,更新invokers为空
this.forbidden = true;
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers();
} else {
// case2 rpc服务提供者存在,根据provider的url,刷新invokers
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
// ...
// 这里将provider的url转换为新Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
List<Invoker<T>> newInvokers =
Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// 新invokers存储到RegistryDirectory
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
// 旧invokers销毁
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
RegistryDirectory#toInvokers:
1)mergeUrl:对服务提供方的url和服务消费方的url进行一个合并
2)protocol#refer:自适应Protocol根据url创建Invoker
mergeUrl内部比较复杂,举个例子是什么意思。
比如服务提供方暴露rpc服务,指定connections=5,对于dubbo协议代表provider和consumer之间建立5个长连接,从而注册到zk上的url就包含了connections=5。
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
// rpc服务定义
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl());
service.setConnections(5);
}
服务消费方引用rpc服务,指定connections=2,最终会以consumer方指定的connections为准,建立2个长连接。
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("consumer-app"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
reference.setConnections(2);
rpc服务Invoker(Invoker)
Directory调用自适应Protocol创建Invoker,传入url是dubbo协议。
DubboProtocol继承自AbstractProtocol,AbstractProtocol#refer将所有Invoker实现都包了一层AsyncToSyncInvoker,将异步调用转换为同步。
DubboProtocol#protocolBindingRefer:根据url创建DubboInvoker,对于消费方来说这是最底层的Invoker了。
getClients根据url的connections配置,创建Netty客户端,默认情况下每个address(ip+port)只有一个长连接。
ProtocolFilterWrapper包装扩展点,在DubboProtocol#refer返回DubboInvoker之后,和provider侧一样包了一层Filter(只不过激活扩展点分组为consumer),返回给Directory。
至此Directory订阅流程结束,接下来进入Cluster#join阶段。
Join(Cluster)
回到RegistryProtocol#doRefer,此时Invoker已经注入到了Directory。
Cluster#join将Directory转换为Invoker。
Cluster走自适应实现,默认是FailoverCluster。
几乎所有Cluster都继承AbstractCluster。
AbstractCluster在子类doJoin返回Invoker后,包了一层ClusterInterceptor扩展点,和Filter的做法类似。
FailoverCluster直接new了一个FailoverClusterInvoker返回。
其实在Cluster扩展点之外,还包了一层MockClusterWrapper。
MockClusterWrapper会把上述ClusterInterceptor包裹的FailoverClusterInvoker再包一层MockClusterInvoker,官方称为本地伪装特性。
rpc服务代理创建
最终再次回到ReferenceConfig#createProxy,此时Protocol层已经完全处理完毕,返回了一个Invoker。
为了给用户代码使用,最终用ProxyFactory将Invoker转换为rpc服务代理。
之前在provider侧已经看到过ProxyFactory的另一个方法,将rpc服务实现转换为Invoker;
而在consumer侧,是将Invoker转换为rpc服务代理。
还是老规矩,我们不看默认javassist字节码实现代理,直接看JdkProxyFactory实现来理解getProxy方法。
JdkProxyFactory就是利用了jdk动态代理,将invoker放入了InvocationHandler实现中。
所以运行时在消费端的入口方法,就在InvokerInvocationHandler中,后续再看。
此外,ProxyFactory也有包装扩展点。
StubProxyFactoryWrapper在底层ProxyFactory返回的代理对象上,可以允许包一层Stub,即官方称为本地存根的特性,这里不深入讨论。
总结
从整体上来说,无论是rpc服务提供方还是消费方,整体都遵从这个流程:
1)准备Environment:利用2.7.5BootstrapAPI,从各种地方搜集kv配置,比如dubbo.properties、配置中心、系统变量;
2)二次填充/校验Config:利用第一步得到的kv配置,注入ServiceConfig/ReferenceConfig,其中还涉及到一些配置优先级的问题;
3)执行业务方法:对于provider来说,暴露rpc服务;对于consumer来说,引用rpc服务代理。他们都依赖于注册中心;
从模型的角度来看:
provider是将rpc实现类层层包装为invoker嵌入dubbo内部,在运行时等待rpc请求到来,经过层层包装的invoker就能来到rpc实现类;
consumer是将注册中心中的providerUrl层层包装为invoker嵌入dubbo内部,通过javassist代理创建rpc代理服务,给到用户;
无论是provider还是consumer,可以预见到invoker会在运行阶段贯穿始终。
provider
rpc服务暴露流程:
1)ProxyFactory#getInvoker:创建Invoker代理,目标对象是rpc服务实现类;
2)Protocol#export:基于rpc协议暴露rpc服务,比如dubbo协议,底层默认开启NettyServer接收客户端请求;
3)Protocol#export:基于registry协议注册rpc服务到注册中心,比如zookeeper协议,创建临时znode:/dubbo/rpc服务全类名/providers/提供者url;
从模型的角度来说,provider将rpc服务实现包装为Invoker,缓存在一个内部map中:
1)ProxyFactory将rpc服务实现包装为Invoker代理对象,默认采用javassist动态生成字节码方式(JdkProxyFactory更好理解);
2)ProtocolFilterWrapper将上述Invoker通过Filter适配包装,Filter(Invoker代理(rpc服务实现));
3)DubboProtocol将上述Invoker通过DubboExporter包装,Exporter(Filter(Invoker代理(rpc服务实现))),并缓存到内部map,map的key={分组}/{rpc服务}:{版本号}:{端口};
至此在运行时通过Protocol缓存map的key能找到Exporter,最终能调用rpc服务实现。
consumer
rpc服务引用流程:
1)RegistryDirectory#subscribe:订阅rpc服务,取决于Registry实现,比如zk,监听/dubbo/rpc服务/providers父目录变更;
2)RegistryDirectory#notify:因为首次订阅得到providerUrls,这里同步收到通知,调用Protocol#refer创建invokers,注入RegistryDirectory内部map;
3)Protocol#refer:传入url协议为rpc协议,如dubbo,创建DubboInvoker,同一address(ip+port)创建一个netty长连接(所有rpc服务共享)。DubboInvoker同样会被Filter包装,只是Filter的group是consumer;
4)Cluster#join:将RegistryDirectory转换为Invoker,默认是FailoverClusterInvoker。包装Cluster在FailoverClusterInvoker外部包了:ClusterInterceptor适配Invoker和MockClusterInvoker(本地伪装);
5)ProxyFactory#getProxy:将Cluster#join返回的Invoker转换为rpc服务代理对象,默认采用javassist动态生成字节码方式(JdkProxyFactory更好理解),StubProxyFactoryWrapper包装扩展支持将代理对象二次包装为Stub(本地存根);
6)将最终生成的代理对象,缓存到ReferenceConfig;
从模型角度来说,大致如下:
欢迎大家评论或私信讨论问题。
本文原创,未经许可不得转载。
欢迎关注公众号【程序猿阿越】。
转载自:https://juejin.cn/post/7210945608877998135