likes
comments
collection
share

Dubbo源码(三)rpc调用流程

作者站长头像
站长
· 阅读数 29

前言

本章基于dubbo2.7.6版本,分析rpc调用流程。

基于上一章服务暴露与引用(juejin.cn/post/721094…),一切几乎水到渠成。

笔者将一次rpc同步调用拆分为三个阶段:

1)用户代码执行rpc方法,consumer发送rpc请求给provider

2)provider处理rpc请求响应consumer

3)consumer收到响应,返回用户代码

在总结部分整理了rpc调用流程。

发送rpc请求

Dubbo源码(三)rpc调用流程

代理层

InvokerInvocationHandler#invoke:

1)把rpc方法的关键信息,都包装为一个RpcInvocation贯穿Invoker#invoke;

这个RpcInvocation就不细看了,目的无非是将远程调用需要的信息都封装为一个pojo,

类似于我们平常写业务代码的时候aop用的MethodInvocation。

2)执行代理Invoker,一直通到DubboInvoker;

3)result.recreate:如果发生rpc异常,抛出,否则返回rpc方法返回值;

Dubbo源码(三)rpc调用流程

Cluster层

ClusterInterceptor

在上一章提到过,Cluster#join返回的Invoker会被ClusterInterceptor激活扩展点包一层,但是ClusterInterceptor并没有实现Invoker,所以要用适配器模式包一遍。

AbstractCluster内部类InterceptorInvokerNode,负责适配ClusterInterceptor实现到Invoker。

protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
    private AbstractClusterInvoker<T> clusterInvoker;
    private ClusterInterceptor interceptor;
    private AbstractClusterInvoker<T> next;
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult;
        try {
            // 前置拦截
            interceptor.before(next, invocation);
            // 执行
            asyncResult = interceptor.intercept(next, invocation);
        } catch (Exception e) {
            // ...
            throw e;
        } finally {
            // 后置拦截
            interceptor.after(next, invocation);
        }
        return asyncResult.whenCompleteWithContext((r, t) -> {
            // rpc请求完成回调ClusterInterceptor.Listener
            if (interceptor instanceof ClusterInterceptor.Listener) {
                ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                if (t == null) {
                    listener.onMessage(r, clusterInvoker, invocation);
                } else {
                    listener.onError(t, clusterInvoker, invocation);
                }
            }
        });
    }
}

ConsumerContextClusterInterceptor:负责创建和清理rpc上下文,配合实现隐式传参特性。

  • before:发起rpc请求前,创建rpc请求上下文,清除rpc响应上下文
  • after:发起rpc请求完成,还未收到rpc响应,清除rpc请求上下文
  • onMessage:收到rpc响应,且响应成功,创建rpc响应上下文
  • onError:rpc响应失败,什么都不做;
@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {

    @Override
    public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
        // 创建rpc【请求】上下文
        RpcContext.getContext()
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0);
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        // 清除rpc【响应】上下文
        RpcContext.removeServerContext();
    }

    @Override
    public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
        // 清除rpc【请求】上下文
        RpcContext.removeContext();
    }

    @Override
    public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
        // 创建rpc【响应】上下文
        RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
    }

    @Override
    public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) {

    }
}

其中rpc请求上下文对应RpcContext.LOCAL,rpc响应上下文对应RpcContext.SERVER_LOCAL。

对于服务调用方,

RpcContext.LOCAL的生命周期,是一次rpc请求,用户也可以在rpc方法调用前,主动初始化RpcContext.LOCAL实现隐式传参

RpcContext.SERVER_LOCAL的生命周期很长,是一次rpc响应到下一次rpc请求之前,也就是说用户可以在rpc方法调用结束后一直使用本次rpc调用的响应上下文,直至发生下次rpc调用

Dubbo源码(三)rpc调用流程

RpcContext中的InternalThreadLocal是借鉴了Netty的FastThreadLocal( juejin.cn/post/694493… ,用数组代替hashmap实现的threadlocal,里面javadoc都和FastThreadLocal如出一辙。

Dubbo源码(三)rpc调用流程

ClusterInvoker

AbstractClusterInvoker#invoke:本质上ClusterInvoker是根据一系列条件,从所有服务提供者Invoker中选择其中一个Invoker执行。

Dubbo源码(三)rpc调用流程

1)Directory#list:列出Invocation对应Invokers(路由)

Dubbo源码(三)rpc调用流程

2)AbstractClusterInvoker#initLoadBalance:根据url参数找LoadBalance扩展,默认RandomLoadBalance

Dubbo源码(三)rpc调用流程

3)AbstractClusterInvoker#doInvoke:子类实现,会有不同的集群容错方式

路由

RegistryDirectory#doList:经过RouterChain路由过滤后返回Invoker集合。

