likes
comments
collection
share

Dubbo源码(二)服务暴露与引用

作者站长头像
站长
· 阅读数 38

前言

本文基于Dubbo2.7.6版本,分析最传统的rpc服务暴露和服务引用的流程。

Dubbo2.7.x提出了很多新概念新特性,比如应用粒度服务发现、元数据中心、服务自省等等。

在本章会忽略这些部分,因为默认情况下2.7.x还是采用rpc服务粒度注册和发现,后续章节再来看2.7的新特性。

在总结部分整理了服务暴露和服务引用的整体流程。

服务暴露

这里我们以最传统简单的api方式,暴露一个RPC服务DemoService。

public static void main(String[] args) throws Exception {
    ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
    // rpc服务定义
    service.setInterface(DemoService.class);
    service.setRef(new DemoServiceImpl());
    // 应用配置
    service.setApplication(new ApplicationConfig("heihei-app"));
    // 注册中心配置
    service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
    // rpc协议配置
    ProtocolConfig protocolConfig = new ProtocolConfig();
    protocolConfig.setName("dubbo");
    protocolConfig.setPort(20881);
    service.setProtocol(protocolConfig);
    // 暴露rpc服务
    service.export();
    System.out.println("dubbo service started");
    new CountDownLatch(1).await();
}

核心api是ServiceConfig#export暴露一个RPC服务,大体上分为三步:

1)环境初始化

2)ServiceConfig二次填充和校验

3)doExport真正暴露rpc服务

public synchronized void export() {
    // 是否允许暴露 serviceConfig.export or providerConfig.export
    if (!shouldExport()) {
        return;
    }
    // 【step1】2.7.5新单例api 初始化Environment全局配置
    if (bootstrap == null) {
        bootstrap = DubboBootstrap.getInstance();
        bootstrap.init();
    }
    // 【step2】serviceConfig二次填充并校验
    checkAndUpdateSubConfigs();
    // RPC服务元数据信息 暂时忽略
    serviceMetadata.setVersion(version);
    serviceMetadata.setGroup(group);
    serviceMetadata.setDefaultGroup(group);
    serviceMetadata.setServiceType(getInterfaceClass());
    serviceMetadata.setServiceInterfaceName(getInterface());
    serviceMetadata.setTarget(getRef());
    if (shouldDelay()) {
        // 特性:延迟暴露,忽略
        DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    } else {
        // 【step3】暴露【核心】
        doExport();
    }
    // 2.7.5 发布ServiceConfigExportedEvent事件
    exported();
}

DubboBootstrap初始化

DubboBootstrap是2.7.5新增的api,是个单例对象,可以统一管理整个dubbo的配置和启动,和Netty的ServerBootstrap差不多都是一个意思。

利用DubboBootstrap可以支持应用粒度注册发现、元数据中心等新特性,这个我们后续再看。

private static void startWithBootstrap(boolean useApplicationRegistry) {
    ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
    service.setInterface(DemoService.class);
    service.setRef(new DemoServiceImpl());
    ServiceConfig<GreetingServiceImpl> service2 = new ServiceConfig<>();
    service2.setInterface(GreetingService.class);
    service2.setRef(new GreetingServiceImpl());
    DubboBootstrap bootstrap = DubboBootstrap.getInstance();
    ApplicationConfig applicationConfig = new ApplicationConfig("haha-app");
    RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181");
    bootstrap.application(applicationConfig)
            .registry(registryConfig)
            .service(service)
            .service(service2)
            .start()
            .await();
}

而对于早期的ServiceConfig#export这类api做了兼容,所以ServiceConfig#export会主动调用一次DubboBootstrap初始化。

ConfigManager和Environment

DubboBootstrap单例对象在构造阶段会做三个事情:

1)创建ConfigManager

2)创建Environment

3)注册ShutdownHook,用于在进程结束前执行一些销毁动作(比如RPC服务注销等等)

Dubbo源码(二)服务暴露与引用

这里主要要介绍一下ConfigManager和Environment。

