likes
comments
collection
share

【Dubbo】服务暴露机制

作者站长头像
站长
· 阅读数 4

概述

【Dubbo】服务暴露机制

首先,看一下服务暴露的整体流程,如上图所示。Dubbo框架将服务暴露分为两大部分,第一步将持有的服务实例通过代理转换成Invoker,第二步会把Invoker通过具体的协议(比如Dubbo )转换成Exporter。接下来会具体分析一下每个步骤的流程以及源码。

源码解析

ServiceConfig类

ServiceBean类

在看ServiceConfig之前,先来看一下其子类ServiceBeanonApplicationEvent方法,该方法监听了Spring容器的上下文刷新事件,当收到该事件时会触发服务的暴露工作,具体代码如下所示:

@Override
//添加了上下文刷新监听,用于暴露服务使用。
public void onApplicationEvent(ContextRefreshedEvent event) {
    //是否延迟导出 && 是否已经导出 && 是不是已经取消导出
    if (isDelay() && !isExported() && !isUnexported()) {
        //省略日志输出代码
        export();
    }
}

至于ServiceBean的话,是通过配置或者注解解析出来的Bean,本文不做解析。

export方法

接下来看一下ServiceConfig#export方法,该方法主要对exportdelay配置进行检查,如果export=false就不做暴露,如果delay=true则进行延迟暴露。

public synchronized void export() {
    if (provider != null) {
        //获取export和delay配置。
        if (export == null) {
            export = provider.getExport();
        }
        if (delay == null) {
            delay = provider.getDelay();
        }
    }
    if (export != null && !export) {
        return;
    }
    //延迟暴露
    if (delay != null && delay > 0) {
        delayExportExecutor.schedule(new Runnable() {
            @Override
            public void run() {
                doExport();
            }
        }, delay, TimeUnit.MILLISECONDS);
    } else {
        //暴露服务
        doExport();
    }
}

接下来看一下ServiceConfig#doExport方法,该方法主要是进行配置检查,设置缺省值或者抛出异常等。配置检查部分本文不做解析,主要看一下该方法中调用的ServiceConfig#doExportUrls方法。

doExportUrls方法

ServiceConfig#doExportUrls方法会加载注册中心的URL,然后遍历协议列表,在每个协议下面都导出服务。代码如下所示:

//多协议多注册中心导出。
private void doExportUrls() {
    //加载注册中心URL
    List<URL> registryURLs = loadRegistries(true);
    for (ProtocolConfig protocolConfig : protocols) {
        // 遍历 protocols,并在每个协议下导出服务
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

loadRegistries方法

loadRegistries方法主要包含以下逻辑:

  1. 检测是否存在注册中心配置类,不存在则抛出异常
  2. 遍历注册中心配置类,构建注册中心URL列表
  3. 判断是否加入到registryList

代码如下所示:

protected List<URL> loadRegistries(boolean provider) {
    //检查是否存在注册中心配置类,不存在则抛出异常。
    checkRegistry();
    List<URL> registryList = new ArrayList<URL>();
    if (registries != null && !registries.isEmpty()) {
        for (RegistryConfig config : registries) {
            String address = config.getAddress();
            //默认值
            if (address == null || address.length() == 0) {
                address = Constants.ANYHOST_VALUE;
            }
            //配置值
            String sysaddress = System.getProperty("dubbo.registry.address");
            if (sysaddress != null && sysaddress.length() > 0) {
                address = sysaddress;
            }
            if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
            		//将构建URL所需的参数加入到map里。
                Map<String, String> map = new HashMap<String, String>();
                appendParameters(map, application);
                appendParameters(map, config);
                map.put("path", RegistryService.class.getName());
                map.put("dubbo", Version.getProtocolVersion());
                map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                if (ConfigUtils.getPid() > 0) {
                    map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                }
                if (!map.containsKey("protocol")) {
                    if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                        map.put("protocol", "remote");
                    } else {
                        map.put("protocol", "dubbo");
                    }
                }

                //解析得到URL列表,address可能包含多个注册中心ip
                List<URL> urls = UrlUtils.parseURLs(address, map);
                for (URL url : urls) {
                    url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                    //将URL协议头设置为registry
                    url = url.setProtocol(Constants.REGISTRY_PROTOCOL);

                    //(服务提供者 && 注册) || (非服务提供者 && 订阅)
                    if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                            || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return registryList;
}

doExportUrlsFor1Protocol方法

doExportUrlsFor1Protocol方法主要包含以下逻辑:

  1. 组装URL
  2. 导出服务

这里主要分析一下导出服务的代码,导出服务部分代码主要包含以下逻辑:

  1. 如果scope!=remote,则导出服务到本地
  2. 如果scope!=local,则导出服务到远程
    1. 遍历注册中心URL,依次导出
  3. 其实一般来说scope是为空的,上面两个都会执行

代码如下所示:

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    //省略无关代码
    // don't export when none is configured
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        // scope != remote,导出到本地
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        // scope != local,导出到远程
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            //省略日志输出代码
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));

                    //加载监视器链接,将监视器链接作为参数添加到 url 中
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    //省略日志输出代码

                    // For providers, this is used to enable custom proxy to generate invoker
                    String proxy = url.getParameter(Constants.PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                    }

                    //生成invoker
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    //DelegateProviderMetaDataInvoker用于持有Invoker和ServiceConfig
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    //导出服务,将exporter加入到exporters
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
            // 不存在注册中心,仅导出服务
            else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        }
    }
    this.urls.add(url);
}

