likes
comments
collection
share

面试官 :你知道哪些生产组件用了 Netty ? 怎么用的 ?

作者站长头像
站长
· 阅读数 6

👈👈👈 欢迎点赞收藏关注哟

一. 前言

之前说了,很多开源框架都是基于Netty做的底层通信,这一篇就以 RocketMQ 作为案例,来看看这些框架是怎么玩好 Netty的。

整个文章会以3个步骤来思考:

  • Netty 在初始化的时候做了什么?
  • Netty 在接收数据的时候做了什么?
  • 接收到的数据又是什么样的

二. RocketMQ 是怎么用

2.1 Spring 启动时对 Netty 做了什么 ?

首先说结论, RocketMQ 对 Netty 的最终使用类是 NettyRemotingClient ,当请求最终走到这里涉及几步 :

  • S1 : Spring 启动时初始化通过 afterPropertiesSet属性填充后执行)方法触发 Start 方法
  • S2 : 在 start 方法中执行 Bootstrap 的创建
  • S3 : 在创建 Topic 和 构建 Route 的环节执行 connect 操作

关于RocketMQ那些相关的流程不是本篇重点,就不深究了。从第二步开始:

// >>>> Bootstrap 的初始化过程--------------------------------
Bootstrap handler = 
// 1. eventLoopGroupWorker 做的是可上层传入的设计,但是实际上都是初始化的时候创建的
this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
   
     // 2. option 的配置省略,主要是超时时间 ,Buf 大小等等
    .option(ChannelOption.TCP_NODELAY, true)
     //......
    .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
        
            // 3. 重中之重,这里就是后续的处理逻辑链路,可以看到有加密解密一大堆东西,以及最重要的处理类
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(
                defaultEventExecutorGroup,
                new NettyEncoder(),
                new NettyDecoder(),
                new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                new NettyConnectManageHandler(),
                new NettyClientHandler());
        }
    });
    
    
// >>>> 连接开启 --------------------------------
private Channel createChannel(final String addr){
    ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));                   
}

有了上文的基础,可以大致看出处理流程了,其中最重要的就是为 pipeline 配置的处理链表,也就是说 :

  • Spring 启动时触发 bootstrap 即可
  • 为自己的 Netty 链路准备对应的处理链,对数据进行解析和翻译
  • 在处理链的最后一步传入到业务Handler 中

2.2 Handler 链表的处理

Netty 中通过一个 ChannelPiple 来处理整个 Handler 链表,在这个环节中涉及到以下对象 :

  • 接口 ChannelHandlerContext : 描述的是处理链(Pipeline)中的一个环节或者节点
    • channel() : 表示的是该上下文所属的 channel
    • handler() : 当前处理的 channelHandler 对象
    • executor(): 获取与该 ChannelHandlerContext 关联的 EventExecutor
    • fireChannelRead(Object msg): 触发 Channel 读取操作,将消息传递到下一个 ChannelHandler

S1 : 链表的流转 (用链表做解析和翻译

  • S1 : 上一个 Handler 进行业务处理,处理完成后调用 ctx.fireChannelRead
  • S2 : AbstractChannelHandlerContext 会触发 invokeChannelRead
  • S3 : 在其中调用 next.invokeChannelRead(m); 触发下一个
  • S4 : 代理方法会调用 ((ChannelInboundHandler) handler()).channelRead(this, msg); 进行实际的 Read 读取

面试官 :你知道哪些生产组件用了 Netty ? 怎么用的 ?

这个就不多说了,链表结构一目了然。

S2 : 接收的是数据样式

面试官 :你知道哪些生产组件用了 Netty ? 怎么用的 ?

2.3 接收后的数据流转

下面就开始深入学习 :

S1 : 底层的数据处理 -- DefaultEventExecutor (你接受到的只是一堆 Byte

一开始数据是保存到一个 Buf 对象中的。一般是 PooledUnsafeDirectByteBuf,即为缓冲池

  • PooledUnsafeDirectByteBuf : 用于在 Netty 中进行内存管理,可以将数据存储在直接缓冲区内存中,提高数据访问速度
    • 一般情况下的通信,底层都是通过 Byte 进行数据传输的
    • 而当 byte 数据到 Netty 后 , 数据首先被放在上述缓冲区对象中
  • 为什么需要缓冲池 ?
      1. 提高效率 :Byte 不是最小的传输单元,在网络中数据会以包的形式进行传输
      1. 解耦 : 当多个线程进行 NIO 处理时,可以放在各自的缓冲区
      1. 安全 : 缓冲区提供了内存管理的能力,可以避免内存溢出
  • 关键点 : 数据一次性是接收不完的
    • Netty 会对数据进行切分,然后把每个小块写入Buffer中 ,用于发送或者接收
    • 如果接收到的数据大小超过了Buffer的容量,会触发一个OutOfBufferSpace异常
    • 发送方 :通过将大块数据切分成多个小块,然后将每个小块写入Buffer中进行发送
    • 接收方 :将读取到的数据追加到一个汇总Buffer中,然后根据需要对汇总Buffer中的数据进行处理

面试官 :你知道哪些生产组件用了 Netty ? 怎么用的 ?

S2 : 对数据进行转换 -- ByteToMessageDecoder (把 Byte 转换为业务可读的对象

RocketMQ 中会触发一个 ByteToMessageDecoder 的 Handler ,该 Handler 会进行 Byte 到 实际对象的转换 到了这里 Message 对象已经变成 RemotingCommand ,也就是变成了一个可读对象

面试官 :你知道哪些生产组件用了 Netty ? 怎么用的 ?

S3 : 最终业务处理 -- NettyClientHandler (见下文)

2.4 接收到数据怎么处理 ?

最终上文的 pipeline 会传到最终的业务处理类 NettyClientHandler

  • 👉 S1 : 接收到消息后 C- NettyRemotingAbstract # processMessageReceived
    • 首先调用 processRequestCommand 对 请求进行处理
    • 同样 ,Netty 是双向的,在这里也可以对 Response 进行处理 (processResponseCommand)
  • 👉 S2 : 创建一个线程 C- NettyRemotingAbstract # processRequestCommand
    • 在这个流程中所有直接创建了一个 Runner 对象
    • 在把这个 Runner 对象封装成 RequestTask
    • Pair<NettyRequestProcessor, ExecutorService> 对象的线程池进行 submit 执行
  • 👉 S3 : 调用到实际的业务逻辑 C- ClientRemotingProcessor # processRequest
    • 其中会对很多的请求类型进行处理
  • 👉 S4 : 业务处理的起点
    • 最后在 ClientRemotingProcessor # consumeMessageDirectly
    • 在后面就是 RocketMQ 中的处理了,此篇不关注

面试官 :你知道哪些生产组件用了 Netty ? 怎么用的 ?

阶段总结

每次写源码其实反响一般,这一篇已经尽量讲流程,忽略细节,不知道效果怎么样。。。

面试官 :你知道哪些生产组件用了 Netty ? 怎么用的 ?

其实和案例里面的没什么区别,无非是自定义了一些专属的 Handler ,当数据传到最终的 NettyClientHandler 后,已经是可读的数据了,进行 Topic ,Route 的处理就行了。

当然其中还有很多小细节和优化,不在这一篇的分析范围内。

下文预告

Netty 生产级的使用一篇文章是说不清楚的。所以这个系列未来还有2-3篇的基础和最后的应用案例。