ConfigManager和Environment都是SPI接口FrameworkExt的实现类,都是框架级别的单例对象,不支持扩展,统一通过ApplicationModel暴露给外部使用。

Dubbo源码(二)服务暴露与引用

FrameworkExt只是一个标记接口,父接口Lifecycle定义了一些生命周期方法。

@SPI
public interface FrameworkExt extends Lifecycle {

}
public interface Lifecycle {
    void initialize() throws IllegalStateException;
    void start() throws IllegalStateException;
    void destroy() throws IllegalStateException;
}

Environment可以理解为一堆kv配置,包括文件系统的dubbo.properties、环境变量、外部配置中心配置等等。

Dubbo源码(二)服务暴露与引用

ConfigManager统一管理AbstractConfig。

后面会看到Environment其实不会暴露给业务使用,最终都是注入AbstractConfig供业务使用

Dubbo源码(二)服务暴露与引用

比如ServiceConfig、ApplicationConfig、ProtocolConfig等等,都属于AbstractConfig子类。

比如ServiceConfig#setApplication。

如果顶层应用配置为空,不仅将application放入ServiceConfig,还会加入顶层配置。

Dubbo源码(二)服务暴露与引用

这些api都废弃了,更合理的使用方式肯定是用DubboBootstrap#application。

Dubbo源码(二)服务暴露与引用

这里提出来主要是要知道,通过ConfigManager可以拿到全局顶层配置。

initialize

DubboBoostrap的初始化方法,主要是new一些单例对象,如果有走配置中心就从配置中心拉配置到Environment。

具体逻辑不展开分析,我们只需要知道,Environment中的kv配置都有了,包括dubbo.properties、配置中心配置、环境变量等等。

Dubbo源码(二)服务暴露与引用

二次填充和校验

ServiceConfig#checkAndUpdateSubConfigs:

在上面Environment准备好之后,可以对ServiceConfig做二次填充,最后做一些校验。

为什么我叫它二次填充,是因为在用户硬编码之后,一些属性会被覆盖或者赋予默认值,认为是一种二次加工。

Dubbo源码(二)服务暴露与引用

这里重点分析一下ServiceConfig二次填充,笔者将其分为两类:

1)复合属性注入:如RegsitryConfig注入,当ServiceConfig一些复合配置为空,从上级获取注入。

2)简单属性注入:如ServiceConfig.delay延迟暴露,从Environment中获取kv配置,通过反射注入。

第一类是ServiceConfig#completeCompoundConfigs。

这里举个例子说明这个方法在做啥,具体源码不贴了。

比如注册中心配置,可以针对rpc服务ServiceConfig配置。

ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
// 应用配置
service.setApplication(new ApplicationConfig("heihei-app"));
// 注册中心配置
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));

也可以走全局ApplicationConfig的注册中心配置。

此时会取Application级别的Registry作为当前RPC服务的注册地址。

ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
// 应用配置
ApplicationConfig applicationConfig = new ApplicationConfig("heihei-app");
applicationConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
service.setApplication(applicationConfig);
// 注册中心配置
// service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));

对于第二类,包含:

1)checkProtocol:获取dubbo.protocol(s)注入ProtocolConfig,再将ProtocolConfig注入ServiceConfig

2)checkRegistry:获取dubbo.registry(s)注入RegistryConfig,再将RegistryConfig注入ServiceConfig

3)refresh:获取dubbo.service.全类名.属性名注入ServiceConfig

dubbo.protocol.name=dubbo
dubbo.protocol.port=20882
dubbo.registry.address=zookeeper://127.0.0.1:2181
dubbo.service.org.apache.dubbo.demo.DemoService.delay=3000

无论是ProtocolConfig还是RegistryConfig还是ServiceConfig,顶层都是AbstractConfig。

AbstractConfig#refresh是实现普通属性注入的核心方法。

逻辑也比较简单,就是根据当前AbstractConfig前缀,获取kv配置,封装为一个组合配置CompositeConfiguration,循环找setter方法通过反射注入。

Dubbo源码(二)服务暴露与引用

