Dubbo源码(三)rpc调用流程
前言
本章基于dubbo2.7.6版本,分析rpc调用流程。
基于上一章服务暴露与引用(juejin.cn/post/721094…),一切几乎水到渠成。
笔者将一次rpc同步调用拆分为三个阶段:
1)用户代码执行rpc方法,consumer发送rpc请求给provider
2)provider处理rpc请求响应consumer
3)consumer收到响应,返回用户代码
在总结部分整理了rpc调用流程。
发送rpc请求
代理层
InvokerInvocationHandler#invoke:
1)把rpc方法的关键信息,都包装为一个RpcInvocation贯穿Invoker#invoke;
这个RpcInvocation就不细看了,目的无非是将远程调用需要的信息都封装为一个pojo,
类似于我们平常写业务代码的时候aop用的MethodInvocation。
2)执行代理Invoker,一直通到DubboInvoker;
3)result.recreate:如果发生rpc异常,抛出,否则返回rpc方法返回值;
Cluster层
ClusterInterceptor
在上一章提到过,Cluster#join返回的Invoker会被ClusterInterceptor激活扩展点包一层,但是ClusterInterceptor并没有实现Invoker,所以要用适配器模式包一遍。
AbstractCluster内部类InterceptorInvokerNode,负责适配ClusterInterceptor实现到Invoker。
protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
private AbstractClusterInvoker<T> clusterInvoker;
private ClusterInterceptor interceptor;
private AbstractClusterInvoker<T> next;
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// 前置拦截
interceptor.before(next, invocation);
// 执行
asyncResult = interceptor.intercept(next, invocation);
} catch (Exception e) {
// ...
throw e;
} finally {
// 后置拦截
interceptor.after(next, invocation);
}
return asyncResult.whenCompleteWithContext((r, t) -> {
// rpc请求完成回调ClusterInterceptor.Listener
if (interceptor instanceof ClusterInterceptor.Listener) {
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
if (t == null) {
listener.onMessage(r, clusterInvoker, invocation);
} else {
listener.onError(t, clusterInvoker, invocation);
}
}
});
}
}
ConsumerContextClusterInterceptor:负责创建和清理rpc上下文,配合实现隐式传参特性。
- before:发起rpc请求前,创建rpc请求上下文,清除rpc响应上下文;
- after:发起rpc请求完成,还未收到rpc响应,清除rpc请求上下文;
- onMessage:收到rpc响应,且响应成功,创建rpc响应上下文;
- onError:rpc响应失败,什么都不做;
@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {
@Override
public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
// 创建rpc【请求】上下文
RpcContext.getContext()
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0);
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
// 清除rpc【响应】上下文
RpcContext.removeServerContext();
}
@Override
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
// 清除rpc【请求】上下文
RpcContext.removeContext();
}
@Override
public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
// 创建rpc【响应】上下文
RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
}
@Override
public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) {
}
}
其中rpc请求上下文对应RpcContext.LOCAL,rpc响应上下文对应RpcContext.SERVER_LOCAL。
对于服务调用方,
RpcContext.LOCAL的生命周期,是一次rpc请求,用户也可以在rpc方法调用前,主动初始化RpcContext.LOCAL实现隐式传参;
RpcContext.SERVER_LOCAL的生命周期很长,是一次rpc响应到下一次rpc请求之前,也就是说用户可以在rpc方法调用结束后一直使用本次rpc调用的响应上下文,直至发生下次rpc调用。
RpcContext中的InternalThreadLocal是借鉴了Netty的FastThreadLocal( juejin.cn/post/694493… ) ,用数组代替hashmap实现的threadlocal,里面javadoc都和FastThreadLocal如出一辙。
ClusterInvoker
AbstractClusterInvoker#invoke:本质上ClusterInvoker是根据一系列条件,从所有服务提供者Invoker中选择其中一个Invoker执行。
1)Directory#list:列出Invocation对应Invokers(路由)
2)AbstractClusterInvoker#initLoadBalance:根据url参数找LoadBalance扩展,默认RandomLoadBalance
3)AbstractClusterInvoker#doInvoke:子类实现,会有不同的集群容错方式
路由
RegistryDirectory#doList:经过RouterChain路由过滤后返回Invoker集合。
需要注意的是,如果reference指定多个group,则不包含路由逻辑,直接返回所有Invoker。
在服务订阅之前,RegistryProtocol会用订阅url构造RouterChain注入RegistryDirectory。
RouterChain构造时会根据订阅url获取RouterFactory激活扩展点,通过RouterFactory#getRouter创建Router对象。
RouterChain#route:循环所有Router,过滤invokers。
注意:invokers可以认为是内存注册表(rpc服务级别),只有注册中心providers列表变更,这里才会更新,rpc期间不强依赖注册中心的远程注册表。
默认情况下会有四个Router:
1)MockInvokersSelector:本地伪装特性,忽略
2)TagRouter:根据tag过滤Invoker
支持三种模式配置tag路由,优先级从高到低:
a)配置中心路由规则;b)RpcContext指定tag;c)reference指定tag
比如reference指定tag为gray。
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("consumer-app"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
reference.setTag("gray");
TagRouter优先会找tag=gray的provider,如果找不到tag=gray的provider,会取tag=null的provider。
3)ServiceRouter:基于rpc服务的路由,需要结合配置中心
4)AppRouter:基于应用的路由,需要结合配置中心
集群容错
FailoverClusterInvoker是默认集群容错实现。
如果发生故障,重新选择Invoker进行远程调用,最多会调用3次(retry=2)。
AbstractClusterInvoker#select:在实际选择invoker前,优先走了一圈粘滞连接特性。
AbstractClusterInvoker#doSelect:
1)如果invokers只有一个,直接返回第一个
2)调用loadbalance执行一次选择,返回invoker
2-1)invoker已经被选过或者invoker不可用,再次进行一次选择
2-2)否则返回loadbalance的选择,结束
3)重新选择invoker,如果仍然为空,兜底返回上次invoker的下一个invoker
AbstractClusterInvoker#reselect逻辑如下:
1)获取未选择过的可用invokers,再次执行loadbalance
2)如果第一类invokers为空,把已选过的可用invokers也考虑进去(对于failover来说,就是之前执行失败的invoker),再次执行loadbalance
负载均衡
当经过路由后还存在多个invokers,往往就需要loadbalance通过特定算法找到其中一个invoker。
默认实现是RandomLoadBalance,随机选择invoker。
需要注意的是,大部分LoadBalance实现都有权重逻辑。
AbstractLoadBalance#getWeight:获取运行时不同provider对应invoker的权重
默认每个invoker权重都是100,基于provider的启动时间和预热时间可能会减少权重。
Protocol层
经过Cluster+Directory筛选,最终剩下一个服务提供者的ProtocolInvoker。
以Dubbo协议为例,最终会进入DubboInvoker
在进入DubboInvoker之前会经过一系列group=consumer的Filter,这里略过。
在AbstractProtocol#refer引用时,主动在DubboInvoker外部包装了AsyncToSyncInvoker。
AsyncToSyncInvoker#invoke调用协议Invoker,待底层协议Invoker返回,如果是同步调用,阻塞等待rpc响应。
DubboInvoker继承自AbstractInvoker抽象类。
AbstractInvoker#invoke做一些通用控制,比如填充RpcInvocation、异常封装到Result等等,底层调用DubboInvoker#doInvoke。
DubboProtocol#doInvoke:
1)getCallbackExecutor:决策处理rpc响应的线程池;
2)ExchangeClient#request:将Invocation、超时时间、处理rpc响应线程池提交给通讯层,得到Future返回;
对于普通同步调用来说,getCallbackExecutor每次会返回一个新的ThreadlessExecutor。
ThreadlessExecutor的javadoc如下:
ThreadlessExecutor不管理任何线程,如果任务被提交到这个executor,不会马上被调度。
直到某个线程调用waitAndDrain方法,该线程立即执行这个任务。
这部分我们到第三阶段再分析。
通讯层
一般情况下,rpc框架业务线程和io线程都是分开的。
为了解决这个问题,一般设计方式都遵循如下规则:
1)【业务线程】为request分配唯一id
2)【业务线程】将request-id和request-future缓存到全局map
3)【定时线程】对于全局map做定时扫描,如果超时,用指定线程池执行future的回调
4)【io线程】发送请求到io线程
5)【io线程】io线程收到响应,从全局map根据响应里的request-id拿到request-future,提交到6线程
6)【rpc响应线程(异步)/业务线程(同步)】执行future的回调
HeaderExchangeChannel#request:这里做了1234步,第四步就是channel#send。
首先Request创建时分配了自增id,作为rpc请求id(1)。
public class Request {
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private final long mId;
public Request() {
mId = newId();
}
private static long newId() {
return INVOKE_ID.getAndIncrement();
}
}
DefaultFuture#newFuture:创建DefaultFuture实例,提交超时检测任务(3)。
DefaultFuture构造时将request-id和自己放到全局map中(2)。
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout :
channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
FUTURES.put(id, this); // 核心
CHANNELS.put(id, channel);
}
序列化
DubboProtocol#initClient:在创建客户端连接时,Dubbo协议序列化方式为dubbo。
最终Request都会走DubboCountCodec进行序列化,序列化细节忽略。
同步调用阻塞等待
DubboInvoker将rpc请求提交到通讯层后,直接返回。
在AsyncToSyncInvoker中由于是同步调用,走AsyncRpcResult#get阻塞等待rpc响应。
对于同步调用AsyncRpcResult#get会主动调用ThreadlessExecutor#waitAndDrain。
ThreadlessExecutor#waitAndDrain:阻塞等待rpc响应任务投递,当响应到来,当前线程执行rpc响应任务。
之所以这个Executor的行为不同,是因为重写了execute方法。
当没有调用waitAndDrain执行任务前,任务会投递到queue中。
至于rpc响应,放到第三阶段继续看。
处理rpc请求
反序列化
同样,反序列化也是走DubboCountCodec,区别在于Request的data部分不是一个普通的RpcInvocation。
rpc请求体会被反序列化为一个DecodeableRpcInvocation,继承自RpcInvocation。
DecodeableRpcInvocation是个特殊的Invocation,特点在于能够自己根据输入流反序列化。
通讯层
AllChannelHandler#received:收到反序列化之后的Request,根据url找业务线程池执行rpc请求。
在暴露阶段,provider对于每个端口开启一个业务线程池,处理业务。
默认采用固定大小200+无队列+日志打印拒绝策略的线程池。
HeaderExchangeHandler#handleRequest:执行ChannelEventRunnable
1)提交到ExchangeHandler实现,执行Invocation调用
2)注册future回调,当业务处理完毕,再将rpc响应提交到io线程,响应客户端
接下来的流程,就是根据Request找到Invoker,经过层层包装,调用到rpc服务实现。
Protocol层
DubboProtocol#requestHandler:
1)getInvoker:根据Invocation找Invoker
2)执行Invoker
找Invoker
DubboProtocol#getInvoker:根据Invocation信息拼接serviceKey,定位到之前暴露的DubboExporter,返回DubboExporter持有的Invoker。
Filter
接下来经过一系列group=provider的Filter。
介绍一个比较关键的ContextFilter。
在进入下游invoker前,将Invocation中的关键信息注入RpcContext请求rpc上下文中。
在rpc服务方法执行完毕后,将RpcContext响应rpc上下文中关键信息,注入rpc响应。
代理层
接下来会来到Rpc服务代理层,暴露阶段会通过ProxyFactory创建rpc服务实现代理,默认javassist实现。
AbstractProxyInvoker#invoke对于目标rpc方法执行包了一层,异步适配、异常处理等等。
对于javassist来说执行动态生成的Wrapper#invokeMethod,动态生成的Wrapper在上一章提到过,不再赘述。
注意:这里proxy实际上是target,即rpc服务实现类实例。
HeaderExchangeHandler#handleRequest:再复述一次
当rpc服务方法执行完毕,future完成后会将response提交到io线程,进入三阶段。
处理rpc响应
反序列化
rpc响应模型Response:
1)mId:对应request-id
2)mResult:DecodeableRpcResult实例
Response.mResult是DecodeableRpcResult实例。
DecodeableRpcResult实现和DecodeableRpcInvocation类似,自己支持反序列化,继承了AppResponse。
通讯层
AllChannelHandler#received:
当前是io线程(netty),根据入参message(Response)获取rpc响应处理线程池,提交ChannelEventRunnable任务到对应线程池。
WrappedChannelHandler#getPreferredExecutorService:
根据请求id找到挂起的DefaultFuture,再根据DefaultFuture找到绑定的线程池。
对于同步调用,这里就是ThreadlessExecutor。
io线程将ChannelEventRunnable,提交到ThreadlessExecutor之后,就能唤醒用户线程,实现同步调用 。
Protocol层
HeaderExchangeHandler继续处理,调用DefaultFuture#received。
DefaultFuture#received:从全局futures中移除挂起请求
1)取消超时检测
2)完成future
DefaultFuture#doReceived:future以正常或异常完成。
AsyncToSyncInvoker调用Result#get从阻塞中返回。
代理层
最后回到rpc服务代理。
InvokerInvocationHandler#invoke:AsyncRpcResult#recreate获取结果。
AsyncRpcResult#recreate:对于同步调用,走底层AppResponse的recreate。
AppResponse#recreate:调用正常,返回反序列化后的result。
总结
rpc流程
一次同步rpc调用流程大致如下:
阶段一(consumer):
1)代理层:用户代码进入rpc服务代理,收集rpc调用必要参数,封装RpcInvocation
2)Cluster层:执行Interceptor,经过路由、容错、负载均衡最终选定一个provider
3)Protocol层:
- 执行Filter
- 构建Request并缓存到内存map,后台线程扫描map校验Request是否超时
- 提交Request到io线程,阻塞等待Response
4)通讯层:Request序列化,发送Request给对端
阶段二(provider):
1)通讯层:Request反序列化,得到RpcInvocation,提交到业务线程池并注册future回调,回调时将rpc返回值封装为Response并序列化提交到io线程
2)Protocol层:通过RpcInvocation找到之前暴露的Exporter,找到Exporter对应的Invoker,执行Filter
3)代理层:调用rpc方法实现
阶段三(consumer):
1)通讯层:将Response反序列化,提交任务到业务线程池
2)Protocol层:根据请求id反向找到挂起的future,取消超时检测,设置future执行结果
3)代理层:解析future返回结果,返回用户代码
一些细节
实现同步rpc调用
rpc调用业务线程和io线程一般是独立的,所以有如下设计:
1)【业务线程】为request分配唯一id
2)【业务线程】将request-id和request-future缓存到全局map
3)【定时线程】对于全局map做定时扫描,如果超时,用指定线程池执行future的回调
4)【io线程】发送请求到io线程
5)【io线程】io线程收到响应,从全局map根据响应里的request-id拿到request-future,提交到6线程
6)【rpc响应线程(异步)/业务线程(同步)】执行future的回调
一般框架底层通讯都会设计成异步,然后同步去适配异步。
不例外,dubbo设计了ThreadlessExecutor。
本质上ThreadlessExecutor是一个阻塞队列适配了ExecutorService。
业务线程会阻塞等待队列非空(queue.take),io线程收到response投递请求到阻塞队列,唤醒业务线程,从而实现同步调用。
负载均衡算法
之前对默认负载均衡算法有一点误解,认为是纯粹的随机算法,实际上包含权重逻辑。
而权重逻辑中会包含warmup减权,默认warmup时长是10分钟,权重是100。
欢迎大家评论或私信讨论问题。
本文原创,未经许可不得转载。
欢迎关注公众号【程序猿阿越】。
转载自:https://juejin.cn/post/7213211041764425788