likes
comments
collection
share

【Netty】「源码解析」(一)ByteBuf 的动态扩容机制

作者站长头像
站长
· 阅读数 78

前言

本篇博文是《从0到1学习 Netty》中源码系列的第一篇博文,主要内容是通过源码逐步讲解 Netty 中 ByteBuf 的动态扩容机制,并结合应用案例加以验证,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;

介绍

在我们写入新数据时,如果 ByteBuf 的内部空间不足以容纳新数据,它会自动进行扩容。一般 ByteBuf 会使用 ensureWritable0 方法进行扩容,ensureWritable0 的大致流程如下所示:

【Netty】「源码解析」(一)ByteBuf 的动态扩容机制

其中,可写部分可扩容部分的相关内容在博文 ByteBuf 的基本使用 中进行了详细介绍,这里就不再赘述。

ensureWritable0 的源码如下所示:

final void ensureWritable0(int minWritableBytes) {
    // 判断部分
    ensureAccessible();
    if (minWritableBytes <= writableBytes()) {
        return;
    }
    final int writerIndex = writerIndex();
    if (checkBounds) {
        if (minWritableBytes > maxCapacity - writerIndex) {
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }
    }

    // 计算部分
    int minNewCapacity = writerIndex + minWritableBytes;
    int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
    int fastCapacity = writerIndex + maxFastWritableBytes();

    if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
        newCapacity = fastCapacity;
    }

    capacity(newCapacity);
}

接下来将对源码进行讲解与分析,主要分为判断模块与计算模块;

判断模块

判断模块主要有三个判断,分别是判断 ByteBuf 对象是否可以访问,是否需要扩容以及是否写入溢出,如果一切符合要求,将会进入计算模块,也就是扩容阶段;


第一个判断:判断 ByteBuf 对象是否可以访问

源码如下所示:

/**
 * Should be called by every method that tries to access the buffers content to check
 * if the buffer was released before.
 */
protected final void ensureAccessible() {
    if (checkAccessible && !isAccessible()) {
        throw new IllegalReferenceCountException(0);
    }
}

/**
 * Used internally by {@link AbstractByteBuf#ensureAccessible()} to try to guard
 * against using the buffer after it was released (best-effort).
 */
boolean isAccessible() {
    return refCnt() != 0;
}

在上述源码中,通过 ensureAccessible(); 方法来检查 ByteBuf 对象是否被销毁,如果 checkAccessible 标志位为 true,表示需要检查缓冲区是否可访问。如果缓冲区已经被释放(即引用计数为 0,通过 refCnt() 方法获取当前缓冲区的引用计数),则会抛出 IllegalReferenceCountException 异常,表示缓冲区已经无法访问。

这是一个用于检测已经释放的缓冲区的最佳努力实现,它可以提高性能并允许更好的内联优化,每个尝试访问缓冲区内容的方法都应调用该方法,以检查缓冲区之前是否已释放,防止在释放缓冲区后使用缓冲区。


第二个判断:判断 ByteBuf 对象是否需要扩容

源码如下所示:

if (minWritableBytes <= writableBytes()) {  
    return;  
}

@Override  
public int writableBytes() {  
    return capacity() - writerIndex;  
}

在上述源码中,writableBytes() 方法返回缓冲区中还剩余多少可写入的字节数量,即缓冲区的当前容量减去已经写入的字节数,capacity() 返回缓冲区的当前容量,而 writerIndex 返回下一次写入的索引位置;

通过当前可写部分的长度 writableBytes() 与等待写入的字节数量 minWritableBytes 进行比较来判断 ByteBuf 对象是否需要扩容,如果 minWritableBytes <= writableBytes(),那么 ByteBuf 就不需要进行扩容,直接返回调用该函数的上层函数或者退出当前函数的执行,否则,程序会继续向下执行。


第三个判断:判断 ByteBuf 对象是否写入溢出

源码如下所示:

if (checkBounds) {
    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }
}

在上述源码中,checkBounds 用于判断是否需要进行边界检查,然后再计算出 ByteBuf 可写入的最大字节数量,即 maxCapacity - writerIndex