Environment#getPrefixedConfiguration构造前缀组合配置,比如ProtocolConfig的前缀是dubbo.protocol。

配置优先级由高到低:

1)java环境变量如-D参数

2)系统环境变量

3)配置中心针对应用级别配置

4)配置中心针对全局配置

5)AbstractConfig.metadata

6)文件系统dubbo.properties和OrderedPropertiesProviderSPI注入的properties

Dubbo源码(二)服务暴露与引用

暴露

ServiceConfig#doExport-> doExportUrls

Dubbo源码(二)服务暴露与引用

1)注册RPC服务模型和RPC服务提供者模型到ServiceRepository内存中

ServiceRepository在内存中存储了当前应用所有rpc服务相关信息。

Dubbo源码(二)服务暴露与引用

消费和提供rpc服务key由interface接口+group分组+version版本唯一确定。

Dubbo源码(二)服务暴露与引用

比如:

ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl());
service.setGroup("a");
service.setVersion("1.0");

2)根据RegistryConfig构建注册中心url,重点在于url的registry参数,表明了注册中心类型

如:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=heihei-app&dubbo=2.0.2&pid=28847&registry=zookeeper&timestamp=1678443137183

3)循环协议暴露

ServiceConfig#doExportUrlsFor1Protocol

1)构造发布到注册中心的url

2)exportLocal:暴露injvm协议,默认情况下都会暴露,这部分忽略,和34两步极为相似

3)PROXY_FACTORY.getInvoker:利用ProxyFactory的Adaptive实现,创建Invoker代理对象

4)PROTOCOL.export:利用Protocol的Adaptive实现,暴露rpc服务

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();
    if (StringUtils.isEmpty(name)) {
        name = DUBBO;
    }
    // 【step1】 将各种配置,注入这个map,用map作为parameter,构造url
    Map<String, String> map = new HashMap<String, String>();
    map.put(SIDE_KEY, PROVIDER_SIDE);
    ServiceConfig.appendRuntimeParameters(map);
    AbstractConfig.appendParameters(map, getMetrics());
    AbstractConfig.appendParameters(map, getApplication());
    AbstractConfig.appendParameters(map, getModule());
    AbstractConfig.appendParameters(map, provider);
    AbstractConfig.appendParameters(map, protocolConfig);
    AbstractConfig.appendParameters(map, this);
    // 元数据相关忽略
    MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
    if (metadataReportConfig != null && metadataReportConfig.isValid()) {
        map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
    }
    for (MethodConfig method : getMethods()) {
        // map填充rpc服务方法信息
    }
    String host = findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = findConfigedPorts(protocolConfig, name, map);
    // 暴露到注册中心的url【关注】
    URL url = new URL(name, host, port, 
                      getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
    // 扩展点 ConfiguratorFactory 对url做特殊处理
    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        // ...
    }
    // ...
    // 【step2】暴露injvm协议
    exportLocal(url);
    for (URL registryURL : registryURLs) {
        if ("injvm".equalsIgnoreCase(url.getProtocol())) {
            continue;
        }
        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
        if (monitorUrl != null) {
            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
        }
        String proxy = url.getParameter(PROXY_KEY);
        if (StringUtils.isNotEmpty(proxy)) {
            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
        }
        // registryURL=注册中心的url,其中export参数对应发布到注册中心的rpc协议url
        URL registryUrlAndExportUrl = registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString());
        // 【step3】创建代理对象,负责当前RPC服务的方法调用
        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryUrlAndExportUrl);
        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
        // 【step4】暴露
        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
        exporters.add(exporter);
    } 
    // 2.7.5 元数据服务 发布服务定义
    WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
    metadataService.publishServiceDefinition(url);
    this.urls.add(url);
}

Invoker代理

ProxyFactory主要负责创建代理对象,这里先看getInvoker。

@SPI("javassist")
public interface ProxyFactory {
    @Adaptive({"proxy"})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;
    @Adaptive({"proxy"})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}

ProxyFactory#getInvoker入参有三个:

