Redisson 同异步之间的互相切换
问题:redisson 这个 redis 客户端,底层也是用的netty,那就比较好奇了:netty是异步的,上层是同步的,要拿结果的,同时呢,redis协议也不可能按照redisson的要求,在请求和响应里携带请求id,那,它是怎么实现同步转异步的呢,异步结果回来后,又是怎么把结果对应上的呢?
有位前辈提出了这个问题,我也是非常好奇 redisson 是怎么做到的,看了之后也许对同异步又新的认识,带着这个问题,于是乎我就去 redisson 的源码去看了。
前辈博客: www.cnblogs.com/grey-wolf/p…
正常我们在 redis的客户端返回:
redis> HSET test1 1
(integer) 2
使用 redisson 代码如下:
RMap<Integer, Integer> map = client.getMap("test1");
// 异步执行
RFuture<Integer> future = map.getAsync(1);
future.onComplete((number, throwable) -> {
// 得到结果后的处理
if (number == null) {
System.out.println(throwable);
}
System.out.println(number);
});
// 同步执行
map.get(1);
先看异步,看源码还得断点,调用栈如下:
读思路
调用栈虽然有点长,但是我们看到 CommandDecoder 这个方法,肯定从接收消息来的。刚刚我们调用的 map.getAsync(1); 这段代码的时候肯定是发了一条请求到 redis 服务端的,那么这个请求很大可能就是发生这条消息的返回。
CommandDecoder:
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 从 channel 的 map 拿出来一个 QueueCommand
// 那么刚刚就有人存了这个 QueueCommand 进来
QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();
if (state() == null) {
state(new State());
}
// ...
decode(ctx, in, data);
// ...
}
// 真正的解析消息
protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, boolean skipConvertor, List<CommandData<?, ?>> commandsData) throws IOException {
if (code == '$') {
ByteBuf buf = readBytes(in);
Object result = null;
if (buf != null) {
Decoder<Object> decoder = selectDecoder(data, parts);
result = decoder.decode(buf, state());
}
// 已经得到结果结果是2,符合预期
handleResult(data, parts, result, false);
}
// 处理结果
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean skipConvertor) {
// ...
completeResponse(data, result);
// ...
}
// 发生结果
protected void completeResponse(CommandData<Object, Object> data, Object result) {
if (data != null) {
// 发生给监听结果的人
data.getPromise().trySuccess(result);
}
}
DefaultPromise 通知监听器:
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for(int i = 0; i < size; ++i) {
notifyListener0(this, a[i]);
}
}
结果:
处理结果(调用回我们自己的方法):
QueueCommand 的值:
我们可以看出来这个 command 就是我们刚刚发生给redis客户端的命令。
CommandData 的主要参数
public class CommandData<T, R> implements QueueCommand {
final RPromise<R> promise;
final RedisCommand<T> command;
final Object[] params;
final Codec codec;
final MultiDecoder<Object> messageDecoder;
}
DefaultPromise 的主要参数:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
private static final InternalLogger rejectedExecutionLogger = InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8, SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
private static final Object SUCCESS = new Object();
private static final Object UNCANCELLABLE = new Object();
private static final DefaultPromise.CauseHolder CANCELLATION_CAUSE_HOLDER = new DefaultPromise.CauseHolder(ThrowableUtil.unknownStackTrace(new CancellationException(), DefaultPromise.class, "cancel(...)"));
private static final StackTraceElement[] CANCELLATION_STACK;
private volatile Object result;
private final EventExecutor executor;
private Object listeners; // 监听器列表
private short waiters;
private boolean notifyingListeners;
}
先理一下读思路:
- 首先,接收到消息的时候,我们从 channel 那里尝试去取出 CommandData
- 如果有,带着 CommandData 一起去解析消息
- 解析出结果后,通知 CommandData 的所有监听器(调用我们定义的方法),通知完就删除监听器
思路清晰了,现在找到从什么地方把 CommandData 放进去 channel 和 监听器什么时候放进去 就可以了。
写思路
什么时候把 CommandData 放进去 channel 的?
其实 CURRENT_COMMAND 这个key就只有一个出处:
CommandsQueue :
public class CommandsQueue extends ChannelDuplexHandler {
// 我们要找的key
public static final AttributeKey<QueueCommand> CURRENT_COMMAND = AttributeKey.valueOf("promise");
// 多个命令是时候用
private final Queue<QueueCommandHolder> queue = new ConcurrentLinkedQueue<>();
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof QueueCommand) {
QueueCommand data = (QueueCommand) msg;
QueueCommandHolder holder = queue.peek();
if (holder != null && holder.getCommand() == data) {
super.write(ctx, msg, promise);
} else {
// 把当前命令包装成 QueueCommandHolder ,进入队列
queue.add(new QueueCommandHolder(data, promise));
// 发生消息
sendData(ctx.channel());
}
} else {
super.write(ctx, msg, promise);
}
}
// 发送消息
private void sendData(Channel ch) {
QueueCommandHolder command = queue.peek();
// 如果这次不止一个,特殊操作
if (command != null && command.trySend()) {
QueueCommand data = command.getCommand();
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) {
for (CommandData<Object, Object> cd : pubSubOps) {
for (Object channel : cd.getParams()) {
ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd);
}
}
} else {
// 第一次,或者写只有一个cmd
// 把当前 cmd 放在 channel key为 CURRENT_COMMAND
ch.attr(CURRENT_COMMAND).set(data);
}
command.getChannelPromise().addListener(listener);
ch.writeAndFlush(data, command.getChannelPromise());
}
}
}
注:ChannelDuplexHandler 是一个双向的 handler ,消息进出都会经过它。所以它在写 write() 时候的时候把当前 cmd 放到了 channel 上面。
什么时候我们的监听器放进去(RedissonPromise)?
再看 dome 代码:
// 加入异步监听
future.onComplete((number, throwable) -> {
if (number == null) {
System.out.println(throwable);
}
System.out.println(number);
});
RedissonPromise:
@Override
public void onComplete(BiConsumer<? super T, ? super Throwable> action) {
promise.addListener(f -> {
if (!f.isSuccess()) {
action.accept(null, f.cause());
return;
}
action.accept((T) f.getNow(), null);
});
}
再整理一下写的思路:
- CommandsQueue 会监听 write 方法
- 如果是 cmd命令就当前 cmd 命令放到 channel 和 放到 cmd队列中去
结合读写思路
- CommandsQueue 会监听 write 方法
- 如果是 cmd命令就当前 cmd 命令放到 channel 和 放到 cmd队列中去
- 接收到消息的时候,我们从 channel 那里尝试去取出 CommandData
- 如果有,带着 CommandData 一起去解析消息
- 解析出结果后,通知 CommandData 的所有监听器(调用我们定义的方法),通知完就删除监听器
问题
如果我们和 redis 突然失联了怎么办?(断线重连) 把当前队列清空,以免污染重新链接
CommandsQueue.channelInactive() 客户端断开时
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
while (true) {
// 把当前队列清空
QueueCommandHolder command = queue.poll();
if (command == null) {
break;
}
command.getChannelPromise().tryFailure(
new WriteRedisConnectionException("Channel has been closed! Can't write command: "
+ LogHelper.toString(command.getCommand()) + " to channel: " + ctx.channel()));
}
super.channelInactive(ctx);
}
同步怎么做的?
RMap<Integer, Integer> map = client.getMap("test1");
// 同步执行
map.get(1);
其实很简单,就不断循环 RFuture ,直到它有返回为止。
@Override
public <V> V get(RFuture<V> future) {
// ...
future.await();
// ...
}
public Promise<V> await() throws InterruptedException {
// 已经完成
if (this.isDone()) {
return this;
} else if (Thread.interrupted()) {
throw new InterruptedException(this.toString());
} else {
this.checkDeadLock();
synchronized(this) {
// 如果没有完成就不返回
while(!this.isDone()) {
this.incWaiters();
try {
// 停止一下
this.wait();
} finally {
this.decWaiters();
}
}
return this;
}
}
}
// 32767次循环就停止,不等了直接抛出异常
private void incWaiters() {
if (this.waiters == 32767) {
throw new IllegalStateException("too many waiters: " + this);
} else {
++this.waiters;
}
}
转载自:https://juejin.cn/post/6958658105609715719