Dubbo服务消费者请求过程的源码解析
今天我们分析一下Dubbo消费者是怎样调用远程的生产者的。
在服务调用过程中,看似只有请求和响应两步。但实际上,服务调用过程包含四步,如下图所示的服务调用过程包含服务消费者发送请求、服务提供者接收请求、服务提供者发送响应结果和服务消费者接收响应结果。

服务调用的流程如下:
- 服务消费者调用接口时,底层使用
Invoker,通过Client向服务提供者发送请求消息; - 服务提供者接收请求,并处理业务逻辑,底层会通过服务端的
Invoker调用接口实现; - 服务提供者处理完业务逻辑后,发送响应数据;
 - 服务消费者接收响应数据,并返回给调用方。
 
注入到Spring容器的invoker是MockClusterInvoker类,这是我们的入口,从下图可以看出

InvokerInvocationHandler.invoke()的核心逻辑是调用了MockClusterInvoker.invoke(),并调用Result.recreate()获取返回结果
//MockClusterInvoker.invoke()
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    //获取mock配置值,默认为false
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
        //非mock,调用远程服务
        //默认为FailoverClusterInvoker
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        //强制执行mock接口
        result = doMockInvoke(invocation, null);
    } else {
        //省略其他逻辑代码...
    }
    return result;
}
默认情况下,走的是正常的远程调用逻辑,即FailoverClusterInvoker.invoke()。invoke()方法是在其父类AbstractClusterInvoker中实现的。


//AbstractClusterInvoker.invoke()
public Result invoke(final Invocation invocation) throws RpcException {
    //检查是否已销毁
    checkWhetherDestroyed();
    //给invocation设置attachments.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    //获取可用的invokers
    //list的底层是调用的Directory.list
    //所有的invokers都保存在RouterChain.invokers里
    //所以Directory.list最终也是从RouterChain.invokers获取的
    List<Invoker<T>> invokers = list(invocation);
    //初始化负载均衡
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    //为异步调用添加InvocationId
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    //调用子类的模板方法doInvoke(),当前对象为FailoverClusterInvoker
    return doInvoke(invocation, invokers, loadbalance);
}
以上代码中,有 3 处关键点:
list(invocation):获取所有可用的Invoker;initLoadBalance(invokers, invocation):初始化负载均衡器(LoadBalance),用于选择Invoker时的负载均衡;doInvoke(invocation, invokers, loadbalance):调用子类的模板方法doInvoke(),此方法中实现了负载均衡和远程服务调用。

以上代码的开头,当没有可用的服务时,抛出异常。这个是日常调试过程中比较常见的问题,比如依赖的服务提供者正在部署,或者服务挂了等,都会在这里跑出异常。
代码的最后,调用routerChain.route()从routerChain容器中获取Invoker集合。在获取的过程中,根据路由规则,筛选出符合条件的Invoker集合

//AsyncToSyncInvoker.invoke()
@Override
public Result invoke(Invocation invocation) throws RpcException {
    //调用DubboInvoker.invoke()
    Result asyncResult = invoker.invoke(invocation);
    try {
        //如果是同步调用,等待返回结果
        //默认是同步调用
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            //调用超时异常
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            //远程调用异常
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}
以上代码共两步,第一步调用DubboInvoker.invoke(),并得到一个Result;第二步调用Result.get()方法,同步等待返回结果。invoke()方法在其父类AbstractInvoker中实现。

//DubboInvoker.doInvoke()
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);
    //获取ExchangeClient,用于通讯
    //ExchangeClient表示客户端,包装了与远程通讯的逻辑
    //此处的ExchangeClient为ReferenceCountExchangeClient,装饰了HeaderExchangeClient
    //因此,最终调用的是HeaderExchangeClient
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        //判断是否单向通讯
        //单向通讯,即只发送请求,不关心结果
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        //如果是单向通讯,调用的是currentClient.send(),直接返回结果
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            //如果是双向通讯,调用的是currentClient.request()
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
            //注册responseFuture.whenComplete,调用asyncRpcResult.complete()
            //在AsyncToSyncInvoker调用DubboInvoker之后,如果是同步调用,会调用asyncRpcResult.get()等待结果。
            //当responseFuture执行完成后,触发asyncRpcResult.complete()
            //然后触发AsyncToSyncInvoker中的asyncRpcResult.get()执行结束,得到最终的Result
            asyncRpcResult.subscribeTo(responseFuture);
            //保存responseFuture
            FutureContext.getContext().setCompatibleFuture(responseFuture);
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
以上代码,先获取一个ExchangeClient;然后根据通讯方式判断处理逻辑,如果是单向通讯(即不关心返回结果),则调用ExchangeClient.send(),如果是双向通讯(即需要等待返回结果),则调用ExchangeClient.request()。
这里重点看双向通讯的逻辑,我们知道,Netty中的网络通讯都是异步的,那么 Dubbo 怎么实现的同步调用呢?这里是关键。这里的实现方式使用的核心组件是CompletableFuture,此组件的核心功能之一是当执行complete()方法后,解除get()的阻塞并获取结果。
结合以上代码,完整的异步转同步的逻辑如下:
- 实例化一个
AsyncRpcResult对象,将来作为返回结果,返回给AsyncToSyncInvoker; - 调用
ExchangeClient.request(),并返回CompletableFuture; - asyncRpcResult 注册
CompletableFuture的whenComplete事件; - 将asyncRpcResult 返回给
AsyncToSyncInvoker; - 在
AsyncToSyncInvoker中调用AsyncRpcResult.get(),阻塞等待结果; - 服务端返回响应数据后(此过程在下文中分析),调用
CompletableFuture.complete(),触发asyncRpcResult.complete(),最终解除AsyncRpcResult.get()的阻塞,并获取返回结果。 

//HeaderExchangeChannel.request()
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // 创建Request对象
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    //最终,返回的是DefaultFuture,DefaultFuture继承自CompletableFuture
    //此处的channel为NettyClient
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        //此处的channel为NettyClient
        //调用的是NettyClient的父类AbstractPeer.send()
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
以上代码中,干了三件事:
- 初始化
Request对象,包装请求数据; - 初始化
DefaultFuture对象,DefaultFuture继承自CompletableFuture,最终返回给AsyncToSyncInvoker,用于上文说的异步转同步,以及超时异常的处理; - 调用
NettyClient.send()发送请求。 
上述代码中channel.send(req)的chanel是HeaderExchangeChannel, HeaderExchangeClient中依赖了HeaderExchangeChannel,也依赖了HeaderExchangeHandler, HeaderExchangeHandler很重要,响应接收返回值的时候有用到

今天的请求过程就讲解到这里,dubbo是一款很优秀的框架,值得我们去细细研究,源码要反复阅读,毕竟熟能生巧。
转载自:https://juejin.cn/post/7229217442425389114