1)proxy:proxy命名不太符合实际语义,应该改为target更合适,因为传入的实际上是rpc服务实现类实例,如DemoService的实现DemoServiceImpl;

2)type:rpc服务接口,如DemoService;

3)url:注册中心url,且export参数为发布到注册中心的url,如

registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=heihei-app&dubbo=2.0.2&pid=3337&registry=zookeeper&timestamp=1678540659987
&
export=dubbo://127.0.0.1:20881/org.apache.dubbo.demo.DemoService?
anyhost=true&application=heihei-app&bind.ip=127.0.0.1&bind.port=20881&
deprecated=false&dubbo=2.0.2&
dynamic=true&generic=false&
interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&
pid=3337&release=&scope=remote&side=provider&timestamp=1678540660025

ProxyFactory#getInvoker走的是自适应实现,由于url中没有指定proxy,所以默认走javassist实现。

javassist底层还是利用target目标class拼java代码,动态生成一个Wrapper类,封装一个AbstractProxyInvoker。

Dubbo源码(二)服务暴露与引用

因为javassist不好理解,我们看JdkProxyFactory的实现。

JdkProxyFactory#getInvoker将target(DemoServiceImpl)封装到AbstractProxyInvoker中,invoke方法就能用传入的方法和参数直接调用到目标类上。

很明显,这个Invoker就是用于暴露给外部调用的,外部只要传入反射需要用到的必要参数,比如方法名、参数列表,就能调用到目标类的目标方法。

Dubbo源码(二)服务暴露与引用

JdkProxyFactory#getInvoker仅仅是封装了一个Invoker,invoker运行时用到了java反射技术。

javassist实现性能更好,因为Wrapper#invokeMethod直接调用目标方法,不走反射。

javassist拼的代码大致如下:

Dubbo源码(二)服务暴露与引用

此外,JdkProxyFactory类名比较容易误解,应该理解为用jdk的api实现的代理工厂,并不是使用了jdk动态代理的工厂。

虽然JdkProxyFactory#getProxy方法用了jdk动态代理实现(服务消费方用,这里先不深入)。

Dubbo源码(二)服务暴露与引用

protocol#export(Invoker)

上面创建完invoker代理之后,走Protocol的自适应扩展点实现。

Dubbo源码(二)服务暴露与引用

第一章提到过AdaptiveClassCodeGenerator会生成自适应扩展类,如果参数列表没有url,会从参数列表里找一个参数有getUrl方法,用他作为获取url途径。

这里Invoker就有getUrl方法,对应url就是构造Invoker时使用的。

registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=heihei-app&dubbo=2.0.2&pid=3337&registry=zookeeper&timestamp=1678540659987
&
export=dubbo://127.0.0.1:20881/org.apache.dubbo.demo.DemoService?
anyhost=true&application=heihei-app&bind.ip=127.0.0.1&bind.port=20881&
deprecated=false&dubbo=2.0.2&
dynamic=true&generic=false&
interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&
pid=3337&release=&scope=remote&side=provider&timestamp=1678540660025

第一章说到获取扩展名,能通过url中的params参数来获取。

特殊的,对于protocol参数支持调用url#getProtocol方法获取扩展名,也就是说这里会先走RegistryProtocol实现。

Dubbo源码(二)服务暴露与引用

比较晦涩的一点是,在进入RegistryProtocol之前会走两个Wrapper扩展点。

ProtocolListenerWrapper,忽略,主要是利用InvokerListener SPI实现一些事件监听。

ProtocolFilterWrapper,做了个特殊处理,暴露阶段会经过两次这个export方法(提示:Wrapper对象是两个不同的对象,并不是内部包装的protocol会变,见SPI实现),第一次包装的是RegistryProtocol。

Dubbo源码(二)服务暴露与引用

RegistryProtocol#export共分为两步:1)暴露rpc服务;2)向注册中心注册。

Dubbo源码(二)服务暴露与引用

暴露rpc服务

暴露rpc服务的入口,在RegsitryProtocol#doLocalExport。

