likes
comments
collection
share

水煮Redisson(二十)- 请求的执行过程【拆解】

作者站长头像
站长
· 阅读数 61

前言

这一章节主要研究在Redisson中,是怎么处理一个redis请求的。Redisson的处理核心类是【RedisExecutor】,所有的请求都会走到这里,经由Netty网络框架,与Redis服务进行数据交互。 这里有两个至关重要的回调句柄【future】

  • mainPromise:发起请求时创建,由当前线程持有,并监听等待结果;
  • attemptPromise:在RedisExecutor.execute()方法中创建,由Netty的线程持有,在redis服务返回数据之后,触发成功事件,结束监听。

下面是一个简单示例图:

水煮Redisson(二十)- 请求的执行过程【拆解】

发起请求,返回mainPromise

看一个简单的指令,获取hash缓存key的元素个数。在redisson中,对操作进行了读写分类处理,所有的操作,都会进入到处理类:CommandSyncService。 sizeAsync发送之后,会将mainPromise返回给发起请求的线程。至于如何从mainPromise中取得数据,这里不关注。

    public RFuture<Integer> sizeAsync() {
        return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName());
    }
	
	    @Override
    public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
		// 记住这个mainPromise,后续还会用到
        RPromise<R> mainPromise,后续还会用到 = createPromise();
        NodeSource source = getNodeSource(key);
        async(true, source, codec, command, params, mainPromise, false);
        return mainPromise;
    }

无论是同步异步,读或写操作,还是lua脚本执行,最终都会走到这个方法,这是Redisson中所有关于redis指令操作的入口。

    public <V, R> void async(boolean readOnlyMode, NodeSource source, Codec codec,
            RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, 
            boolean ignoreRedirect) {
        RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder);
        executor.execute();
    }

执行请求

执行请求,发送数据,注册attemptPromise事件监听,并等待结果 从上个方法中,我们看到了一个神秘的类:RedisExecutor,这里详细介绍一下其中的执行方法。

public void execute() {
    // 如果此操作已经被取消,则直接释放返回
    if (mainPromise.isCancelled()) {
        // 调用Netty中的方法:ReferenceCountUtil.safeRelease(obj);
        // 释放params中的元素
        free();
        return;
    }
    // 这里判断redisson是否关闭,请求的对象是InfinitySemaphoreLatch实例,如果redisson关闭,那么会执行shutdownLatch.close()方法
    // 如果已经close,那么在调用acquire()方法的时候,会直接返回false
    if (!connectionManager.getShutdownLatch().acquire()) {
        free();
        mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
        return;
    }
    // 获取解码器
    codec = getCodec(codec);
    // 获取redis链接,根据参数【readOnlyMode】来判断链接读写类型,如果是读操作,则从slaveBalancer获取;
    // 写操作链接从主节点获取
    RFuture<RedisConnection> connectionFuture = getConnection();

    RPromise<R> attemptPromise = new RedissonPromise<R>();
    // 监听链接的取消事件,如果链接被中断,则取消定时任务
    mainPromiseListener = (r, e) -> {
        if (mainPromise.isCancelled() && connectionFuture.cancel(false)) {
            log.debug("Connection obtaining canceled for {}", command);
            // 注意这个timeout,在前面介绍时间轮算法时,捎带介绍了一下,这是Netty中的定时任务,在HashedWheelTimer类中执行
            timeout.cancel();
            if (attemptPromise.cancel(false)) {
                free();
            }
        }
    };
    // 如果重试次数耗尽,则执行一下监听程序,判断是否需要终止任务
    if (attempt == 0) {
        mainPromise.onComplete((r, e) -> {
            if (this.mainPromiseListener != null) {
                this.mainPromiseListener.accept(r, e);
            }
        });
    }
    // 定时重试机制,在此方法中会创建timeoutTask,在task中调用当前方法【execute()】,并指定重试时间间隔
    // timeout = connectionManager.newTimeout(retryTimerTask, retryInterval, TimeUnit.MILLISECONDS);
    scheduleRetryTimeout(connectionFuture, attemptPromise);
    // 注册链接成功的事件监听,链接建立之后,发送指令到redis服务器
    connectionFuture.onComplete((connection, e) -> {
        // 发送指令到redis,返回writeFuture
        sendCommand(attemptPromise, connection);
        // 监听发送指令的结果
        writeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // 发送成功之后,取消原先的timeout定时任务,添加等待结果的定时任务,超时之后,再次调用当前方法【execute()】
                // scheduleResponseTimeout()方法中,timeoutTime是配置文件中的超时时间
                // timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
                checkWriteFuture(writeFuture, attemptPromise, connection);
            }
        });
        // 释放链接
        releaseConnection(attemptPromise, connectionFuture);
    });
    // attemptPromise取消、异常或者成功,都会触发此事件,进而执行方法:checkAttemptPromise
    attemptPromise.onComplete((r, e) -> {
        // 判断是否需要重试,如果不需要,则直接返回结果
        // 调用方法:handleResult(attemptFuture, connectionFuture)
        // 触发结果:mainPromise.trySuccess(res),在最外层调用future.get()拿到结果
        checkAttemptPromise(attemptPromise, connectionFuture);
    });
}

