likes
comments
collection
share

Netty 网络编程

作者站长头像
站长
· 阅读数 34

Netty 入门

概述

什么是 Netty?

  • 异步:用的多线程,不是异步 IO
  • 基于事件驱动:表示用的 Selector

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

Netty 的地位

Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位

以下的框架都使用了 Netty,因为它们有网络通信需求!

  • Cassandra - nosql 数据库
  • Spark - 大数据分布式计算框架
  • Hadoop - 大数据分布式存储框架
  • RocketMQ - ali 开源的消息队列
  • ElasticSearch - 搜索引擎
  • gRPC - rpc 框架
  • Dubbo - rpc 框架
  • Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
  • Zookeeper - 分布式协调框架

Netty 的优势

Netty vs NIO,工作量大,bug 多

  • 需要自己构建协议
  • 解决 TCP 传输问题,如粘包、半包
  • epoll 空轮询导致 CPU 100%
  • 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer

Netty vs 其它网络应用框架

  • Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀

创建服务器 / 客户端

  1. 首先创建启动器
  2. 创建NioEventLoopGroup基于NIO服务端实现
  3. childHandler表示添加的处理器都是给SocketChannel用的
  4. ChannelInitializer 仅仅执行一次
  5. 客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
  6. new ServerBootstrap().bind 绑定监听端口

服务端

// 1. 启动器,负责组装 netty组件,启动服务器
new ServerBootstrap()
        // BossEventLoop,WorkerEventLoop(selector, thread)
        // 创建一个事件循环线程组,适用于 NIO 实现,可以简单理解为 `线程池 + Selector`
        .group(new NioEventLoopGroup())
        // 指定要使用的服务器 Channel 实现,适用于 NIO 实现
        .channel(NioServerSocketChannel.class) // OIO BIO
        // BOSS 负责处理连接Work(child)负责处理读写,决定了work(child)能执行哪些(handler)
        // 配置服务器 Channel 处理器,即 ChannelPipeline 中的一组 ChannelHandler
        .childHandler(
                // 5. channel代表客户端进行数据读写的通道Initializer初始化,负责添加到别的 handler
                new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 添加具体的 handler
                        // 将 bytebuf 转换为字符串
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        // 自定义handler
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override// 读事件
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                // 打印转好的字符串
                                System.out.println(msg);
                            }
                        });
                    }
                })
        // 监听端口
        .bind(8080);

客户端

// 启动类
new Bootstrap()
        // 添加 EventLoop
        .group(new NioEventLoopGroup())
        // 选择客户端 channel 实现
        .channel(NioSocketChannel.class)
        // 添加处理器
        .handler(new ChannelInitializer<NioSocketChannel>() {
            @Override // 在建立连接后被调用
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                // 把字符串编码成字节
                nioSocketChannel.pipeline().addLast(new StringEncoder());
            }
        })
        // 连接服务器
        .connect(new InetSocketAddress("localhost", 8080))
        .sync()
        .channel()
        // 向服务器发送数据
        .writeAndFlush("hello, world");

流程分析

  1. 先创建启动器类
  2. 添加组件,eventloop(内部就有线程和选择器不断循环,查找事件)
  3. 选择NIOServerSocket 实现
  4. 添加处理器,只有连接事件发生之后才会执行 initChannel初始化方法
  5. 绑定监听端口(服务端就到这里)
  6. (客户端)创建启动器和eventloop
  7. 客户端选择socket事件
  8. 添加处理器,也是等连接建立才会执行初始化方法
  9. 最后连接服务器
  10. 服务器监听到accept事件之后
  11. 最后找处理器处理这个事件
  12. (我们看不懂事线),连接建立后调用初始化方法
  13. 客户端 sync 只有连接之后才会继续执行
  14. channel() 这个是连接对象
  15. 最后就可以读写
  16. 发数据就会走到处理器内部
  17. 进行转为字节数组进行 bytebuf进行发送
  18. 服务端的eventloop就会监听到读事件
  19. 走到了服务器的处理器,进行处理

channel数据传输通道,可读出来可写进。和nio概念一致

handel中的message流动数据,handel 是一个工序,对原始数据进行一道道工序进行处理

pipeline就是流水线,一道工序,进行添加工序。

handel 分为 inbound和outbound 入站和出站 读入就走入站,写出就走出站

eventloop就是线程,相当于工人 一旦某一个工人负责一个事件,那就会负责到底。一个工人是可以管理多个事件的

eventloop既可以执行io操作,也可以进行普通任务,每个工人都有自己的任务队列,依次处理。底层用的单线程的线程池。

任务可以是定时任务也可以是普通任务

组件

EventLoop

EventLoop是事件循环对象

本质是一个单线程执行器,同时维护了Selector,里面有run方法处理 channel 上源源不断的 io 事件。

继承关系:

  • 一条线是继承自 java.util.concurrent.ScheduledExecutorService
  • 因此包含了线程池的所有方法
  • 另一条是继承自 netty 自己的 OrderedEventExecutor
  • 提供了 Boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
  • 提供了 parent方法来看看自己属于哪个 EventLoopGroup
// eventloop
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    EventLoopGroup parent();
}
// 继承线程池接口
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {

EventLoopGroup 事件循环组,我们一般使用这个

EventLoopGroup 是一组 EventLoop,channel 一般会调用 EventLoopGroup 的 register 方法俩绑定其中一个 EventLoop,后续这个 channel 上的 io 事件都由同一个 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
  • 实现了 Iterable 接口提供遍历 EventLoop 的能力
  • 另有 next 方法获取集合中下一个 EventLoop

我们平常使用的是下面两个循环组实现类

// 可以实现普通任务,io事件,定时任务
EventLoopGroup nioGroup = new NioEventLoopGroup();
// 普通任务和定时任务
EventLoopGroup defaultGroup = new DefaultEventLoopGroup();

那么我们空参创建对象,线程数是多少呢

下面这段代码,是用来初始化 EventLoopGroup对象的。

如果没有指定线程数,会采用默认的,也就是当前系统的CPU核心数 * 2

/**
 * 构造函数,创建MultithreadEventLoopGroup对象
 * @param nThreads 表示EventLoopGroup中EventLoop的数量,如果为0则使用默认的线程数
 * @param executor 用于执行任务的Executor对象
 * @param args 可选参数列表
 */
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    // 调用父类的构造函数,初始化EventLoopGroup对象
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

获取当前CPU核数--我的是 16核

NettyRuntime.availableProcessors();

指定线程数量

EventLoopGroup nioGroup = new NioEventLoopGroup(2);

获取下一个线程

// 创建两个线程的事件循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 获取下一个线程
System.out.println(nioGroup.next()); // 第一次打印第一个
System.out.println(nioGroup.next()); // 打印第二个
System.out.println(nioGroup.next()); // 打印第一个,因为一共就两个
System.out.println(nioGroup.next()); // 打印第二个
// io.netty.channel.nio.NioEventLoop@737996a0
// io.netty.channel.nio.NioEventLoop@61dc03ce
// io.netty.channel.nio.NioEventLoop@737996a0
// io.netty.channel.nio.NioEventLoop@61dc03ce

执行普通任务

  • 这里的execute和submit都是一样的效果
// 创建两个线程的事件循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 执行普通任务
nioGroup.next().execute(() -> {
   log.debug("ok");
});
log.debug("main");

定时任务

// 创建两个线程的事件循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 执行定时任务 以一定的频率执行
// 参数1:执行的方法
// 参数2:表示第一次启动多久开始
// 参数3:表示间隔多久再次触发
// 参数4:单位
nioGroup.next().scheduleAtFixedRate(() -> {
   log.debug("ok");
}, 3, 5, TimeUnit.SECONDS);
log.debug("main");

IO 事件

下面是服务端,客户端同上面。

一个客户端绑定一个服务的 EventLoop线程。

下面再编写客户端的时候,如果要打上断点实现阻塞,需要将idea的断点卡成单线程的

Netty 网络编程

new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    // 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        // 这个实际开发是要指定字符类型的,不要默认
                        log.debug(buf.toString(Charset.defaultCharset()));
                    }
                });
            }
        })
        .bind(8080);