RegistryProtocol.protocol是通过setter注入的Adaptive实现。

Dubbo源码(二)服务暴露与引用

这次,会再次进入ProtocolFilterWrapper的export方法,只不过url变成了rpc协议,比如dubbo://。

Dubbo源码(二)服务暴露与引用

所以会走ProtocolFilterWrapper的buildInvokerChain。

这个方法会将特别常用的扩展点Filter,适配为Invoker,形成一个Invoker链。

Dubbo源码(二)服务暴露与引用

Invoker被Filter包装后,export最终调用到rpc协议的实现类,比如DubboProtocol#export。

DubboProtocol#export创建DubboExporter,包裹Invoker,缓存在内部的exporterMap中

exporterMap的key={分组}/{rpc服务}:{版本号}:{端口}。

Dubbo源码(二)服务暴露与引用

对于每个address(host+port),创建一个单例的ProtocolServer。

Dubbo源码(二)服务暴露与引用

通讯层的默认实现就是netty4.NettyTransporter,netty的相关配置项都来源于url,不再深入。

Dubbo源码(二)服务暴露与引用

注册rpc服务

RegistryProtocol#register注册rpc服务入口。

Dubbo源码(二)服务暴露与引用

RegistryFactory是通过setter注入的自适应扩展实现,取url的协议对应实现。

Dubbo源码(二)服务暴露与引用

目前所有RegistryFactory都继承自AbstractRegistryFactory

AbstractRegistryFactory#getRegistry:抽象类做缓存工作,子类实现createRegistry创建Registry实例。

Dubbo源码(二)服务暴露与引用

基本上所有注册中心都是利用三方jar实现,当protocol=zookeeper时ZookeeperRegistry实现如下。

因为zk的客户端有很多,通过切换ZookeeperTransporter(自适应扩展)可以接入不同的zk客户端。

目前只有apache curator实现,一般注册中心客户端中都涵盖心跳逻辑,这里也不例外,不深入分析。

Dubbo源码(二)服务暴露与引用

从RegistryFactory获取到Registry后,调用Registry#register注册。

基本所有Registry都继承自FailbackRegistry,FailbackRegistry实现register方法,实现重试。

Dubbo源码(二)服务暴露与引用

doRegister方法一般也是调用三方api,实现服务注册逻辑。

比如zk注册中心是创建"/dubbo/rpc服务全类名/providers/rpc协议url如dubbo"这个znode,并且是个临时节点。

Dubbo源码(二)服务暴露与引用

服务引用

下面用最简单的api方式,创建一个服务消费方。

ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("consumer-app"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
DemoService service = reference.get(); // 核心
String message = service.sayHello("dubbo");
System.out.println(message);

核心方法是通过ReferenceConfig#get方法得到rpc服务的代理。

Dubbo源码(二)服务暴露与引用

本质上是ReferenceConfig在首次调用get方法时会创建一个rpc服务代理并缓存。

Dubbo源码(二)服务暴露与引用

ReferenceConfig#init方法初始化创建rpc服务代理,分为四步:

1)DubboBootstrap#init:环境初始化,和服务提供方一样,主要是初始化Environment得到kv配置

2)ReferenceConfig二次填充和校验

3)拼装map,为后续组装URL做铺垫

Dubbo源码(二)服务暴露与引用

4)createProxy:利用组装好的map创建rpc服务代理

public synchronized void init() {
    if (initialized) {
        return;
    }
    // 【step1】初始化 主要是Environment
    if (bootstrap == null) {
        bootstrap = DubboBootstrap.getInstance();
        bootstrap.init();
    }
    // 【step2】ReferenceConfig二次填充校验
    checkAndUpdateSubConfigs();
    checkStubAndLocal(interfaceClass);
    ConfigValidationUtils.checkMock(interfaceClass, this);

    // 【step3】将各种配置注入map,为后续创建URL做铺垫
    Map<String, String> map = new HashMap<String, String>();
    map.put(SIDE_KEY, CONSUMER_SIDE);
    ReferenceConfigBase.appendRuntimeParameters(map);
    // ...
    // 【step4】创建rpc服务代理
    ref = createProxy(map);
    // ...
    initialized = true;
}

