Dubbo服务消费者请求过程的源码解析
今天我们分析一下Dubbo消费者是怎样调用远程的生产者的。
在服务调用过程中,看似只有请求和响应两步。但实际上,服务调用过程包含四步,如下图所示的服务调用过程包含服务消费者发送请求、服务提供者接收请求、服务提供者发送响应结果和服务消费者接收响应结果。
服务调用的流程如下:
- 服务消费者调用接口时,底层使用
Invoker
,通过Client
向服务提供者发送请求消息; - 服务提供者接收请求,并处理业务逻辑,底层会通过服务端的
Invoker
调用接口实现; - 服务提供者处理完业务逻辑后,发送响应数据;
- 服务消费者接收响应数据,并返回给调用方。
注入到Spring容器的invoker是MockClusterInvoker类,这是我们的入口,从下图可以看出
InvokerInvocationHandler.invoke()
的核心逻辑是调用了MockClusterInvoker.invoke()
,并调用Result.recreate()
获取返回结果
//MockClusterInvoker.invoke()
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
//获取mock配置值,默认为false
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//非mock,调用远程服务
//默认为FailoverClusterInvoker
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
//强制执行mock接口
result = doMockInvoke(invocation, null);
} else {
//省略其他逻辑代码...
}
return result;
}
默认情况下,走的是正常的远程调用逻辑,即FailoverClusterInvoker.invoke()
。invoke()
方法是在其父类AbstractClusterInvoker
中实现的。
//AbstractClusterInvoker.invoke()
public Result invoke(final Invocation invocation) throws RpcException {
//检查是否已销毁
checkWhetherDestroyed();
//给invocation设置attachments.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
//获取可用的invokers
//list的底层是调用的Directory.list
//所有的invokers都保存在RouterChain.invokers里
//所以Directory.list最终也是从RouterChain.invokers获取的
List<Invoker<T>> invokers = list(invocation);
//初始化负载均衡
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
//为异步调用添加InvocationId
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//调用子类的模板方法doInvoke(),当前对象为FailoverClusterInvoker
return doInvoke(invocation, invokers, loadbalance);
}
以上代码中,有 3 处关键点:
list(invocation)
:获取所有可用的Invoker
;initLoadBalance(invokers, invocation)
:初始化负载均衡器(LoadBalance
),用于选择Invoker
时的负载均衡;doInvoke(invocation, invokers, loadbalance)
:调用子类的模板方法doInvoke()
,此方法中实现了负载均衡和远程服务调用。
以上代码的开头,当没有可用的服务时,抛出异常。这个是日常调试过程中比较常见的问题,比如依赖的服务提供者正在部署,或者服务挂了等,都会在这里跑出异常。
代码的最后,调用routerChain.route()
从routerChain
容器中获取Invoker
集合。在获取的过程中,根据路由规则,筛选出符合条件的Invoker
集合
//AsyncToSyncInvoker.invoke()
@Override
public Result invoke(Invocation invocation) throws RpcException {
//调用DubboInvoker.invoke()
Result asyncResult = invoker.invoke(invocation);
try {
//如果是同步调用,等待返回结果
//默认是同步调用
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof TimeoutException) {
//调用超时异常
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else if (t instanceof RemotingException) {
//远程调用异常
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
return asyncResult;
}
以上代码共两步,第一步调用DubboInvoker.invoke()
,并得到一个Result
;第二步调用Result.get()
方法,同步等待返回结果。invoke()
方法在其父类AbstractInvoker
中实现。
//DubboInvoker.doInvoke()
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
//获取ExchangeClient,用于通讯
//ExchangeClient表示客户端,包装了与远程通讯的逻辑
//此处的ExchangeClient为ReferenceCountExchangeClient,装饰了HeaderExchangeClient
//因此,最终调用的是HeaderExchangeClient
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
//判断是否单向通讯
//单向通讯,即只发送请求,不关心结果
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
//如果是单向通讯,调用的是currentClient.send(),直接返回结果
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
//如果是双向通讯,调用的是currentClient.request()
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
//注册responseFuture.whenComplete,调用asyncRpcResult.complete()
//在AsyncToSyncInvoker调用DubboInvoker之后,如果是同步调用,会调用asyncRpcResult.get()等待结果。
//当responseFuture执行完成后,触发asyncRpcResult.complete()
//然后触发AsyncToSyncInvoker中的asyncRpcResult.get()执行结束,得到最终的Result
asyncRpcResult.subscribeTo(responseFuture);
//保存responseFuture
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
以上代码,先获取一个ExchangeClient
;然后根据通讯方式判断处理逻辑,如果是单向通讯(即不关心返回结果),则调用ExchangeClient.send()
,如果是双向通讯(即需要等待返回结果),则调用ExchangeClient.request()
。
这里重点看双向通讯的逻辑,我们知道,Netty中的网络通讯都是异步的,那么 Dubbo 怎么实现的同步调用呢?这里是关键。这里的实现方式使用的核心组件是CompletableFuture
,此组件的核心功能之一是当执行complete()
方法后,解除get()
的阻塞并获取结果。
结合以上代码,完整的异步转同步的逻辑如下:
- 实例化一个
AsyncRpcResult
对象,将来作为返回结果,返回给AsyncToSyncInvoker
; - 调用
ExchangeClient.request()
,并返回CompletableFuture
; - asyncRpcResult 注册
CompletableFuture
的whenComplete
事件; - 将asyncRpcResult 返回给
AsyncToSyncInvoker
; - 在
AsyncToSyncInvoker
中调用AsyncRpcResult.get()
,阻塞等待结果; - 服务端返回响应数据后(此过程在下文中分析),调用
CompletableFuture.complete()
,触发asyncRpcResult.complete()
,最终解除AsyncRpcResult.get()
的阻塞,并获取返回结果。
//HeaderExchangeChannel.request()
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// 创建Request对象
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
//最终,返回的是DefaultFuture,DefaultFuture继承自CompletableFuture
//此处的channel为NettyClient
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
//此处的channel为NettyClient
//调用的是NettyClient的父类AbstractPeer.send()
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
以上代码中,干了三件事:
- 初始化
Request
对象,包装请求数据; - 初始化
DefaultFuture
对象,DefaultFuture
继承自CompletableFuture
,最终返回给AsyncToSyncInvoker
,用于上文说的异步转同步,以及超时异常的处理; - 调用
NettyClient.send()
发送请求。
上述代码中channel.send(req)的chanel是HeaderExchangeChannel, HeaderExchangeClient中依赖了HeaderExchangeChannel,也依赖了HeaderExchangeHandler, HeaderExchangeHandler很重要,响应接收返回值的时候有用到
今天的请求过程就讲解到这里,dubbo是一款很优秀的框架,值得我们去细细研究,源码要反复阅读,毕竟熟能生巧。
转载自:https://juejin.cn/post/7229217442425389114