likes
comments
collection
share

ConcurrentHashMap源码核心方法解析

作者站长头像
站长
· 阅读数 19

源码真的可以让一个人学到不少,不要怕!!!坚持看下去,必有收获!!!

源码核心方法

ConcurrentHashMap结合CASsynchronized -> 平衡性能与线程安全性

putVal()

final V putVal(K key, V value, boolean onlyIfAbsent) {  
    if (key == null || value == null) throw new NullPointerException();  
    int hash = spread(key.hashCode());  
    int binCount = 0;  
    for (Node<K,V>[] tab = table;;) { // 遍历整个Node数组
        Node<K,V> f; int n, i, fh; K fk; V fv;
  • 初始化,默认大小DEFAULT_CAPACITY = 16
        if (tab == null || (n = tab.length) == 0) // 如果还没有初始化则进行初始化
            tab = initTable();
  • 添加链表第一个元素,此时只有添加一种操作,使用CAS可以在一定程度上优化性能
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {  
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) // CAS -> 添加链表第一个元素
            break; // no lock when adding to empty bin  
        }
  • 辅助扩容,想了解的小伙伴可以自己去看一下源码
        else if ((fh = f.hash) == MOVED)  
        tab = helpTransfer(tab, f); // -> transfer()
  • putIfAbsent()方法的实现,key存在 -> 返回当前value
        else if (onlyIfAbsent // check first node without acquiring lock  
                && fh == hash  
                && ((fk = f.key) == key || (fk != null && key.equals(fk)))  
                && (fv = f.val) != null)  
        return fv;
  • 根据hash计算得出的数组index已经存在元素,向链表/红黑树中插入元素
    • key相同,覆盖原来的value -> oldVal
    • key不同,添加链表后继节点 -> pre.next

    链表

        else {  
            V oldVal = null;  
            synchronized (f) {  
                if (tabAt(tab, i) == f) {  
                    if (fh >= 0) {  
                        binCount = 1;  
                        for (Node<K,V> e = f;; ++binCount) {  
                            K ek;  
                            if (e.hash == hash &&  
                                ((ek = e.key) == key ||  
                                 (ek != null && key.equals(ek)))) {  
                                oldVal = e.val;  
                                if (!onlyIfAbsent) // 默认情况下覆盖,调用putIfAbsent则返回已存在的value值
                                    e.val = value;  
                                break;  
                            }  
                            Node<K,V> pred = e;  
                            if ((e = e.next) == null) {  
                                pred.next = new Node<K,V>(hash, key, value);  
                                break;  
                            }  
                        }  
                    }
    

    红黑树

                    else if (f instanceof TreeBin) {  
                        Node<K,V> p;  
                        binCount = 2;  
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,  
                                                       value)) != null) {  
                            oldVal = p.val;  
                            if (!onlyIfAbsent) // 默认情况下覆盖,调用putIfAbsent则返回已存在的value值
                                p.val = value;  
                        }  
                    }
    
  • 扩容期间插入元素 -> 抛出异常
                    else if (f instanceof ReservationNode)  
                        throw new IllegalStateException("Recursive update");  
                } // if(tabAt(tab,i) == f){
            } // synchronized(f){
  • 记录链表(binCount=1)/红黑树(binCount=2)的节点数
    • 当链表的节点数"≥8",进行扩容 -> treeifyBin()
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)  
                    treeifyBin(tab, i); // 扩容 -> 2*n
                if (oldVal != null)  
                    return oldVal; // 属于putIfAbsent -> 返回已存在的`oldVal`
                break;
            }  
        } // else{
    } // for (Node<K,V>[] tab = table;;) {
    
  • 记录整个ConcurrentHashMap表的大小
    addCount(1L, binCount); // baseCount + CounterCell[i].value累加
    return null;  
}

treeifyBin()

接上面,链表长度"≥8" -> 扩容tryPresize() -> 数组扩容至"≥64" -> “ 链表 -> 红黑树 ”

private final void treeifyBin(Node<K,V>[] tab, int index) {  
    Node<K,V> b; int n;  
    if (tab != null) {  
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)  
            tryPresize(n << 1); // 位运算扩容 -> 2*n
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {  
            synchronized (b) {  
                if (tabAt(tab, index) == b) { 
                    TreeNode<K,V> hd = null, tl = null;  
                    for (Node<K,V> e = b; e != null; e = e.next) { // 链表节点 -> 树节点 
                        TreeNode<K,V> p =  
                            new TreeNode<K,V>(e.hash, e.key, e.val,  
                                                        null, null);  
                        if ((p.prev = tl) == null)  
                            hd = p; // 树的根节点
                        else  
                            tl.next = p;  
                        tl = p;
                    }
                // 转化后的树通过根节点存入数组,TreeBin会通过旋转、着色等:"链表"树 -> 红黑树
                setTabAt(tab, index, new TreeBin<K,V>(hd));
                }  
            } // synchronized (b)
        } // else if    
    }  
}