前三步和服务提供方非常类似,不再赘述,重点看createProxy方法。

ReferenceConfig#createProxy

1)执行自适应Protocol的refer方法,传入url协议为registry,走RegistryProtocol,返回一个Invoker;

2)执行自适应ProxyFactory的getProxy方法,传入Invoker,返回一个rpc服务代理对象;

private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
    // 【特性:本地调用】直接走injvm,忽略
} else {
    urls.clear();
    if (url != null && url.length() > 0) {
        // 【特性:直连提供者】用户指定url,不走注册中心,忽略
    } else {
        if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
            checkRegistry();
            // 根据ReferenceConfig配置,构造注册中心url,即registry://...
            List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
            for (URL u : us) {
                // 在registry://后面拼refer=map
                urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
            }
        }
    }

    if (urls.size() == 1) {
        // 【重点】自适应Protocol执行refer方法
        // url = registry://...
        invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
    } else {
        // 【特性:多注册中心】忽略
    }
    // 【关注】自适应ProxyFactory将invoker转换为rpc服务代理对象
    return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

在ProtocolFilterWrapper中,和provider一样,会两次进入refer方法,第一次由于协议是registry,直接透传到RegistryProtocol。

Dubbo源码(二)服务暴露与引用

RegistryProtocol#doRefer

1)构造RegistryDirectory,执行RegistryDirectory#subscribe方法订阅rpc服务;

2)执行Cluster自适应扩展点的join方法,将RegistryDirectory转换为一个Invoker;

Dubbo源码(二)服务暴露与引用

Directory/Invoker/Cluster

先来介绍一下Directory和Invoker和Cluster的关系。

Directory持有多个Invoker,运行时根据Invocation可以获取到其中部分Invoker。

Dubbo源码(二)服务暴露与引用

Directory持有的Invoker,实际上会是对应rpc协议Protocol通过refer方法返回的,比如DubboProtocol。

这里的Invoker和provider侧的不同,就是负责底层网络通讯

为什么对于一个rpc服务会有多个呢?因为每个rpc服务都可能有多个provider实例。

Protocol#refer:可以猜测,这里url就是注册中心上一个rpc服务不同address的provider的url。

Dubbo源码(二)服务暴露与引用

虽然Directory有多个Invoker,但是对于客户端来说,最终只能暴露一个rpc服务代理。

所以需要有一层Cluster来屏蔽底层多个Invoker的事实。

Dubbo源码(二)服务暴露与引用

粗略认为,Cluster负责负载均衡,Directory是内存注册表,Invoker是netty连接。

订阅(Directory)

RegistryDirectory#subscribe:订阅rpc服务,传入url是consumer://协议。

Dubbo源码(二)服务暴露与引用

这里直接看ZookeeperRegistry#doSubscribe实现。

这里根据入参url=consumer://xxxx,构造出监听zk的三个path,即/dubbo/{rpc服务}/{providers,configurators,routers}。

循环三个path注册children监听,并且将第一次获取的数据,通过ZookeeperRegistry#notify通知RegistryDirectory。

Dubbo源码(二)服务暴露与引用

我们只关注/dubbo/{rpc服务}/providers下的数据变更,其他两种先不管。

rpc提供者变更(Directory)

RegistryDirectory#notify接收到provider变更通知,刷新内部存储的invokers。

Dubbo源码(二)服务暴露与引用

RegistryDirectory#refreshInvoker:转换provider的url集合为Invoker,刷新Directory中缓存的invokers。