我们来分工细化一下

  1. 我们可以把 eventloop 划分为 boss 和 work
  2. 将accept 和 read 分开处理

那 第一个事件循环组 线程是否可以设置为 1 呢

因为服务器这一有个,它也只会和里面一个 eventloop 进行绑定

new ServerBootstrap()
        // 将 参数1:只处理accept 参数2:处理read
        .group(new NioEventLoopGroup(), new NioEventLoopGroup())

继续分工细化--如果其中一个Nio线程执行中 在读操作时执行太久,会影响其他 channel读操作 最好不要让它占用 work nio线程,所以我们继续细分

  1. 创建独立的事件循环对象,因为不需要进行io所以是普通的
  2. 将下个处理器绑定上。
// 细分2:创建独立的 EventLoopGroup
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
        .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline()
                        .addLast("handler1", new ChannelInboundHandlerAdapter() {
                            @Override
                            // 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                // 这个实际开发是要指定字符类型的,不要默认
                                log.debug(buf.toString(Charset.defaultCharset()));
                                // 让消息传递给下一个handler
                                ctx.fireChannelRead(msg);
                            }
                        })
                        // 指定事件循环组和名称
                        .addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                            @Override
                            // 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                // 这个实际开发是要指定字符类型的,不要默认
                                log.debug(buf.toString(Charset.defaultCharset()));
                            }
                        });
            }
        })
        .bind(8080);

Netty 网络编程

那么是怎么实现切换事件循环组,也就是换人处理的呢?

如果两个 handler 绑定的是同一个线程,那么直接调用,否则调用的代码封装为一个任务对象。由一下一个 handler的线程调用

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
    /**
    *    EventExecutor 是事件循环组
    */
    EventExecutor executor = next.executor();
    
    // 当前 handler 中的线程是否和 eventloop 是同一个线程
    // 是,直接调用
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } 
    // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
    else {
    // 使用runnable 执行一个事件
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

Channel & 连接 & 关闭问题 & 异步

channel 的主要作用

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入,只是写进入缓冲区
  • writeAndFlush() 方法将数据写入并刷出

connect 连接问题

connect:是异步阻塞,main方法发起,执行是Nio的线程

所以如果在发送数据前,一定要保证已经连接好了,否则数据是发送不出去的

ChannelFuture:带有Future或者 promise都是和异步配套使用,用来处理结果

ChannelFuture channelFuture = new Bootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline().addLast(new StringEncoder());
            }
        })
        // connect 是异步阻塞,main方法发起调用,真正执行连接的是Nio线程
        .connect(new InetSocketAddress("localhost", 8080));
//        channelFuture.sync();
// 会无阻塞向下执行获取channel,最后发送数据
Channel channel = channelFuture.channel();
//        log.debug("{}", channel); // 如果没有获取到打印的就是没有连接的 channel,所以需要执行 sync
channel.writeAndFlush("123");

解决连接问题,保证发送数据前一定是正确连接的

  1. sync
// 阻塞当前线程,直到连接建立完毕
channelFuture.sync();
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("123");
  1. 将执行发送的代码,交给nio线程。
  • addListener(回调对象)方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    // 在nio 线程连接建立好之后,会调用operationComplete
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        Channel channel = channelFuture.channel();
        log.debug("{}", channel);
        channel.writeAndFlush("123");
    }
});

关闭问题

我们看下面这段代码

我们可以输入内容,发送给服务端,按 q 关闭传输通道

但是我们关闭的时候,想做一些操作,怎么整 直接在close下面写吗,那是不对的,因为close方法是异步的

channelFuture.sync();
Channel channel = channelFuture.channel();
new Thread(new Runnable() {
    @Override
    public void run() {
        Scanner sc = new Scanner(System.in);
        while (true) {
            String line = sc.nextLine();
            if ("q".equals(line)) {
                channel.close();
                break;
            }
            channel.writeAndFlush(line);
        }
    }
}, "input").start();

解决异步问题

添加日志调试

nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
  1. 同步解决---由主线程处理关闭操作
new Thread(() -> {
    Scanner sc = new Scanner(System.in);
    while (true) {
        String line = sc.nextLine();
        if ("q".equals(line)) {
            channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}, "input").start();
// 添加关闭处理器
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();// 设置同步
log.debug("处理关闭之后....");
  1. 异步解决--由 nio 线程处理
new Thread(() -> {
    Scanner sc = new Scanner(System.in);
    while (true) {
        String line = sc.nextLine();
        if ("q".equals(line)) {
            channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}, "input").start();
ChannelFuture closeFuture = channel.closeFuture();

// 设置关闭回调函数
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        log.debug("处理关闭之后....");
    }
});

优雅的关闭

  • 它会等待所有正在处理的任务完成后,再进行关闭。
NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)

closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        log.debug("处理关闭之后的操作");
        group.shutdownGracefully(); // 优雅的关闭
    }
});

异步

异步:就是一个线程发起连接一个线程去建立连接

思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96个病人

Netty 网络编程

那我们就可以细分一下,四个医生分别处理四个事情

只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12 效率几乎是原来的四倍

Netty 网络编程

重点:

  1. 单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势
  2. 异步并没有缩短响应时间,反而有所增加
  3. 合理进行任务拆分,也是利用异步的关键

Future & Promise

处理异步时:经常用到的两个接口, futer 核 promise

首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future:只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise;不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果

jdk future

// 线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 提交任务
Future<Integer> future = pool.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        log.debug("执行计算"); // poll 线程
        Thread.sleep(1000);
        return 50;
    }
});
// 主线程通过 future 获取结果
log.debug("等待结果"); // 主线程
log.debug("结果是 {}", future.get()); // 主线程

nio future

// 创建事件循环组
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
// 获取执行事件
EventLoop eventLoop = eventGroup.next();
// 执行方法
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        log.debug("执行计算"); // nio 线程
        Thread.sleep(1000);
        return 50;
    }
});

同步获取线程返回结果

// 主线程通过 future 获取结果
log.debug("等待结果"); // 主线程
log.debug("结果是 {}", future.get()); // 主线程

异步

future.addListener(new GenericFutureListener<Future<? super Integer>>() {
    @Override
    public void operationComplete(Future<? super Integer> future) throws Exception {
        log.debug("等待结果"); // nio 线程
        log.debug("结果是 {}", future.getNow()); // nio 线程
    }
});

promise 获取结果

// 1. 准备 EventLoop 对象
EventLoop loop = new NioEventLoopGroup().next();
// 2. 主动创建 promise 结果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(loop);
new Thread(new Runnable() {
    @Override
    public void run() {
        // 3. 任意一个线程执行计算,计算完毕使用 promise 填充结果
        log.debug("开始计算...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            // 填写失败的数据
            promise.setFailure(e);
        }
        // 填写成功的数据
        promise.setSuccess(90);
    }
}).start();
log.debug("等待结果...");
log.debug("结果是:{}", promise.get());