需要注意的是,如果reference指定多个group,则不包含路由逻辑,直接返回所有Invoker。

Dubbo源码(三)rpc调用流程

在服务订阅之前,RegistryProtocol会用订阅url构造RouterChain注入RegistryDirectory。

Dubbo源码(三)rpc调用流程

RouterChain构造时会根据订阅url获取RouterFactory激活扩展点,通过RouterFactory#getRouter创建Router对象。

Dubbo源码(三)rpc调用流程

RouterChain#route:循环所有Router,过滤invokers。

注意:invokers可以认为是内存注册表(rpc服务级别),只有注册中心providers列表变更,这里才会更新,rpc期间不强依赖注册中心的远程注册表。

Dubbo源码(三)rpc调用流程

默认情况下会有四个Router:

1)MockInvokersSelector:本地伪装特性,忽略

2)TagRouter:根据tag过滤Invoker

支持三种模式配置tag路由,优先级从高到低:

a)配置中心路由规则;b)RpcContext指定tag;c)reference指定tag

比如reference指定tag为gray。

ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("consumer-app"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
reference.setTag("gray");

TagRouter优先会找tag=gray的provider,如果找不到tag=gray的provider,会取tag=null的provider。

Dubbo源码(三)rpc调用流程

3)ServiceRouter:基于rpc服务的路由,需要结合配置中心

Dubbo源码(三)rpc调用流程

4)AppRouter:基于应用的路由,需要结合配置中心

Dubbo源码(三)rpc调用流程

集群容错

FailoverClusterInvoker是默认集群容错实现。

如果发生故障,重新选择Invoker进行远程调用,最多会调用3次(retry=2)。

Dubbo源码(三)rpc调用流程

AbstractClusterInvoker#select:在实际选择invoker前,优先走了一圈粘滞连接特性。

Dubbo源码(三)rpc调用流程

AbstractClusterInvoker#doSelect:

1)如果invokers只有一个,直接返回第一个

2)调用loadbalance执行一次选择,返回invoker

2-1)invoker已经被选过或者invoker不可用,再次进行一次选择

2-2)否则返回loadbalance的选择,结束

3)重新选择invoker,如果仍然为空,兜底返回上次invoker的下一个invoker

Dubbo源码(三)rpc调用流程

AbstractClusterInvoker#reselect逻辑如下:

1)获取未选择过的可用invokers,再次执行loadbalance

2)如果第一类invokers为空,把已选过的可用invokers也考虑进去(对于failover来说,就是之前执行失败的invoker),再次执行loadbalance

Dubbo源码(三)rpc调用流程

负载均衡

当经过路由后还存在多个invokers,往往就需要loadbalance通过特定算法找到其中一个invoker。

默认实现是RandomLoadBalance,随机选择invoker。

需要注意的是,大部分LoadBalance实现都有权重逻辑。

Dubbo源码(三)rpc调用流程

AbstractLoadBalance#getWeight:获取运行时不同provider对应invoker的权重

默认每个invoker权重都是100,基于provider的启动时间和预热时间可能会减少权重。

Dubbo源码(三)rpc调用流程

Protocol层

经过Cluster+Directory筛选,最终剩下一个服务提供者的ProtocolInvoker。

以Dubbo协议为例,最终会进入DubboInvoker

在进入DubboInvoker之前会经过一系列group=consumer的Filter,这里略过。

在AbstractProtocol#refer引用时,主动在DubboInvoker外部包装了AsyncToSyncInvoker。

Dubbo源码(三)rpc调用流程

AsyncToSyncInvoker#invoke调用协议Invoker,待底层协议Invoker返回,如果是同步调用,阻塞等待rpc响应。

Dubbo源码(三)rpc调用流程

DubboInvoker继承自AbstractInvoker抽象类。

AbstractInvoker#invoke做一些通用控制,比如填充RpcInvocation、异常封装到Result等等,底层调用DubboInvoker#doInvoke。

Dubbo源码(三)rpc调用流程

DubboProtocol#doInvoke:

1)getCallbackExecutor:决策处理rpc响应的线程池;

2)ExchangeClient#request:将Invocation、超时时间、处理rpc响应线程池提交给通讯层,得到Future返回;

Dubbo源码(三)rpc调用流程

对于普通同步调用来说,getCallbackExecutor每次会返回一个新的ThreadlessExecutor

Dubbo源码(三)rpc调用流程

ThreadlessExecutor的javadoc如下:

Dubbo源码(三)rpc调用流程

ThreadlessExecutor不管理任何线程,如果任务被提交到这个executor,不会马上被调度。

直到某个线程调用waitAndDrain方法,该线程立即执行这个任务。

这部分我们到第三阶段再分析。

通讯层

一般情况下,rpc框架业务线程和io线程都是分开的。

为了解决这个问题,一般设计方式都遵循如下规则:

1)【业务线程】为request分配唯一id

2)【业务线程】将request-id和request-future缓存到全局map

3)【定时线程】对于全局map做定时扫描,如果超时,用指定线程池执行future的回调

4)【io线程】发送请求到io线程

5)【io线程】io线程收到响应,从全局map根据响应里的request-id拿到request-future,提交到6线程

6)【rpc响应线程(异步)/业务线程(同步)】执行future的回调

HeaderExchangeChannel#request:这里做了1234步,第四步就是channel#send。

Dubbo源码(三)rpc调用流程

首先Request创建时分配了自增id,作为rpc请求id(1)。

public class Request {
    private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    private final long mId;
    public Request() {
        mId = newId();
    }
    private static long newId() {
        return INVOKE_ID.getAndIncrement();
    }
}

DefaultFuture#newFuture:创建DefaultFuture实例,提交超时检测任务(3)。

Dubbo源码(三)rpc调用流程

DefaultFuture构造时将request-id和自己放到全局map中(2)。

private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : 
        channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
    FUTURES.put(id, this); // 核心
    CHANNELS.put(id, channel);
}

序列化

DubboProtocol#initClient:在创建客户端连接时,Dubbo协议序列化方式为dubbo。

Dubbo源码(三)rpc调用流程

最终Request都会走DubboCountCodec进行序列化,序列化细节忽略。

Dubbo源码(三)rpc调用流程

同步调用阻塞等待

DubboInvoker将rpc请求提交到通讯层后,直接返回。

在AsyncToSyncInvoker中由于是同步调用,走AsyncRpcResult#get阻塞等待rpc响应。

Dubbo源码(三)rpc调用流程

对于同步调用AsyncRpcResult#get会主动调用ThreadlessExecutor#waitAndDrain。

Dubbo源码(三)rpc调用流程

ThreadlessExecutor#waitAndDrain:阻塞等待rpc响应任务投递,当响应到来,当前线程执行rpc响应任务。

Dubbo源码(三)rpc调用流程

之所以这个Executor的行为不同,是因为重写了execute方法。

当没有调用waitAndDrain执行任务前,任务会投递到queue中。

至于rpc响应,放到第三阶段继续看。

Dubbo源码(三)rpc调用流程

处理rpc请求

反序列化

同样,反序列化也是走DubboCountCodec,区别在于Request的data部分不是一个普通的RpcInvocation。

rpc请求体会被反序列化为一个DecodeableRpcInvocation,继承自RpcInvocation。

Dubbo源码(三)rpc调用流程

DecodeableRpcInvocation是个特殊的Invocation,特点在于能够自己根据输入流反序列化。

Dubbo源码(三)rpc调用流程

通讯层

AllChannelHandler#received:收到反序列化之后的Request,根据url找业务线程池执行rpc请求。

Dubbo源码(三)rpc调用流程

在暴露阶段,provider对于每个端口开启一个业务线程池,处理业务。

Dubbo源码(三)rpc调用流程

默认采用固定大小200+无队列+日志打印拒绝策略的线程池。

Dubbo源码(三)rpc调用流程

HeaderExchangeHandler#handleRequest:执行ChannelEventRunnable

1)提交到ExchangeHandler实现,执行Invocation调用

2)注册future回调,当业务处理完毕,再将rpc响应提交到io线程,响应客户端

Dubbo源码(三)rpc调用流程

接下来的流程,就是根据Request找到Invoker,经过层层包装,调用到rpc服务实现。

Dubbo源码(三)rpc调用流程

Protocol层

DubboProtocol#requestHandler:

1)getInvoker:根据Invocation找Invoker

2)执行Invoker

Dubbo源码(三)rpc调用流程

找Invoker

DubboProtocol#getInvoker:根据Invocation信息拼接serviceKey,定位到之前暴露的DubboExporter,返回DubboExporter持有的Invoker。

Dubbo源码(三)rpc调用流程

Filter

接下来经过一系列group=provider的Filter。

介绍一个比较关键的ContextFilter

在进入下游invoker前,将Invocation中的关键信息注入RpcContext请求rpc上下文中。

Dubbo源码(三)rpc调用流程

在rpc服务方法执行完毕后,将RpcContext响应rpc上下文中关键信息,注入rpc响应。

Dubbo源码(三)rpc调用流程

代理层

接下来会来到Rpc服务代理层,暴露阶段会通过ProxyFactory创建rpc服务实现代理,默认javassist实现。

Dubbo源码(三)rpc调用流程

AbstractProxyInvoker#invoke对于目标rpc方法执行包了一层,异步适配、异常处理等等。

Dubbo源码(三)rpc调用流程

对于javassist来说执行动态生成的Wrapper#invokeMethod,动态生成的Wrapper在上一章提到过,不再赘述。