private volatile Map<String, Invoker<T>> urlInvokerMap;
private volatile List<Invoker<T>> invokers;
private void refreshInvoker(List<URL> invokerUrls) {
    if (invokerUrls.size() == 1
            && invokerUrls.get(0) != null
            && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        // case1 没有rpc服务提供者,invokerUrls只有一个empty://协议的url,更新invokers为空
        this.forbidden = true;
        this.invokers = Collections.emptyList();
        routerChain.setInvokers(this.invokers);
        destroyAllInvokers();
    } else {
        // case2 rpc服务提供者存在,根据provider的url,刷新invokers
        this.forbidden = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
        // ...
        // 这里将provider的url转换为新Invoker
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
        List<Invoker<T>> newInvokers = 
            Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        // 新invokers存储到RegistryDirectory
        routerChain.setInvokers(newInvokers);
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;
        // 旧invokers销毁
        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

RegistryDirectory#toInvokers

1)mergeUrl:对服务提供方的url和服务消费方的url进行一个合并

2)protocol#refer:自适应Protocol根据url创建Invoker

Dubbo源码(二)服务暴露与引用

mergeUrl内部比较复杂,举个例子是什么意思。

比如服务提供方暴露rpc服务,指定connections=5,对于dubbo协议代表provider和consumer之间建立5个长连接,从而注册到zk上的url就包含了connections=5。

ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
    // rpc服务定义
    service.setInterface(DemoService.class);
    service.setRef(new DemoServiceImpl());
    service.setConnections(5);
}

服务消费方引用rpc服务,指定connections=2,最终会以consumer方指定的connections为准,建立2个长连接。

ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("consumer-app"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
reference.setConnections(2);

rpc服务Invoker(Invoker)

Directory调用自适应Protocol创建Invoker,传入url是dubbo协议。

DubboProtocol继承自AbstractProtocol,AbstractProtocol#refer将所有Invoker实现都包了一层AsyncToSyncInvoker,将异步调用转换为同步。

Dubbo源码(二)服务暴露与引用

DubboProtocol#protocolBindingRefer:根据url创建DubboInvoker,对于消费方来说这是最底层的Invoker了。

getClients根据url的connections配置,创建Netty客户端,默认情况下每个address(ip+port)只有一个长连接。

Dubbo源码(二)服务暴露与引用

ProtocolFilterWrapper包装扩展点,在DubboProtocol#refer返回DubboInvoker之后,和provider侧一样包了一层Filter(只不过激活扩展点分组为consumer),返回给Directory。

Dubbo源码(二)服务暴露与引用

至此Directory订阅流程结束,接下来进入Cluster#join阶段。

Join(Cluster)

回到RegistryProtocol#doRefer,此时Invoker已经注入到了Directory。

Cluster#join将Directory转换为Invoker。

Dubbo源码(二)服务暴露与引用

Cluster走自适应实现,默认是FailoverCluster。

几乎所有Cluster都继承AbstractCluster。

AbstractCluster在子类doJoin返回Invoker后,包了一层ClusterInterceptor扩展点,和Filter的做法类似。

Dubbo源码(二)服务暴露与引用

FailoverCluster直接new了一个FailoverClusterInvoker返回。

Dubbo源码(二)服务暴露与引用

其实在Cluster扩展点之外,还包了一层MockClusterWrapper。

MockClusterWrapper会把上述ClusterInterceptor包裹的FailoverClusterInvoker再包一层MockClusterInvoker,官方称为本地伪装特性。

Dubbo源码(二)服务暴露与引用

rpc服务代理创建

最终再次回到ReferenceConfig#createProxy,此时Protocol层已经完全处理完毕,返回了一个Invoker。

为了给用户代码使用,最终用ProxyFactory将Invoker转换为rpc服务代理。

Dubbo源码(二)服务暴露与引用

之前在provider侧已经看到过ProxyFactory的另一个方法,将rpc服务实现转换为Invoker;

而在consumer侧,是将Invoker转换为rpc服务代理。

Dubbo源码(二)服务暴露与引用

还是老规矩,我们不看默认javassist字节码实现代理,直接看JdkProxyFactory实现来理解getProxy方法。

JdkProxyFactory就是利用了jdk动态代理,将invoker放入了InvocationHandler实现中。

所以运行时在消费端的入口方法,就在InvokerInvocationHandler中,后续再看。

Dubbo源码(二)服务暴露与引用

此外,ProxyFactory也有包装扩展点。

StubProxyFactoryWrapper在底层ProxyFactory返回的代理对象上,可以允许包一层Stub,即官方称为本地存根的特性,这里不深入讨论。

Dubbo源码(二)服务暴露与引用

总结

从整体上来说,无论是rpc服务提供方还是消费方,整体都遵从这个流程:

1)准备Environment:利用2.7.5BootstrapAPI,从各种地方搜集kv配置,比如dubbo.properties、配置中心、系统变量;