handler & pipeline

ChannelHandle 用来处理 Channel 上的各种事件,分为入站和出站两种。

当所有的ChannelHandle 连在一起就是 pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

入站是按照顺序执行的也就是, 会打印1,2,3

出站是按照逆序的方式,会打印6,5,4

new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                // 添加入站处理器
                nioSocketChannel.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        log.debug("1");
                        super.channelRead(ctx, msg);
                        // 使用通道写入数据并刷新。ctx通道信息,获取上下文的分配器,创建bufer输出数据
                        nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes()));
                    }
                });
                // 同上
                        log.debug("2");
                // 同上
                        log.debug("3");
                //
                // 添加出站处理器
                nioSocketChannel.pipeline().addLast("h4", new ChannelOutboundHandlerAdapter() {
                    @Override
                    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                        log.debug("4");
                        super.write(ctx, msg, promise);
                    }
                });
                // 同上
                        log.debug("5");
                //
                // 同上
                        log.debug("6");
                //
            }
        })
        .bind(8080);

出站处理器 当我们用 channel 的方法发送数据,会从尾部开始依次执行出站过滤器 当用方法内部的 ctx channel的方法发送,会从当前位置开始往前找

调试出站入站,handler

public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("4");
                super.write(ctx, msg, promise);
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        // 模拟入站操作
//        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
        // 模拟出站操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
    }

bytebuf

创建了一个默认的 ByteBuf(池化基于直接内存的 ByteBuf),初始容量是 10

如果不指定,默认256,扩容 * 2

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
public static void main(String[] args) {
    // 创建默认空间的 bytebuf,空间容量默认256,超过空间容量,会自动扩容
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // read index:0 write index:0 capacity:256
    log(buffer);
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < 300; i++) {
        sb.append("a");
    }
    buffer.writeBytes(sb.toString().getBytes());
    log(buffer);
    // read index:0 write index:300 capacity:512
}

// netty 调试 bytebuf 方法
private static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
            .append("read index:").append(buffer.readerIndex())
            .append(" write index:").append(buffer.writerIndex())
            .append(" capacity:").append(buffer.capacity())
            .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}

直接内存 & 堆内存

默认是创建的直接内存

创建堆内存

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

创建直接内存

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);

直接内存创建和销毁的代价昂贵,但读写效率高(少一次内存复制),适合配合池化功能一起用

直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

池化 vs 非池化

池化的最大意义在于可以重用 ByteBuf,优点有

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

下面的代码是在 vm虚拟机参数里设置的

-Dio.netty.allocator.type={unpooled|pooled}

NIO 的缓冲区池化能够有效地减少创建和销毁缓冲区对象的成本,提高程序的性能和吞吐量。

  • 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
  • 4.1 之前,池化功能还不成熟,默认是非池化实现
public static void main(String[] args) {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer();
    System.out.println(buffer.getClass());
    // class io.netty.buffer.PooledUnsafeDirectByteBuf
    /*
     * PooledUnsafeDirectByteBuf
     * Unpooled:非池化
     * Pooled:池化
     * Direct:直接内存
     * Heap:堆内存
     */
}

Bytebuf的组成部分

  • 废弃字节:表示已经读过了
  • 可读字节:未读的数据
  • 可写字节:未写入的数据
  • 可扩容:最大的容量,全部的

Netty 网络编程

butebuf 是读写两个指针,不需要切换读写模式,最开始读写指针都在 0 位置

写读方法

方法签名含义备注
writeBoolean(boolean value)写入 boolean 值用一字节 01|00 代表 true|false
writeByte(int value)写入 byte 值
writeShort(int value)写入 short 值
writeInt(int value)写入 int 值Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value)写入 int 值Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value)写入 long 值
writeChar(int value)写入 char 值
writeFloat(float value)写入 float 值
writeDouble(double value)写入 double 值
writeBytes(ByteBuf src)写入 netty 的 ByteBuf
writeBytes(byte[] src)写入 byte[]
writeBytes(ByteBuffer src)写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)写入字符串

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
  • 网络传输,默认习惯是 Big Endian

扩容规则是

  • 写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
  • 写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)
  • 扩容不能超过 max capacity 会报错

读入

// 只读入一个字节
buffer.readByte()

如果需要重复读取 int 整数 5,怎么办?

可以在 read 前先做个标记 mark

buffer.markReaderIndex();
System.out.println(buffer.readInt());//读入
log(buffer);

这时要重复读取的话,重置到标记位置 reset

buffer.resetReaderIndex();

retain & release(垃圾回收)

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

注意:因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 try finally里 relase,那么就失去了传递性,必须等 bytebuf 完成了它的使命,才可以。

基本规则是,谁是最后使用者,谁负责 release

入站 bytebuf 处理原则
对原始 ByteBuf 不做处理无须 release
将原始 ByteBuf 转换为其它类型的 Java 对象ByteBuf 就没用了,必须 release
不调用 ctx.fireChannelRead(msg) 向后传递必须 release
ByteBuf 没有成功传递到下一个必须 release

出站 ByteBuf 处理原则:由 HeadContext flush 后 release

异常处理原则

  • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

看看释放 bytebuf 的源码

尾部释放

在 io.netty.channel 的类中的内部类 TailContext 下面方法

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
}

继续跟进

protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
    this.onUnhandledInboundMessage(msg);
    if (logger.isDebugEnabled()) {
        logger.debug("Discarded message pipeline : {}. Channel : {}.", ctx.pipeline().names(), ctx.channel());
    }
}

这里就看到了 ReferenceCountUtil.release(msg); 用来释放

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

用来判断,如果是这个类型的,代表可以释放,否则返回false

public static boolean release(Object msg) {
    return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}

看看头部释放的源码

io.netty.channel下的HeadContext内部类的write方法

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    this.unsafe.write(msg, promise);
}

我们跟进 AbstractChannel实现类的 AbstractUnsafe内部类的 write方法

如果是通道是null 表示用计数方式关闭,并处理异常

this.assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
    try {
        ReferenceCountUtil.release(msg);
    } finally {
        this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.this.initialCloseCause, "write(Object, ChannelPromise)"));
    }

} else {
    int size;
    try {
        msg = AbstractChannel.this.filterOutboundMessage(msg);
        size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable var15) {
        try {
            ReferenceCountUtil.release(msg);
        } finally {
            this.safeSetFailure(promise, var15);
        }

        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}

slice 零拷贝体现之一

作用:减少内存复制

【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

切片之后,会对容量进行限制,切多少就是多少。不能再次添加

public class Server {

    public static void main(String[] args) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
        // 赋值是个数据
        buffer.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
        // 打印
        log(buffer);
        // 进行切片,这里是没有发生数据复制的

        // 从0 切到 5个,a-e
        ByteBuf f1 = buffer.slice(0, 5);
        // 从5 切到 5个,f-j
        ByteBuf f2 = buffer.slice(5, 5);
        log(f1);
        log(f2);
        System.out.println("===============");
        // 修该第0位也就是 a,发现原bytebuf也修改了
        f1.setByte(0, 'b');
        log(f1);
        log(buffer);
    }

    // netty 调试 bytebuf 方法
    private static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index:").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(NEWLINE);
        appendPrettyHexDump(buf, buffer);
        System.out.println(buf.toString());
    }
}

原 bytebuf 释放之后,切片的bytebuf是不能用了

解决上面的问题

我们用完那个,自动释放它自己的 bytebuf

工具类还是上面的

