Dubbo服务消费者接收响应过程的源码解析
大家好,我是趣开源,上一篇文章我们讲到了Dubbo服务消费者请求过程的源码解析,今天我们讲一下接收响应过程。强调一下,我们现在说到都是消费者端的源码,生产者的源码后面会讲到。
了解netty的同学都知道netty将业务和网络处理这两块解耦了,接收业务的代码都在ChannelHandler里,如果不熟悉netty,可以给我留言,我会考虑讲解一下netty的源码。
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
上述代码是创建一个NettyClientHandler对象,参数传的是this, 也就是nettyClient对象的本身,大家这里可能会有疑问,我来说明一下,nettyClient本身集成了ChannelHandler接口,所以这里传this是没有问题的。
看一下NettyClientHandler的构造函数的参数也是ChannelHandler
上述的channelRead方法就是客户端接收服务端响应的处理逻辑,里面主要是委派给handler.received方法来处理,这里handler就是nettyClient对象,咱们来看一下handler的由来
从上面的几张图片中我们来说一下handler的流程:
NettyClientHandler->MultiMessageHandler->HeartBeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DefaultFuture
先获取NettyChannel
,然后调用NettyClient.received()
。
//NettyClientHandler.channelRead()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
//调用NettyClient.received()
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
继续调用MultiMessageHandler.received()
。
//MultiMessageHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
//Response类型的消息,走HeartBeatHandler.received()
handler.received(channel, message);
}
}
当消息类型是Response
时,走HeartBeatHandler.received()
。
//HeartBeatHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
//心跳请求
if (isHeartbeatRequest(message)) {
//省略部分代码...
}
//心跳响应
if (isHeartbeatResponse(message)) {
//省略部分代码...
}
//继续调用AllChannelHandler.received()
handler.received(channel, message);
}
最终和服务提供者的处理逻辑一样,调用AllChannelHandler.received()
。
//AllChannelHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
//线程池
ExecutorService executor = getExecutorService();
try {
//创建线程,在线程池内异步执行逻辑处理
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//省略部分代码...
}
}
先获取服务消费者端的线程池,然后创建线程并异步处理。其核心处理逻辑同样在ChannelEventRunnable
中。
//ChannelEventRunnable.run()
public void run() {
if (state == ChannelState.RECEIVED) {
//当前状态是接收
try {
//调用DecodeHandler.received()
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
//其他状态的处理
//省略...
}
}
当状态是RECEIVED
时,调用DecodeHandler.received()
。
//DecodeHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
//省略部分代码...
//调用HeaderExchangeHandler.received()
handler.received(channel, message);
}
此时的消息类型是Response
,所以走的是处理响应消息的逻辑handleResponse(
//HeaderExchangeHandler.handleResponse()
static void handleResponse(Channel channel, Response response) throws RemotingException {
//response不为null,且非心跳
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
如果response
不为null
,且非心跳,则调用DefaultFuture.received()
;
//DefaultFuture.received()
public static void received(Channel channel, Response response) {
//调用重载方法received()
received(channel, response, false);
}
继续调用DefaultFuture
的重载方法received()
。
//DefaultFuture.received()
public static void received(Channel channel, Response response, boolean timeout) {
try {
//获取当前Future,并从全局中移除
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
//如果是超时,则从时间轮中移除,不再进行超时检查
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// 取消超时检查
t.cancel();
}
//处理返回结果,将结果返回给调用方
future.doReceived(response);
} else {
//省略部分代码...
}
} finally {
CHANNELS.remove(response.getId());
}
}
先根据请求编号从DefaultFuture
的全局容器FUTURES
中获取DefaultFuture
。然后调用DefaultFuture.doReceived()
处理返回结果。
//DefaultFuture.doReceived()
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
//在发送消息的时候
//DubboInvoker里返回的是DefaultFuture,并返回AsyncRpcResult给AsyncToSyncInvoker
//同时,注册了DefaultFuture.whenComplete,调用AsyncRpcResult.complete
//在AsyncToSyncInvoker中通过asyncRpcResult.get()等待结果返回
//因此,当执行到此地的complete和completeExceptionally时,就会调用AsyncRpcResult.complete
//然后触发asyncRpcResult.get()执行结束,并获取此地设置的返回值
//至此,调用结束
if (res.getStatus() == Response.OK) {
//如果响应状态是成功,完成并设定返回值
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
//返回超时异常TimeoutException
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
//返回远程调用异常RemotingException
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
如果看过上一篇文章的话,DefaultFuture看到它我想大家都会熟悉吧,正好是发送请求时HeaderExchangeChannel中用到的,我们来看一下:
我们先回顾一下服务消费者发送请求的时候的逻辑。
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
//注册responseFuture.whenComplete,调用asyncRpcResult.complete()
//在AsyncToSyncInvoker调用DubboInvoker之后,如果是同步调用,会调用asyncRpcResult.get()等待结果。
//当responseFuture执行完成后,触发asyncRpcResult.complete()
//然后触发AsyncToSyncInvoker中的asyncRpcResult.get()执行结束,得到最终的Result
asyncRpcResult.subscribeTo(responseFuture);
//保存responseFuture
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
以上代码中的responseFuture
即为上面的DefaultFuture
,asyncRpcResult
注册了DefaultFuture
的whenComplete()
。同时在AsyncToSyncInvoker
调用DubboInvoker
之后,调用asyncRpcResult.get()
等待结果。当执行完DefaultFuture.doReceived()
中的this.complete()
或this.completeExceptionally()
后,将会触发AsyncRpcResult.complete()
或AsyncRpcResult.completeExceptionally()
。最终释放asyncRpcResult.get()
并得到返回结果。
至此,服务调用的全过程就全部分析完了!
总结
Dubbo 服务调用的全流程如上图所示。
- 服务消费者通过本地
Invoker
,向服务提供者发送请求,并对请求消息编码。 - 服务消费者阻塞等待返回结果。
- 服务提供者接收请求,并在处理业务逻辑之前对请求消息解码。
- 服务提供者将本次调用的业务处理分发到线程池,并通过
Invoker
调用接口的实现。 - 服务提供者发送响应结果,并在发送到服务消费者之前,对响应结果编码。
- 服务消费者接收响应结果,并在处理之前对响应结果解码。
- 服务消费者将本次响应分发到线程池,异步处理。
- 在异步处理过程中,通过请求编号找到本次调用的
DefaultFuture
,设置返回结果并唤醒第2步的阻塞。 - 业务调用方同步获取到第 8 步设置的响应结果
转载自:https://juejin.cn/post/7229539262910218299