Netty 是如何找到下一个可执行的 ChannelHandler?本文为稀土掘金技术社区首发签约文章,30天内禁止转载
我们知道事件在 ChannelPipeline 双向链表中传播,由于不是所有节点都能够执行该事件,所以它会顺着该双向链表往下找,找到能够执行该事件的节点,比如读事件,它需要找到实现了 channelRead()
的节点。考虑到效率问题,Netty 需要找到一种比较高效的方式来实现。
在 Netty 中有一个类 ChannelHandlerMask,每一个 ChannelHandlerContext 节点都有一个 executionMask 值,在构建 ChannelPipeline 链表新建 ChannelHandlerContext 的时候会通过 ChannelHandlerMask 计算出它的 executionMask 值,然后在 findContextXxx()
的时候根据传入的 mask 与当前 ChannelHandler 进行计算,判断当前 ChannelHandler 是否可执行当前事件,从而找出下一个可执行的 ChannelHandler.
ChannelHandlerMask
Netty 对所有事件都定义了 mask 值:
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
从这些成员变量的命名我们就清楚知道每个 mask 值对应哪个事件。从这些值的定义可以看出,Netty 将所有 inbound、outbound 和 Exception 的每一个事件都用一位 "1" 表示,一共 17 位。
计算 mask 值
在构建 ChannelPipeline 双向链表的时候,我们都会将 ChannelHandler 包装到一个 ChannelHandlerContext 中,在 Netty 中,每一个 ChannelHandlerContext 都会有一个 mask 值与之对应,即 executionMask(定义在 AbstractChannelHandlerContext 中),在我们新建 ChannelHandlerContext 的时候会计算该值:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
// ...
this.executionMask = mask(handlerClass);
}
利用 ChannelHandler 的 Class调用 mask()
计算 executionMask:
static int mask(Class<? extends ChannelHandler> clazz) {
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
先读缓存,如果存在就直接返回,否则调用 mask0()
,该方法比较繁琐,大明哥就只保留一个了:
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
// ...
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
// ...
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// ...
}
return mask;
}
该方法有两处 if :
ChannelInboundHandler.class.isAssignableFrom(handlerType)
:判断该类是否为 ChannelInboundHandler。isSkippable()
:该方法是用来判断某个 ChannelHandler 中的某个方法是否需要跳过。就是看对应方法上是否有@Skip
注解。
当 if 条件满足后 mask 值是如何变化的呢?假如一个 ChannelHandler 是 ChannelInboundHandler ,且需要跳过 channelRead()
,则 mask 值计算如下:
int mask = MASK_EXCEPTION_CAUGHT;
mask |= MASK_ALL_INBOUND;
mask &= ~MASK_CHANNEL_READ;
int mask = MASK_EXCEPTION_CAUGHT
初始化,其值为:
0000 0000 0000 0000 0001
第一步 mask |= MASK_ALL_INBOUND
:
0000 0000 0001 1111 1111
第二步 mask &= ~MASK_CHANNEL_READ
:
0000 0000 0001 1101 1111
经过这样列举后,你会发现,恰好是 MASK_CHANNEL_READ
所对应的位置变为 0,所以反推如果一个类重写了某个方法,那其对应的位置就是 1,如:
public class TestNameHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// ...
}
}
这了我们可以猜测他的 executionMask 值为 32,即 0000 0000 0000 0010 0000
,验证如下:
HeadContext 是 ChannelInboundHandler 和 ChannelOutboundHandler 的合体,其值为 0001 1111 1111 1111 1111
,即 131071,而 TailContext 是 ChannelInboundHandler ,其值为 0000 0001 1111 1111
,即 511。
应用 mask 值
ChannelHandlerContext 有了 executionMask 值后,Netty 就可以利用它来找下一个可执行的节点了。在 ChannelPipeline 事件传播中我们会递归调用 fireChannelRead()
来传播事件:
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
findContextInbound(MASK_CHANNEL_READ)
寻找下一个实现了 channelRead()
的节点,MASK_CHANNEL_READ
值为 1 << 5 = 0000 0000 0000 0010 0000
。
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
通过 do...while
循环来获取节点,直到 skipContext()
返回 false,则表示找到了可执行的节点。
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
我们以上面的 TestNameHandler 为例。
- ctx.executionMask:
0000 0000 0000 0010 0000
- onlyMask:
0000 0000 0001 1111 1110
,即 MASK_ONLY_INBOUND - mask:
0000 0000 0000 0010 0000
,即 MASK_CHANNEL_READ
上面表达式为:
(0000 0000 0000 0010 0000 & (0000 0000 0001 1111 1110 | 0000 0000 0000 0010 0000) == 0 ||
(0000 0000 0000 0010 0000 & 0000 0000 0000 0010 0000) == 0)
很明显为 false,则就是这个 ChannelHandler 的节点了。
这种方式是不是简单而又高效?不得不惊叹 Netty 设计的细节牛逼之处!!
转载自:https://juejin.cn/post/7407260232662958130