public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
    buffer.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
    log(buffer);
    ByteBuf f1 = buffer.slice(0, 5);
    // 让引用数据加1
    f1.retain();
    ByteBuf f2 = buffer.slice(5, 5);
    log(f1);
    log(f2);
    System.out.println("释放原 bytebuf");
    // 让原bytebuf 减1,释放不了真正的内存,没有减到0,不会释放内存
    buffer.release();
    log(f1);
}

duplicate

【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的

copy

会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关

零拷贝,合并小bytebuf

重新计算了 指针位置,也是比较麻烦, 还是要注意 relase问题哦

public static void main(String[] args) throws Exception {
    ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10);
    buf1.writeBytes(new byte[]{6, 7, 8, 9, 10});
    ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10);
    buf2.writeBytes(new byte[]{1, 2, 3, 4, 5});
    // 合并上面两个 buf,不发生复制
    // 默认不会自动调整写入的位置,必须带上true
    CompositeByteBuf bufs = ByteBufAllocator.DEFAULT.compositeBuffer();
    bufs.addComponents(true, buf1, buf2);
    log(bufs);
}

Unpooled

Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作

这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

ByteBuf 优势

  • 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf

双向通信

服务端

public static void main(String[] args) throws Exception {
    new ServerBootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buffer = (ByteBuf) msg;
                            System.out.println(buffer.toString(Charset.defaultCharset()));

                            // 建议使用 ctx.alloc() 创建 ByteBuf
                            ByteBuf response = ctx.alloc().buffer();
                            response.writeBytes(buffer);
                            ctx.writeAndFlush(response);

                            // 思考:需要释放 buffer 吗
                            // 从 Socket 读取到字节数据之后,将数据转换成 ByteBuf 对象。
                            // 在 Netty 的特殊实现下,当该消息被服务端处理完成
                            // 即当消息被写回客户端之后,Netty 会自动释放其占用的内存。
                            // 思考:需要释放 response 吗
                            // response 是我们手动创建的 ByteBuf 对象,而不是从消息中通过解码产生的,因此我们需要手动释放该对象
                            // 又因为我们向客户端写入数据时使用了 ctx.writeAndFlush(response)
                            // 该方法会自动在发送完成后将 response 进行释放,因此我们不需要手动释放该对象。
                            // 但如果是通过 write() 方法写入数据,那么我们需要在写入完成后手动调用 
                            // response.release() 方法对 response 对象进行释放。
                        }
                    });
                }
            }).bind(8080);
}

客户端

public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    Channel channel = new Bootstrap().group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new StringEncoder());
                    nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            try {
                                ByteBuf buffer = (ByteBuf) msg;
                                System.out.println(buffer.toString(Charset.defaultCharset()));
                            } finally {
                                // 释放
                                ReferenceCountUtil.release(msg);
                            }

                            // 思考:需要释放 buffer 吗
                            // 接收到的 butebuf需要显示释放,避免内存泄露
                            // 如果不,则会保留到程序退出
                        }
                    });
                }
            }).connect("localhost", 8080).sync().channel();
    channel.closeFuture().addListener(future -> {
        group.shutdownGracefully();
    });
    new Thread(() -> {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            String line = scanner.nextLine();
            if ("q".equals(line)) {
                channel.close();
                break;
            }
            channel.writeAndFlush(line);
        }
    }).start();
}

读和写误解

我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 BB 到 A 的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读

服务端

public class TestServer {
    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8888);
        Socket s = ss.accept();

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                // 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

客户端

public class TestClient {
    public static void main(String[] args) throws IOException {
        Socket s = new Socket("localhost", 8888);

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

netty 进阶

在Client中:在入站处理器中,channelActive方法会在客户端连接成功后触发。

  • option(ChannelOption.SO_RCVBUF, 10):设置缓冲区大小 = 10
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
                    // 设置缓冲区大小,就不会一次全部接收了,但是产生了半包问题
                    .option(ChannelOption.SO_RCVBUF, 10);

看一组黏包现象,我们下面当客户端连接后会发送数据到服务端,可是我们希望循环一次发一次,服务端却一次全部接收了

READ: 160B,一次全部接收了

服务端

@Slf4j
public class Server {
    public void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) {
                    nioSocketChannel.pipeline().addLast(new LoggingHandler());
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
            // 关闭时,同步关闭
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.debug("server error...", e);
        } finally {
            boss.shutdownGracefully(); // 安全关闭
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new Server().start();
    }
}

客户端

public static void main(String[] args) {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    // channelActive:在连接建立好之后触发
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        // 循环写入
                        for (int i = 0; i < 10; i++) {
                            ByteBuf buffer = ctx.alloc().buffer(16);
                            buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                            ctx.writeAndFlush(buffer);
                        }
                    }
                });
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.error("client error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

半包问题,就是设置了缓冲区大小,然后客户端发送的数据是

Netty 网络编程

注意:TCP 存在黏包半包,UDP 是不存在的

TCP 应答机制(滑动窗口)

TCP 可靠:发信息到对端,对端必须应答,如果长时间没有应答,会重发信息,影响吞吐量

滑动窗口解决上面问题

粘包:发送方发送一个完整报文,由于接收方处理不及时,且窗口比较大,就形成粘包

半包:接收方只剩下了一部分,发送方的数据不能全部放下,只能先放一部分,等待窗口移动继续放,形成半包

解决方式:引入窗口

窗口大小决定了无需等待应答而可以继续发送的数据最大值

Netty 网络编程

窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用

  • 图中深色的部分即要发送的数据,高亮的部分即窗口
  • 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
  • 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
  • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

短连接

解决粘包:客户端每发送一次,就关闭连接,也叫做短连接

下面是客户端,服务端还是上面的

public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            send();
        }
    }

private static void send() {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    // channelActive:在连接建立好之后触发
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        // 循环写入
                        ByteBuf buffer = ctx.alloc().buffer(16);
                        buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                        ctx.writeAndFlush(buffer);
                        ctx.close();
                    }
                });
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
        channelFuture.channel().closeFuture().sync();
    } catch (
            InterruptedException e) {
        log.error("client error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

但是会产生半包,如果服务端设置了 连接的 bytebuf 大小,发送方发的数据过大,每次都接收不全

这里说下,修改容量的方式--ServerBootstrap的类

// 是针对每个连接的配置,实现默认,会1024的容量
// 现在改成最小值,初始值,最大值。最低是十六,内部会取16的整数倍
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));

固定长度

客户端和服务端约定好。服务端每次解码10个字节,多的就等下次够十个再解码。

注意顺序:处理器,一定要先解码再打印日志信息。解码处理器在前面

服务端先加入。

// 添加定长解码器,长度和客户端约定好
nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));

客户端---每次都是十个字节一个数据,不足用_补齐

public static void main(String[] args) {
    send();
}

    /**
     * 填充数组为指定长度,并将数组中的元素替换为指定字符对应的字节值。
     *
     * @param c   要替换的字符
     * @param len 数组长度
     * @return 填充后的字节数组
     */
    public static byte[] fill10Bytes(char c, int len) {
        byte[] bytes = new byte[10];
        Arrays.fill(bytes, (byte) 95);

        for (int i = 0; i < len; ++i) {
            bytes[i] = (byte) c;
        }

        System.out.println(new String(bytes));
        return bytes;
    }

