Netty 网络编程
Netty 入门
概述
什么是 Netty?
- 异步:用的多线程,不是异步 IO
- 基于事件驱动:表示用的 Selector
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
Netty 的地位
Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位
以下的框架都使用了 Netty,因为它们有网络通信需求!
- Cassandra - nosql 数据库
- Spark - 大数据分布式计算框架
- Hadoop - 大数据分布式存储框架
- RocketMQ - ali 开源的消息队列
- ElasticSearch - 搜索引擎
- gRPC - rpc 框架
- Dubbo - rpc 框架
- Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
- Zookeeper - 分布式协调框架
Netty 的优势
Netty vs NIO,工作量大,bug 多
- 需要自己构建协议
- 解决 TCP 传输问题,如粘包、半包
- epoll 空轮询导致 CPU 100%
- 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
Netty vs 其它网络应用框架
- Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
创建服务器 / 客户端
- 首先创建启动器
- 创建NioEventLoopGroup基于NIO服务端实现
- childHandler表示添加的处理器都是给SocketChannel用的
- ChannelInitializer 仅仅执行一次
- 客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
- new ServerBootstrap().bind 绑定监听端口
服务端
// 1. 启动器,负责组装 netty组件,启动服务器
new ServerBootstrap()
// BossEventLoop,WorkerEventLoop(selector, thread)
// 创建一个事件循环线程组,适用于 NIO 实现,可以简单理解为 `线程池 + Selector`
.group(new NioEventLoopGroup())
// 指定要使用的服务器 Channel 实现,适用于 NIO 实现
.channel(NioServerSocketChannel.class) // OIO BIO
// BOSS 负责处理连接Work(child)负责处理读写,决定了work(child)能执行哪些(handler)
// 配置服务器 Channel 处理器,即 ChannelPipeline 中的一组 ChannelHandler
.childHandler(
// 5. channel代表客户端进行数据读写的通道Initializer初始化,负责添加到别的 handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 添加具体的 handler
// 将 bytebuf 转换为字符串
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 自定义handler
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override// 读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 打印转好的字符串
System.out.println(msg);
}
});
}
})
// 监听端口
.bind(8080);
客户端
// 启动类
new Bootstrap()
// 添加 EventLoop
.group(new NioEventLoopGroup())
// 选择客户端 channel 实现
.channel(NioSocketChannel.class)
// 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在建立连接后被调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 把字符串编码成字节
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// 连接服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
// 向服务器发送数据
.writeAndFlush("hello, world");
流程分析
- 先创建启动器类
- 添加组件,eventloop(内部就有线程和选择器不断循环,查找事件)
- 选择NIOServerSocket 实现
- 添加处理器,只有连接事件发生之后才会执行 initChannel初始化方法
- 绑定监听端口(服务端就到这里)
- (客户端)创建启动器和eventloop
- 客户端选择socket事件
- 添加处理器,也是等连接建立才会执行初始化方法
- 最后连接服务器
- 服务器监听到accept事件之后
- 最后找处理器处理这个事件
- (我们看不懂事线),连接建立后调用初始化方法
- 客户端 sync 只有连接之后才会继续执行
- channel() 这个是连接对象
- 最后就可以读写
- 发数据就会走到处理器内部
- 进行转为字节数组进行 bytebuf进行发送
- 服务端的eventloop就会监听到读事件
- 走到了服务器的处理器,进行处理
channel数据传输通道,可读出来可写进。和nio概念一致
handel中的message流动数据,handel 是一个工序,对原始数据进行一道道工序进行处理
pipeline就是流水线,一道工序,进行添加工序。
handel 分为 inbound和outbound 入站和出站 读入就走入站,写出就走出站
eventloop就是线程,相当于工人 一旦某一个工人负责一个事件,那就会负责到底。一个工人是可以管理多个事件的
eventloop既可以执行io操作,也可以进行普通任务,每个工人都有自己的任务队列,依次处理。底层用的单线程的线程池。
任务可以是定时任务也可以是普通任务
组件
EventLoop
EventLoop是事件循环对象
本质是一个单线程执行器,同时维护了Selector,里面有run方法处理 channel 上源源不断的 io 事件。
继承关系:
- 一条线是继承自 java.util.concurrent.ScheduledExecutorService
- 因此包含了线程池的所有方法
- 另一条是继承自 netty 自己的 OrderedEventExecutor
- 提供了 Boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
- 提供了 parent方法来看看自己属于哪个 EventLoopGroup
// eventloop
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
EventLoopGroup parent();
}
// 继承线程池接口
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
EventLoopGroup 事件循环组,我们一般使用这个
EventLoopGroup 是一组 EventLoop,channel 一般会调用 EventLoopGroup 的 register 方法俩绑定其中一个 EventLoop,后续这个 channel 上的 io 事件都由同一个 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
我们平常使用的是下面两个循环组实现类
// 可以实现普通任务,io事件,定时任务
EventLoopGroup nioGroup = new NioEventLoopGroup();
// 普通任务和定时任务
EventLoopGroup defaultGroup = new DefaultEventLoopGroup();
那么我们空参创建对象,线程数是多少呢
下面这段代码,是用来初始化 EventLoopGroup对象的。
如果没有指定线程数,会采用默认的,也就是当前系统的CPU核心数 * 2
/**
* 构造函数,创建MultithreadEventLoopGroup对象
* @param nThreads 表示EventLoopGroup中EventLoop的数量,如果为0则使用默认的线程数
* @param executor 用于执行任务的Executor对象
* @param args 可选参数列表
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// 调用父类的构造函数,初始化EventLoopGroup对象
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
获取当前CPU核数--我的是 16核
NettyRuntime.availableProcessors();
指定线程数量
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
获取下一个线程
// 创建两个线程的事件循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 获取下一个线程
System.out.println(nioGroup.next()); // 第一次打印第一个
System.out.println(nioGroup.next()); // 打印第二个
System.out.println(nioGroup.next()); // 打印第一个,因为一共就两个
System.out.println(nioGroup.next()); // 打印第二个
// io.netty.channel.nio.NioEventLoop@737996a0
// io.netty.channel.nio.NioEventLoop@61dc03ce
// io.netty.channel.nio.NioEventLoop@737996a0
// io.netty.channel.nio.NioEventLoop@61dc03ce
执行普通任务
- 这里的execute和submit都是一样的效果
// 创建两个线程的事件循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 执行普通任务
nioGroup.next().execute(() -> {
log.debug("ok");
});
log.debug("main");
定时任务
// 创建两个线程的事件循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 执行定时任务 以一定的频率执行
// 参数1:执行的方法
// 参数2:表示第一次启动多久开始
// 参数3:表示间隔多久再次触发
// 参数4:单位
nioGroup.next().scheduleAtFixedRate(() -> {
log.debug("ok");
}, 3, 5, TimeUnit.SECONDS);
log.debug("main");
IO 事件
下面是服务端,客户端同上面。
一个客户端绑定一个服务的 EventLoop线程。
下面再编写客户端的时候,如果要打上断点实现阻塞,需要将idea的断点卡成单线程的
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
// 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 这个实际开发是要指定字符类型的,不要默认
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
我们来分工细化一下
- 我们可以把 eventloop 划分为 boss 和 work
- 将accept 和 read 分开处理
那 第一个事件循环组 线程是否可以设置为 1 呢
因为服务器这一有个,它也只会和里面一个 eventloop 进行绑定
new ServerBootstrap()
// 将 参数1:只处理accept 参数2:处理read
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
继续分工细化--如果其中一个Nio线程执行中 在读操作时执行太久,会影响其他 channel读操作 最好不要让它占用 work nio线程,所以我们继续细分
- 创建独立的事件循环对象,因为不需要进行io所以是普通的
- 将下个处理器绑定上。
// 细分2:创建独立的 EventLoopGroup
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
.addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
// 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 这个实际开发是要指定字符类型的,不要默认
log.debug(buf.toString(Charset.defaultCharset()));
// 让消息传递给下一个handler
ctx.fireChannelRead(msg);
}
})
// 指定事件循环组和名称
.addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
@Override
// 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 这个实际开发是要指定字符类型的,不要默认
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
那么是怎么实现切换事件循环组,也就是换人处理的呢?
如果两个 handler 绑定的是同一个线程,那么直接调用,否则调用的代码封装为一个任务对象。由一下一个 handler的线程调用
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
/**
* EventExecutor 是事件循环组
*/
EventExecutor executor = next.executor();
// 当前 handler 中的线程是否和 eventloop 是同一个线程
// 是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
// 使用runnable 执行一个事件
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
Channel & 连接 & 关闭问题 & 异步
channel 的主要作用
- close() 可以用来关闭 channel
- closeFuture() 用来处理 channel 的关闭
- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入,只是写进入缓冲区
- writeAndFlush() 方法将数据写入并刷出
connect 连接问题
connect:是异步阻塞,main方法发起,执行是Nio的线程
所以如果在发送数据前,一定要保证已经连接好了,否则数据是发送不出去的
ChannelFuture:带有Future或者 promise都是和异步配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// connect 是异步阻塞,main方法发起调用,真正执行连接的是Nio线程
.connect(new InetSocketAddress("localhost", 8080));
// channelFuture.sync();
// 会无阻塞向下执行获取channel,最后发送数据
Channel channel = channelFuture.channel();
// log.debug("{}", channel); // 如果没有获取到打印的就是没有连接的 channel,所以需要执行 sync
channel.writeAndFlush("123");
解决连接问题,保证发送数据前一定是正确连接的
- sync
// 阻塞当前线程,直到连接建立完毕
channelFuture.sync();
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("123");
- 将执行发送的代码,交给nio线程。
- addListener(回调对象)方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
// 在nio 线程连接建立好之后,会调用operationComplete
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("123");
}
});
关闭问题
我们看下面这段代码
我们可以输入内容,发送给服务端,按 q 关闭传输通道
但是我们关闭的时候,想做一些操作,怎么整 直接在close下面写吗,那是不对的,因为close方法是异步的
channelFuture.sync();
Channel channel = channelFuture.channel();
new Thread(new Runnable() {
@Override
public void run() {
Scanner sc = new Scanner(System.in);
while (true) {
String line = sc.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}
}, "input").start();
解决异步问题
添加日志调试
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
- 同步解决---由主线程处理关闭操作
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (true) {
String line = sc.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 添加关闭处理器
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();// 设置同步
log.debug("处理关闭之后....");
- 异步解决--由 nio 线程处理
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (true) {
String line = sc.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
ChannelFuture closeFuture = channel.closeFuture();
// 设置关闭回调函数
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("处理关闭之后....");
}
});
优雅的关闭
- 它会等待所有正在处理的任务完成后,再进行关闭。
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("处理关闭之后的操作");
group.shutdownGracefully(); // 优雅的关闭
}
});
异步
异步:就是一个线程发起连接一个线程去建立连接
思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96
个病人
那我们就可以细分一下,四个医生分别处理四个事情
只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12
效率几乎是原来的四倍
重点:
- 单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势
- 异步并没有缩短响应时间,反而有所增加
- 合理进行任务拆分,也是利用异步的关键
Future & Promise
处理异步时:经常用到的两个接口, futer 核 promise
首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
- jdk Future:只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise;不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
jdk future
// 线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 提交任务
Future<Integer> future = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算"); // poll 线程
Thread.sleep(1000);
return 50;
}
});
// 主线程通过 future 获取结果
log.debug("等待结果"); // 主线程
log.debug("结果是 {}", future.get()); // 主线程
nio future
// 创建事件循环组
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
// 获取执行事件
EventLoop eventLoop = eventGroup.next();
// 执行方法
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算"); // nio 线程
Thread.sleep(1000);
return 50;
}
});
同步获取线程返回结果
// 主线程通过 future 获取结果
log.debug("等待结果"); // 主线程
log.debug("结果是 {}", future.get()); // 主线程
异步
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("等待结果"); // nio 线程
log.debug("结果是 {}", future.getNow()); // nio 线程
}
});
promise 获取结果
// 1. 准备 EventLoop 对象
EventLoop loop = new NioEventLoopGroup().next();
// 2. 主动创建 promise 结果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(loop);
new Thread(new Runnable() {
@Override
public void run() {
// 3. 任意一个线程执行计算,计算完毕使用 promise 填充结果
log.debug("开始计算...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
// 填写失败的数据
promise.setFailure(e);
}
// 填写成功的数据
promise.setSuccess(90);
}
}).start();
log.debug("等待结果...");
log.debug("结果是:{}", promise.get());
handler & pipeline
ChannelHandle 用来处理 Channel 上的各种事件,分为入站和出站两种。
当所有的ChannelHandle 连在一起就是 pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
入站是按照顺序执行的也就是, 会打印1,2,3
出站是按照逆序的方式,会打印6,5,4
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 添加入站处理器
nioSocketChannel.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
// 使用通道写入数据并刷新。ctx通道信息,获取上下文的分配器,创建bufer输出数据
nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes()));
}
});
// 同上
log.debug("2");
// 同上
log.debug("3");
//
// 添加出站处理器
nioSocketChannel.pipeline().addLast("h4", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
// 同上
log.debug("5");
//
// 同上
log.debug("6");
//
}
})
.bind(8080);
出站处理器 当我们用 channel 的方法发送数据,会从尾部开始依次执行出站过滤器 当用方法内部的 ctx channel的方法发送,会从当前位置开始往前找
调试出站入站,handler
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 模拟入站操作
// channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
// 模拟出站操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
}
bytebuf
创建了一个默认的 ByteBuf(池化基于直接内存的 ByteBuf),初始容量是 10
如果不指定,默认256,扩容 * 2
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
public static void main(String[] args) {
// 创建默认空间的 bytebuf,空间容量默认256,超过空间容量,会自动扩容
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// read index:0 write index:0 capacity:256
log(buffer);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 300; i++) {
sb.append("a");
}
buffer.writeBytes(sb.toString().getBytes());
log(buffer);
// read index:0 write index:300 capacity:512
}
// netty 调试 bytebuf 方法
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
直接内存 & 堆内存
默认是创建的直接内存
创建堆内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
创建直接内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
直接内存创建和销毁的代价昂贵,但读写效率高(少一次内存复制),适合配合池化功能一起用
直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
池化 vs 非池化
池化的最大意义在于可以重用 ByteBuf,优点有
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
下面的代码是在 vm虚拟机参数里设置的
-Dio.netty.allocator.type={unpooled|pooled}
NIO 的缓冲区池化能够有效地减少创建和销毁缓冲区对象的成本,提高程序的性能和吞吐量。
- 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
- 4.1 之前,池化功能还不成熟,默认是非池化实现
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer();
System.out.println(buffer.getClass());
// class io.netty.buffer.PooledUnsafeDirectByteBuf
/*
* PooledUnsafeDirectByteBuf
* Unpooled:非池化
* Pooled:池化
* Direct:直接内存
* Heap:堆内存
*/
}
Bytebuf的组成部分
- 废弃字节:表示已经读过了
- 可读字节:未读的数据
- 可写字节:未写入的数据
- 可扩容:最大的容量,全部的
butebuf 是读写两个指针,不需要切换读写模式,最开始读写指针都在 0 位置
写读方法
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
注意
- 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
- 网络传输,默认习惯是 Big Endian
扩容规则是
- 写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
- 写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)
- 扩容不能超过 max capacity 会报错
读入
// 只读入一个字节
buffer.readByte()
如果需要重复读取 int 整数 5,怎么办?
可以在 read 前先做个标记 mark
buffer.markReaderIndex();
System.out.println(buffer.readInt());//读入
log(buffer);
这时要重复读取的话,重置到标记位置 reset
buffer.resetReaderIndex();
retain & release(垃圾回收)
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为 1
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
注意:因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 try finally里 relase,那么就失去了传递性,必须等 bytebuf 完成了它的使命,才可以。
基本规则是,谁是最后使用者,谁负责 release
入站 bytebuf 处理原则 | |
---|---|
对原始 ByteBuf 不做处理 | 无须 release |
将原始 ByteBuf 转换为其它类型的 Java 对象 | ByteBuf 就没用了,必须 release |
不调用 ctx.fireChannelRead(msg) 向后传递 | 必须 release |
ByteBuf 没有成功传递到下一个 | 必须 release |
出站 ByteBuf 处理原则:由 HeadContext flush 后 release
异常处理原则
- 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
看看释放 bytebuf 的源码
尾部释放
在 io.netty.channel 的类中的内部类 TailContext 下面方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
}
继续跟进
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
this.onUnhandledInboundMessage(msg);
if (logger.isDebugEnabled()) {
logger.debug("Discarded message pipeline : {}. Channel : {}.", ctx.pipeline().names(), ctx.channel());
}
}
这里就看到了 ReferenceCountUtil.release(msg); 用来释放
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
用来判断,如果是这个类型的,代表可以释放,否则返回false
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}
看看头部释放的源码
io.netty.channel下的HeadContext内部类的write方法
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
this.unsafe.write(msg, promise);
}
我们跟进 AbstractChannel实现类的 AbstractUnsafe内部类的 write方法
如果是通道是null 表示用计数方式关闭,并处理异常
this.assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
try {
ReferenceCountUtil.release(msg);
} finally {
this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.this.initialCloseCause, "write(Object, ChannelPromise)"));
}
} else {
int size;
try {
msg = AbstractChannel.this.filterOutboundMessage(msg);
size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable var15) {
try {
ReferenceCountUtil.release(msg);
} finally {
this.safeSetFailure(promise, var15);
}
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
slice 零拷贝体现之一
作用:减少内存复制
【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
切片之后,会对容量进行限制,切多少就是多少。不能再次添加
public class Server {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
// 赋值是个数据
buffer.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
// 打印
log(buffer);
// 进行切片,这里是没有发生数据复制的
// 从0 切到 5个,a-e
ByteBuf f1 = buffer.slice(0, 5);
// 从5 切到 5个,f-j
ByteBuf f2 = buffer.slice(5, 5);
log(f1);
log(f2);
System.out.println("===============");
// 修该第0位也就是 a,发现原bytebuf也修改了
f1.setByte(0, 'b');
log(f1);
log(buffer);
}
// netty 调试 bytebuf 方法
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
原 bytebuf 释放之后,切片的bytebuf是不能用了
解决上面的问题
我们用完那个,自动释放它自己的 bytebuf
工具类还是上面的
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
log(buffer);
ByteBuf f1 = buffer.slice(0, 5);
// 让引用数据加1
f1.retain();
ByteBuf f2 = buffer.slice(5, 5);
log(f1);
log(f2);
System.out.println("释放原 bytebuf");
// 让原bytebuf 减1,释放不了真正的内存,没有减到0,不会释放内存
buffer.release();
log(f1);
}
duplicate
【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的
copy
会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关
零拷贝,合并小bytebuf
重新计算了 指针位置,也是比较麻烦, 还是要注意 relase问题哦
public static void main(String[] args) throws Exception {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10);
buf1.writeBytes(new byte[]{6, 7, 8, 9, 10});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10);
buf2.writeBytes(new byte[]{1, 2, 3, 4, 5});
// 合并上面两个 buf,不发生复制
// 默认不会自动调整写入的位置,必须带上true
CompositeByteBuf bufs = ByteBufAllocator.DEFAULT.compositeBuffer();
bufs.addComponents(true, buf1, buf2);
log(bufs);
}
Unpooled
Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
ByteBuf 优势
- 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
- 可以自动扩容
- 支持链式调用,使用更流畅
- 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf
双向通信
服务端
public static void main(String[] args) throws Exception {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(Charset.defaultCharset()));
// 建议使用 ctx.alloc() 创建 ByteBuf
ByteBuf response = ctx.alloc().buffer();
response.writeBytes(buffer);
ctx.writeAndFlush(response);
// 思考:需要释放 buffer 吗
// 从 Socket 读取到字节数据之后,将数据转换成 ByteBuf 对象。
// 在 Netty 的特殊实现下,当该消息被服务端处理完成
// 即当消息被写回客户端之后,Netty 会自动释放其占用的内存。
// 思考:需要释放 response 吗
// response 是我们手动创建的 ByteBuf 对象,而不是从消息中通过解码产生的,因此我们需要手动释放该对象
// 又因为我们向客户端写入数据时使用了 ctx.writeAndFlush(response)
// 该方法会自动在发送完成后将 response 进行释放,因此我们不需要手动释放该对象。
// 但如果是通过 write() 方法写入数据,那么我们需要在写入完成后手动调用
// response.release() 方法对 response 对象进行释放。
}
});
}
}).bind(8080);
}
客户端
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap().group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(Charset.defaultCharset()));
} finally {
// 释放
ReferenceCountUtil.release(msg);
}
// 思考:需要释放 buffer 吗
// 接收到的 butebuf需要显示释放,避免内存泄露
// 如果不,则会保留到程序退出
}
});
}
}).connect("localhost", 8080).sync().channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}).start();
}
读和写误解
我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 B
和 B 到 A
的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读
服务端
public class TestServer {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(8888);
Socket s = ss.accept();
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
// 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据
for (int i = 0; i < 100; i++) {
writer.write(String.valueOf(i));
writer.newLine();
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
客户端
public class TestClient {
public static void main(String[] args) throws IOException {
Socket s = new Socket("localhost", 8888);
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
for (int i = 0; i < 100; i++) {
writer.write(String.valueOf(i));
writer.newLine();
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
netty 进阶
在Client中:在入站处理器中,channelActive方法会在客户端连接成功后触发。
- option(ChannelOption.SO_RCVBUF, 10):设置缓冲区大小 = 10
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
// 设置缓冲区大小,就不会一次全部接收了,但是产生了半包问题
.option(ChannelOption.SO_RCVBUF, 10);
看一组黏包现象,我们下面当客户端连接后会发送数据到服务端,可是我们希望循环一次发一次,服务端却一次全部接收了
READ: 160B,一次全部接收了
服务端
@Slf4j
public class Server {
public void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
// 关闭时,同步关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
boss.shutdownGracefully(); // 安全关闭
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new Server().start();
}
}
客户端
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// channelActive:在连接建立好之后触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 循环写入
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error...", e);
} finally {
worker.shutdownGracefully();
}
}
半包问题,就是设置了缓冲区大小,然后客户端发送的数据是
注意:TCP 存在黏包半包,UDP 是不存在的
TCP 应答机制(滑动窗口)
TCP 可靠:发信息到对端,对端必须应答,如果长时间没有应答,会重发信息
,影响吞吐量
滑动窗口解决上面问题
粘包:发送方发送一个完整报文,由于接收方处理不及时,且窗口比较大,就形成粘包
半包:接收方只剩下了一部分,发送方的数据不能全部放下,只能先放一部分,等待窗口移动继续放,形成半包
解决方式:引入窗口
窗口大小决定了无需等待应答而可以继续发送的数据最大值
窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
- 图中深色的部分即要发送的数据,高亮的部分即窗口
- 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
- 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
- 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收
短连接
解决粘包:客户端每发送一次,就关闭连接,也叫做短连接
下面是客户端,服务端还是上面的
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
send();
}
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// channelActive:在连接建立好之后触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 循环写入
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
ctx.close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
channelFuture.channel().closeFuture().sync();
} catch (
InterruptedException e) {
log.error("client error...", e);
} finally {
worker.shutdownGracefully();
}
}
但是会产生半包,如果服务端设置了 连接的 bytebuf 大小,发送方发的数据过大,每次都接收不全
这里说下,修改容量的方式--ServerBootstrap的类
// 是针对每个连接的配置,实现默认,会1024的容量
// 现在改成最小值,初始值,最大值。最低是十六,内部会取16的整数倍
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
固定长度
客户端和服务端约定好。服务端每次解码10个字节,多的就等下次够十个再解码。
注意顺序:处理器,一定要先解码再打印日志信息。解码处理器在前面
服务端先加入。
// 添加定长解码器,长度和客户端约定好
nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));
客户端---每次都是十个字节一个数据,不足用_补齐
public static void main(String[] args) {
send();
}
/**
* 填充数组为指定长度,并将数组中的元素替换为指定字符对应的字节值。
*
* @param c 要替换的字符
* @param len 数组长度
* @return 填充后的字节数组
*/
public static byte[] fill10Bytes(char c, int len) {
byte[] bytes = new byte[10];
Arrays.fill(bytes, (byte) 95);
for (int i = 0; i < len; ++i) {
bytes[i] = (byte) c;
}
System.out.println(new String(bytes));
return bytes;
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// channelActive:在连接建立好之后触发
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 循环写入
ByteBuf buffer = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 10; i++) {
buffer.writeBytes(fill10Bytes(c++, r.nextInt(10) + 1));
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
channelFuture.channel().closeFuture().sync();
} catch (
InterruptedException e) {
log.error("client error...", e);
} finally {
worker.shutdownGracefully();
}
}
服务端
public void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
// 添加定长解码器,长度和客户端约定好
nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));
nioSocketChannel.pipeline().addLast(new LoggingHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
// 关闭时,同步关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
boss.shutdownGracefully(); // 安全关闭
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new Server().start();
}
缺点:会浪费字节,长度需要决策好
行解码器
换行符解码器,不能指定,默认 \n \r
// 还有一种DelimiterBasedFrameDecoder
// 参数1是最大长度,参数2:是指定分隔符
addLast(new LoggingHandler(LogLevel.DEBUG));
客户端
public static void main(String[] args) {
send();
}
/**
* 使用指定的字符生成一个指定长度的字符串,并在字符串末尾添加换行符。
*
* @param c 要生成的字符
* @param len 字符串长度
* @return 生成的字符串
*/
public static StringBuilder makeString(char c, int len) {
StringBuilder sb = new StringBuilder(len + 2);
for (int i = 0; i < len; ++i) {
sb.append(c);
}
sb.append("\n");
return sb;
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// channelActive:在连接建立好之后触发
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 循环写入
ByteBuf buffer = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 10; i++) {
String s = makeString(c++, r.nextInt(256) + 1).toString();
buffer.writeBytes(s.getBytes());
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
channelFuture.channel().closeFuture().sync();
} catch (
InterruptedException e) {
log.error("client error...", e);
} finally {
worker.shutdownGracefully();
}
}
服务端
public void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
// 添加基于换行符的数据,遇到 \n \r等就会将这条数据截取。每一行最大数据1024
// 超出则丢弃
nioSocketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024));
nioSocketChannel.pipeline().addLast(new LoggingHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
// 关闭时,同步关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
boss.shutdownGracefully(); // 安全关闭
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new Server().start();
}
LTC 解码器
lengthFieldOffset
长度字段偏移量,也就是长度字节开始位置,图中等于0,表示长度字节从 0 开始
lengthFieldLength
长度字段本身是多长,图中等于 2,表示000C,C = 12
lengthAdjustment
长度字段为基准,还有几个字节是内容
initialBytesToStrip
从头剥离几个字节,也就是字段长度记录了内容长度,一共两个字节,解析完想删除这两个字节
就可以根据上面信息,从0开始读,读12个字节拿到 hello world
下面这个有点不一样,它多带了一个头,从第 2 个字节开始读,一共三个长度字段
表示从0开始读,长度字段为 3 个,不需要剥离,跳过两个字节,开始读取12个内容字节
下面方法测试
public static void main(String[] args) throws Exception {
// netty 的测试方法,省去很多
EmbeddedChannel channel = new EmbeddedChannel(
// 参数1:最大长度,参数2:长度字段开始位置,参数3:长度字段本身字节数,参数4:是否跳过字节,参数5:剥离长度
new LengthFieldBasedFrameDecoder(
1024, 0, 4, 1, 4),
new LoggingHandler(LogLevel.DEBUG)
);
// 四个字节的内容长度,实际内容
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
send(buffer, "Hello, world");
send(buffer, "Hi!");
// 使用 Em的channel写入 bufer
channel.writeInbound(buffer);
// 打印
// .Hello, world,
// 前面的 . 代表我们的附加内容,如果不剥离字段长度,会更多
}
private static void send(ByteBuf buffer, String content) {
byte[] bytes = content.getBytes();
buffer.writeInt(bytes.length); // 先写入内容的长度
buffer.writeByte(1); // 我们写入一个附加内容,前面已经跳过的字节,在参数4位置
buffer.writeBytes(bytes);
}
协议设计与解析
TCP/IP 中消息传输基于流的方式,没有边界。
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
- redis 协议
- 要求我们先发送
- *3:表示三个元素
- 每个命令的长度
- $3:表示 set 三个长度
- set
- $4
- name
- $8
- zhangsan
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
final byte[] LINE = new byte[]{13, 10};
try {
Bootstrap bootstrap = new Bootstrap().group(worker).channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes()); // name 四个长度
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$8".getBytes()); // zhangsan 8个长度
buffer.writeBytes(LINE);
buffer.writeBytes("zhangsan".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost",6379).sync();
// 关闭时,同步关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
worker.shutdownGracefully();
}
}
http 协议
new HttpServerCodec()
Netty 提供的一个方便的处理器,它将实际上是将 HttpRequestDecoder 和 HttpResponseEncoder 组合在一起,前者用于解码 HTTP 请求,后者用于编码 HTTP 响应。
下面是简单使用
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
NioEventLoopGroup boss = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new HttpServerCodec());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("{}", msg.getClass());
if (msg instanceof HttpRequest) { // 请求行,请求头(是get请求)
} else if (msg instanceof HttpContent) { // 请求体(不是get)
}
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8352).sync();
// 关闭时,同步关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
worker.shutdownGracefully();
}
}
我们希望只关注某一种类型的处理
SimpleChannelInboundHandler:只关心特定的消息,request或者httpconent,不是则跳过
- DefaultFullHttpResponse:获取响应对象,进行写回数据
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
NioEventLoopGroup boss = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new HttpServerCodec()); // 即使入栈处理器也是出站处理器
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception {
// 获取请求
log.debug(httpRequest.uri()); // 请求行
log.debug("{}", httpRequest.headers()); // 请求头
// 获取响应
/*
* version 版本
* status 响应状态
*/
DefaultFullHttpResponse response = new DefaultFullHttpResponse(httpRequest.protocolVersion(), HttpResponseStatus.OK);
// 向浏览器写入数据
byte[] bytes = "<h1>Hello World</h1>".getBytes();
response.content().writeBytes(bytes);
// 我们应该在响应头加 响应体长度,否则浏览器会一直在等待响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length); // netty 框架 HttpHeaderNames
// 写回响应
channelHandlerContext.writeAndFlush(response);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8352).sync();
// 关闭时,同步关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
worker.shutdownGracefully();
}
}
自定义协议要素
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊... 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
先写编解码器
这里写一个 message 的发送的信息,只是一个类型状态
会有多个子类继承这个 message 类,表示多个状态
public abstract class Message implements Serializable {
/**
* 根据消息类型字节,获得对应的消息 class
*
* @param messageType 消息类型字节
* @return 消息 class
*/
public static Class<? extends Message> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
// 存储序列号
private int sequenceId;
// 存储消息的类型
private int messageType;
public abstract int getMessageType();
// 登录请求消息
public static final int LoginRequestMessage = 0;
// 登录响应消息
public static final int LoginResponseMessage = 1;
// 聊天请求消息
public static final int ChatRequestMessage = 2;
// 聊天响应消息
public static final int ChatResponseMessage = 3;
// 创建群组请求消息
public static final int GroupCreateRequestMessage = 4;
// 创建群组响应消息
public static final int GroupCreateResponseMessage = 5;
// 加入群组请求消息
public static final int GroupJoinRequestMessage = 6;
// 加入群组响应消息
public static final int GroupJoinResponseMessage = 7;
// 退出群组请求消息
public static final int GroupQuitRequestMessage = 8;
// 退出群组响应消息
public static final int GroupQuitResponseMessage = 9;
// 群组聊天请求消息
public static final int GroupChatRequestMessage = 10;
// 群组聊天响应消息
public static final int GroupChatResponseMessage = 11;
// 获取群组成员请求消息
public static final int GroupMembersRequestMessage = 12;
// 获取群组成员响应消息
public static final int GroupMembersResponseMessage = 13;
// 心跳请求消息
public static final int PingMessage = 14;
// 心跳响应消息
public static final int PongMessage = 15;
/**
* 请求类型 byte 值
*/
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
/**
* 响应类型 byte 值
*/
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
自定义编码解码
readInt 是Netty中ByteBuf类的一个方法,用于从ByteBuf中读取一个int类型的数据。
它返回一个int类型的值,并将读取索引自动增加4(int的字节数)
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
// 编码
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 4个字节的魔数,任意
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本
out.writeByte(1);
// 3. 字节的序列化方式 0,jdk 1 JSON
out.writeByte(0);
// 4. 字节指令类型---.getMessageType 拿到当前多态的,类型,每个类都有指定的类型
out.writeByte(msg.getMessageType());
// 5. 4个字节--请求序号
out.writeInt(msg.getSequenceId());
out.writeByte(0xff); // 为了凑数的,因为固定字节数 要 2 的倍数
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容--用jdk序列化
out.writeBytes(bytes);
}
// 解码
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 先读取魔术
int magicNum = in.readInt();
// 读取版本
byte version = in.readByte();
// 读取序列号方式
byte serializerType = in.readByte();
// 读取消息类型 messageType
byte messageType = in.readByte();
// 读取请求序号
int sequenceId = in.readInt();
// 读取无意义的字节,当时为了规范加的
in.readByte();
// 读取长度
int length = in.readInt();
// 获取数据字节
byte[] bytes = new byte[length];
ByteBuf byteBuf = in.readBytes(bytes, 0, length);
// 转为对象
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(byteArrayInputStream);
Message message = (Message) ois.readObject();
log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
// 最后添加到集合中
out.add(message);
}
}
然后我们测试这个处理器
- 1024:最大容量
- 12:偏移量,因为序列号,魔数,类型无效字符等,协议。
- 4:长度字节
- 0:不需要跳过
- 0:因为 decode自己解析,所以不需要
注意要加LengthFieldBasedFrameDecoder
(1024, 12, 4, 0, 0),解决黏包半包
开启了解码器:如果数据不完整,将不会往下走
writeInbound:执行完会调用 release释放 bytebuf
public static void main(String[] args) throws Exception {
// 添加刚刚写的编码解码
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new LoggingHandler(),
new MessageCodec());
// encode 编码
LoginRequestMessage loginRequestMessage = new LoginRequestMessage("zhangsan", "123");
channel.writeOutbound(loginRequestMessage);
// decode 解码
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// 写入数据
new MessageCodec().encode(null, loginRequestMessage, buffer);
// 入站
channel.writeInbound(buffer);
}
打印的字节数,可以根据自己定义的协议来,对比,是正确的
注解(安全情况)
sharable 线程安全
我们可以打开处理类的源码查看
@Sharable
表示线程安全,这个处理器可以被共享使用
像 LTC解码器是不可以被共享的,存在线程隐患
@Sharable
public class LoggingHandler extends ChannelDuplexHandler {
那我们自己定义的 handle 可以加共享注解吗
ByteToMessageCodec
;子类文档明确规定,不能加共享注解,否则抛出异常
public class MessageCodec extends ByteToMessageCodec<Message> {
那需要添加怎么解决
使用这个父类即可--MessageToMessageCodec
有一些小变动,需要自行改一下,bytebuf方面
public class MessageCodec extends MessageToMessageCodec<ByteBuf, Message> {
开发聊天室
首先,我们协议在 LengthFieldBasedFrameDecoder
直接写,会容易出错我们制作成类
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
// 协议
public ProcotolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
登录
客户端
客户端连接成功之后进行连接建立事件,控制台输入
需要开辟线程,使用 nio 线程会影响其他 io 操作
- 我们构建完对象,进行
writeAndFlush
发送 - 往上继续找上一个
handel
MESSAGE_CODEC
编码,LOGGING_HANDLER
记录日志- ProcotolFrameDecoder是入站的所以不会触发
先看一下下面这个 CountDownLatch
用于同步工具类,协调线程之间的等待时间
countDown()
:每次调用减 1,为零释放所有线程await()
:让调用的线程等待计数器为 0 ,立即返回,否则会阻塞
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
用于在多线程下判断的 boolean 对象
get()
:获取当前布尔值。set(boolean newValue)
:设置布尔值为指定的新值。
AtomicBoolean LOGIN = new AtomicBoolean();
会话管理---登录成功后保存是那个channel
那个用户
将用户和 channel
绑定起来
SessionFactory.getSession().bind(ctx.channel(), username);
注意:handler 没有加 @ChannelHandler.Sharable 会导致其他客户端连接不上
连接假死
原因
- 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
- 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
- 应用程序线程阻塞,无法进行数据读写
问题
- 假死的连接占用的资源不能自动释放
- 向假死的连接发送数据,得到的反馈是发送超时
IdleStateHandler 处理器解决以上问题
- 第一个参数: 表示读空闲时间,即在指定的时间间隔内如果没有读取到数据,则会触发userEventTriggered方法。
- 第二个参数: 表示写空闲时间,即在指定的时间间隔内如果没有写入数据,则会触发userEventTriggered方法。
- 第三个参数: 表示读写空闲时间,即在指定的时间间隔内如果既没有读取到数据也没有写入数据,则会触发userEventTriggered方法。
用心跳机制,保证客户端正常链接,处理空闲问题
优化
序列化算法
序列化,反序列化主要用在消息正文的转换上
- 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
- 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
/**
* 扩展序列化
*/
public interface Serializer {
// 反序列化
<T> T deserialize(Class<T> tClass, byte[] bytes);
// 序列化
<T> byte[] serialize(T object);
enum Algorithm implements Serializer {
Java {
@Override
public <T> T deserialize(Class<T> tClass, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
}
},
Json {
@Override
public <T> T deserialize(Class<T> tClass, byte[] bytes) {
String s = new String(bytes, StandardCharsets.UTF_8);
return new Gson().fromJson(s, tClass);
}
@Override
public <T> byte[] serialize(T object) {
String toJson = new Gson().toJson(object);
return toJson.getBytes(StandardCharsets.UTF_8);
}
}
}
}
注意反序列化,获得的对象
参数调优
// 客户端通过 option 配置参数,给 SocketChannel 配置参数
new Bootstrap().option()
// 服务端通过 option 配置参数,给 ServerSocketChannel 配置参数
new ServerBootstrap().option()
// 服务端通过 childOptio 配置参数,给 SocketChannel 配置参数
new ServerBootstrap().childOption()
客户端超时连接
客户端不一定等到 5秒后抛出异常,它有可能判断服务器没有开启,根本连接不上,直接抛出网络编程异常
bootstrap.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500);
SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间,netty 不会用的。
全部代码
client
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
// 线程计数器
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
// 登录状态--默认未登录
AtomicBoolean LOGIN = new AtomicBoolean(false);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
// bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
// 初始化操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
// ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
// 用来判断是不是,读空闲时间太长或者 写时间空闲时间太长
// 5s 没用收到 channel 的数据会触发一个 IdleState#WRITER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 触发特殊事件 IdleState
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
// 写空闲
if (event.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingMessage());
}
}
});
// 建立连接之后触发 active
ch.pipeline().addLast("client_handel", new ChannelInboundHandlerAdapter() {
// 接受响应信息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// log.debug("{}", msg);
if (msg instanceof LoginResponseMessage) {
LoginResponseMessage message = (LoginResponseMessage) msg;
// 如果登录成功设置为 true
if (message.isSuccess()) {
LOGIN.set(true);
}
// 让计数器 - 1,让线程继续执行,唤醒
WAIT_FOR_LOGIN.countDown();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 在这里要创建线程去实现,控制台输入,负责向服务器发送信息
new Thread(() -> {
Scanner sc = new Scanner(System.in);
System.out.println("请输入用户名:");
String username = sc.nextLine();
System.out.println("请输入密码:");
String password = sc.nextLine();
// 构建消息对象
LoginRequestMessage message = new LoginRequestMessage(username, password);
// 发送信息
ctx.writeAndFlush(message);
System.out.println("等待后续操作....");
try {
// 响应回来,继续执行
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果登录失败,关闭 channel,返回
log.debug("{}", LOGIN.get());
if (!LOGIN.get()) {
ctx.channel().close();
return;
}
// 打印菜单,登录成功
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]"); // 发送
System.out.println("gsend [group name] [content]"); // 向群聊发送
System.out.println("gcreate [group name] [m1,m2,m3...]"); // 创建群,并且拉人
System.out.println("gmembers [group name]"); // 查看群里成员
System.out.println("gjoin [group name]"); // 加入聊天群
System.out.println("gquit [group name]"); // 退出聊天群
System.out.println("quit");
System.out.println("==================================");
Scanner command = new Scanner(System.in);
// 用户输入命令--因为命令都是以空格分割的
String[] s = command.nextLine().split(" ");
switch (s[0]) { // 根据命令格式,0 表示命令类型
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
break;
case "gcreate":
HashSet<String> strings = new HashSet<>(Arrays.asList(s[2].split(",")));
strings.add(username);
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], strings));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
case "quit":
ctx.channel().close();
return;
}
}
}, "system in").start();
}
});
}
});
log.debug("登录");
Channel channel = bootstrap.connect("localhost", 8081).sync().channel();
channel.closeFuture().sync();
} catch (
Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
config 包
public abstract class Config {
static Properties properties;
static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
public static int getServerPort() {
String value = properties.getProperty("server.port");
if(value == null) {
return 8080;
} else {
return Integer.parseInt(value);
}
}
public static Serializer.Algorithm getSerializerAlgorithm() {
String value = properties.getProperty("serializer.algorithm");
if(value == null) {
return Serializer.Algorithm.Java;
} else {
return Serializer.Algorithm.valueOf(value);
}
}
}
protocol
@ChannelHandler.Sharable
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(0);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message);
}
}
@ChannelHandler.Sharable
/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
/* ordinal 参数会根据排序进行对比,比如 java是一个那么就会转换 0 以此类推 */
out.writeByte(Config.getSerializerAlgorithm().ordinal());
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组,序列化, 使用
byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
// 反序列化
// 根据个数拿到对应的序列化算法,找到反序列化的算法
Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerType];
// 确定具体消息类型
Class<?> aClass = Message.getMessageClass(messageType);
Object message = algorithm.deserialize(aClass, bytes);
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message);
}
}
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProcotolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
/**
* 扩展序列化
*/
public interface Serializer {
// 反序列化
<T> T deserialize(Class<T> tClass, byte[] bytes);
// 序列化
<T> byte[] serialize(T object);
enum Algorithm implements Serializer {
Java {
@Override
public <T> T deserialize(Class<T> tClass, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
}
},
Json {
@Override
public <T> T deserialize(Class<T> tClass, byte[] bytes) {
String s = new String(bytes, StandardCharsets.UTF_8);
return new Gson().fromJson(s, tClass);
}
@Override
public <T> byte[] serialize(T object) {
String toJson = new Gson().toJson(object);
return toJson.getBytes(StandardCharsets.UTF_8);
}
}
}
}
server.handler
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage chatRequestMessage) throws Exception {
// 获取接收者者
String to = chatRequestMessage.getTo();
// 根据用户名获取 channel
Channel channel = SessionFactory.getSession().getChannel(to);
// 为空,表示不在线
if (channel != null) {
// 发送信息
channel.writeAndFlush(new ChatResponseMessage(chatRequestMessage.getFrom(), chatRequestMessage.getContent()));
} else {
channelHandlerContext.writeAndFlush(new ChatResponseMessage(false, "对方用户不在线"));
}
}
}
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupChatRequestMessage groupChatRequestMessage) throws Exception {
// 根据群名,获取所有通道
List<Channel> channel = GroupSessionFactory.getGroupSession()
.getMembersChannel(groupChatRequestMessage.getGroupName());
// 循环发送信息
for (Channel channel1 : channel) {
channel1.writeAndFlush(new GroupChatResponseMessage(groupChatRequestMessage.getFrom(),
groupChatRequestMessage.getContent()));
}
}
}
/**
* 处理创建群聊
*/
@ChannelHandler.Sharable
public class GroupCreateRequestHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupCreateRequestMessage groupCreateRequestMessage) throws Exception {
// 将要创建的群聊名称
String groupName = groupCreateRequestMessage.getGroupName();
// 创建群并且要拉入的群成员
Set<String> members = groupCreateRequestMessage.getMembers();
// 获取群管理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
// 创建群
Group group = groupSession.createGroup(groupName, members);
if (group == null) {
// 发送拉群消息
List<Channel> channelList = groupSession.getMembersChannel(groupName);
for (Channel channel : channelList) {
channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
}
channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));
} else {
channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "群已经存在"));
}
}
}
@ChannelHandler.Sharable // 因为没有状态信息,多线程下是安全的
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
// 判断登录状态
boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage message;
if (login) {
SessionFactory.getSession().bind(ctx.channel(), username);
message = new LoginResponseMessage(true, "登录成功");
} else {
message = new LoginResponseMessage(false, "用户名或密码不正确");
}
ctx.writeAndFlush(message);
}
}
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
// 当连接断开触发 inactive 事件
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 正常断开", ctx.channel());
}
// 用户异常断开
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 异常断开", ctx.channel());
}
}
backlog 连接
服务器在三次连接建立之后,将半连接队列转移到全连接队列
因为服务器 accept 连接量特别大,所以将建立成功的信息放到全连接队列。以此处理。
accept 是发生在连接之前的。
-
在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
-
sync queue - 半连接队列
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
-
accept queue - 全连接队列
- 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值,假设linux配置 100,程序配置 200 ,取100为准
- 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
ChannelOption.SO_BACKLOG, 2
,设置全连接队列容量大小
new ServerBootstrap()
.group(new NioEventLoopGroup())
// 配置让全链接队列最大存放 2个
.option(ChannelOption.SO_BACKLOG, 2)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LoggingHandler());
}
}).bind(8424);
断点在 nioeventloop 下面代码中,即可调试
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
超过队列大小,将会抛出异常
系统参数
- ulimit -n:一个进程可以打开的文件操作数
- TCP_NODELAY:设置 true 不需要延迟,不需要算法,建议 true
- 属于 SocketChannal 参数
- 属于操作系统参数
- SO_SNDBUF & SO_RCVBUF
- SO_SNDBUF 属于 SocketChannal 参数
- SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
内存缓冲区
io 只能是直接内存,ctx获取的 bytebuf
三种线程模型
单线程模型
异步非阻塞io,所有的操作都是由一个nio线程处理,适合特别小的程序
一个单线程负荷过度,客户端向服务端超时,服务端的超时处理会处理,最后卡死就会宕机
多线程模型
分成reactor 单线程和reactor 多线程 单线程负责接受新来的请求 处理逻辑交给线程池处理
由一组nio线程组成
主从线程模型
由一组线程池接受请求,一组线程池处理io
netty 周期周期
- 处理器添加
- channel 注册
- channel 连接
- channel 读取完毕
- channel 断开
- channel 移除
- 处理器移除
/**
* 定义入站,指定的http请求
*/
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
// 获取channel
Channel channel = ctx.channel();
// 显示客户端远程地址
System.out.println(channel.remoteAddress());
/*
/0:0:0:0:0:0:0:1:5220
/0:0:0:0:0:0:0:1:5220
/0:0:0:0:0:0:0:1:5221
/0:0:0:0:0:0:0:1:5221
会打印四次,原因是有两次并不是请求request,和图标等
还可以用 curl ip地址, 进行干净的请求
*/
// 将发送信息存到容器中
ByteBuf buf = ctx.alloc().buffer().writeBytes("hello netty".getBytes());
// 构建 httpresponse
DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
// 定义响应数据类型和长度
httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
httpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
// 刷到客户端
ctx.writeAndFlush(httpResponse);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 注册");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 移除");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 连接");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 断开");
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 读取完毕");
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("channel 用户时间触发");
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 可写更改");
super.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("channel 发生异常");
super.exceptionCaught(ctx, cause);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("处理器添加");
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("处理器移除");
super.handlerRemoved(ctx);
}
}
转载自:https://juejin.cn/post/7254096495183298597