将计算出来的最大字节数量与等待写入的字节数量 minWritableBytes 比较,若 minWritableBytes 大于可写入的最大字节数量,则说明缓冲区剩余空间不足以容纳要写入的数据,于是抛出一个 IndexOutOfBoundsException 异常,表示写入操作越界,否则,将会进入扩容阶段;

关于扩容阶段的内容,将在下面的计算模块进行详细讲解。

计算模块

计算模块主要是计算出 minNewCapacitynewCapacityfastCapacity 这三个值,然后从中选出比较合理的值作为 ByteBuf 当前容量进行扩容;


计算 minNewCapacity

源码如下所示:

int minNewCapacity = writerIndex + minWritableBytes;

在上述源码中,最小需要的新容量 minNewCapacity 就是当前写位置 writerIndex 加上等待写入的字节数量 minWritableBytes


计算 newCapacity

源码如下所示:

int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);

@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    checkPositiveOrZero(minNewCapacity, "minNewCapacity");
    if (minNewCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

在上述源码中,主要作用就是将当前容量规范化为2的幂次方,第一行代码使用分配器 alloc() 计算出一个新的容量值 newCapacity,不过要注意的是,这个值有可能会大于 maxCapacity

接下来看到这里的关键函数 calculateNewCapacity(int minNewCapacity, int maxCapacity),该函数接收两个参数:minNewCapacitymaxCapacity,分别代表最小需要的新容量和最大容量,然后函数中会进行一些逻辑操作,返回一个新的合理的容量大小。

首先,检查 minNewCapacity 是否为正,并且是否小于等于 maxCapacity。如果不满足条件,则会抛出 IllegalArgumentException 异常。

然后,定义一个阈值 threshold,其值为 4MB:

static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page

final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

如果 minNewCapacity 等于 threshold,则直接返回 threshold。如果 minNewCapacity 大于 threshold,则将 minNewCapacity 除以 threshold 得到一个整数,再将该整数乘以 threshold 得到当前容量的第一个整数倍值,然后加上 threshold。此时,如果新计算得到的容量值已经超过了 maxCapacity-threshold,则返回 maxCapacity,否则实际容量值要再加上一个 threshold

举个例子,比如说当前的 minNewCapacity=7threshold=4,那么 newCapacity = minNewCapacity / threshold * threshold = 7 / 4 * 4 = 4,因此,最后得到的 newCapacity 就是 8,即 newCapacity += threshold

最后,若 minNewCapacity 小于等于 threshold,则将容量大小从 64 开始连续翻倍,直到达到 minNewCapacity 或者超过 threshold 后停止。如果翻倍后的容量大小超过了 maxCapacity,则返回 maxCapacity。如果没有超过,则返回翻倍后的容量大小。


计算 fastCapacity

源码如下所示:

int fastCapacity = writerIndex + maxFastWritableBytes();

@Override
public int maxFastWritableBytes() {
    return Math.min(maxLength, maxCapacity()) - writerIndex;
}

上述源码中,fastCapacity 是当前写位置 writerIndex 加上一个最大快速可写字节数 maxFastWritableBytes() 得到的结果。

其中,maxLength 值是根据你定义的 ByteBuf 的空间大小决定的,它会是16的倍数,比如 ByteBuf 的空间大小为9,那么 maxLength 值为16;ByteBuf 的空间大小为65,那么 maxLength 值为80。


选择合理的容量并进行扩容

源码如下所示:

if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {  
    newCapacity = fastCapacity;  
}  
  
capacity(newCapacity);

上述源码中,如果新容量值 newCapacity 大于 fastCapacity 且最小需要的新容量 minNewCapacity 小于或等于 fastCapacity,则选择较小的 fastCapacity 作为新的容量值,以避免不必要的重新分配。

实战验证

现在,我们自定义一个空间大小为36的 ByteBuf,然后向其中写入60个字节的数据,以触发 ByteBuf 的动态扩容机制,测试代码如下:

ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(36);  
log(buf);  

StringBuilder sb = new StringBuilder();  
for (int i = 0; i < 10; i++) {  
    sb.append("sidiot");  
}  

buf.writeBytes(sb.toString().getBytes());  
log(buf);

根据 int minNewCapacity = writerIndex + minWritableBytes; 计算出 minNewCapacity 的值为 0 + 60 = 60;

接着调用 calculateNewCapacity 函数,由于 minNewCapacity 的值为60小于64,所以 newCapacity 的值为64;

【Netty】「源码解析」(一)ByteBuf 的动态扩容机制

由于我们定义的 ByteBuf 的空间大小为36,因此 maxLength 的值为48,fastCapacity 的值也是48:

【Netty】「源码解析」(一)ByteBuf 的动态扩容机制

最终,我们获得了 minNewCapacitynewCapacity 和 fastCapacity 这三个变量的数值,分别为60,64,48:

【Netty】「源码解析」(一)ByteBuf 的动态扩容机制

由于 minNewCapacity > fastCapacity,因此 if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) 条件不成立,所以 newCapacity 依旧是64,即最后 ByteBuf 的空间扩容为64:

【Netty】「源码解析」(一)ByteBuf 的动态扩容机制

运行结果:

read index:0 write index:0 capacity:36

read index:0 write index:60 capacity:64
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 73 69 64 69 6f 74 73 69 64 69 |sidiotsidiotsidi|
|00000010| 6f 74 73 69 64 69 6f 74 73 69 64 69 6f 74 73 69 |otsidiotsidiotsi|
|00000020| 64 69 6f 74 73 69 64 69 6f 74 73 69 64 69 6f 74 |diotsidiotsidiot|
|00000030| 73 69 64 69 6f 74 73 69 64 69 6f 74             |sidiotsidiot    |
+--------+-------------------------------------------------+----------------+

那如果将 ByteBuf 的空间大小设置为69,向其中写入72个字节的数据,最终 ByteBuf 的空间大小会扩容至多少呢?

想必小伙伴都知道答案了,是的,80!

暂时没有算出来的小伙伴也不要气馁,我们接着往下分析;

minNewCapacity 毋庸置疑的是72,然后在计算 newCapacity 时,由于 128 > 72 > 64,因此 newCapacity 的大小为 128:

【Netty】「源码解析」(一)ByteBuf 的动态扩容机制

注意,newCapacity <<= 1; 相当于 newCapacity *= 2,但是位运算速度会快一点;

由于我们将 ByteBuf 的空间大小设置为69,因此 fastCapacity 的大小就是 5 * 16 = 80

最终,我们获得了 minNewCapacitynewCapacity 和 fastCapacity 这三个变量的数值,72,128,80:

【Netty】「源码解析」(一)ByteBuf 的动态扩容机制

而这三个变量值又符合逻辑判断 if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity),所以 newCapacity = fastCapacity = 80

因此,最终 ByteBuf 的空间扩容为80:

【Netty】「源码解析」(一)ByteBuf 的动态扩容机制

运行结果:

read index:0 write index:0 capacity:69

read index:0 write index:72 capacity:80
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 73 69 64 69 6f 74 73 69 64 69 |sidiotsidiotsidi|
|00000010| 6f 74 73 69 64 69 6f 74 73 69 64 69 6f 74 73 69 |otsidiotsidiotsi|
|00000020| 64 69 6f 74 73 69 64 69 6f 74 73 69 64 69 6f 74 |diotsidiotsidiot|
|00000030| 73 69 64 69 6f 74 73 69 64 69 6f 74 73 69 64 69 |sidiotsidiotsidi|
|00000040| 6f 74 73 69 64 69 6f 74                         |otsidiot        |
+--------+-------------------------------------------------+----------------+

后记

以上就是 ByteBuf 的动态扩容机制 的所有内容了,希望本篇博文对大家有所帮助!

参考:

📝 上篇精讲:「萌新入门」(七)ByteBuf 的性能优化

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;

👍 创作不易,请多多支持;

🔥 系列专栏:探索 Netty:源码解析与应用案例分享

转载自:https://juejin.cn/post/7242872705254293560
评论
请登录