private static void send() {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    // channelActive:在连接建立好之后触发
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) {
                        // 循环写入
                        ByteBuf buffer = ctx.alloc().buffer();
                        char c = '0';
                        Random r = new Random();
                        for (int i = 0; i < 10; i++) {
                            buffer.writeBytes(fill10Bytes(c++, r.nextInt(10) + 1));
                        }
                        ctx.writeAndFlush(buffer);
                    }
                });
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
        channelFuture.channel().closeFuture().sync();
    } catch (
            InterruptedException e) {
        log.error("client error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

服务端

public void start() {
    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                // 添加定长解码器,长度和客户端约定好
                nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));
                nioSocketChannel.pipeline().addLast(new LoggingHandler());
            }
        });
        ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
        // 关闭时,同步关闭
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.debug("server error...", e);
    } finally {
        boss.shutdownGracefully(); // 安全关闭
        worker.shutdownGracefully();
    }
}

public static void main(String[] args) {
    new Server().start();
}

缺点:会浪费字节,长度需要决策好

行解码器

换行符解码器,不能指定,默认 \n \r

// 还有一种DelimiterBasedFrameDecoder
// 参数1是最大长度,参数2:是指定分隔符
addLast(new LoggingHandler(LogLevel.DEBUG));

客户端

public static void main(String[] args) {
    send();
}

/**
 * 使用指定的字符生成一个指定长度的字符串,并在字符串末尾添加换行符。
 *
 * @param c   要生成的字符
 * @param len 字符串长度
 * @return 生成的字符串
 */
public static StringBuilder makeString(char c, int len) {
    StringBuilder sb = new StringBuilder(len + 2);
    for (int i = 0; i < len; ++i) {
        sb.append(c);
    }
    sb.append("\n");
    return sb;
}

private static void send() {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    // channelActive:在连接建立好之后触发
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) {
                        // 循环写入
                        ByteBuf buffer = ctx.alloc().buffer();
                        char c = '0';
                        Random r = new Random();
                        for (int i = 0; i < 10; i++) {
                            String s = makeString(c++, r.nextInt(256) + 1).toString();
                            buffer.writeBytes(s.getBytes());
                        }
                        ctx.writeAndFlush(buffer);
                    }
                });
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
        channelFuture.channel().closeFuture().sync();
    } catch (
            InterruptedException e) {
        log.error("client error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

服务端

public void start() {
    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                // 添加基于换行符的数据,遇到 \n \r等就会将这条数据截取。每一行最大数据1024
                // 超出则丢弃
                nioSocketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024));
                nioSocketChannel.pipeline().addLast(new LoggingHandler());
            }
        });
        ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
        // 关闭时,同步关闭
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.debug("server error...", e);
    } finally {
        boss.shutdownGracefully(); // 安全关闭
        worker.shutdownGracefully();
    }
}

public static void main(String[] args) {
    new Server().start();
}

LTC 解码器

lengthFieldOffset 长度字段偏移量,也就是长度字节开始位置,图中等于0,表示长度字节从 0 开始

lengthFieldLength长度字段本身是多长,图中等于 2,表示000C,C = 12

lengthAdjustment长度字段为基准,还有几个字节是内容

initialBytesToStrip 从头剥离几个字节,也就是字段长度记录了内容长度,一共两个字节,解析完想删除这两个字节

就可以根据上面信息,从0开始读,读12个字节拿到 hello world

Netty 网络编程

下面这个有点不一样,它多带了一个头,从第 2 个字节开始读,一共三个长度字段

Netty 网络编程

表示从0开始读,长度字段为 3 个,不需要剥离,跳过两个字节,开始读取12个内容字节 Netty 网络编程

下面方法测试

public static void main(String[] args) throws Exception {
    // netty 的测试方法,省去很多
    EmbeddedChannel channel = new EmbeddedChannel(
            // 参数1:最大长度,参数2:长度字段开始位置,参数3:长度字段本身字节数,参数4:是否跳过字节,参数5:剥离长度
            new LengthFieldBasedFrameDecoder(
                    1024, 0, 4, 1, 4),
            new LoggingHandler(LogLevel.DEBUG)
    );
    // 四个字节的内容长度,实际内容
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    send(buffer, "Hello, world");
    send(buffer, "Hi!");
    // 使用 Em的channel写入 bufer
    channel.writeInbound(buffer);

    // 打印
    // .Hello, world,
    // 前面的 . 代表我们的附加内容,如果不剥离字段长度,会更多
}

private static void send(ByteBuf buffer, String content) {
    byte[] bytes = content.getBytes();
    buffer.writeInt(bytes.length); // 先写入内容的长度
    buffer.writeByte(1); // 我们写入一个附加内容,前面已经跳过的字节,在参数4位置
    buffer.writeBytes(bytes);
}

协议设计与解析

TCP/IP 中消息传输基于流的方式,没有边界。

协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

  • redis 协议
  • 要求我们先发送
  • *3:表示三个元素
  • 每个命令的长度
  • $3:表示 set 三个长度
  • set
  • $4
  • name
  • $8
  • zhangsan
public static void main(String[] args) {  
    NioEventLoopGroup worker = new NioEventLoopGroup();
        final byte[] LINE = new byte[]{13, 10};
        try {
            Bootstrap bootstrap = new Bootstrap().group(worker).channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) {
                    nioSocketChannel.pipeline().addLast(new LoggingHandler());
                    nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ByteBuf buffer = ctx.alloc().buffer();
                            buffer.writeBytes("*3".getBytes());
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("$3".getBytes());
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("set".getBytes());
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("$4".getBytes()); // name 四个长度
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("name".getBytes());
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("$8".getBytes()); // zhangsan 8个长度
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("zhangsan".getBytes());
                            buffer.writeBytes(LINE);
                            ctx.writeAndFlush(buffer);
                        }

                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buf = (ByteBuf) msg;
                            System.out.println(buf.toString(Charset.defaultCharset()));
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost",6379).sync();
            // 关闭时,同步关闭
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.debug("server error...", e);
        } finally {
            worker.shutdownGracefully();
        }
}

http 协议

new HttpServerCodec()

Netty 提供的一个方便的处理器,它将实际上是将 HttpRequestDecoder 和 HttpResponseEncoder 组合在一起,前者用于解码 HTTP 请求,后者用于编码 HTTP 响应。

下面是简单使用

