netty4-读取ByteBuf数据时如何预留操作空间避免中途扩容
PS:禁止拷贝形式转载,转载请以URL形式
1 简介
背景: 业务需要实现如上图Tcp Server 读取Tcp client 的数据然后通过 Udp client 发送出去 。因为Udp 无序、分片、最大限制65535-28=65507字节以及核心业务要求,所以需要对Tcp client 发送的数据进行二次操作后在进行Udp 发包。
过程: 使用netty4 实现了一版程序,发现实现的程序在解码、自定义handle二次操作ByteBuf 时会产生多次无谓的复制以及扩容。
结果: 最终发现Netty
AbstractNioByteChannel.NioByteUnsafe.read()
读取时通过
RecvByteBufAllocator.allocate()
分配读取ByteBuf大小,并且在
NioSocketChannel.doReadBytes()
写入时会尽量把ByteBuf 写满没有预留多余的空间导致 每一次对读取的ByteBuf添加额外的业务数据都可能会涉及到扩容从而降低性能,本编文章记录如何解决该问题并优化
2 参考
《Netty》从零开始学netty源码(五十六)之RecvByteBufAllocator
Netty - 内存大小预测器 RecvByteBufAllocator 源码分析 (包含客户端Channel读消息处理)
3 环境
java:1.8
netty:4.1.90.Final
4 实现
PS:当前我项目实际使用的是AdaptiveRecvByteBufAllocator
,所以下列涉及RecvByteBufAllocator
实际都为AdaptiveRecvByteBufAllocator
,其他字节分配器原理大差不差大家触类旁通。
4.1 关键源码
4.2 解决方案
根据关键源码可以分析处,关键点AdaptiveRecvByteBufAllocator
的处理。假设我们要额外添加10字节大小的业务数据,需要考虑解决如下
- 我们在记录
AdaptiveRecvByteBufAllocator.lastBytesRead(int bytes)
时,对bytes 提前加10,把后面最终业务操作提前记录。 - 我们在写入
AdaptiveRecvByteBufAllocator.attemptedBytesRead(int bytes)
时,对bytes提前减10,不要把数据写入进业务空间里面。
但是AdaptiveRecvByteBufAllocator.lastBytesRead()
使用了super.lastBytesRead()
导致没有办法使用简单的wraper包装进行处理,最终使用拷贝完整的AdaptiveRecvByteBufAllocator
代码重新命名为CustomAdaptiveRecvByteBufAllocator
对上述两个方法进行处理。
4.2 实现代码
精简版本:和源代码AdaptiveRecvByteBufAllocator
相比只用改下述部分
public void lastBytesRead(int bytes) {
//如果大于0,证明实际读取到了数据需要加10
int b = bytes > 0 ? bytes + 10 : bytes;
if (bytes == this.attemptedBytesRead()) {
this.record(b);
}
super.lastBytesRead(b);
}
@Override
public void attemptedBytesRead(int bytes) {
//读取的数据最少要大于10,不然业务数据添加进去需要进行扩容
super.attemptedBytesRead(bytes > 10 ? bytes - 10 : 0);
}
完整版本:
import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator;
import io.netty.util.internal.ObjectUtil;
import java.util.ArrayList;
import java.util.List;
/**
* @ClassName
* @Description
* @Author dyf
* @Date 2023/6/8
* @Version 1.0
*/
public class CustomAdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 2048;
static final int DEFAULT_MAXIMUM = 65536;
private static final int INDEX_INCREMENT = 4;
private static final int INDEX_DECREMENT = 1;
private static final int[] SIZE_TABLE;
/**
* @deprecated
*/
@Deprecated
public static final CustomAdaptiveRecvByteBufAllocator DEFAULT;
private final int minIndex;
private final int maxIndex;
private final int initial;
private static int getSizeTableIndex(int size) {
int low = 0;
int high = SIZE_TABLE.length - 1;
while (high >= low) {
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else {
if (size >= a) {
if (size == a) {
return mid;
}
return mid + 1;
}
high = mid - 1;
}
}
return low;
}
public CustomAdaptiveRecvByteBufAllocator() {
this(64, 2048, 65536);
}
public CustomAdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
ObjectUtil.checkPositive(minimum, "minimum");
if (initial < minimum) {
throw new IllegalArgumentException("initial: " + initial);
} else if (maximum < initial) {
throw new IllegalArgumentException("maximum: " + maximum);
} else {
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
this.minIndex = minIndex + 1;
} else {
this.minIndex = minIndex;
}
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
this.maxIndex = maxIndex - 1;
} else {
this.maxIndex = maxIndex;
}
this.initial = initial;
}
}
public Handle newHandle() {
return new CustomAdaptiveRecvByteBufAllocator.HandleImpl(this.minIndex, this.maxIndex, this.initial);
}
public CustomAdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
super.respectMaybeMoreData(respectMaybeMoreData);
return this;
}
static {
List<Integer> sizeTable = new ArrayList();
int i;
for (i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
for (i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
SIZE_TABLE = new int[sizeTable.size()];
for (i = 0; i < SIZE_TABLE.length; ++i) {
SIZE_TABLE[i] = (Integer) sizeTable.get(i);
}
DEFAULT = new CustomAdaptiveRecvByteBufAllocator();
}
private final class HandleImpl extends MaxMessageHandle {
private final int minIndex;
private final int maxIndex;
private int index;
private int nextReceiveBufferSize;
private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
this.index = CustomAdaptiveRecvByteBufAllocator.getSizeTableIndex(initial);
this.nextReceiveBufferSize = CustomAdaptiveRecvByteBufAllocator.SIZE_TABLE[this.index];
}
public void lastBytesRead(int bytes) {
int b = bytes > 0 ? bytes + 6 : bytes;
if (bytes == this.attemptedBytesRead()) {
this.record(b);
}
super.lastBytesRead(b);
}
@Override
public void attemptedBytesRead(int bytes) {
super.attemptedBytesRead(bytes > 6 ? bytes - 6 : 0);
}
public int guess() {
return this.nextReceiveBufferSize;
}
private void record(int actualReadBytes) {
if (actualReadBytes <= CustomAdaptiveRecvByteBufAllocator.SIZE_TABLE[Math.max(0, this.index - 1)]) {
if (this.decreaseNow) {
this.index = Math.max(this.index - 1, this.minIndex);
this.nextReceiveBufferSize = CustomAdaptiveRecvByteBufAllocator.SIZE_TABLE[this.index];
this.decreaseNow = false;
} else {
this.decreaseNow = true;
}
} else if (actualReadBytes >= this.nextReceiveBufferSize) {
this.index = Math.min(this.index + 4, this.maxIndex);
this.nextReceiveBufferSize = CustomAdaptiveRecvByteBufAllocator.SIZE_TABLE[this.index];
this.decreaseNow = false;
}
}
public void readComplete() {
this.record(this.totalBytesRead());
}
}
}
转载自:https://juejin.cn/post/7242623149752598586