注意:这里proxy实际上是target,即rpc服务实现类实例。

Dubbo源码(三)rpc调用流程

HeaderExchangeHandler#handleRequest:再复述一次

当rpc服务方法执行完毕,future完成后会将response提交到io线程,进入三阶段。

Dubbo源码(三)rpc调用流程

处理rpc响应

反序列化

rpc响应模型Response:

1)mId:对应request-id

2)mResult:DecodeableRpcResult实例

Dubbo源码(三)rpc调用流程

Response.mResult是DecodeableRpcResult实例。

DecodeableRpcResult实现和DecodeableRpcInvocation类似,自己支持反序列化,继承了AppResponse。

Dubbo源码(三)rpc调用流程

通讯层

AllChannelHandler#received:

当前是io线程(netty),根据入参message(Response)获取rpc响应处理线程池,提交ChannelEventRunnable任务到对应线程池。

Dubbo源码(三)rpc调用流程

WrappedChannelHandler#getPreferredExecutorService:

根据请求id找到挂起的DefaultFuture,再根据DefaultFuture找到绑定的线程池。

对于同步调用,这里就是ThreadlessExecutor

Dubbo源码(三)rpc调用流程

io线程将ChannelEventRunnable,提交到ThreadlessExecutor之后,就能唤醒用户线程,实现同步调用

Dubbo源码(三)rpc调用流程

Protocol层

HeaderExchangeHandler继续处理,调用DefaultFuture#received。

Dubbo源码(三)rpc调用流程

DefaultFuture#received:从全局futures中移除挂起请求

1)取消超时检测

2)完成future

Dubbo源码(三)rpc调用流程

DefaultFuture#doReceived:future以正常或异常完成。

Dubbo源码(三)rpc调用流程

AsyncToSyncInvoker调用Result#get从阻塞中返回。

Dubbo源码(三)rpc调用流程

代理层

最后回到rpc服务代理。

InvokerInvocationHandler#invoke:AsyncRpcResult#recreate获取结果。

Dubbo源码(三)rpc调用流程

AsyncRpcResult#recreate:对于同步调用,走底层AppResponse的recreate。

Dubbo源码(三)rpc调用流程

AppResponse#recreate:调用正常,返回反序列化后的result。

Dubbo源码(三)rpc调用流程

总结

rpc流程

一次同步rpc调用流程大致如下:

Dubbo源码(三)rpc调用流程

阶段一(consumer):

1)代理层:用户代码进入rpc服务代理,收集rpc调用必要参数,封装RpcInvocation

2)Cluster层:执行Interceptor,经过路由、容错、负载均衡最终选定一个provider

3)Protocol层:

  • 执行Filter
  • 构建Request并缓存到内存map,后台线程扫描map校验Request是否超时
  • 提交Request到io线程,阻塞等待Response

4)通讯层:Request序列化,发送Request给对端

阶段二(provider):

1)通讯层:Request反序列化,得到RpcInvocation,提交到业务线程池并注册future回调,回调时将rpc返回值封装为Response并序列化提交到io线程

2)Protocol层:通过RpcInvocation找到之前暴露的Exporter,找到Exporter对应的Invoker,执行Filter

3)代理层:调用rpc方法实现

阶段三(consumer):

1)通讯层:将Response反序列化,提交任务到业务线程池

2)Protocol层:根据请求id反向找到挂起的future,取消超时检测,设置future执行结果

3)代理层:解析future返回结果,返回用户代码

一些细节

实现同步rpc调用

rpc调用业务线程和io线程一般是独立的,所以有如下设计:

1)【业务线程】为request分配唯一id

2)【业务线程】将request-id和request-future缓存到全局map

3)【定时线程】对于全局map做定时扫描,如果超时,用指定线程池执行future的回调

4)【io线程】发送请求到io线程

5)【io线程】io线程收到响应,从全局map根据响应里的request-id拿到request-future,提交到6线程

6)【rpc响应线程(异步)/业务线程(同步)】执行future的回调

一般框架底层通讯都会设计成异步,然后同步去适配异步。

不例外,dubbo设计了ThreadlessExecutor。

本质上ThreadlessExecutor是一个阻塞队列适配了ExecutorService。

Dubbo源码(三)rpc调用流程

业务线程会阻塞等待队列非空(queue.take),io线程收到response投递请求到阻塞队列,唤醒业务线程,从而实现同步调用。

负载均衡算法

之前对默认负载均衡算法有一点误解,认为是纯粹的随机算法,实际上包含权重逻辑。

而权重逻辑中会包含warmup减权,默认warmup时长是10分钟,权重是100。

Dubbo源码(三)rpc调用流程

欢迎大家评论或私信讨论问题。

本文原创,未经许可不得转载。

欢迎关注公众号【程序猿阿越】。