Dubbo3源码篇1-服务发现(本地和远程暴露)源码分析
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
1. Dubbo与Spring整合
简单看下
DubboBeanDefinitionParser.parse()
方法:
/**
* @param element 当前要解析的标签
* @param parserContext 解析上下文,其中包含了当前配置文件中所有其它标签的解析信息
* @param beanClass 当前标签解析出的内容要封装的类,其中就是存放的从标签中读出的属性。读到了什么就记录下什么
* @param registered 解析出的标签是否需要注解到注册中心
* @return 解析对象。将读取出的数据最终要构建出一个“逻辑”上的对象。例如,构建出一个“注册中心”对象、“协议”对象等
*/
@SuppressWarnings("unchecked")
private static RootBeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean registered) {
// ----------------------- 1 创建并初始化解析对象 ---------------
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
// 指定该bean不进行延迟初始化
beanDefinition.setLazyInit(false);
// ----------------------- 2 解决id问题: 为空与重复问题 ---------------
// config id
// 获取当前解析标签的id属性
String configId = resolveAttribute(element, "id", parserContext);
// 若id不空,则写入到beanDefinition
if (StringUtils.isNotEmpty(configId)) {
beanDefinition.getPropertyValues().addPropertyValue("id", configId);
}
// get id from name
// 若id属性为空,则获取name属性赋值给id
if (StringUtils.isEmpty(configId)) {
configId = resolveAttribute(element, "name", parserContext);
}
// 此时若id不空,则处理{}点位符
if (StringUtils.isNotEmpty(configId)) {
configId = resolvePlaceholders(configId, parserContext);
}
// bean名称取id属性值
String beanName = configId;
// 若此时beanName为空,即id属性仍为空
if (StringUtils.isEmpty(beanName)) {
// generate bean name
// 取beanClass的类名
String prefix = beanClass.getName();
int counter = 0;
// beanName为类名与数字的拼接
beanName = prefix + "#" + counter;
// 查看该beanName是否重复。若重复,则让数字增一
while (parserContext.getRegistry().containsBeanDefinition(beanName)) {
beanName = prefix + "#" + (counter++);
}
}
// 此时beanName一定不空,且不重复
beanDefinition.setAttribute(BEAN_NAME, beanName);
// ----------------------- 3 对特殊标签的处理 ---------------
// 对<dubbo:protocol/>标签的处理
if (ProtocolConfig.class.equals(beanClass)) {
// for (String name : parserContext.getRegistry().getBeanDefinitionNames()) {
// BeanDefinition definition = parserContext.getRegistry().getBeanDefinition(name);
// PropertyValue property = definition.getPropertyValues().getPropertyValue("protocol");
// if (property != null) {
// Object value = property.getValue();
// if (value instanceof ProtocolConfig && beanName.equals(((ProtocolConfig) value).getName())) {
// definition.getPropertyValues().addPropertyValue("protocol", new RuntimeBeanReference(beanName));
// }
// }
// }
// 对<dubbo:service/>标签的处理
} else if (ServiceBean.class.equals(beanClass)) {
String className = resolveAttribute(element, "class", parserContext);
if (StringUtils.isNotEmpty(className)) {
RootBeanDefinition classDefinition = new RootBeanDefinition();
classDefinition.setBeanClass(ReflectUtils.forName(className));
classDefinition.setLazyInit(false);
parseProperties(element.getChildNodes(), classDefinition, parserContext);
beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, beanName + "Impl"));
}
}
// ----------------------- 4 对普通标签的普适性处理 ---------------
Map<String, Class> beanPropTypeMap = beanPropsCache.get(beanClass.getName());
if (beanPropTypeMap == null) {
beanPropTypeMap = new HashMap<>();
beanPropsCache.put(beanClass.getName(), beanPropTypeMap);
if (ReferenceBean.class.equals(beanClass)) {
//extract bean props from ReferenceConfig
getPropertyMap(ReferenceConfig.class, beanPropTypeMap);
} else {
getPropertyMap(beanClass, beanPropTypeMap);
}
}
...
return beanDefinition;
}
也就是说 dubbo的配置文件每一个标签对应一个类,每个标签可以配置对应类的所有属性值:
@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class));
...
}
2. 核心接口
2.1 Invocation
其封装了远程调用的具体信息。
2.2 Invoker
其是提供者 provider 的代理对象
,在代码中就代表提供者。特别是在消费者进行远程调
用时,其通过服务路由、负载均衡、集群容错等机制要查找的就是 Invoker。找到了其需要
的 Invoker 实例就可以进行远程调用了
2.3 Exporter
服务暴露对象。其包含一个很重要的方法 getInvoker(),用于获取当前服务暴露实例所 包含的远程调用实例 Invoker,即可以进行的远程调用。
而 unexport()方法会使服务不进行服务暴露。
2.4 Directory
Directory 中包含一个很重要的方法 list(),其返回结果为一个 List<Invoker>
。其实简单来
说,可以将 Directory 理解为一个动态的 Invoker 列表。
3. 启动入口DubboBootstrapApplicationListener
// 当Spring容器创建时会触发该方法的执行
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (isOriginalEventSource(event)) {
if (event instanceof DubboAnnotationInitedEvent) {
// This event will be notified at AbstractApplicationContext.registerListeners(),
// init dubbo config beans before spring singleton beans
applicationContext.getBean(DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);
// All infrastructure config beans are loaded, initialize dubbo here
DubboBootstrap.getInstance().initialize();
} else if (event instanceof ApplicationContextEvent) {
this.onApplicationContextEvent((ApplicationContextEvent) event);
}
}
}
private void onApplicationContextEvent(ApplicationContextEvent event) {
if (DubboBootstrapStartStopListenerSpringAdapter.applicationContext == null) {
DubboBootstrapStartStopListenerSpringAdapter.applicationContext = event.getApplicationContext();
}
// 容器刷新事件(容器创建、刷新会引发该事件)
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
// 容器关闭事件
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextClosedEvent(ContextClosedEvent event) {
if (dubboBootstrap.getTakeoverMode() == BootstrapTakeoverMode.SPRING) {
// will call dubboBootstrap.stop() through shutdown callback.
DubboShutdownHook.getDubboShutdownHook().run();
}
}
接下来执行DubboBootstrap.start()方法:
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
startup.set(false);
shutdown.set(false);
awaited.set(false);
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 1. export Dubbo Services
// 服务暴露
exportServices();
// If register consumer instance or has exported services
if (isRegisterConsumerInstance() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
// 3. Register the local ServiceInstance if required
registerServiceInstance();
}
// 服务引用
referServices();
...
}
return this;
}
服务暴露和服务引用的入口都是在这里。
4. Dubbo服务提供方启动的流程图
如下:
4. ServiceConfig类
我们先来看看ServiceConfig类,ServiceConfig可以说是每暴露一个接口就会有一个ServiceConfig对象,比如说我现在有2个接口
然后有2个对应的实现类,那么我们在服务暴露的时候就会有2个ServiceConfig实例,其实我们看它的属性的时候也能猜到
我这里只是截了3个成员,第一个是接口名,第二个是接口的class对象,第三个就是接口的具体实现类。
接下来我们具体看看ServiceConfig的成员:
//记录随机端口的
private static final Map<String, Integer> RANDOM_PORT_MAP = new HashMap<String, Integer>();
// 延时暴露 executor
private static final ScheduledExecutorService DELAY_EXPORT_EXECUTOR = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));
// 自适应Protocol ,这个就跟变色龙似的,能够根据具体的参数值变成不同的实现
private static final Protocol PROTOCOL = ExtensionLoader
.getExtensionLoader(Protocol.class) // 获取SPI接口Protocol的extensionLoader实例
.getAdaptiveExtension(); // 使用extensionLoader实例获取Protocol的自适应类实例
//代理工厂的自适应
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
// 是否已经暴露
private transient volatile boolean exported;
//是否需要暴露
private transient volatile boolean unexported;
private DubboBootstrap bootstrap;
private transient volatile AtomicBoolean initialized = new AtomicBoolean(false);
// exporters
private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();
//接口class
protected Class<?> interfaceClass;
// 具体实现类
protected T ref;
protected String path;
// 关于provider的配置
protected ProviderConfig provider;
protected String providerIds;
// 范化
protected volatile String generic;
继续 DubboBootstrap.exportService():
private void exportServices() {
// 遍历当前配置文件中的所有<dubbo:service/>标签
for (ServiceConfigBase sc : configManager.getServices()) {
// TODO, compatible with ServiceConfig.export()
ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
serviceConfig.setBootstrap(this);
if (!serviceConfig.isRefreshed()) {
serviceConfig.refresh();
}
// 判断是否是异步暴露
if (sc.shouldExportAsync()) {
ExecutorService executor = executorRepository.getServiceExportExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
if (!sc.isExported()) {
sc.export();
exportedServices.add(sc);
}
} catch (Throwable t) {
logger.error("export async catch error : " + t.getMessage(), t);
}
}, executor);
asyncExportingFutures.add(future);
} else { // 处理同步暴露情况
if (!sc.isExported()) { // 若尚未暴露
sc.export(); // 服务暴露
exportedServices.add(sc);
}
}
}
}
public Boolean shouldExportAsync() {
// 获取<dubbo:service/>中的export-async属性
Boolean shouldExportAsync = getExportAsync();
if (shouldExportAsync == null) {
// 获取<dubbo:provider/>标签中的export-async属性
shouldExportAsync = provider != null && provider.getExportAsync() != null && provider.getExportAsync();
}
return shouldExportAsync;
}
接下来ServiceConfig.export():
public synchronized void export() {
// 若<dubbo:service/>的export属性为true,且当前服务尚未暴露
if (this.shouldExport() && !this.exported) {
this.init();
// check bootstrap state
if (!bootstrap.isInitialized()) {
throw new IllegalStateException("DubboBootstrap is not initialized");
}
if (!this.isRefreshed()) {
this.refresh();
}
if (!shouldExport()) {
return;
}
// 判断是否延迟暴露
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 服务暴露
doExport();
}
if (this.bootstrap.getTakeoverMode() == BootstrapTakeoverMode.AUTO) {
this.bootstrap.start();
}
}
}
我们可以看到这个方法主要做的工作是服务暴露延迟的,如果delay不是null && delay>0 然后给ScheduledExecutorService然后 delay ms后再进行服务暴露,我们要想使用延迟暴露功能,
-
可以在@Service注解中添加delay 属性。
@Service(delay =1000 )
-
也可以在xml中添加
<dubbo:provider delay="100"/> <dubbo:service interface="xxx.xxx.xxx" delay="1000"></dubbo:service>
我们再接着往下看不延迟暴露走doExport()方法,
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
// 若<dubbo:servcie/>的path属性为空,则取interface属性值
// URL的格式 protocol://ip:port/path?a=b&b=c&...
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
// 假设有3个注册中心,2个服务暴露协议
// 为每个服务暴露协议在每个注册中心中进行暴露
doExportUrls();
exported();
}
private void doExportUrls() {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
// 获取所有注册中心的【标准化地址URL】与【兼容性地址URL】
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
// 遍历所有服务暴露协议(<dubbo:protocol/>)
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// 使用当前遍历的服务暴露协议与所有注册中心配对进行服务暴露
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
我们看一下loadRegistries()的代码:
public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<URL>();
// 获取<dubbo:application/>标签
ApplicationConfig application = interfaceConfig.getApplication();
// 获取<dubbo:registry/>标签
List<RegistryConfig> registries = interfaceConfig.getRegistries();
if (CollectionUtils.isNotEmpty(registries)) {
// 遍历所有<dubbo:registry/>标签
for (RegistryConfig config : registries) {
// 获取<dubbo:registry/>标签的address属性
String address = config.getAddress();
if (StringUtils.isEmpty(address)) {
address = ANYHOST_VALUE;
}
// 只要address不是N/A,即不是不可用
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
// 创建并初始化一个map,这个map中的值为<dubbo:registry/>标签及相当标签中的属性值
Map<String, String> map = new HashMap<String, String>();
// 将<dubbo:application/>标签属性写入map
AbstractConfig.appendParameters(map, application);
// 将<dubbo:registry/>标签属性写入map
AbstractConfig.appendParameters(map, config);
map.put(PATH_KEY, RegistryService.class.getName());
AbstractInterfaceConfig.appendRuntimeParameters(map);
if (!map.containsKey(PROTOCOL_KEY)) {
map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
}
// 构建注册中心标准URL
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = URLBuilder.from(url)
// 向URL中添加registry属性,例如registry=zookeeper
.addParameter(REGISTRY_KEY, url.getProtocol())
// 将URL的协议修改为registry
.setProtocol(extractRegistryType(url))
.build();
if ((provider && url.getParameter(REGISTER_KEY, true))
|| (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
// 为每个标准URL生成一个兼容URL
return genCompatibleRegistries(registryList, provider);
}
我们可以看到先走了loadRegistries(true); 这个方法,获取到了一个URL集合,其实这里就是获取注册中心列表,一个URL就是一个注册中心,会生成一个标准化的URL和兼容URL,registry开头的为标准URL:
接着就是循环暴露doExportUrlsFor1Protocol(protocolConfig, registryURLs);
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 构建服务暴露URL要使用的map
Map<String, String> map = buildAttributes(protocolConfig);
//init serviceMetadata attachments
// 元数据注册
serviceMetadata.getAttachments().putAll(map);
// 构建服务暴露URL
URL url = buildUrl(protocolConfig, registryURLs, map);
// 服务暴露
exportUrl(url, registryURLs);
}
- 构建服务暴露URL要使用的map:这个方法首先判断ProtocolConfig的协议,如果没有默认设置成dubbo,再往下就是设置参数,比如说side=provider,dubbo=2.2.0,timestamp,pid等等,然后把一些config中的配置塞到map中,接着就是遍历处理MethodConfig。再接着就是判断是不是范化调用,如果是就把范化的信息扔到map中,设置methods=*,如果不是范化调用,就找到你所有的method,然后将methods=你所有method名拼接起来。
- 注册元数据
- 构建服务暴露URL:
dubbo://192.168.124.7:20881/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&bind.ip=192.168.124.7&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&metadata-type=remote&methods=hello&pid=25778&release=&revision=1.0.0&side=provider×tamp=1656114630120&version=1.0.0
接下里,我们进入服务暴露的方法:
private void exportUrl(URL url, List<URL> registryURLs) {
// 获取<dubbo:service/>的scope属性
String scope = url.getParameter(SCOPE_KEY);
// 若scope的值不等于none,则进行暴露
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// 若scope的值不等于remote,则进行本地暴露
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
// 本地暴露
exportLocal(url);
}
// 若scope的值不等于local,则进行远程暴露
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// 远程暴露
url = exportRemote(url, registryURLs);
MetadataUtils.publishServiceDefinition(url);
}
}
this.urls.add(url);
}
在这个分为本地暴露和远程暴露,我们分别都看一下
5. 本地暴露
看一下exportLocal()方法:
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL) // 将URL的protocol设置为injvm
.setHost(LOCALHOST_VALUE)
.setPort(0) // URL没有端口号
.build();
// 本地暴露
doExportUrl(local, false);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
首先把URL中的protocol变成injvm,host是127.0.0.1,port是0 。其实这里是新生成了一个URL,把之前URL里面的配置搬过来了。接下来:
private void doExportUrl(URL url, boolean withMetaData) {
// 构建出invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
// 远程暴露
Exporter<?> exporter = PROTOCOL.export(invoker);
exporters.add(exporter);
}
我们先来看下proxyFactory.getInvoker(ref, (Class) interfaceClass, local)这个方法。proxyFactory是我们一个类成员:
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
获取了一个自适应的扩展实现类。我们看下这个自适应是根据哪个key来找实现类的。
@SPI("javassist")
public interface ProxyFactory {
/**
* create proxy.
*
* @param invoker
* @return proxy
*/
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/**
* create proxy.
*
* @param invoker
* @return proxy
*/
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
/**
* create invoker.
*
* @param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
咱们用的getInvoker方法,然后会根据proxy这个属性去咱们的URL中找对应的值,我们现在没有刻意设置这个proxy属性的话,就会走默认,也就是@SPI(“javassist”)中的javassist实现类。这块知识数据dubbo spi里面的。 我们来看看javassist实现类,也就是JavassistProxyFactory这个类。
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// getProxy() 创建代理类proxy的class
// newInstance() 创建这个class的实例,即代理对象
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
//生成一个invoker的包装类
/**
* @param proxy 接口实现类
* @param type 接口类型class
* @param url URL
* @param <T> 接口类型
* @return Invoker
*/
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$' 不能解析类名中带$的
// 这里是如果,接口实现类中有$符号,就是用接口类型,没有$符号,就用实现类的类型
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
/**
* 进行调用
* @param proxy 实现类
* @param methodName 方法名
* @param parameterTypes 参数类型
* @param arguments 参数
* @return
* @throws Throwable
*/
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
首先第一行,Wrapper.getWrapper,这个会帮你生成一个Wrapper,其实这个Wrapper会根据你提供的这个类型生成一个获取你这个类中成员变量的方法,设置成员变量的方法,执行你这个类中方法的方法。 我们可以来看下生成的啥样子:
public class Wrapper$1 {
public static String[] pns;// 字段名
public static Map pts;//<字段名,字段类型>
public static String[] mns;//方法名
public static String[] dmns;//自己方法的名字
public static Class[] mts;//方法参数类型
public String[] getPropertyNames(){ return pns; }
public boolean hasProperty(String n){ return pts.containsKey(n); }
public Class getPropertyType(String n){ return (Class)pts.get(n); }
public String[] getMethodNames(){ return mns; }
public String[] getDeclaredMethodNames(){ return dmns; }
public void setPropertyValue(Object o, String n, Object v){
com.xuzhaocai.dubbo.provider.IHelloProviderService w;
try{
w = (( com.xuzhaocai.dubbo.provider.IHelloProviderService)$1);
}catch(Throwable e) {
throw new IllegalArgumentException(e);
}
if( $2.equals("字段名")){
w."字段名"= $3;
return ;
}
}
public Object getPropertyValue(Object o, String n){
com.xuzhaocai.dubbo.provider.IHelloProviderService w;
try{
w = (( com.xuzhaocai.dubbo.provider.IHelloProviderService)$1);
}catch(Throwable e){
throw new IllegalArgumentException(e);
}
if( $2.equals("字段名")){
return ($w) w."字段名";
}
return null;
}
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws InvocationTargetException{
com.xuzhaocai.dubbo.provider.IHelloProviderService w;
try{
w = (( com.xuzhaocai.dubbo.provider.IHelloProviderService)$1);
}catch(Throwable e){
throw new IllegalArgumentException(e);
}
try{
if("方法名".equals($2) && 方法参数个数 == $3.length && $3[1].getName().equals("方法第几个参数的name")){
w.方法名(参数);
}
if("方法名".equals($2) && 方法参数个数 == $3.length && $3[1].getName().equals("方法第几个参数的name")){
w.方法名(参数);
}
} catch(Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new NoSuchMethodException("Not found method "+$2+" in class 你传进来那个实现类");
}
}
就是针对你这个类生成了3个方法:
- setPropertyValue(Object o, String n, Object v) 往o中设置属性
- Object getPropertyValue(Object o, String n) 从o中取属性值
Object invokeMethod(Object o, String n, Class[] p, Object[] v)
执行o的某个方法。
我们接着往下看:
return new AbstractProxyInvoker<T>(proxy, type, url) {
/**
* 进行调用
* @param proxy 实现类
* @param methodName 方法名
* @param parameterTypes 参数类型们
* @param arguments 参数
* @return
* @throws Throwable
*/
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
我们来看下这个AbstractProxyInvoker这个抽象类。
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
Logger logger = LoggerFactory.getLogger(AbstractProxyInvoker.class);
private final T proxy;
private final Class<T> type;
private final URL url;
...
/**
* 调用
* @param invocation 调用实体
* @return 结果实体
* @throws RpcException
*/
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse(invocation);
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (InvocationTargetException e) {
if (RpcContext.getServiceContext().isAsyncStarted() && !RpcContext.getServiceContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
...
//实际调用子类实现
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
@Override
public String toString() {
return getInterface() + " -> " + (getUrl() == null ? " " : getUrl().toString());
}
}
可以看出AbstractProxyInvoker这个抽象类非常简单,构造中对T proxy 接口实现类,具体提供服务, Class type 接口类型, URL url ,这三个参数进行检验,然后存储。
实现接口Invoker的invoke(Invocation invocation)方法,实际上还是子类实现doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments)
方法。
现在再看JavassistProxyFactory类就清楚了,最终走的是wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
上面这部分是proxyFactory.getInvoker(ref, (Class) interfaceClass, local)
,接着我们看下
protocol.export():
Exporter<?> exporter = PROTOCOL.export(invoker);
这里这个protocol也是ServiceConfig的类成员,获取自适应实现类。我们看下Protocol接口:
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
因为我们URL这里protocol=injvm,所以回去找对应的实现,也就是InjvmProtocol这个类。
/**
* InjvmProtocol
*/
public class InjvmProtocol extends AbstractProtocol implements Protocol {
public static final String NAME = Constants.LOCAL_PROTOCOL;
public static final int DEFAULT_PORT = 0;
private static InjvmProtocol INSTANCE;
public InjvmProtocol() {
INSTANCE = this;
}
public static InjvmProtocol getInjvmProtocol() {
if (INSTANCE == null) {
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(InjvmProtocol.NAME); // load
}
return INSTANCE;
}
static Exporter<?> getExporter(Map<String, Exporter<?>> map, URL key) {
Exporter<?> result = null;
if (!key.getServiceKey().contains("*")) {
result = map.get(key.getServiceKey());
} else {
if (map != null && !map.isEmpty()) {
for (Exporter<?> exporter : map.values()) {
if (UrlUtils.isServiceKeyMatch(key, exporter.getInvoker().getUrl())) {
result = exporter;
break;
}
}
}
}
if (result == null) {
return null;
} else if (ProtocolUtils.isGeneric(
result.getInvoker().getUrl().getParameter(Constants.GENERIC_KEY))) {
return null;
} else {
return result;
}
}
@Override
public int getDefaultPort() {
return DEFAULT_PORT;
}
/**
* 服务暴露
* @param invoker Service invoker
* @param <T>
* @return
* @throws RpcException
*/
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
...
我把这次无关的方法先去掉了,看下 export方法,然后new InjvmExporter类
/**
* InjvmExporter
*/
class InjvmExporter<T> extends AbstractExporter<T> {
//service key
private final String key;
private final Map<String, Exporter<?>> exporterMap;
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
exporterMap.put(key, this);
}
@Override
public void unexport() {
super.unexport();
exporterMap.remove(key);
}
}
最终是将key=接口全类名,value=this(也就是InjvmExporter对象)put到exporterMap中了。
再回到ServiceConfig的exportLocal方法中。还有最后一句exporters.add(exporter);这里是将上面生成的InjvmExporter对象缓存了起来。
到这里我们这个服务本地暴露(injvm)就解析完成了。
6. 远程暴露
我们知道服务暴露分为本地(injvm)与远程(remote)两种方式,接下来本篇将解析dubbo的远程暴露。
// 远程暴露
url = exportRemote(url, registryURLs);
private URL exportRemote(URL url, List<URL> registryURLs) {
// 处理有注册中心的情况
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 遍历所有注册中心URL
for (URL registryURL : registryURLs) {
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
}
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.putAttribute(MONITOR_KEY, monitorUrl);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url.getServiceKey() + " to registry " + registryURL.getAddress());
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url.getServiceKey());
}
}
// 远程暴露(仅跟踪registryURL地址为 registry://... 的地址)
doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
}
} else { // 处理没有注册中心的情况
if (MetadataService.class.getName().equals(url.getServiceInterface())) {
MetadataUtils.saveMetadataURL(url);
}
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
// 远程暴露(与上面的暴露相比,仅没有向注册中心注册)
doExportUrl(url, true);
}
return url;
}
如果有注册中心,然后遍历注册中心,首先是获取监控中心,将监控中心添加到url中,然后就是将proxy_key属性放到registryUrl中,其实这个proxy_key 的值是可以设置的,就是告诉dubbo我用什么来进行生成代理,这个对应的就是dubbo spi 自适应特性。doExportUrl(url, true):
private void doExportUrl(URL url, boolean withMetaData) {
// 构建出invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
// 远程暴露
Exporter<?> exporter = PROTOCOL.export(invoker);
exporters.add(exporter);
}
这个跟本地调用是同一个方法,只是参数不同,远程暴露第二个参数为true。
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
,这行就是包装了一下这个invoker,主要是把原始的配置信息跟invoker绑在一块了。可以看下DelegateProviderMetaDataInvoker 这个类。
public class DelegateProviderMetaDataInvoker<T> implements Invoker {
protected final Invoker<T> invoker;
private ServiceConfig metadata;
public DelegateProviderMetaDataInvoker(Invoker<T> invoker,ServiceConfig metadata) {
this.invoker = invoker;
this.metadata = metadata;
}
...
public ServiceConfig getMetadata() {
return metadata;
}
}
我们可以看下这个protocol,这个是自适应扩展类,我们从url中获取protocol的值,咱们invoker里面包的url是registryURL,然后对应的protocol也就是注册中心的protocol,RegistryProtocol这个类。但是在创建扩展实现类的时候dubbo会给我们setter注入与wrapper包装,所以我们拿到的RegistryProtocol最终样子是这样的:
Qos —> Filter —> Listener ----> RegistryProtocol.
我自己debug,么有找到QosProtocolWrapper这个包装类,疑问?哪位大佬可以解答一下?
接下来我们先看下QosProtocolWrapper,这里我们只挑与本篇有关的内容
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//判断是registry
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
// 启动Qos服务器
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
判断是registry协议的话,就启动Qos服务器,然后调用下一个export 方法,也就是ProtocolFilterWrapper的export方法。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
/// registry protocol 就到下一个wapper 包装对象中就可以了
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
如果是registry协议的话就可以进入下一个了,然后不是registry协议就需要绑定一堆filter了,这个我们在说dubbo协议的时候会讲到,这里我们直接进入下一个类ProtocolListenerWrapper的export方法。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// registry 是否是注册中心
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// dubbo export dubbo --->暴露服务 生成exporter
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));// exporter listener
}
这里也是判断了一下是否是registry协议,如果是的话就直接进入下一个RegistryProtocol。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 获取注册中心URL
URL registryUrl = getRegistryUrl(originInvoker);
// 获取服务暴露URL
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// 服务暴露,即将服务的exporter写入到缓存map
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry 获取注册中心实例
final Registry registry = getRegistry(registryUrl);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注册到注册中心
register(registry, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
首先获取注册中心和服务暴露URL,我们来看下doLocalExport方法:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
- 首先获取一个cacheKey,这个cacheKey其实就是我们在服务暴露之前塞进去的一个export属性值(这里是把dynamic,enabled这两个属性移除掉了):
private String getCacheKey(final Invoker<?> originInvoker) { URL providerUrl = getProviderUrl(originInvoker); String key = providerUrl.removeParameters("dynamic", "enabled").toFullString(); return key; } private URL getProviderUrl(final Invoker<?> originInvoker) { // 获取URL中的export属性值,即要暴露的服务的URL Object providerURL = originInvoker.getUrl().getAttribute(EXPORT_KEY); if (!(providerURL instanceof URL)) { throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl().getAddress()); } return (URL) providerURL; }
- 接下来就是根据cacheKey去bounds里面找,如果没有找到,就说明之前没有暴露过,我们可以看下bounds这个成员:
private final ConcurrentMap<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<>();
- 如果没有暴露过的话,就将invoker封装起来,我们可以看下InvokerDelegete这个静态类与它的父类InvokerWrapper类
// InvokerDelegete 委托类 将provider 的url 进行缓存 public static class InvokerDelegate<T> extends InvokerWrapper<T> { private final Invoker<T> invoker; /** * @param invoker * @param url invoker.getUrl return this value */ public InvokerDelegate(Invoker<T> invoker, URL url) { super(invoker, url); this.invoker = invoker; } // 获取invoker public Invoker<T> getInvoker() { // 如果是 委托类的话,就获取委托类里面的那个 if (invoker instanceof InvokerDelegate) { return ((InvokerDelegate<T>) invoker).getInvoker(); } else { return invoker; } } } public class InvokerWrapper<T> implements Invoker<T> { private final Invoker<T> invoker; private final URL url; public InvokerWrapper(Invoker<T> invoker, URL url) {this.invoker = invoker;this.url = url;} @Override public Class<T> getInterface() {return invoker.getInterface();} @Override public URL getUrl() {return url;} @Override public boolean isAvailable() {return invoker.isAvailable();} @Override public Result invoke(Invocation invocation) throws RpcException {return invoker.invoke(invocation);} @Override public void destroy() { invoker.destroy(); } }
- 可以看出来这个委托类的作用就是把保存了url。接下来就是将protocol.export返回的exporter包装起来缓存到bounds中。我们看下ExporterChangeableWrapper这个成员内部包装类:
private class ExporterChangeableWrapper<T> implements Exporter<T> { private final Invoker<T> originInvoker; private Exporter<T> exporter; private URL subscribeUrl; private URL registerUrl; public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) { this.exporter = exporter; this.originInvoker = originInvoker; } public Invoker<T> getOriginInvoker() { return originInvoker; } @Override public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void setExporter(Exporter<T> exporter) { this.exporter = exporter; } @Override public void unexport() { String key = getCacheKey(this.originInvoker); bounds.remove(key); Registry registry = RegistryProtocol.this.getRegistry(getRegistryUrl(originInvoker)); ... }
- 这个作用就是将原始的invoker与exporter绑在了一起。下面我们就看看这个exporter是怎么得到的。
6.1 服务器启动protocol.export方法
我们讲到exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);/// dubbo protocol
,我们来看一下RegistryProtocol类的private Protocol protocol 成员,这个成员的值是由dubbo spi 扩展技术在实例完成后setter注入进来的,实际上这里protocol注入的是ExtensionLoader.getExtensionLoader(Protocol).getAdaptiveExtension();是自适应的实现类。我们可以看下自适应的实现类的export方法:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
然后invokerDelegete 这个委托类getUrl得到的是protocol=dubbo的url,所以
Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");
最后获取的extension是经过dubbo spi setter注入与wrapper包装后的。
Filter -> Listener----->Qos-----> Dubbo
我这里没有debug到Qos包装类,这块有后面在看看?
这里的包装其实是有顺序的,因为在包装的时候会按照@Active对应的order顺序进行逆序排序,可以看下包装这块代码:
6.1.1 ProtocolFilterWrapper
接下来我们看下ProtocolFilterWrapper类的export方法:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {/// registry protocol 就到下一个wapper 包装对象中就可以了
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
这里不是registry protocol 然后会走下面,我们看下buildInvokerChain 方法,这里buildInvokerChain的三个参数分别是invoker, service.filter,provider:
/**
* build consumer/provider filter chain
* 在构建调用链时方法先获取Filter列表,然后创建与Fitler数量一样多Invoker
* 结点,接着将这些结点串联在一起,构成一个链表,最后将这个链的首结点返回,随后的调用中,将从首结点开始,依次调用各个结点,
* 完成调用后沿调用链返回。这里各个Invoker结点的串联是通过与其关联的invoke 方法来完成的。
*/
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
Invoker<T> last = originalInvoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
// 调用buildInvokerChain时会传入invoker参数。
final Invoker<T> next = last;
// 通过循环遍历获取到的Filter,同时创建Invoker结点,每个结点对应一个Filter。此时循环内部定义了next指针。
last = new FilterChainNode<>(originalInvoker, next, filter);
}
}
return last;
}
这个方法实际上是根据dubbo spi 扩展技术自动激活的特性获取到对应的filter们,然后一层一层的包装这个invoker,生成一个过滤调用链,最后到真实的invoker上面。这个Filter我们后期会单独拿出来解析,这里只需要知道它一层层包装了invoker就行。
6.1.2 ProtocolListenerWrapper
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {// registry 是否是注册中心
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),// dubbo export dubbo --->暴露服务 生成exporter
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));// exporter listener
}
这里不是REGISTRY_PROTOCOL,然后走了下面使用ListenerExporterWrapper 将服务暴露返回的exporter与自动激活的listener们绑在了一起。我们可以看下这个ListenerExporterWrapper类,
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
// 遍历通知
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
// 通知
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
我们可以看到就是遍历通知listener,告诉他们当前服务暴露完了。这些listener们我们后面单独拿出来解析。
6.1.3 QosProtocolWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//判断是registry
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
// 启动Qos服务器?
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
这里同理不是registry protocol ,直接到了下一层.
6.1.4 DubboProtocol
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
// 将invoker封装为了DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 将exporter写入到缓存map
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
// 创建并启动Netty Server
openServer(url);
optimizeSerialization(url);
return exporter;
}
这里首先是根据url生成一个服务的key,创建一个DubboExporter 把invoker,key与缓存exporter的map 绑在一起。将 创建的这个exporter缓存到exporterMap里面。我们可以exporterMap的定义:
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
其中key就是生成的那个serviceKey,然后value是创建的那个exporter。
接着就是调用openServer(url),来打开服务器,我们看下源码:
private void openServer(URL url) {
// find server.
// key格式 ip:暴露协议端口 例如,192.168.59.1:20881
String key = url.getAddress();
//client can export a service which's only for server to invoke
// 表示当前应用是否是provider
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 从缓存map中获取当前key对应的同异步转换对象。
// 从key的值可知,一个应用中每个服务暴露协议都会对应一个同异步转换对象,
// 而一个同异步转换对象会对应一个Netty Server,即一个主机中每个服务暴露
// 协议会对应创建一个Netty Server(面试题)
// DCL
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 创建同异步转换对象
serverMap.put(key, createServer(url));
}else {
server.reset(url);
}
}
} else {
// server supports reset, use together with override
// 将当前服务的url添加到server中
server.reset(url);
}
}
}
首先是获取地址,这个地址形式是ip:port,获取isserver参数,默认是true,然后从serverMap这个缓存中查找对应的server,如果之前没有创建过,就调用createServer(url) 来创建server,之后把创建的server缓存在serverMap中,我们先看下serverMap这个成员变量
protected final Map<String, ProtocolServer> serverMap = new ConcurrentHashMap<>();
其中key是服务器地址,也就是上面url.getAddress();获得的,value就是对应的ProtocolServer。 我们再来看看是怎么创建server的,看下createServer(url)这个方法的源码:
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// todo
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
这个方法前面部分就是设置一些参数,channel.readonly.sent =true,就是服务器关闭的时候指发送只读属性,heartbeat=60*1000 设置默认的心跳时间,获取server,缺省的情况下使用netty,设置Codec 的类型为dubbo。
然后Exchangers.bind(url, requestHandler),其实这个Exchangers 是个门面类,封装了bind与connect两个方法的调用
6.1.5 Exchanger
我们可以看看Exchanger的方法:
我们接着,看下当前调用的bind方法:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//验证
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
// 如果没有codec 的话 就设置为exchange
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// exchanger.bind()
return getExchanger(url).bind(url, handler);
}
前面的都是参数校验,我们看下getExchanger(url) 方法:
public static Exchanger getExchanger(URL url) {
// 获取exchanger 缺省header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
这里可以看到从url中获取exchanger ,缺省是header,然后使用dubbo spi 获取到HeaderExchanger,我们看下HeaderExchanger源码:
6.1.6 HeaderExchanger
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 创建一个通信client
//DecodeHandler => HeaderExchangeHandler => ExchangeHandler( handler ) 。
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建一个通信server DecodeHandler << HeaderExchangeHandler << handler
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
这里有三个点,一是将这handler又包装了两层,二是使用Transporters 这个门面类进行bind,再就是创建HeaderExchangeServer 将server进行增强,其实HeaderExchangeServer 这个是专门发送心跳的。 我们先看下Transporters这个类,
6.1.7 Transporters
这个Transporters也是门面类,对外统一了bind 与connect。我们只看下与这次有关的部分
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
//验证
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else { // 多个channal 对 Channel分发 ChannelHandlerDispatcher 循环
handler = new ChannelHandlerDispatcher(handlers);
}
// 真正服务器 进行bind
return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() { // 获取transporter
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
首先是参数校验,接着判断handler的个数,如果一个话还好说,多个话就要使用ChannelHandlerDispatcher类来包装了,其实里面就是对多个handler循环调用,接着调用getTransporter获取Transporter扩展点的自适应类。 我们可以稍微看下Transporter 这个扩展点的代码,
@SPI("netty") // 默认是netty4的
public interface Transporter {
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
可以看到bind自适应实现类是根据server 与transporter 这两个参数决定的,server这个参数咱们上面的DubboProtocol类里面createServer 方法中就设置好了,缺省使用netty,我们通过看dubbo spi 配置文件:
netty对应的也就是NettyTransporter这个实现类。
6.1.8 NettyTransporter
/**
* 实现 Transporter 接口,基于 Netty4 的网络传输实现类
*/
public class NettyTransporter implements Transporter {
/**
* 扩展名
*/
public static final String NAME = "netty";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
我们可以看到new了一个NettyServer对象。我们来看下代码
6.1.9 NettyServer
在看NettyServer前先看下它的继承结构
看下NettyServer的构造方法:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
调用的父类的构造 先看下AbstractServer代码:
public abstract class AbstractServer extends AbstractEndpoint implements Server {
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
// 线程池
ExecutorService executor;
// 服务地址
private InetSocketAddress localAddress;
//绑定地址
private InetSocketAddress bindAddress;
// 服务器最大可接受连接数
private int accepts;
// 空闲超时时间
private int idleTimeout = 600;
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());//ip
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); //port
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 服务器最大可接受连接数
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
doOpen(); // 子类实现, 真正打开服务器
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
// 获取线程池
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
protected abstract void doOpen() throws Throwable;
}
我们可以看到AbstractServer的构造前面都是些参数的获取,之后调用doOpen()方法,具体是由子类实现的我们在看看NettyServer的doOpen方法
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
//iothreads 默认是cpu核心数+1 与 32 进行比较,取小的那个 也就是最大不超过32
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
/**
* ChannelOption.SO_REUSEADDR 这个参数表示允许重复使用本地地址和端口,
*
* 比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,
* 使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,
* 比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,
* 而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR
* 就无法正常使用该端口。
*/
// 设置线程组
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, keepalive)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind 启动Netty Server
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
我们可以看到就是启动netty服务器,然后boss线程是一个,然后work线程是是cpu数+1然后与32做比较,取最小的那个,也就是最大不超过32个线程。 到这里我们的服务就算启动完成了。 我们再回过头来看看HeaderExchangeServer这个维护心跳的
6.1.10 HeaderExchangeServer
public HeaderExchangeServer(RemotingServer server) {
Assert.notNull(server, "server == null");
this.server = server;
// 开启空闲检测任务
startIdleCheckTask(getUrl());
}
private void startIdleCheckTask(URL url) {
if (!server.canHandleIdle()) {
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
int idleTimeout = getIdleTimeout(url);
long idleTimeoutTick = calculateLeastDuration(idleTimeout);
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
this.closeTimerTask = closeTimerTask;
// init task and start timer.
// 初始化任务并开启定时器
IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
}
}
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
好了,到这里我们远程服务暴露中启动部分就算结束了,这里我们并没有说太多的细节,我只需要把服务暴露服务器启动这个过程缕顺就可以了,具体的细节方面放到后面分析。
6.2 服务注册
6.1 小结中讲了DubboProtocol一步步到服务器启动的。但是这些只是我们在服务远程暴露的RegistryProtocol#export()方法的一个方法,只是介绍了服务器启动,我们知道服务暴露其实是服务器启动+服务注册,今天我们就接着看看RegistryProtocol#export()方法后面的部分。
6.2.1 RegistryProtocol#export
// 获取注册中心URL
URL registryUrl = getRegistryUrl(originInvoker);
// 获取服务暴露URL
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// 服务暴露,即将服务的exporter写入到缓存map
//export invoker doLocalExport表示本地启动服务不包括去注册中心注册
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry 获取注册中心实例
final Registry registry = getRegistry(registryUrl);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注册到注册中心
register(registry, registeredProviderUrl);
}
...
然后接着就是从originInvoker里面获取registryUrl,接着就是获取Registry的对象。我们看下这个getRegistry()方法:
protected Registry getRegistry(final URL registryUrl) {
return registryFactory.getRegistry(registryUrl);
}
通过registryFactory 获取Registry,其实这个registryFactory 是RegistryProtocol的一个成员,然后是dubbo spi 自动setter注入进来的。我们来看一下registryFactory接口源代码:
我们可以看到@Adaptive的值是protocol,咱们上面的url中的protocol值正好是zookeeper,就可以找到ZookeeperRegistryFactory,我们看下ZookeeperRegistryFactory继承图:
继承了一个抽象类AbstractRegistryFactory,然后getRegistry()方法就是在这个抽象类中,我们可以看下:
// Registry Collection Map<RegistryAddress, Registry>
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
@Override
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
// interface
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
// export refer
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
// 从缓存中获取, 有就返回, 没有就创建
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 创建方法由子类实现 模板方法设计模式
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
// 由子类实现的 创建方法, 模板方法设计模式
protected abstract Registry createRegistry(URL url);
前面就是设置参数,移除参数,然后将url转换成一个serviceKey的字符串,之后就是根据这个serviceKey去这个成员map中获取value,如果存在就返回,不存在就调用子类的createRegistry方法来创建Registry,然后再塞进去这个map中缓存。我们看下子类的createRegistry方法实现,这里就是ZookeeperRegistryFactory的实现:
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public ZookeeperRegistryFactory() {
this.zookeeperTransporter = ZookeeperTransporter.getExtension();
}
@DisableInject
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
这里就是new了个ZookeeperRegistry对象然后返回了,这里zookeeperTransporter 成员是dubbo spi 创建完成这个对象自动注入的,这个咱们后面再解析。
我们接着RegistryProtocol#export 方法的解析,final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);这行代码就是获取服务提供者的url,接着下面就是获取一个register的参数,缺省是true,
我们再回到RegistryProtocol#export方法中,判断register,这里是true,然后走了register(registryUrl, registeredProviderUrl);
,我们看下register方法,
private void register(Registry registry, URL registeredProviderUrl) {
registry.register(registeredProviderUrl);
}
可以看出来,直接使用registry对象,这其实还是那个ZookeeperRegistry,然后就是调用ZookeeperRegistry#register 方法,这个咱们讲到注册中心部分再说。
7. 整体流程图
服务暴露的核心代码在这里就结束了,敬请期待下一篇文章哈
参考文章
转载自:https://juejin.cn/post/7113081670982434852