exportLocal方法

exportLocal方法用于导出本地服务,主要包含以下逻辑:

  1. 判断当前协议是否已经是injvm,如果是不作处理
  2. 设置URL的协议、hostport为本地
  3. 创建invoker,导出服务
private void exportLocal(URL url) {
    //判断当前协议是否为injvm,如果是就无需再次导出了。
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        URL local = URL.valueOf(url.toFullString())
                //将协议设置成injvm,host设置为本地,port=0
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(LOCALHOST)
                .setPort(0);
        StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
        //创建invoker,导出服务
        //这里的protocol是一个自适应扩展类,会根据url的协议去调用对应扩展类的export方法
      	//此处调用的是InjvmProtocol#export方法
        Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
    }
}

getInvoker方法

getInvokerProxyFactor接口提供的,默认的实现类是JavassistProxyFactoryJavassistProxyFactory#getInvoker方法主要包含以下逻辑:

  1. 创建包装类
  2. 创建invoker,对invoker的方法调用最终会调用到包装类的invokeMethod

代码如下所示:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    //创建包装类
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    //创建匿名invoker类,实现doInvoke方法,doInvoke方法是一个抽象法法
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

getWrapper方法主要逻辑是对传入的类进行解析,生成invokeMethod代码以及一些其他代码,然后通过Javassist生成class对象并通过反射创建实例,具体代码不做解析。

Protocol

RegistryProtocol#export

RegistryProtocol#export方法主要包含以下逻辑:

  1. 委托具体协议(Dubbo)进行服务暴露,创建NettyServer监听端口和保存服务实例。
  2. 创建注册中心对象,与注册中心创建TCP连接。
  3. 注册服务元数据到注册中心。
  4. 订阅configurators节点,监听服务动态属性变更事件。
  5. 服务销毁收尾工作,比如关闭端口、反注册服务信息等

代码如下所示:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //本地暴露服务,这个方法会取出export参数的值作为url,并根据url的协议调用对应的protocol(比如dubbo)进行export
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    //获取注册中心URL,以zookeeper注册中心为例,得到的示例 URL 如下:
    //zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.18.39.113%3A20880%2Ftop.fuyuaaa.api.TestService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bean.name%3Dtop.fuyuaaa.api.TestService%26bind.ip%3D172.18.39.113%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dtop.fuyuaaa.api.TestService%26methods%3Ddemo%26pid%3D72080%26qos.port%3D22222%26revision%3D1.0%26side%3Dprovider%26timestamp%3D1599729148679%26transporter%3Dnetty%26version%3D1.0&pid=72080&qos.port=22222&timestamp=1599729148633
    URL registryUrl = getRegistryUrl(originInvoker);

    //根据URL加载Registry实现类,比如 ZookeeperRegistry
    final Registry registry = getRegistry(originInvoker);

    //获取已注册的服务提供者URL,比如:
    //dubbo://172.18.39.113:20880/top.fuyuaaa.api.TestService?anyhost=true&application=demo-provider&bean.name=top.fuyuaaa.api.TestService&dubbo=2.0.2&generic=false&interface=top.fuyuaaa.api.TestService&methods=demo&pid=72080&revision=1.0&side=provider&timestamp=1599729148679&transporter=netty&version=1.0
    final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

    //to judge to delay publish whether or not
    //获取 register 参数
    boolean register = registeredProviderUrl.getParameter("register", true);

    //向服务提供者与消费者注册表中注册服务提供者
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

    if (register) {
        //向注册中心注册服务
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    //获取订阅 URL,比如:
    //provider://172.18.39.113:20880/top.fuyuaaa.api.TestService?anyhost=true&application=demo-provider&bean.name=top.fuyuaaa.api.TestService&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=top.fuyuaaa.api.TestService&methods=demo&pid=72080&revision=1.0&side=provider&timestamp=1599729148679&transporter=netty&version=1.0
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    //创建监听器,监听服务接口下configurators节点,用于处理动态配置,比如dubbo-admin对集群的服务治理。
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    //向注册中心进行订阅override数据
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //创建并返回 DestroyableExporter
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

DubboProtocol#export

DubboProtocol#export方法主要包含以下逻辑:

  1. 缓存servicekeyexporter
  2. 创建服务器

代码如下所示:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    //缓存key和对应的export
    //🌰:top.fuyuaaa.api.TestService:1.0:20880
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //省略本地存根相关代码
    
    //创建服务器实例
    openServer(url);
    //优化序列化
    optimizeSerialization(url);
    return exporter;
}

private void openServer(URL url) {
    // find server.
    // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
    String key = url.getAddress();
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            // 服务器已创建,则根据 url 中的配置重置服务器
            server.reset(url);
        }
    }
}

关于服务注册以及创建netty服务器具体流程不做解析。

流程图

【Dubbo】服务暴露机制

参考

Dubbo官网

《深入理解Apache Dubbo与实战》

转载自:https://juejin.cn/post/6870798222388920333
评论
请登录