likes
comments
collection
share

Netty 是如何找到下一个可执行的 ChannelHandler?本文为稀土掘金技术社区首发签约文章,30天内禁止转载

作者站长头像
站长
· 阅读数 30

我们知道事件在 ChannelPipeline 双向链表中传播,由于不是所有节点都能够执行该事件,所以它会顺着该双向链表往下找,找到能够执行该事件的节点,比如读事件,它需要找到实现了 channelRead() 的节点。考虑到效率问题,Netty 需要找到一种比较高效的方式来实现。

Netty 是如何找到下一个可执行的 ChannelHandler?本文为稀土掘金技术社区首发签约文章,30天内禁止转载

在 Netty 中有一个类 ChannelHandlerMask,每一个 ChannelHandlerContext 节点都有一个 executionMask 值,在构建 ChannelPipeline 链表新建 ChannelHandlerContext 的时候会通过 ChannelHandlerMask 计算出它的 executionMask 值,然后在 findContextXxx() 的时候根据传入的 mask 与当前 ChannelHandler 进行计算,判断当前 ChannelHandler 是否可执行当前事件,从而找出下一个可执行的 ChannelHandler.

ChannelHandlerMask

Netty 对所有事件都定义了 mask 值:

    static final int MASK_EXCEPTION_CAUGHT = 1;
    static final int MASK_CHANNEL_REGISTERED = 1 << 1;
    static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
    static final int MASK_CHANNEL_ACTIVE = 1 << 3;
    static final int MASK_CHANNEL_INACTIVE = 1 << 4;
    static final int MASK_CHANNEL_READ = 1 << 5;
    static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
    static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
    static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
    static final int MASK_BIND = 1 << 9;
    static final int MASK_CONNECT = 1 << 10;
    static final int MASK_DISCONNECT = 1 << 11;
    static final int MASK_CLOSE = 1 << 12;
    static final int MASK_DEREGISTER = 1 << 13;
    static final int MASK_READ = 1 << 14;
    static final int MASK_WRITE = 1 << 15;
    static final int MASK_FLUSH = 1 << 16;
    
    static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
            MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
            MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
    private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
    static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
            MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
    private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;

从这些成员变量的命名我们就清楚知道每个 mask 值对应哪个事件。从这些值的定义可以看出,Netty 将所有 inbound、outbound 和 Exception 的每一个事件都用一位 "1" 表示,一共 17 位。

计算 mask 值

在构建 ChannelPipeline 双向链表的时候,我们都会将 ChannelHandler 包装到一个 ChannelHandlerContext 中,在 Netty 中,每一个 ChannelHandlerContext 都会有一个 mask 值与之对应,即 executionMask(定义在 AbstractChannelHandlerContext 中),在我们新建 ChannelHandlerContext 的时候会计算该值:

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                  String name, Class<? extends ChannelHandler> handlerClass) {
        // ...
        this.executionMask = mask(handlerClass);
    }

利用 ChannelHandler 的 Class调用 mask() 计算 executionMask:

    static int mask(Class<? extends ChannelHandler> clazz) {
        Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
        Integer mask = cache.get(clazz);
        if (mask == null) {
            mask = mask0(clazz);
            cache.put(clazz, mask);
        }
        return mask;
    }

先读缓存,如果存在就直接返回,否则调用 mask0(),该方法比较繁琐,大明哥就只保留一个了:

    private static int mask0(Class<? extends ChannelHandler> handlerType) {
        int mask = MASK_EXCEPTION_CAUGHT;
        try {
            if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
                mask |= MASK_ALL_INBOUND;

                if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                    mask &= ~MASK_CHANNEL_READ;
                }
                // ...
            }

            if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
                // ...
            }

            if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
                mask &= ~MASK_EXCEPTION_CAUGHT;
            }
        } catch (Exception e) {
            // ...
        }

        return mask;
    }

该方法有两处 if :

  • ChannelInboundHandler.class.isAssignableFrom(handlerType):判断该类是否为 ChannelInboundHandler。
  • isSkippable():该方法是用来判断某个 ChannelHandler 中的某个方法是否需要跳过。就是看对应方法上是否有 @Skip 注解。

当 if 条件满足后 mask 值是如何变化的呢?假如一个 ChannelHandler 是 ChannelInboundHandler ,且需要跳过 channelRead() ,则 mask 值计算如下:

int mask = MASK_EXCEPTION_CAUGHT;
    mask |= MASK_ALL_INBOUND;
    mask &= ~MASK_CHANNEL_READ;

int mask = MASK_EXCEPTION_CAUGHT 初始化,其值为:

0000 0000 0000 0000 0001

第一步 mask |= MASK_ALL_INBOUND

0000 0000 0001 1111 1111

第二步 mask &= ~MASK_CHANNEL_READ

0000 0000 0001 1101 1111

经过这样列举后,你会发现,恰好是 MASK_CHANNEL_READ 所对应的位置变为 0,所以反推如果一个类重写了某个方法,那其对应的位置就是 1,如:

public class TestNameHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // ...
    }
}

这了我们可以猜测他的 executionMask 值为 32,即 0000 0000 0000 0010 0000,验证如下:

Netty 是如何找到下一个可执行的 ChannelHandler?本文为稀土掘金技术社区首发签约文章,30天内禁止转载

HeadContext 是 ChannelInboundHandler 和 ChannelOutboundHandler 的合体,其值为 0001 1111 1111 1111 1111,即 131071,而 TailContext 是 ChannelInboundHandler ,其值为 0000 0001 1111 1111,即 511。

应用 mask 值

ChannelHandlerContext 有了 executionMask 值后,Netty 就可以利用它来找下一个可执行的节点了。在 ChannelPipeline 事件传播中我们会递归调用 fireChannelRead() 来传播事件:

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }

findContextInbound(MASK_CHANNEL_READ) 寻找下一个实现了 channelRead() 的节点,MASK_CHANNEL_READ 值为 1 << 5 = 0000 0000 0000 0010 0000

    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.next;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
        return ctx;
    }

通过 do...while 循环来获取节点,直到 skipContext() 返回 false,则表示找到了可执行的节点。

    private static boolean skipContext(
            AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
        return (ctx.executionMask & (onlyMask | mask)) == 0 ||
                (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
    }

我们以上面的 TestNameHandler 为例。

  • ctx.executionMask: 0000 0000 0000 0010 0000
  • onlyMask:0000 0000 0001 1111 1110,即 MASK_ONLY_INBOUND
  • mask:0000 0000 0000 0010 0000,即 MASK_CHANNEL_READ

上面表达式为:

(0000 0000 0000 0010 0000 & (0000 0000 0001 1111 1110 | 0000 0000 0000 0010 0000) == 0 ||
                (0000 0000 0000 0010 0000 & 0000 0000 0000 0010 0000) == 0)

很明显为 false,则就是这个 ChannelHandler 的节点了。

这种方式是不是简单而又高效?不得不惊叹 Netty 设计的细节牛逼之处!!

转载自:https://juejin.cn/post/7407260232662958130
评论
请登录