likes
comments
collection
share

netty4-读取ByteBuf数据时如何预留操作空间避免中途扩容

作者站长头像
站长
· 阅读数 46

PS:禁止拷贝形式转载,转载请以URL形式

1 简介

设备A
Udp-Client
Tcp-Server
Tcp-Client
Udp-Server

背景: 业务需要实现如上图Tcp Server 读取Tcp client 的数据然后通过 Udp client 发送出去 。因为Udp 无序、分片、最大限制65535-28=65507字节以及核心业务要求,所以需要对Tcp client 发送的数据进行二次操作后在进行Udp 发包。

过程: 使用netty4 实现了一版程序,发现实现的程序在解码、自定义handle二次操作ByteBuf 时会产生多次无谓的复制以及扩容。

结果: 最终发现Netty AbstractNioByteChannel.NioByteUnsafe.read() 读取时通过 RecvByteBufAllocator.allocate()分配读取ByteBuf大小,并且在 NioSocketChannel.doReadBytes()写入时会尽量把ByteBuf 写满没有预留多余的空间导致 每一次对读取的ByteBuf添加额外的业务数据都可能会涉及到扩容从而降低性能,本编文章记录如何解决该问题并优化

2 参考

netty是如何读取数据

《Netty》从零开始学netty源码(五十六)之RecvByteBufAllocator

Netty - 内存大小预测器 RecvByteBufAllocator 源码分析 (包含客户端Channel读消息处理)

3 环境

java:1.8

netty:4.1.90.Final

4 实现

PS:当前我项目实际使用的是AdaptiveRecvByteBufAllocator,所以下列涉及RecvByteBufAllocator实际都为AdaptiveRecvByteBufAllocator,其他字节分配器原理大差不差大家触类旁通。

4.1 关键源码

netty4-读取ByteBuf数据时如何预留操作空间避免中途扩容

netty4-读取ByteBuf数据时如何预留操作空间避免中途扩容

netty4-读取ByteBuf数据时如何预留操作空间避免中途扩容

4.2 解决方案

根据关键源码可以分析处,关键点AdaptiveRecvByteBufAllocator的处理。假设我们要额外添加10字节大小的业务数据,需要考虑解决如下

  1. 我们在记录AdaptiveRecvByteBufAllocator.lastBytesRead(int bytes)时,对bytes 提前加10,把后面最终业务操作提前记录。
  2. 我们在写入AdaptiveRecvByteBufAllocator.attemptedBytesRead(int bytes)时,对bytes提前减10,不要把数据写入进业务空间里面。

但是AdaptiveRecvByteBufAllocator.lastBytesRead()使用了super.lastBytesRead()导致没有办法使用简单的wraper包装进行处理,最终使用拷贝完整的AdaptiveRecvByteBufAllocator代码重新命名为CustomAdaptiveRecvByteBufAllocator对上述两个方法进行处理。

4.2 实现代码

精简版本:和源代码AdaptiveRecvByteBufAllocator相比只用改下述部分

public void lastBytesRead(int bytes) {
    //如果大于0,证明实际读取到了数据需要加10
    int b = bytes > 0 ? bytes + 10 : bytes;
    if (bytes == this.attemptedBytesRead()) {
        this.record(b);
    }

    super.lastBytesRead(b);
}

@Override
public void attemptedBytesRead(int bytes) {
    //读取的数据最少要大于10,不然业务数据添加进去需要进行扩容
    super.attemptedBytesRead(bytes > 10 ? bytes - 10 : 0);
}

完整版本:


import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator;
import io.netty.util.internal.ObjectUtil;

import java.util.ArrayList;
import java.util.List;

/**
 * @ClassName
 * @Description
 * @Author dyf
 * @Date 2023/6/8
 * @Version 1.0
 */
