likes
comments
collection
share

Dubbo服务消费者请求过程的源码解析

作者站长头像
站长
· 阅读数 34

今天我们分析一下Dubbo消费者是怎样调用远程的生产者的。

在服务调用过程中,看似只有请求和响应两步。但实际上,服务调用过程包含四步,如下图所示的服务调用过程包含服务消费者发送请求、服务提供者接收请求、服务提供者发送响应结果和服务消费者接收响应结果。

Dubbo服务消费者请求过程的源码解析

服务调用的流程如下:

  1. 服务消费者调用接口时,底层使用Invoker,通过Client向服务提供者发送请求消息;
  2. 服务提供者接收请求,并处理业务逻辑,底层会通过服务端的Invoker调用接口实现;
  3. 服务提供者处理完业务逻辑后,发送响应数据;
  4. 服务消费者接收响应数据,并返回给调用方。

注入到Spring容器的invoker是MockClusterInvoker类,这是我们的入口,从下图可以看出

Dubbo服务消费者请求过程的源码解析

InvokerInvocationHandler.invoke()的核心逻辑是调用了MockClusterInvoker.invoke(),并调用Result.recreate()获取返回结果

//MockClusterInvoker.invoke()
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;

    //获取mock配置值,默认为false
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
        //非mock,调用远程服务
        //默认为FailoverClusterInvoker
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        //强制执行mock接口
        result = doMockInvoke(invocation, null);
    } else {
        //省略其他逻辑代码...
    }
    return result;
}

默认情况下,走的是正常的远程调用逻辑,即FailoverClusterInvoker.invoke()invoke()方法是在其父类AbstractClusterInvoker中实现的。

Dubbo服务消费者请求过程的源码解析

Dubbo服务消费者请求过程的源码解析

//AbstractClusterInvoker.invoke()
public Result invoke(final Invocation invocation) throws RpcException {
    //检查是否已销毁
    checkWhetherDestroyed();

    //给invocation设置attachments.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    //获取可用的invokers
    //list的底层是调用的Directory.list
    //所有的invokers都保存在RouterChain.invokers里
    //所以Directory.list最终也是从RouterChain.invokers获取的
    List<Invoker<T>> invokers = list(invocation);
    //初始化负载均衡
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    //为异步调用添加InvocationId
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    //调用子类的模板方法doInvoke(),当前对象为FailoverClusterInvoker
    return doInvoke(invocation, invokers, loadbalance);
}

以上代码中,有 3 处关键点:

  1. list(invocation):获取所有可用的Invoker
  2. initLoadBalance(invokers, invocation):初始化负载均衡器(LoadBalance),用于选择Invoker时的负载均衡;
  3. doInvoke(invocation, invokers, loadbalance):调用子类的模板方法doInvoke(),此方法中实现了负载均衡和远程服务调用。

Dubbo服务消费者请求过程的源码解析

以上代码的开头,当没有可用的服务时,抛出异常。这个是日常调试过程中比较常见的问题,比如依赖的服务提供者正在部署,或者服务挂了等,都会在这里跑出异常。

代码的最后,调用routerChain.route()routerChain容器中获取Invoker集合。在获取的过程中,根据路由规则,筛选出符合条件的Invoker集合

Dubbo服务消费者请求过程的源码解析

//AsyncToSyncInvoker.invoke()
@Override
public Result invoke(Invocation invocation) throws RpcException {
    //调用DubboInvoker.invoke()
    Result asyncResult = invoker.invoke(invocation);

    try {
        //如果是同步调用,等待返回结果
        //默认是同步调用
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();

        if (t instanceof TimeoutException) {
            //调用超时异常
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            //远程调用异常
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}

以上代码共两步,第一步调用DubboInvoker.invoke(),并得到一个Result;第二步调用Result.get()方法,同步等待返回结果。invoke()方法在其父类AbstractInvoker中实现。

Dubbo服务消费者请求过程的源码解析

//DubboInvoker.doInvoke()
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    //获取ExchangeClient,用于通讯
    //ExchangeClient表示客户端,包装了与远程通讯的逻辑
    //此处的ExchangeClient为ReferenceCountExchangeClient,装饰了HeaderExchangeClient
    //因此,最终调用的是HeaderExchangeClient
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        //判断是否单向通讯
        //单向通讯,即只发送请求,不关心结果
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        //如果是单向通讯,调用的是currentClient.send(),直接返回结果
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            //如果是双向通讯,调用的是currentClient.request()
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
            //注册responseFuture.whenComplete,调用asyncRpcResult.complete()
            //在AsyncToSyncInvoker调用DubboInvoker之后,如果是同步调用,会调用asyncRpcResult.get()等待结果。
            //当responseFuture执行完成后,触发asyncRpcResult.complete()
            //然后触发AsyncToSyncInvoker中的asyncRpcResult.get()执行结束,得到最终的Result
            asyncRpcResult.subscribeTo(responseFuture);
            //保存responseFuture
            FutureContext.getContext().setCompatibleFuture(responseFuture);
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

以上代码,先获取一个ExchangeClient;然后根据通讯方式判断处理逻辑,如果是单向通讯(即不关心返回结果),则调用ExchangeClient.send(),如果是双向通讯(即需要等待返回结果),则调用ExchangeClient.request()

这里重点看双向通讯的逻辑,我们知道,Netty中的网络通讯都是异步的,那么 Dubbo 怎么实现的同步调用呢?这里是关键。这里的实现方式使用的核心组件是CompletableFuture,此组件的核心功能之一是当执行complete()方法后,解除get()的阻塞并获取结果。

结合以上代码,完整的异步转同步的逻辑如下:

  1. 实例化一个AsyncRpcResult对象,将来作为返回结果,返回给AsyncToSyncInvoker
  2. 调用ExchangeClient.request(),并返回CompletableFuture
  3. asyncRpcResult 注册CompletableFuturewhenComplete事件;
  4. 将asyncRpcResult 返回给AsyncToSyncInvoker
  5. AsyncToSyncInvoker中调用AsyncRpcResult.get(),阻塞等待结果;
  6. 服务端返回响应数据后(此过程在下文中分析),调用CompletableFuture.complete(),触发asyncRpcResult.complete(),最终解除AsyncRpcResult.get()的阻塞,并获取返回结果。

Dubbo服务消费者请求过程的源码解析

//HeaderExchangeChannel.request()
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // 创建Request对象
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    //最终,返回的是DefaultFuture,DefaultFuture继承自CompletableFuture
    //此处的channel为NettyClient
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        //此处的channel为NettyClient
        //调用的是NettyClient的父类AbstractPeer.send()
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

以上代码中,干了三件事:

  1. 初始化Request对象,包装请求数据;
  2. 初始化DefaultFuture对象,DefaultFuture继承自CompletableFuture,最终返回给AsyncToSyncInvoker,用于上文说的异步转同步,以及超时异常的处理;
  3. 调用NettyClient.send()发送请求。

上述代码中channel.send(req)的chanel是HeaderExchangeChannel, HeaderExchangeClient中依赖了HeaderExchangeChannel,也依赖了HeaderExchangeHandler, HeaderExchangeHandler很重要,响应接收返回值的时候有用到

Dubbo服务消费者请求过程的源码解析

今天的请求过程就讲解到这里,dubbo是一款很优秀的框架,值得我们去细细研究,源码要反复阅读,毕竟熟能生巧。

转载自:https://juejin.cn/post/7229217442425389114
评论
请登录