发送指令

注意参数中的attemptPromise,在上一小节的代码中可以看到,注册了onComplete监听,一旦此future结束,就会给客户端发送结果。 被忽略的小细节:如果发生【RedisAskException】异常,说明数据在redis集群中发生了迁移,需要进行重试。

protected void sendCommand(RPromise<R> attemptPromise, RedisConnection connection) {
		// 如果数据没有发生迁移,则直接发送
		// 经由Netty网络框架,发送网络数据包到redis服务,channel.writeAndFlush(data);
        writeFuture = connection.send(new CommandData<V, R>(attemptPromise, codec, command, params));

}

数据回传

数据回传时,如何获取attemptPromise句柄?发送数据包到redis服务的时候,会进由CommandsQueue处理,内部sendData方法将整个QueueCommand对象存储到每个客户端channel的CURRENT_COMMAND属性中。

    private void sendData(Channel ch) {
        QueueCommandHolder command = queue.peek();
        if (command != null && command.trySend()) {
            QueueCommand data = command.getCommand();
            // 保存数据到本地内存
            ch.attr(CURRENT_COMMAND).set(data);
            command.getChannelPromise().addListener(listener);
            // 发送数据
            ch.writeAndFlush(data, command.getChannelPromise());
        }
    }

在解码的时候,直接从本地内存获取

@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();
    // ... 
    decode(ctx, in, data);
}

Redis服务回传处理结果,触发attemptPromise成功事件

结果数据回写, Netty中的长连接读取到了数据,CommandDecoder是底层Netty服务中pipeline的一个handler类,对流入数据进行解码。父类ByteToMessageDecoder的channelRead方法接收到数据后,调用CommandDecoder的decode()方法处理数据。 在下面这个方法中,拿取上面介绍的attemptPromise句柄,触发成功事件,从而通知注册在future上的监听器处理后续逻辑。


protected void completeResponse(CommandData<Object, Object> data, Object result, Channel channel) {
    // 注意代码中的data.getPromise().trySuccess(result),这会触发attemptPromise.onComplete事件
    if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) {
        log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data));
    }
}

回写结果,触发mainPromise成功事件

attemptPromise触发成功事件之后,调用checkAttemptPromise方法,触发mainPromise的成功事件。mainPromise就是在发起请求的时候,返回给调用线程的那个future。

protected void handleSuccess(RPromise<R> mainPromise, RFuture<RedisConnection> connectionFuture, R res) {
    if (objectBuilder != null) {
        // 根据结果数据的类型,比如map,list,set等,对数据进行封装
        mainPromise.trySuccess((R) tryHandleReference(objectBuilder, res));
    } else {
        mainPromise.trySuccess(res);
    }
}
转载自:https://juejin.cn/post/7275533017312002085
评论
请登录