Netty编解码器与TCP粘包拆包
Netty编解码器
- Netty的处理器可以分为两类:入站处理器与出站处理器。
- 入站处理器的顶层是channelInboundHandler,出站处理器的顶层是channelOutboundHandler。
- 数据处理时常用的各种编解码器本质上都是处理器。
- 编解码器:无论我们向网络中写入的数据是什么类型(int、char、String、二进制等),数据在网络中传递时,其都是以字节流的形式呈现的﹔将数据由原本的形式转换为字节流的操作成为编码(encode),将数据由字节转换为它原本的格式或是其他格式的操作成为解码(decode),编解码统一称为codec,
- 编码:本质上是一种出站处理器;因此,编码一定是一种channelOutboundHandler。
- 解码∶本质上是一种入站处理器;因此,解码一定是一种channelInboundHandler。
- 在Netty中,编码器通常以xxxEncoder命名;解码器通常以xxxDecoder命名。只是通常,但是不一定。
ByteToMessageDecoder
一般用于自定义解码器。
- ChannelInboundHandlerAdapter是一个从一个ByteBuf解码为另一个消息类型的流式解码器。例如,这里有一个实现,它从输入的ByteBuf中读取所有可读字节并创建一个新的ByteBuf。
public class SquareDecoder extends ByteToMessageDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { out.add(in.readBytes(in.readableBytes())); } }
- 帧检测
- 通常情况下,应该通过添加DelimiterBasedFrameDecoder、FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder或LineBasedFrameDecoder等来提前在管道中处理帧检测。
- 如果需要自定义帧解码器,则需要在使用ByteToMessageDecoder实现帧解码器时要小心。通过检查ByteBuf.readableBytes(),确保缓冲区中有足够的字节来形成完整的帧。如果没有足够的字节来形成完整的帧,则应返回而不修改读取器索引,以允许更多字节到达。
- 比如要解码整型(4字节),但是只收到3个字节,当然要等待后续字节。
- 要在不修改读取器索引的情况下检查完整的帧,请使用ByteBuf.getInt(int)等方法。在使用ByteBuf.getInt(int)等方法时必须使用读取器索引。例如,调用in.getInt(0)假定帧从缓冲区的开头开始,这并不总是正确的。应该使用in.getInt(in.readerIndex())代替。
- 这里的帧不是数据链路层的帧。
- 封装成帧是数据链路层的功能,它是指在一段数据的前后分别添加首部和尾部,构成了一个帧。MAC帧是一种特定的帧格式,它是以太网中使用的帧类型,它的首部包含了目的地址、源地址、类型、校验和等信息。MAC帧的封装也是在数据链路层完成的,它是数据链路层的一种协议数据单元(PDU)。
- Netty所说的帧是应用层的。
- 这里说的帧是指应用层的数据在传输过程中被分割成的一个个数据块,它通常有一个固定的长度或者有一个特定的分隔符来标识。帧解码器的作用是将接收到的字节流重新组合成完整的帧,以便应用层处理。
- 简单说就是消息的边界要确定,从哪到哪是一条完整的消息。
- 确定边界至关重要的,接受方不知道要收多少数据,如果读多了就会容易死锁。
- 信道没有多的消息,接受方一直在等,又因为,发送方也在等待接收响应,双方等待即是死锁。
- 所以应用层也要将消息封装成帧。
- 确定消息的大小常用方式:
- 在结尾加特殊字段。
- 在开头用几个字节表示消息的大小。
- 使用固定长度的帧,每个帧的长度都是一样的,不需要额外的信息来标识帧的边界。这种方式简单高效,但是可能会浪费一些空间或者需要填充一些无用的数据。
- 使用长度字段和类型字段来标识帧的大小和内容,这种方式可以支持多种类型的消息,但是需要额外的空间来存储长度和类型信息。
- 使用转义字符来标识帧的开始和结束,如果数据中出现了转义字符,就用另一个转义字符来表示它。这种方式可以避免使用特殊字段或者固定长度,但是需要对数据进行转义和反转义的操作。
- 这其实就是粘包与拆包
- 粘包是指发送方发送的多个数据包被TCP协议合并成一个大的数据包发送给接收方,导致接收方无法区分数据包的边界。
- 拆包是指发送方发送的一个数据包被TCP协议拆分成多个小的数据包发送给接收方,导致接收方无法获取完整的数据包。粘包和拆包的原因是TCP协议是面向字节流的,没有消息保护边界,而且为了提高传输效率。
- 所以应用层要采取一些措施,来确定消息的边界,于是有了编解码。
- 注意事项
- 注意,ByteToMessageDecoder的子类不得标记为@Sharable。
- Decoder是有状态的,当然不能共享,不然会有线程安全问题。
- 一些方法,如ByteBuf.readBytes(int),如果未释放返回的缓冲区或将其添加到out List中,将导致内存泄漏。使用派生缓冲区(如ByteBuf.readSlice(int))以避免泄漏内存。
- 注意,ByteToMessageDecoder的子类不得标记为@Sharable。
- 该类主要是实现decode()方法:
- 将byte解码成long:
public class ByteMessageToLongDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { System.out.println("将字节解码成long类型"); System.out.println(in.readableBytes());//没有被读过的字节数 if (in.readableBytes() >= 8) { //long类型是8字节,小于8肯定消息不全 out.add(in.readLong()); } } }
- 将byte解码成long:
MessageToByteEncoder
- 和ByteToMessageDecoder大同小异,只不过它是出站处理器。
- 但是该类需要指定范型。
- 将long编码成byte
//需要将范型指定为Long public class LongMessageToByteEncoder extends MessageToByteEncoder<Long> { @Override protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception { System.out.println("将long编码成byte"); out.writeLong(msg); } }
- 如果将范型指定为Long类型,但你的消息不是Long类型,那么该消息将会跳过该编码处理器!
- acceptOutboundMessage(msg)可以自定义,默认是类型匹配。
- 如果将范型指定为Long类型,但你的消息不是Long类型,那么该消息将会跳过该编码处理器!
- 将long编码成byte
编解码器要点
- 无论是编码器还是解码器,其所接收的消息类型必须要与待处理的参数类型一致,否则该编码器或解码器并不会被执行。就是上一个Handler所生成的消息类型要和当前的处理的消息类型要匹配(要理解数据的流向,入站是从前往后,出站是从后往前,一般情况下)。
- 在解码器进行数据解码时,一定要记得判断缓冲(ByteBuf)中的数据是否足够。
if (in.readableBytes() >= 8) { //long类型是8字节,小于8肯定消息不全 out.add(in.readLong()); }
Netty中常用的编解码器
ReplayingDecoder
-
可以反复执行的,直到,消息完整为止。
-
ReplayingDecoder是ByteToMessageDecoder的一种专门变体,它使得在阻塞I/O模式下实现非阻塞解码器成为可能。
-
ReplayingDecoder与ByteToMessageDecoder最大的区别在于它允许您实现decode()和decodeLast()方法,就像所有必需的字节已经接收到一样,而不是检查所需字节的可用性。
-
例如,以下是使用ByteToMessageDecoder实现的整数头帧解码器:
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { //演示了消息头用int表示消息的大小,先解析消息大小,之后交给下一个解码器,解码消息内容 if (buf.readableBytes() < 4) { return; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; } out.add(buf.readBytes(length)); } }
-
使用ReplayingDecoder,上述代码可以简化如下:
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { out.add(buf.readBytes(buf.readInt())); } }
-
这是如何工作的?
- ReplayingDecoder传递了一个特殊的ByteBuf实现,当缓冲区中没有足够的数据时,会抛出某种类型的错误。在上面的IntegerHeaderFrameDecoder中,您假设调用buf.readInt()时缓冲区中将有4个或更多字节。如果缓冲区中实际上有4个字节,它将返回您期望的整数头。否则,将引发错误并将控制返回给ReplayingDecoder。 如果ReplayingDecoder捕获到错误,则它将将缓冲区的readerIndex倒回到“初始”位置(即缓冲区的开头),并在更多数据接收到缓冲区时再次调用decode(...)方法。
- 请注意,为避免为每次抛出错误创建新错误并填充其堆栈跟踪的开销,ReplayingDecoder始终抛出相同的缓存错误实例。
-
限制
- 以简单为代价,ReplayingDecoder强制您遵守一些限制:
- 某些缓冲区操作被禁止。
- skipBytes(int) :跳过指定数量的字节,因为这可能会导致数据丢失或错误的解码。
- readBytes(int) :读取指定数量的字节,因为这可能会导致数据不足或多余。
- readBytes(ByteBuf, int, int) :读取指定数量的字节到另一个ByteBuf,因为这可能会导致数据不足或多余。
- readBytes(byte[], int, int) :读取指定数量的字节到一个字节数组,因为这可能会导致数据不足或多余。
- readBytes(ByteBuffer) :读取指定数量的字节到一个ByteBuffer,因为这可能会导致数据不足或多余。
- readBytes(OutputStream, int) :读取指定数量的字节到一个输出流,因为这可能会导致数据不足或多余。
- 如果网络速度很慢且消息格式复杂,则性能可能会更差。在这种情况下,您的解码器可能需要一遍又一遍地解码消息的同一部分。
- 比如,网络很慢,int消息迟迟收不到4个字节,尽管已经执行了很多遍。
- 您必须记住,decode(...)方法可能会被调用多次以解码单个消息。例如,以下代码将不起作用:
public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>();//类的成员变量 @Override public void decode(.., ByteBuf buf, List<Object> out) throws Exception { // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); //就是第一个整数已经到了,调用一次offer,但是第二个没到,那么就又重新从第一个整数开始 // 该断言将间歇性地失败,因为 values.offer() 可能会被调用多次! assert values.size() == 2; out.add(values.poll() + values.poll()); } }
- 正确的实现如下,您还可以利用下一节中详细介绍的“checkpoint”功能。
public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>(); @Override public void decode(.., ByteBuf buf, List<Object> out) throws Exception { // 恢复自上次部分解码以来可能已更改的变量状态。那我觉得把values变成方法的局部变量好像也可以 values.clear(); // 消息包含2个整数。 values.offer(buf.readInt()); values.offer(buf.readInt()); // 现在我们知道这个断言永远不会失败。 assert values.size() == 2; out.add(values.poll() + values.poll()); } }
-
提高性能
-
幸运的是,复杂解码器实现的性能可以通过checkpoint()方法显著提高。
-
checkpoint()方法更新缓冲区的“初始”位置,以便ReplayingDecoder将缓冲区的readerIndex回溯到您调用checkpoint()方法的最后位置。这样可以避免重复解码已经解码过的数据,提高性能。
- checkpoint()方法的原理是在ReplayingDecoder内部维护了一个checkpoint变量,用来记录缓冲区的“初始”位置。当缓冲区中没有足够的数据时,ReplayingDecoder会抛出一个特殊的Error,并将缓冲区的readerIndex重置为checkpoint变量的值。当缓冲区中有足够的数据时,ReplayingDecoder会调用decode()方法,并在适当的时机调用checkpoint()方法来更新checkpoint变量的值。
-
使用枚举类型调用checkpoint(T)
-
尽管可以使用checkpoint()方法并自己管理解码器的状态,但管理解码器状态的最简单方法是创建表示解码器当前状态的枚举类型,并在状态更改时调用checkpoint(T)方法。根据要解码的消息的复杂程度,您可以拥有尽可能多的状态:
public enum MyDecoderState { READ_LENGTH, READ_CONTENT; } //范型是枚举 public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> { private int length; public IntegerHeaderFrameDecoder() { //设置初始状态 super(MyDecoderState.READ_LENGTH); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { switch (state()) { case READ_LENGTH: length = buf.readInt(); checkpoint(MyDecoderState.READ_CONTENT); case READ_CONTENT: ByteBuf frame = buf.readBytes(length); checkpoint(MyDecoderState.READ_LENGTH); out.add(frame); break; default: throw new Error("不应该到达此处。"); } } }
-
使用无参数调用checkpoint()管理解码器状态的另一种替代方法是自行管理它。
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> { private boolean readLength; private int length; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { if (!readLength) { length = buf.readInt(); readLength = true; checkpoint(); } if (readLength) { ByteBuf frame = buf.readBytes(length); readLength = false; checkpoint(); out.add(frame); } } }
-
-
在管道中用另一个解码器替换解码器
- 如果要编写协议多路复用器,则可能需要将ReplayingDecoder(协议检测器)替换为另一个ReplayingDecoder、ByteToMessageDecoder或MessageToMessageDecoder(实际协议解码器)。
- 仅仅通过调用ChannelPipeline.replace(ChannelHandler,String,ChannelHandler)是无法实现这一点的,需要一些额外的步骤:
public class FirstDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) { ... //解码第一条消息 Object firstMessage = ...; //添加第二个解码器 ctx.pipeline().addLast("second", new SecondDecoder()); if (buf.isReadable()) { //将剩余数据交给第二个解码器 out.add(firstMessage); out.add(buf.readBytes(super.actualReadableBytes())); } else { //没有需要交给第二个解码器的数据 out.add(firstMessage); } //移除第一个解码器 ctx.pipeline().remove(this); } }
MessageToMessageDecoder<I>
- 就是将具体类型转成另外的具体类型,所以要指定范型,表明接收的消息类型。
- 将String类型解码成它的长度
//指定穿过来的类型是String public class StringToIntegerDecoder extends MessageToMessageDecoder<String> { @Override public void decode(ChannelHandlerContext ctx, String message, List<Object> out) { out.add(message.length()); } }
- 将String类型解码成它的长度
LineBasedFrameDecoder
- 基于行的解码器。
- 这是一个用于将接收到的ByteBuf按行分割的解码器。
- 同时支持"\n"和"\r\n"两种行尾符号。 该字节流被期望是使用UTF-8字符编码或ASCII编码的(其中一个编码方式即可)。
- 当前的实现方式是先将字节转换成字符,然后与'\n'或'\r'这类低ASCII字符进行比较。
- UTF-8编码不使用低ASCII范围内的[0..0x7F]字节值来表示多字节的字符编码,因此该实现方式可以完全支持UTF-8编码。
- 低ASCII字符指的是ASCII字符集中的前128个字符,其编码范围是0x00到0x7F。这些字符包括英文字母、数字、标点符号等常见字符,其编码使用一个字节就可以表示。
- UTF-8是一种Unicode字符编码标准,它可以表示所有Unicode字符集中的字符。UTF-8编码使用1至4个字节来表示一个字符,每个字节的范围是0x00到0xFF(也就是0至255)。对于单字节字符(例如英文字符和数字),UTF-8编码使用ASCII码中的相同值,因此它们的字节范围是0x00到0x7F(也就是0至127)。而对于多字节字符,UTF-8编码使用高位字节和低位字节来表示字符,其中高位字节的范围通常是0xC0到0xFF(也就是192至255),低位字节的范围通常是0x80到0xBF(也就是128至191)。因此,UTF-8编码不使用低ASCII范围内的[0..0x7F]字节值来表示多字节字符编码。
- LineBasedFrameDecoder的实现方式,它通过将字节流转换为字符流(假设是UTF-8编码或ASCII编码),然后将字符与低ASCII字符(例如'\n'和'\r')进行比较来查找行尾。由于UTF-8编码不使用低ASCII范围内的字节来表示多字节字符,因此LineBasedFrameDecoder可以通过这种实现方式完全支持UTF-8编码。
FixedLengthFrameDecoder
- 固定长度的解码器。
DelimiterBasedFrameDecoder
- 根据分隔符来解析帧。
LengthFieldBasedFrameDecoder
- 解码器根据消息中长度字段的值动态拆分接收到的 ByteBuf,就是在消息的开头标识消息的长度。
- LengthFieldBasedFrameDecoder的6个参数的含义是:
- maxFrameLength:发送的数据帧最大长度,如果超过这个长度,会抛出TooLongFrameException异常
- lengthFieldOffset:长度字段在发送的字节数组中的偏移量,即长度字段开始的位置
- lengthFieldLength:长度字段占用的字节数,即长度字段的长度
- lengthAdjustment:长度调节值,在总长被定义为包含包头长度时,修正信息长度
- initialBytesToStrip:从解码帧中去除的字节数,即获取真正的内容之前需要跳过的字节数
- failFast:true表示读取到长度字段超过maxFrameLength时就抛出异常,false表示只有真正读取完长度字段表示的字节后才抛出异常,默认为true。
- 用两个字节表示消息长度,不剥离头部:
- 读完两个字节后,根据得到的长度,连续读对应长度的字节。
- 2字节长度字段在偏移量为0的位置,剥离头部:
- 偏移量为0的2个字节长度字段,不要剥离头部,长度字段表示整个消息的长度:
- 在一个5字节的头部后面有3字节长度字段,不要剥离头部:
- 字节长度字段在5字节头部的开始,不剥离头部:
- 在4字节头部中间偏移1字节的位置有2字节长度字段,剥离第一个头部字段和长度字段:
- 在4字节头部中间偏移1的位置有2字节长度字段,去剥离第一个头部字段和长度字段,长度字段表示整个消息的长度:
要确认长度表示的含义,有时候会包含头,有时候不包含。
自定义协议小样例
- 服务器如下:
record Data(int length, byte[] content) { } public class CustomProtocolServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new CustomServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899).sync(); channelFuture.channel().closeFuture().sync(); }catch (Exception exception) { exception.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } class CustomDecoder extends ReplayingDecoder<Void> { private int length; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (length == 0) { length = in.readInt(); } if (in.readableBytes() < length) {//数据没全,设置checkpoint,下次再来 checkpoint(); return; } byte[] content = new byte[length]; in.readBytes(content); Data data = new Data(length, content); out.add(data); length = 0; } } class CustomEncoder extends MessageToByteEncoder<Data> { @Override protected void encode(ChannelHandlerContext ctx, Data msg, ByteBuf out) throws Exception { out.writeInt(msg.length()); out.writeBytes(msg.content()); } } class CustomHandler extends SimpleChannelInboundHandler<Data> { @Override protected void channelRead0(ChannelHandlerContext ctx, Data msg) throws Exception { System.out.println(msg);//业务代码 } } class CustomServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new CustomDecoder()) .addLast(new CustomEncoder()) .addLast(new CustomHandler()); } }
- 客户端如下:
public class CustomProtocolClient { public static void main(String[] args) { EventLoopGroup eventLoopGroup = new NioEventLoopGroup();//客户端只需要一个事件循环组 try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new CustomClientInitializer()); ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { eventLoopGroup.shutdownGracefully(); } } } class CustomClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new CustomDecoder()) .addLast(new CustomEncoder()) .addLast(new ClientHandler()); } } class ClientHandler extends SimpleChannelInboundHandler<Data> { @Override protected void channelRead0(ChannelHandlerContext ctx, Data msg) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { IntStream.range(0, 10).forEach(i -> { //发10次 byte[] bytes = "hello".getBytes(CharsetUtil.UTF_8); Data data = new Data(bytes.length, bytes); ctx.writeAndFlush(data); }); } }
转载自:https://juejin.cn/post/7233046023244906552