public static void main(String[] args) {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    NioEventLoopGroup boss = new NioEventLoopGroup();
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                nioSocketChannel.pipeline().addLast(new LoggingHandler());
                nioSocketChannel.pipeline().addLast(new HttpServerCodec());
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        log.debug("{}", msg.getClass());
                        if (msg instanceof HttpRequest) { // 请求行,请求头(是get请求)

                        } else if (msg instanceof HttpContent) { // 请求体(不是get) 

                        }
                    }
                });
            }
        });
        ChannelFuture channelFuture = serverBootstrap.bind(8352).sync();
        // 关闭时,同步关闭
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.debug("server error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

我们希望只关注某一种类型的处理

SimpleChannelInboundHandler:只关心特定的消息,request或者httpconent,不是则跳过

  • DefaultFullHttpResponse:获取响应对象,进行写回数据
public static void main(String[] args) {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    NioEventLoopGroup boss = new NioEventLoopGroup();
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                nioSocketChannel.pipeline().addLast(new LoggingHandler());
                nioSocketChannel.pipeline().addLast(new HttpServerCodec()); // 即使入栈处理器也是出站处理器
                nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {

                    @Override
                    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception {
                        // 获取请求
                        log.debug(httpRequest.uri()); // 请求行
                        log.debug("{}", httpRequest.headers()); // 请求头

                        // 获取响应
                        /*
                         * version 版本
                         * status 响应状态
                         */
                        DefaultFullHttpResponse response = new DefaultFullHttpResponse(httpRequest.protocolVersion(), HttpResponseStatus.OK);
                        // 向浏览器写入数据
                        byte[] bytes = "<h1>Hello World</h1>".getBytes();
                        response.content().writeBytes(bytes);
                        // 我们应该在响应头加 响应体长度,否则浏览器会一直在等待响应内容
                        response.headers().setInt(CONTENT_LENGTH, bytes.length); // netty 框架 HttpHeaderNames
                        // 写回响应
                        channelHandlerContext.writeAndFlush(response);
                    }
                });
            }
        });
        ChannelFuture channelFuture = serverBootstrap.bind(8352).sync();
        // 关闭时,同步关闭
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.debug("server error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

自定义协议要素

  • 魔数,用来在第一时间判定是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊... 跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

先写编解码器

这里写一个 message 的发送的信息,只是一个类型状态

会有多个子类继承这个 message 类,表示多个状态

public abstract class Message implements Serializable {

    /**
     * 根据消息类型字节,获得对应的消息 class
     *
     * @param messageType 消息类型字节
     * @return 消息 class
     */
    public static Class<? extends Message> getMessageClass(int messageType) {
        return messageClasses.get(messageType);
    }
    // 存储序列号
    private int sequenceId;
    // 存储消息的类型
    private int messageType;

    public abstract int getMessageType();

    // 登录请求消息
    public static final int LoginRequestMessage = 0;
    // 登录响应消息
    public static final int LoginResponseMessage = 1;
    // 聊天请求消息
    public static final int ChatRequestMessage = 2;
    // 聊天响应消息
    public static final int ChatResponseMessage = 3;
    // 创建群组请求消息
    public static final int GroupCreateRequestMessage = 4;
    // 创建群组响应消息
    public static final int GroupCreateResponseMessage = 5;
    // 加入群组请求消息
    public static final int GroupJoinRequestMessage = 6;
    // 加入群组响应消息
    public static final int GroupJoinResponseMessage = 7;
    // 退出群组请求消息
    public static final int GroupQuitRequestMessage = 8;
    // 退出群组响应消息
    public static final int GroupQuitResponseMessage = 9;
    // 群组聊天请求消息
    public static final int GroupChatRequestMessage = 10;
    // 群组聊天响应消息
    public static final int GroupChatResponseMessage = 11;
    // 获取群组成员请求消息
    public static final int GroupMembersRequestMessage = 12;
    // 获取群组成员响应消息
    public static final int GroupMembersResponseMessage = 13;
    // 心跳请求消息
    public static final int PingMessage = 14;
    // 心跳响应消息
    public static final int PongMessage = 15;
    /**
     * 请求类型 byte 值
     */
    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    /**
     * 响应类型 byte 值
     */
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;

    private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();

    static {
        messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
        messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
        messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
        messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
        messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
        messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
        messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
        messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
        messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
        messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
        messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
        messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
        messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
        messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

自定义编码解码

readInt 是Netty中ByteBuf类的一个方法,用于从ByteBuf中读取一个int类型的数据。

它返回一个int类型的值,并将读取索引自动增加4(int的字节数)

@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {

    // 编码
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 1. 4个字节的魔数,任意
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本
        out.writeByte(1);
        // 3. 字节的序列化方式 0,jdk 1 JSON
        out.writeByte(0);
        // 4. 字节指令类型---.getMessageType 拿到当前多态的,类型,每个类都有指定的类型
        out.writeByte(msg.getMessageType());
        // 5. 4个字节--请求序号
        out.writeInt(msg.getSequenceId());
        out.writeByte(0xff); // 为了凑数的,因为固定字节数 要 2 的倍数
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容--用jdk序列化
        out.writeBytes(bytes);
    }

    // 解码
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        // 先读取魔术
        int magicNum = in.readInt();
        // 读取版本
        byte version = in.readByte();
        // 读取序列号方式
        byte serializerType = in.readByte();
        // 读取消息类型 messageType
        byte messageType = in.readByte();
        // 读取请求序号
        int sequenceId = in.readInt();
        // 读取无意义的字节,当时为了规范加的
        in.readByte();
        // 读取长度
        int length = in.readInt();
        // 获取数据字节
        byte[] bytes = new byte[length];
        ByteBuf byteBuf = in.readBytes(bytes, 0, length);
        // 转为对象
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream ois = new ObjectInputStream(byteArrayInputStream);
        Message message = (Message) ois.readObject();
        log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        // 最后添加到集合中
        out.add(message);
    }
}

然后我们测试这个处理器

  • 1024:最大容量
  • 12:偏移量,因为序列号,魔数,类型无效字符等,协议。
  • 4:长度字节
  • 0:不需要跳过
  • 0:因为 decode自己解析,所以不需要

注意要加LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),解决黏包半包

开启了解码器:如果数据不完整,将不会往下走

writeInbound:执行完会调用 release释放 bytebuf

public static void main(String[] args) throws Exception {
    // 添加刚刚写的编码解码
    EmbeddedChannel channel = new EmbeddedChannel(
            new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
            new LoggingHandler(),
            new MessageCodec());

    // encode 编码
    LoginRequestMessage loginRequestMessage = new LoginRequestMessage("zhangsan", "123");
    channel.writeOutbound(loginRequestMessage);

    // decode 解码
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // 写入数据
    new MessageCodec().encode(null, loginRequestMessage, buffer);
    // 入站
    channel.writeInbound(buffer);
}

打印的字节数,可以根据自己定义的协议来,对比,是正确的

注解(安全情况)

sharable 线程安全

我们可以打开处理类的源码查看

@Sharable 表示线程安全,这个处理器可以被共享使用

像 LTC解码器是不可以被共享的,存在线程隐患

@Sharable
public class LoggingHandler extends ChannelDuplexHandler {

那我们自己定义的 handle 可以加共享注解吗

ByteToMessageCodec;子类文档明确规定,不能加共享注解,否则抛出异常

public class MessageCodec extends ByteToMessageCodec<Message> {

那需要添加怎么解决

使用这个父类即可--MessageToMessageCodec

有一些小变动,需要自行改一下,bytebuf方面

public class MessageCodec extends MessageToMessageCodec<ByteBuf, Message> {

开发聊天室

首先,我们协议在 LengthFieldBasedFrameDecoder直接写,会容易出错我们制作成类

public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {

    // 协议
    public ProcotolFrameDecoder() {
        this(1024, 12, 4, 0, 0);
    }

    public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }
}

登录

客户端

客户端连接成功之后进行连接建立事件,控制台输入

需要开辟线程,使用 nio 线程会影响其他 io 操作

  • 我们构建完对象,进行writeAndFlush发送
  • 往上继续找上一个handel
  • MESSAGE_CODEC 编码,LOGGING_HANDLER 记录日志
  • ProcotolFrameDecoder是入站的所以不会触发

先看一下下面这个 CountDownLatch

用于同步工具类,协调线程之间的等待时间

  • countDown():每次调用减 1,为零释放所有线程
  • await():让调用的线程等待计数器为 0 ,立即返回,否则会阻塞
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);

用于在多线程下判断的 boolean 对象

  • get():获取当前布尔值。
  • set(boolean newValue):设置布尔值为指定的新值。
AtomicBoolean LOGIN = new AtomicBoolean();

会话管理---登录成功后保存是那个channel那个用户

将用户和 channel 绑定起来

SessionFactory.getSession().bind(ctx.channel(), username);

注意:handler 没有加 @ChannelHandler.Sharable 会导致其他客户端连接不上

连接假死

原因