tryPresize()

扩容:旧数组 -> 新数组

private final void tryPresize(int size) {  
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :  
        tableSizeFor(size + (size >>> 1) + 1);  // 计算新容量,向上取最近的二进制,如15 -> 16
    int sc;  
    while ((sc = sizeCtl) >= 0) { // `sizeCtl`是根据容量和加载因子计算的控制变量,当sizeCtl>0 -> 扩容
        Node<K,V>[] tab = table; int n;
        // 在并发环境下,如果哈希表尚未初始化,并且首次调用的是 `tryPresize` 方法,它会负责对哈希表进行初始化。
        if (tab == null || (n = tab.length) == 0) {  
            n = (sc > c) ? sc : c; 
            // "sc > c" -> 存在线程已经开始初始化新数组/扩容,并且已经设置了一个更大的大小
            // "sc < c" -> 当前线程将根据基于请求的大小计算得到的二进制数初始化新数组/扩容
            if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {  
                    @SuppressWarnings("unchecked")  
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];  
                    table = nt;  
                    sc = n - (n >>> 2);  
                    }  
                } finally {  
                    sizeCtl = sc;
                }  
            }  
        }  
        
        // 已经是最大容量了,不能扩容 -> 直接返回
        else if (c <= sc || n >= MAXIMUM_CAPACITY)  
            break;  
        // 元素迁移:旧数组 -> 新数组
        else if (tab == table) { // volatile table -> 自旋CAS -> 没有改变 -> 进行元素迁移
            int rs = resizeStamp(n); // 扩容戳 -> 用于扩容过程中多个线程的同步
            if (U.compareAndSetInt(this, SIZECTL, sc,
                                    (rs << RESIZE_STAMP_SHIFT) + 2))  
                transfer(tab, null);  
        }  
    }  
}

transfer()

表中数据分块 -> 多线程并发迁移

  • 初始化扩容数组
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {  
    int n = tab.length, stride;
    // 计算每个线程处理的数据区间大小,最小16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)  
        stride = MIN_TRANSFER_STRIDE; // subdivide range
        
    // 初始化扩容数组 2*n
    if (nextTab == null) { // initiating
        try {  
            @SuppressWarnings("unchecked")  
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];  
            nextTab = nt;  
        } catch (Throwable ex) { // try to cope with OOME  
            sizeCtl = Integer.MAX_VALUE;  
            return;  
        }  
        nextTable = nextTab;  
        transferIndex = n;  
    }
    int nextn = nextTab.length;
  • 记录值 -> 用于判断状态
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 构建在新表中的空节点
    boolean advance = true;  
    boolean finishing = false; // to ensure sweep before committing nextTab  
  • 分配处理区域
    for (int i = 0, bound = 0;;) {  
        Node<K,V> f; int fh;
        // 分配处理区域,假设 transferIndex = n = 64
        while (advance) {  
            int nextIndex, nextBound;  
            if (--i >= bound || finishing) 
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) { // nextIndex = transferIndex = 64
                i = -1;  
                advance = false;  
            }  
            else if (U.compareAndSetInt  
                     (this, TRANSFERINDEX, nextIndex,  
                      nextBound = (nextIndex > stride ?  
                                   nextIndex - stride : 0))) { // nextBound = 64-16 = 48 
                bound = nextBound; // bound = 48
                i = nextIndex - 1; // i = 64-1 = 63 -> 处理区域"48~63"
                advance = false;  
            }  
        }
  • 数据迁移完成
        // 数据迁移完成
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;  
            if (finishing) {  
                nextTable = null;  
                table = nextTab;  
                sizeCtl = (n << 1) - (n >>> 1);  
                return;  
            }  
            if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)  
                    return;  
                finishing = advance = true;  
                i = n; // recheck before commit  
            }  
        }
        
        // 如果旧数组中当前索引定位表中节点为空,CAS尝试更新为转发节点fwd,记录更新 -> 并发安全
        else if ((f = tabAt(tab, i)) == null)  
            advance = casTabAt(tab, i, null, fwd);
        // 如果旧数组中当前索引定位表中存在节点。迁移完成后,旧表中以`MOVED`占位,记录更新
        else if ((fh = f.hash) == MOVED)  
            advance = true; // already processed
  • 数据迁移过程