public class CustomAdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
    static final int DEFAULT_MINIMUM = 64;
    static final int DEFAULT_INITIAL = 2048;
    static final int DEFAULT_MAXIMUM = 65536;
    private static final int INDEX_INCREMENT = 4;
    private static final int INDEX_DECREMENT = 1;
    private static final int[] SIZE_TABLE;
    /**
     * @deprecated
     */
    @Deprecated
    public static final CustomAdaptiveRecvByteBufAllocator DEFAULT;
    private final int minIndex;
    private final int maxIndex;
    private final int initial;

    private static int getSizeTableIndex(int size) {
        int low = 0;
        int high = SIZE_TABLE.length - 1;

        while (high >= low) {
            if (high == low) {
                return high;
            }

            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else {
                if (size >= a) {
                    if (size == a) {
                        return mid;
                    }

                    return mid + 1;
                }

                high = mid - 1;
            }
        }

        return low;
    }

    public CustomAdaptiveRecvByteBufAllocator() {
        this(64, 2048, 65536);
    }

    public CustomAdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
        ObjectUtil.checkPositive(minimum, "minimum");
        if (initial < minimum) {
            throw new IllegalArgumentException("initial: " + initial);
        } else if (maximum < initial) {
            throw new IllegalArgumentException("maximum: " + maximum);
        } else {
            int minIndex = getSizeTableIndex(minimum);
            if (SIZE_TABLE[minIndex] < minimum) {
                this.minIndex = minIndex + 1;
            } else {
                this.minIndex = minIndex;
            }

            int maxIndex = getSizeTableIndex(maximum);
            if (SIZE_TABLE[maxIndex] > maximum) {
                this.maxIndex = maxIndex - 1;
            } else {
                this.maxIndex = maxIndex;
            }

            this.initial = initial;
        }
    }

    public Handle newHandle() {
        return new CustomAdaptiveRecvByteBufAllocator.HandleImpl(this.minIndex, this.maxIndex, this.initial);
    }

    public CustomAdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
        super.respectMaybeMoreData(respectMaybeMoreData);
        return this;
    }

    static {
        List<Integer> sizeTable = new ArrayList();

        int i;
        for (i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }

        for (i = 512; i > 0; i <<= 1) {
            sizeTable.add(i);
        }

        SIZE_TABLE = new int[sizeTable.size()];

        for (i = 0; i < SIZE_TABLE.length; ++i) {
            SIZE_TABLE[i] = (Integer) sizeTable.get(i);
        }

        DEFAULT = new CustomAdaptiveRecvByteBufAllocator();
    }

    private final class HandleImpl extends MaxMessageHandle {
        private final int minIndex;
        private final int maxIndex;
        private int index;
        private int nextReceiveBufferSize;
        private boolean decreaseNow;

        HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;
            this.index = CustomAdaptiveRecvByteBufAllocator.getSizeTableIndex(initial);
            this.nextReceiveBufferSize = CustomAdaptiveRecvByteBufAllocator.SIZE_TABLE[this.index];
        }

        public void lastBytesRead(int bytes) {
            int b = bytes > 0 ? bytes + 6 : bytes;
            if (bytes == this.attemptedBytesRead()) {
                this.record(b);
            }

            super.lastBytesRead(b);
        }

        @Override
        public void attemptedBytesRead(int bytes) {
            super.attemptedBytesRead(bytes > 6 ? bytes - 6 : 0);
        }

        public int guess() {
            return this.nextReceiveBufferSize;
        }

        private void record(int actualReadBytes) {
            if (actualReadBytes <= CustomAdaptiveRecvByteBufAllocator.SIZE_TABLE[Math.max(0, this.index - 1)]) {
                if (this.decreaseNow) {
                    this.index = Math.max(this.index - 1, this.minIndex);
                    this.nextReceiveBufferSize = CustomAdaptiveRecvByteBufAllocator.SIZE_TABLE[this.index];
                    this.decreaseNow = false;
                } else {
                    this.decreaseNow = true;
                }
            } else if (actualReadBytes >= this.nextReceiveBufferSize) {
                this.index = Math.min(this.index + 4, this.maxIndex);
                this.nextReceiveBufferSize = CustomAdaptiveRecvByteBufAllocator.SIZE_TABLE[this.index];
                this.decreaseNow = false;
            }

        }

        public void readComplete() {
            this.record(this.totalBytesRead());
        }
    }
}
转载自:https://juejin.cn/post/7242623149752598586
评论
请登录