  • 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
  • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
  • 应用程序线程阻塞,无法进行数据读写

问题

  • 假死的连接占用的资源不能自动释放
  • 向假死的连接发送数据,得到的反馈是发送超时

IdleStateHandler 处理器解决以上问题

  • 第一个参数: 表示读空闲时间,即在指定的时间间隔内如果没有读取到数据,则会触发userEventTriggered方法。
  • 第二个参数: 表示写空闲时间,即在指定的时间间隔内如果没有写入数据,则会触发userEventTriggered方法。
  • 第三个参数: 表示读写空闲时间,即在指定的时间间隔内如果既没有读取到数据也没有写入数据,则会触发userEventTriggered方法。

用心跳机制,保证客户端正常链接,处理空闲问题

优化

序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
/**
 * 扩展序列化
 */
public interface Serializer {
    // 反序列化
    <T> T deserialize(Class<T> tClass, byte[] bytes);

    // 序列化
    <T> byte[] serialize(T object);

    enum Algorithm implements Serializer {
        Java {
            @Override
            public <T> T deserialize(Class<T> tClass, byte[] bytes) {
                try {
                    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    return (T) ois.readObject();
                } catch (IOException | ClassNotFoundException e) {
                    throw new RuntimeException("反序列化失败", e);
                }
            }

            @Override
            public <T> byte[] serialize(T object) {
                try {
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    ObjectOutputStream oos = new ObjectOutputStream(bos);
                    oos.writeObject(object);
                    return bos.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("序列化失败", e);
                }
            }
        },

        Json {
            @Override
            public <T> T deserialize(Class<T> tClass, byte[] bytes) {
                String s = new String(bytes, StandardCharsets.UTF_8);
                return new Gson().fromJson(s, tClass);
            }

            @Override
            public <T> byte[] serialize(T object) {
                String toJson = new Gson().toJson(object);
                return toJson.getBytes(StandardCharsets.UTF_8);
            }
        }
    }
}

注意反序列化,获得的对象

参数调优

// 客户端通过 option 配置参数,给 SocketChannel 配置参数
new Bootstrap().option() 
// 服务端通过 option 配置参数,给 ServerSocketChannel 配置参数
new ServerBootstrap().option()
// 服务端通过 childOptio 配置参数,给 SocketChannel 配置参数
new ServerBootstrap().childOption()

客户端超时连接

客户端不一定等到 5秒后抛出异常,它有可能判断服务器没有开启,根本连接不上,直接抛出网络编程异常

bootstrap.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500);

SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间,netty 不会用的。

全部代码

Netty 网络编程

client

NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();

// 线程计数器
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
// 登录状态--默认未登录
AtomicBoolean LOGIN = new AtomicBoolean(false);


try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);
//            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500);
    bootstrap.group(group);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {

        // 初始化操作
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new ProcotolFrameDecoder());
//                    ch.pipeline().addLast(LOGGING_HANDLER);
            ch.pipeline().addLast(MESSAGE_CODEC);
            // 用来判断是不是,读空闲时间太长或者 写时间空闲时间太长
            // 5s 没用收到 channel 的数据会触发一个 IdleState#WRITER_IDLE 事件
            ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
            // ChannelDuplexHandler 可以同时作为入站和出站处理器
            ch.pipeline().addLast(new ChannelDuplexHandler() {
                // 触发特殊事件 IdleState
                @Override
                public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                    IdleStateEvent event = (IdleStateEvent) evt;
                    // 写空闲
                    if (event.state() == IdleState.WRITER_IDLE) {
                        ctx.writeAndFlush(new PingMessage());
                    }
                }
            });
            // 建立连接之后触发 active
            ch.pipeline().addLast("client_handel", new ChannelInboundHandlerAdapter() {

                // 接受响应信息
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//                            log.debug("{}", msg);
                    if (msg instanceof LoginResponseMessage) {
                        LoginResponseMessage message = (LoginResponseMessage) msg;
                        // 如果登录成功设置为 true
                        if (message.isSuccess()) {
                            LOGIN.set(true);
                        }
                        // 让计数器 - 1,让线程继续执行,唤醒
                        WAIT_FOR_LOGIN.countDown();
                    }

                }

                @Override
                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    // 在这里要创建线程去实现,控制台输入,负责向服务器发送信息
                    new Thread(() -> {
                        Scanner sc = new Scanner(System.in);
                        System.out.println("请输入用户名:");
                        String username = sc.nextLine();
                        System.out.println("请输入密码:");
                        String password = sc.nextLine();
                        // 构建消息对象
                        LoginRequestMessage message = new LoginRequestMessage(username, password);
                        // 发送信息
                        ctx.writeAndFlush(message);
                        System.out.println("等待后续操作....");
                        try {
                            // 响应回来,继续执行
                            WAIT_FOR_LOGIN.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        // 如果登录失败,关闭 channel,返回
                        log.debug("{}", LOGIN.get());
                        if (!LOGIN.get()) {
                            ctx.channel().close();
                            return;
                        }
                        // 打印菜单,登录成功
                        while (true) {
                            System.out.println("==================================");
                            System.out.println("send [username] [content]"); // 发送
                            System.out.println("gsend [group name] [content]"); // 向群聊发送
                            System.out.println("gcreate [group name] [m1,m2,m3...]"); // 创建群,并且拉人
                            System.out.println("gmembers [group name]"); // 查看群里成员
                            System.out.println("gjoin [group name]"); // 加入聊天群
                            System.out.println("gquit [group name]"); // 退出聊天群
                            System.out.println("quit");
                            System.out.println("==================================");
                            Scanner command = new Scanner(System.in);
                            // 用户输入命令--因为命令都是以空格分割的
                            String[] s = command.nextLine().split(" ");
                            switch (s[0]) { // 根据命令格式,0 表示命令类型
                                case "send":
                                    ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
                                    break;
                                case "gsend":
                                    ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
                                    break;
                                case "gcreate":
                                    HashSet<String> strings = new HashSet<>(Arrays.asList(s[2].split(",")));
                                    strings.add(username);
                                    ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], strings));
                                    break;
                                case "gmembers":
                                    ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
                                    break;
                                case "gjoin":
                                    ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
                                    break;
                                case "gquit":
                                    ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
                                    break;
                                case "quit":
                                    ctx.channel().close();
                                    return;
                            }

                        }
                    }, "system in").start();
                }
            });
        }
    });
    log.debug("登录");
    Channel channel = bootstrap.connect("localhost", 8081).sync().channel();
    channel.closeFuture().sync();
} catch (
        Exception e) {
    log.error("client error", e);
} finally {
    group.shutdownGracefully();
}

config 包

public abstract class Config {
    static Properties properties;
    static {
        try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
            properties = new Properties();
            properties.load(in);
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
    public static int getServerPort() {
        String value = properties.getProperty("server.port");
        if(value == null) {
            return 8080;
        } else {
            return Integer.parseInt(value);
        }
    }
    public static Serializer.Algorithm getSerializerAlgorithm() {
        String value = properties.getProperty("serializer.algorithm");
        if(value == null) {
            return Serializer.Algorithm.Java;
        } else {
            return Serializer.Algorithm.valueOf(value);
        }
    }
}

protocol

@ChannelHandler.Sharable
public class MessageCodec extends ByteToMessageCodec<Message> {

