水煮Redisson(二十)- 请求的执行过程【拆解】
前言
这一章节主要研究在Redisson中,是怎么处理一个redis请求的。Redisson的处理核心类是【RedisExecutor】,所有的请求都会走到这里,经由Netty网络框架,与Redis服务进行数据交互。 这里有两个至关重要的回调句柄【future】
- mainPromise:发起请求时创建,由当前线程持有,并监听等待结果;
- attemptPromise:在RedisExecutor.execute()方法中创建,由Netty的线程持有,在redis服务返回数据之后,触发成功事件,结束监听。
下面是一个简单示例图:
发起请求,返回mainPromise
看一个简单的指令,获取hash缓存key的元素个数。在redisson中,对操作进行了读写分类处理,所有的操作,都会进入到处理类:CommandSyncService。 sizeAsync发送之后,会将mainPromise返回给发起请求的线程。至于如何从mainPromise中取得数据,这里不关注。
public RFuture<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName());
}
@Override
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
// 记住这个mainPromise,后续还会用到
RPromise<R> mainPromise,后续还会用到 = createPromise();
NodeSource source = getNodeSource(key);
async(true, source, codec, command, params, mainPromise, false);
return mainPromise;
}
无论是同步异步,读或写操作,还是lua脚本执行,最终都会走到这个方法,这是Redisson中所有关于redis指令操作的入口。
public <V, R> void async(boolean readOnlyMode, NodeSource source, Codec codec,
RedisCommand<V> command, Object[] params, RPromise<R> mainPromise,
boolean ignoreRedirect) {
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder);
executor.execute();
}
执行请求
执行请求,发送数据,注册attemptPromise事件监听,并等待结果 从上个方法中,我们看到了一个神秘的类:RedisExecutor,这里详细介绍一下其中的执行方法。
public void execute() {
// 如果此操作已经被取消,则直接释放返回
if (mainPromise.isCancelled()) {
// 调用Netty中的方法:ReferenceCountUtil.safeRelease(obj);
// 释放params中的元素
free();
return;
}
// 这里判断redisson是否关闭,请求的对象是InfinitySemaphoreLatch实例,如果redisson关闭,那么会执行shutdownLatch.close()方法
// 如果已经close,那么在调用acquire()方法的时候,会直接返回false
if (!connectionManager.getShutdownLatch().acquire()) {
free();
mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
return;
}
// 获取解码器
codec = getCodec(codec);
// 获取redis链接,根据参数【readOnlyMode】来判断链接读写类型,如果是读操作,则从slaveBalancer获取;
// 写操作链接从主节点获取
RFuture<RedisConnection> connectionFuture = getConnection();
RPromise<R> attemptPromise = new RedissonPromise<R>();
// 监听链接的取消事件,如果链接被中断,则取消定时任务
mainPromiseListener = (r, e) -> {
if (mainPromise.isCancelled() && connectionFuture.cancel(false)) {
log.debug("Connection obtaining canceled for {}", command);
// 注意这个timeout,在前面介绍时间轮算法时,捎带介绍了一下,这是Netty中的定时任务,在HashedWheelTimer类中执行
timeout.cancel();
if (attemptPromise.cancel(false)) {
free();
}
}
};
// 如果重试次数耗尽,则执行一下监听程序,判断是否需要终止任务
if (attempt == 0) {
mainPromise.onComplete((r, e) -> {
if (this.mainPromiseListener != null) {
this.mainPromiseListener.accept(r, e);
}
});
}
// 定时重试机制,在此方法中会创建timeoutTask,在task中调用当前方法【execute()】,并指定重试时间间隔
// timeout = connectionManager.newTimeout(retryTimerTask, retryInterval, TimeUnit.MILLISECONDS);
scheduleRetryTimeout(connectionFuture, attemptPromise);
// 注册链接成功的事件监听,链接建立之后,发送指令到redis服务器
connectionFuture.onComplete((connection, e) -> {
// 发送指令到redis,返回writeFuture
sendCommand(attemptPromise, connection);
// 监听发送指令的结果
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 发送成功之后,取消原先的timeout定时任务,添加等待结果的定时任务,超时之后,再次调用当前方法【execute()】
// scheduleResponseTimeout()方法中,timeoutTime是配置文件中的超时时间
// timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
checkWriteFuture(writeFuture, attemptPromise, connection);
}
});
// 释放链接
releaseConnection(attemptPromise, connectionFuture);
});
// attemptPromise取消、异常或者成功,都会触发此事件,进而执行方法:checkAttemptPromise
attemptPromise.onComplete((r, e) -> {
// 判断是否需要重试,如果不需要,则直接返回结果
// 调用方法:handleResult(attemptFuture, connectionFuture)
// 触发结果:mainPromise.trySuccess(res),在最外层调用future.get()拿到结果
checkAttemptPromise(attemptPromise, connectionFuture);
});
}
发送指令
注意参数中的attemptPromise,在上一小节的代码中可以看到,注册了onComplete监听,一旦此future结束,就会给客户端发送结果。 被忽略的小细节:如果发生【RedisAskException】异常,说明数据在redis集群中发生了迁移,需要进行重试。
protected void sendCommand(RPromise<R> attemptPromise, RedisConnection connection) {
// 如果数据没有发生迁移,则直接发送
// 经由Netty网络框架,发送网络数据包到redis服务,channel.writeAndFlush(data);
writeFuture = connection.send(new CommandData<V, R>(attemptPromise, codec, command, params));
}
数据回传
数据回传时,如何获取attemptPromise句柄?发送数据包到redis服务的时候,会进由CommandsQueue处理,内部sendData方法将整个QueueCommand对象存储到每个客户端channel的CURRENT_COMMAND属性中。
private void sendData(Channel ch) {
QueueCommandHolder command = queue.peek();
if (command != null && command.trySend()) {
QueueCommand data = command.getCommand();
// 保存数据到本地内存
ch.attr(CURRENT_COMMAND).set(data);
command.getChannelPromise().addListener(listener);
// 发送数据
ch.writeAndFlush(data, command.getChannelPromise());
}
}
在解码的时候,直接从本地内存获取
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();
// ...
decode(ctx, in, data);
}
Redis服务回传处理结果,触发attemptPromise成功事件
结果数据回写, Netty中的长连接读取到了数据,CommandDecoder是底层Netty服务中pipeline的一个handler类,对流入数据进行解码。父类ByteToMessageDecoder的channelRead方法接收到数据后,调用CommandDecoder的decode()方法处理数据。 在下面这个方法中,拿取上面介绍的attemptPromise句柄,触发成功事件,从而通知注册在future上的监听器处理后续逻辑。
protected void completeResponse(CommandData<Object, Object> data, Object result, Channel channel) {
// 注意代码中的data.getPromise().trySuccess(result),这会触发attemptPromise.onComplete事件
if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data));
}
}
回写结果,触发mainPromise成功事件
attemptPromise触发成功事件之后,调用checkAttemptPromise方法,触发mainPromise的成功事件。mainPromise就是在发起请求的时候,返回给调用线程的那个future。
protected void handleSuccess(RPromise<R> mainPromise, RFuture<RedisConnection> connectionFuture, R res) {
if (objectBuilder != null) {
// 根据结果数据的类型,比如map,list,set等,对数据进行封装
mainPromise.trySuccess((R) tryHandleReference(objectBuilder, res));
} else {
mainPromise.trySuccess(res);
}
}
转载自:https://juejin.cn/post/7275533017312002085