2)二次填充/校验Config:利用第一步得到的kv配置,注入ServiceConfig/ReferenceConfig,其中还涉及到一些配置优先级的问题;

3)执行业务方法:对于provider来说,暴露rpc服务;对于consumer来说,引用rpc服务代理。他们都依赖于注册中心;

Dubbo源码(二)服务暴露与引用

从模型的角度来看:

provider是将rpc实现类层层包装为invoker嵌入dubbo内部,在运行时等待rpc请求到来,经过层层包装的invoker就能来到rpc实现类;

consumer是将注册中心中的providerUrl层层包装为invoker嵌入dubbo内部,通过javassist代理创建rpc代理服务,给到用户;

无论是provider还是consumer,可以预见到invoker会在运行阶段贯穿始终。

provider

rpc服务暴露流程:

1)ProxyFactory#getInvoker:创建Invoker代理,目标对象是rpc服务实现类;

2)Protocol#export:基于rpc协议暴露rpc服务,比如dubbo协议,底层默认开启NettyServer接收客户端请求;

3)Protocol#export:基于registry协议注册rpc服务到注册中心,比如zookeeper协议,创建临时znode:/dubbo/rpc服务全类名/providers/提供者url;

从模型的角度来说,provider将rpc服务实现包装为Invoker,缓存在一个内部map中:

1)ProxyFactory将rpc服务实现包装为Invoker代理对象,默认采用javassist动态生成字节码方式(JdkProxyFactory更好理解);

2)ProtocolFilterWrapper将上述Invoker通过Filter适配包装,Filter(Invoker代理(rpc服务实现));

3)DubboProtocol将上述Invoker通过DubboExporter包装,Exporter(Filter(Invoker代理(rpc服务实现))),并缓存到内部map,map的key={分组}/{rpc服务}:{版本号}:{端口};

至此在运行时通过Protocol缓存map的key能找到Exporter,最终能调用rpc服务实现

Dubbo源码(二)服务暴露与引用

consumer

rpc服务引用流程:

1)RegistryDirectory#subscribe:订阅rpc服务,取决于Registry实现,比如zk,监听/dubbo/rpc服务/providers父目录变更;

2)RegistryDirectory#notify:因为首次订阅得到providerUrls,这里同步收到通知,调用Protocol#refer创建invokers,注入RegistryDirectory内部map;

3)Protocol#refer:传入url协议为rpc协议,如dubbo,创建DubboInvoker,同一address(ip+port)创建一个netty长连接(所有rpc服务共享)。DubboInvoker同样会被Filter包装,只是Filter的group是consumer;

4)Cluster#join:将RegistryDirectory转换为Invoker,默认是FailoverClusterInvoker。包装Cluster在FailoverClusterInvoker外部包了:ClusterInterceptor适配Invoker和MockClusterInvoker(本地伪装);

5)ProxyFactory#getProxy:将Cluster#join返回的Invoker转换为rpc服务代理对象,默认采用javassist动态生成字节码方式(JdkProxyFactory更好理解),StubProxyFactoryWrapper包装扩展支持将代理对象二次包装为Stub(本地存根);

6)将最终生成的代理对象,缓存到ReferenceConfig;

从模型角度来说,大致如下:

Dubbo源码(二)服务暴露与引用

欢迎大家评论或私信讨论问题。

本文原创,未经许可不得转载。

欢迎关注公众号【程序猿阿越】。

转载自:https://juejin.cn/post/7210945608877998135
评论
请登录