    @Override
    public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(0);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);
    }
}
@ChannelHandler.Sharable
/**
 * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
        ByteBuf out = ctx.alloc().buffer();
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        /* ordinal 参数会根据排序进行对比,比如 java是一个那么就会转换 0 以此类推 */
        out.writeByte(Config.getSerializerAlgorithm().ordinal());
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组,序列化, 使用
        byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
        outList.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        // 反序列化
        // 根据个数拿到对应的序列化算法,找到反序列化的算法
        Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerType];
        // 确定具体消息类型
        Class<?> aClass = Message.getMessageClass(messageType);
        Object message = algorithm.deserialize(aClass, bytes);
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);
    }
}
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {

    public ProcotolFrameDecoder() {
        this(1024, 12, 4, 0, 0);
    }

    public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }
}
/**
 * 扩展序列化
 */
public interface Serializer {
    // 反序列化
    <T> T deserialize(Class<T> tClass, byte[] bytes);

    // 序列化
    <T> byte[] serialize(T object);

    enum Algorithm implements Serializer {
        Java {
            @Override
            public <T> T deserialize(Class<T> tClass, byte[] bytes) {
                try {
                    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    return (T) ois.readObject();
                } catch (IOException | ClassNotFoundException e) {
                    throw new RuntimeException("反序列化失败", e);
                }
            }

            @Override
            public <T> byte[] serialize(T object) {
                try {
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    ObjectOutputStream oos = new ObjectOutputStream(bos);
                    oos.writeObject(object);
                    return bos.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("序列化失败", e);
                }
            }
        },

        Json {
            @Override
            public <T> T deserialize(Class<T> tClass, byte[] bytes) {
                String s = new String(bytes, StandardCharsets.UTF_8);
                return new Gson().fromJson(s, tClass);
            }

            @Override
            public <T> byte[] serialize(T object) {
                String toJson = new Gson().toJson(object);
                return toJson.getBytes(StandardCharsets.UTF_8);
            }
        }
    }
}

server.handler

@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage chatRequestMessage) throws Exception {
        // 获取接收者者
        String to = chatRequestMessage.getTo();
        // 根据用户名获取 channel
        Channel channel = SessionFactory.getSession().getChannel(to);
        // 为空,表示不在线
        if (channel != null) {
            // 发送信息
            channel.writeAndFlush(new ChatResponseMessage(chatRequestMessage.getFrom(), chatRequestMessage.getContent()));
        } else {
            channelHandlerContext.writeAndFlush(new ChatResponseMessage(false, "对方用户不在线"));
        }
    }
}
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupChatRequestMessage groupChatRequestMessage) throws Exception {
        // 根据群名,获取所有通道
        List<Channel> channel = GroupSessionFactory.getGroupSession()
                .getMembersChannel(groupChatRequestMessage.getGroupName());
        // 循环发送信息
        for (Channel channel1 : channel) {
            channel1.writeAndFlush(new GroupChatResponseMessage(groupChatRequestMessage.getFrom(),
                    groupChatRequestMessage.getContent()));
        }
    }
}
/**
 * 处理创建群聊
 */
@ChannelHandler.Sharable
public class GroupCreateRequestHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupCreateRequestMessage groupCreateRequestMessage) throws Exception {
        // 将要创建的群聊名称
        String groupName = groupCreateRequestMessage.getGroupName();
        // 创建群并且要拉入的群成员
        Set<String> members = groupCreateRequestMessage.getMembers();
        // 获取群管理器
        GroupSession groupSession = GroupSessionFactory.getGroupSession();
        // 创建群
        Group group = groupSession.createGroup(groupName, members);
        if (group == null) {
            // 发送拉群消息
            List<Channel> channelList = groupSession.getMembersChannel(groupName);
            for (Channel channel : channelList) {
                channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
            }
            channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));
        } else {
            channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "群已经存在"));
        }
    }
}
@ChannelHandler.Sharable // 因为没有状态信息,多线程下是安全的
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
        String username = msg.getUsername();
        String password = msg.getPassword();
        // 判断登录状态
        boolean login = UserServiceFactory.getUserService().login(username, password);
        LoginResponseMessage message;
        if (login) {
            SessionFactory.getSession().bind(ctx.channel(), username);
            message = new LoginResponseMessage(true, "登录成功");
        } else {
            message = new LoginResponseMessage(false, "用户名或密码不正确");
        }
        ctx.writeAndFlush(message);
    }
}
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {

    // 当连接断开触发 inactive 事件
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} 正常断开", ctx.channel());
    }

    // 用户异常断开
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} 异常断开", ctx.channel());
    }
}

backlog 连接

Netty 网络编程 服务器在三次连接建立之后,将半连接队列转移到全连接队列

因为服务器 accept 连接量特别大,所以将建立成功的信息放到全连接队列。以此处理。

accept 是发生在连接之前的。

  • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制

  • sync queue - 半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue - 全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值,假设linux配置 100,程序配置 200 ,取100为准
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
  • ChannelOption.SO_BACKLOG, 2,设置全连接队列容量大小
new ServerBootstrap()
.group(new NioEventLoopGroup())
// 配置让全链接队列最大存放 2个
.option(ChannelOption.SO_BACKLOG, 2)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new LoggingHandler());
    }
}).bind(8424);

断点在 nioeventloop 下面代码中,即可调试

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

超过队列大小,将会抛出异常

系统参数

  • ulimit -n:一个进程可以打开的文件操作数
  • TCP_NODELAY:设置 true 不需要延迟,不需要算法,建议 true
  • 属于 SocketChannal 参数
  • 属于操作系统参数
  • SO_SNDBUF & SO_RCVBUF
  • SO_SNDBUF 属于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)

内存缓冲区

io 只能是直接内存,ctx获取的 bytebuf

三种线程模型

单线程模型

异步非阻塞io,所有的操作都是由一个nio线程处理,适合特别小的程序

一个单线程负荷过度,客户端向服务端超时,服务端的超时处理会处理,最后卡死就会宕机

多线程模型

分成reactor 单线程和reactor 多线程 单线程负责接受新来的请求 处理逻辑交给线程池处理

由一组nio线程组成

主从线程模型

由一组线程池接受请求,一组线程池处理io

netty 周期周期

  1. 处理器添加
  2. channel 注册
  3. channel 连接
  4. channel 读取完毕
  5. channel 断开
  6. channel 移除
  7. 处理器移除
/**
 * 定义入站,指定的http请求
 */
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        // 获取channel
        Channel channel = ctx.channel();
        // 显示客户端远程地址
        System.out.println(channel.remoteAddress());
        /*
          /0:0:0:0:0:0:0:1:5220
          /0:0:0:0:0:0:0:1:5220
          /0:0:0:0:0:0:0:1:5221
          /0:0:0:0:0:0:0:1:5221
          会打印四次,原因是有两次并不是请求request,和图标等
          还可以用 curl ip地址, 进行干净的请求
         */


        // 将发送信息存到容器中
        ByteBuf buf = ctx.alloc().buffer().writeBytes("hello netty".getBytes());
        // 构建 httpresponse
        DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
        // 定义响应数据类型和长度
        httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
        httpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
        // 刷到客户端
        ctx.writeAndFlush(httpResponse);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 注册");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 移除");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 连接");
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 断开");
        super.channelInactive(ctx);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 读取完毕");
        super.channelReadComplete(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("channel 用户时间触发");
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 可写更改");
        super.channelWritabilityChanged(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("channel 发生异常");
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("处理器添加");
        super.handlerAdded(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("处理器移除");
        super.handlerRemoved(ctx);
    }
}
转载自:https://juejin.cn/post/7254096495183298597
评论
请登录