链表

        else {
            // 数据迁移
            synchronized (f) {
                if (tabAt(tab, i) == f) {  
                    Node<K,V> ln, hn;  
                    if (fh >= 0) {  
                        int runBit = fh & n;  
                        Node<K,V> lastRun = f;
                        // 此处应当是为了性能优化,比如 2 -> 10 -> 6 -> 14,纪录hn = lastRun = 6
                        // "6 -> 14"这一部分无需重新构建,节省部分对象创建的CPU开销、内存占用
                        for (Node<K,V> p = f.next; p != null; p = p.next) {  
                            int b = p.hash & n;  
                            if (b != runBit) {  
                            runBit = b;  
                            lastRun = p;  
                            }  
                        }
                        if (runBit == 0) {  
                            ln = lastRun;  
                            hn = null;  
                        }  
                        else {  
                            hn = lastRun;  
                            ln = null;  
                        }

                        for (Node<K,V> p = f; p != lastRun; p = p.next) {  
                            int ph = p.hash; K pk = p.key; V pv = p.val;  
                            if ((ph & n) == 0)  // 低位:在新数组原位置上
                                ln = new Node<K,V>(ph, pk, pv, ln);  
                            else                // 高位:在新数组新位置上
                                hn = new Node<K,V>(ph, pk, pv, hn);  
                        }  
                        setTabAt(nextTab, i, ln);  
                        setTabAt(nextTab, i + n, hn);  
                        setTabAt(tab, i, fwd);  
                        advance = true;  
                    }
  • 上面这块不容易懂,因此制作下表,不得不感慨二进制与运算(&)的精妙之处😮

ConcurrentHashMap源码核心方法解析

红黑树

  • 这里倒是没什么特别的地方,就是通过遍历然后将树迁移 -> "链表"树 -TreeBin-> 红黑树
                else if (f instanceof TreeBin) {  
                    TreeBin<K,V> t = (TreeBin<K,V>)f;  
                    TreeNode<K,V> lo = null, loTail = null;  
                    TreeNode<K,V> hi = null, hiTail = null;  
                    int lc = 0, hc = 0;  
                for (Node<K,V> e = t.first; e != null; e = e.next) {  
                    int h = e.hash;  
                    TreeNode<K,V> p = new TreeNode<K,V>  
                        (h, e.key, e.val, null, null);
                    // 低位:在新数组原位置上
                    if ((h & n) == 0) {  
                        if ((p.prev = loTail) == null)  
                            lo = p;  
                        else  
                            loTail.next = p;  
                        loTail = p;  
                        ++lc;  
                    }
                    // 高位:在新数组新位置上
                    else {  
                        if ((p.prev = hiTail) == null)  
                            hi = p;  
                        else  
                            hiTail.next = p;  
                        hiTail = p;  
                        ++hc;  
                    }  
                }  
                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :  
                    (hc != 0) ? new TreeBin<K,V>(lo) : t;  
                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :  
                    (lc != 0) ? new TreeBin<K,V>(hi) : t;  
                setTabAt(nextTab, i, ln);  
                setTabAt(nextTab, i + n, hn);  
                setTabAt(tab, i, fwd);  
                advance = true;  
                }
  • 经典的转化过程抛出异常
                else if (f instanceof ReservationNode)  
                    throw new IllegalStateException("Recursive update");  
                } // if (tabAt(tab, i) == f) {   
            } // synchronized (f) {
        } // else { synchronized (f) 
    } // for (int i = 0, bound = 0;;) {
}

addCount()

private final void addCount(long x, int check) {  
    CounterCell[] cs; long b, s;
    // 尝试更新baseCount,不成功(可能有其它线程占用更新)则使用counterCells
        if ((cs = counterCells) != null ||  
            !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {  
            CounterCell c; long v; int m;  
            boolean uncontended = true;
            // 根据当前线程的hash定位cs中的某个位置,尝试更新cs[x]
            if (cs == null || (m = cs.length - 1) < 0 ||  
                (c = cs[ThreadLocalRandom.getProbe() & m]) == null ||  
                !(uncontended =  
                U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) { 
                // 失败则调用`fullAddCount()`
                fullAddCount(x, uncontended); // 多个计数单元CounterCell[i] + `CAS` 
                // 如果竞争激烈则 -> 1.对 CounterCell 扩容  2.回退到 baseCount 更新
                return;  
            }  
            if (check <= 1)  
                return;  
            s = sumCount(); // baseCount + counterCell[i].value累加
        }
        // 辅助扩容,维护性能和加载因子,通过 条件判断 和 原子操作 确保并发环境下扩容的安全性
        if (check >= 0) {  
            Node<K,V>[] tab, nt; int n, sc;  
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&  
                    (n = tab.length) < MAXIMUM_CAPACITY) {  
                int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;  
                if (sc < 0) {  
                    if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||  
                        (nt = nextTable) == null || transferIndex <= 0)  
                        break;  
                    if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))  
                        transfer(tab, nt);  
        }  
        else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))  
            transfer(tab, null);  
        s = sumCount();  
        }  
    }  
}

整理到这里实属不易,仅仅是阅读和分析这一部分源码就花了2天左右的时间,对于文章中没有涉及的方法感兴趣的小伙伴可以自行去了解,之后视情况也可能会补充一些其它方法,无论如何您是坚持看到了最后,感谢您的认可!

转载自:https://juejin.cn/post/7397286692090691593
评论
请登录