likes
comments
collection
share

Dubbo服务消费者接收响应过程的源码解析

作者站长头像
站长
· 阅读数 42

大家好,我是趣开源,上一篇文章我们讲到了Dubbo服务消费者请求过程的源码解析,今天我们讲一下接收响应过程。强调一下,我们现在说到都是消费者端的源码,生产者的源码后面会讲到。

了解netty的同学都知道netty将业务和网络处理这两块解耦了,接收业务的代码都在ChannelHandler里,如果不熟悉netty,可以给我留言,我会考虑讲解一下netty的源码。

Dubbo服务消费者接收响应过程的源码解析

final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);

上述代码是创建一个NettyClientHandler对象,参数传的是this, 也就是nettyClient对象的本身,大家这里可能会有疑问,我来说明一下,nettyClient本身集成了ChannelHandler接口,所以这里传this是没有问题的。

看一下NettyClientHandler的构造函数的参数也是ChannelHandler

Dubbo服务消费者接收响应过程的源码解析

Dubbo服务消费者接收响应过程的源码解析

上述的channelRead方法就是客户端接收服务端响应的处理逻辑,里面主要是委派给handler.received方法来处理,这里handler就是nettyClient对象,咱们来看一下handler的由来

Dubbo服务消费者接收响应过程的源码解析

Dubbo服务消费者接收响应过程的源码解析

Dubbo服务消费者接收响应过程的源码解析

从上面的几张图片中我们来说一下handler的流程:

NettyClientHandler->MultiMessageHandler->HeartBeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DefaultFuture

先获取NettyChannel,然后调用NettyClient.received()

//NettyClientHandler.channelRead()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        //调用NettyClient.received()
        handler.received(channel, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

继续调用MultiMessageHandler.received()

//MultiMessageHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
        //Response类型的消息,走HeartBeatHandler.received()
        handler.received(channel, message);
    }
}

当消息类型是Response时,走HeartBeatHandler.received()

//HeartBeatHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
    setReadTimestamp(channel);
    //心跳请求
    if (isHeartbeatRequest(message)) {
        //省略部分代码...
    }
    //心跳响应
    if (isHeartbeatResponse(message)) {
        //省略部分代码...
    }
    //继续调用AllChannelHandler.received()
    handler.received(channel, message);
}

最终和服务提供者的处理逻辑一样,调用AllChannelHandler.received()

//AllChannelHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
    //线程池
    ExecutorService executor = getExecutorService();
    try {
        //创建线程,在线程池内异步执行逻辑处理
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        //省略部分代码...
    }
}

先获取服务消费者端的线程池,然后创建线程并异步处理。其核心处理逻辑同样在ChannelEventRunnable中。

//ChannelEventRunnable.run()
public void run() {
    if (state == ChannelState.RECEIVED) {
        //当前状态是接收
        try {
            //调用DecodeHandler.received()
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                    + ", message is " + message, e);
        }
    } else {
        //其他状态的处理
        //省略...
    }

}

当状态是RECEIVED时,调用DecodeHandler.received()

//DecodeHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
    //省略部分代码...
    //调用HeaderExchangeHandler.received()
    handler.received(channel, message);
}

此时的消息类型是Response,所以走的是处理响应消息的逻辑handleResponse(

//HeaderExchangeHandler.handleResponse()
static void handleResponse(Channel channel, Response response) throws RemotingException {
    //response不为null,且非心跳
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}

如果response不为null,且非心跳,则调用DefaultFuture.received();

//DefaultFuture.received()
public static void received(Channel channel, Response response) {
    //调用重载方法received()
    received(channel, response, false);
}

继续调用DefaultFuture的重载方法received()

//DefaultFuture.received()
public static void received(Channel channel, Response response, boolean timeout) {
    try {
        //获取当前Future,并从全局中移除
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            //如果是超时,则从时间轮中移除,不再进行超时检查
            Timeout t = future.timeoutCheckTask;
            if (!timeout) {
                // 取消超时检查
                t.cancel();
            }
            //处理返回结果,将结果返回给调用方
            future.doReceived(response);
        } else {
            //省略部分代码...
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

先根据请求编号DefaultFuture的全局容器FUTURES中获取DefaultFuture。然后调用DefaultFuture.doReceived()处理返回结果。

//DefaultFuture.doReceived()
private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    //在发送消息的时候
    //DubboInvoker里返回的是DefaultFuture,并返回AsyncRpcResult给AsyncToSyncInvoker
    //同时,注册了DefaultFuture.whenComplete,调用AsyncRpcResult.complete
    //在AsyncToSyncInvoker中通过asyncRpcResult.get()等待结果返回
    //因此,当执行到此地的complete和completeExceptionally时,就会调用AsyncRpcResult.complete
    //然后触发asyncRpcResult.get()执行结束,并获取此地设置的返回值
    //至此,调用结束

    if (res.getStatus() == Response.OK) {
        //如果响应状态是成功,完成并设定返回值
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        //返回超时异常TimeoutException
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        //返回远程调用异常RemotingException
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}

如果看过上一篇文章的话,DefaultFuture看到它我想大家都会熟悉吧,正好是发送请求时HeaderExchangeChannel中用到的,我们来看一下:

Dubbo服务消费者接收响应过程的源码解析

我们先回顾一下服务消费者发送请求的时候的逻辑。

AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
//注册responseFuture.whenComplete,调用asyncRpcResult.complete()
//在AsyncToSyncInvoker调用DubboInvoker之后,如果是同步调用,会调用asyncRpcResult.get()等待结果。
//当responseFuture执行完成后,触发asyncRpcResult.complete()
//然后触发AsyncToSyncInvoker中的asyncRpcResult.get()执行结束,得到最终的Result
asyncRpcResult.subscribeTo(responseFuture);
//保存responseFuture
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;

以上代码中的responseFuture即为上面的DefaultFutureasyncRpcResult注册了DefaultFuturewhenComplete()。同时在AsyncToSyncInvoker调用DubboInvoker之后,调用asyncRpcResult.get()等待结果。当执行完DefaultFuture.doReceived()中的this.complete()this.completeExceptionally()后,将会触发AsyncRpcResult.complete()AsyncRpcResult.completeExceptionally()。最终释放asyncRpcResult.get()并得到返回结果。

至此,服务调用的全过程就全部分析完了!

总结

Dubbo 服务调用的全流程如上图所示。

  1. 服务消费者通过本地Invoker,向服务提供者发送请求,并对请求消息编码。
  2. 服务消费者阻塞等待返回结果。
  3. 服务提供者接收请求,并在处理业务逻辑之前对请求消息解码。
  4. 服务提供者将本次调用的业务处理分发到线程池,并通过Invoker调用接口的实现。
  5. 服务提供者发送响应结果,并在发送到服务消费者之前,对响应结果编码。
  6. 服务消费者接收响应结果,并在处理之前对响应结果解码。
  7. 服务消费者将本次响应分发到线程池,异步处理。
  8. 在异步处理过程中,通过请求编号找到本次调用的DefaultFuture,设置返回结果并唤醒第2步的阻塞。
  9. 业务调用方同步获取到第 8 步设置的响应结果
转载自:https://juejin.cn/post/7229539262910218299
评论
请登录