likes
comments
collection
share

面试官:concurrentHashMap读取数据需要加锁么?万字详解ConcurrentHashMap整理concur

作者站长头像
站长
· 阅读数 28

介绍

平常在需要使用一些简单的本地缓存时会用到ConcurrentHashMap,但一直没深入了解过它的原理,之前面试时也有一定几率碰到,回答的总是一知半解的,最近深入的了解一下ConcurrentHashMap的原理,把它完整的梳理成一篇文章。 文章会从查询、存放、扩容、计数四个方面来深入解析ConcurrentHashMap,我先抛出几个问题,大家可以先思考一下,带着问题来阅读这样可以更好帮助大家理解concurrentHashMap,如果都了解那么大神可以出门右拐了;如果不了解可以详细看一下本篇文章,看完本章内容详细你能完整的回答全部问题,如果只想知道答案的,可以直接下滑到底部

  1. concurrentHashMap的查询过程是什么样的?
  2. concurrentHashMap的散列算法和hashMap的有什么区别?
  3. concurrentHashMap查询时会加锁么?
  4. concurrentHashMap的K/V允许为空么?为什么?
  5. concurrentHashMap什么时候初始化?
  6. concurrentHashMap哪些时机会进行扩容?
  7. concurrentHashMap如何实现并发扩容的?
  8. concurrentHashMap的size方法是准确的么?
  9. concurrentHashMap中计数的桶是如何扩容的?
  10. concurrentHashMap同一个桶中,链表和红黑树可以共存么?

使用

平时在使用concurrentHashMap时,最常用的就是获取值,存放值以及查询存放了多少个值。代码如下:

// 新建ConcurrentHashMap
ConcurrentHashMap<String,String> map = new ConcurrentHashMap();
// 获取值
map.get("key");
// 存入值
map.put("key","value");
map.putIfAbsent("key","value");
// 获取map中的数量
map.size();

源码

属性

这些是concurrentHashMap内部的属性,为了更好的理解源码,我先贴出来,大家可以先熟悉一下。


// 最大长度
private static final int MAXIMUM_CAPACITY = 1 << 30;

// 默认容量
private static final int DEFAULT_CAPACITY = 16;

// 扩容操作时对低位进行操作时应用
private static int RESIZE_STAMP_BITS = 16;

// 扩容时,每个线程每次迁移的最小步长
private static final int MIN_TRANSFER_STRIDE = 16;

// 老数组的下标
private transient volatile int transferIndex;


// 扩容时最大值
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

// 扩容操作时对高位进行操作时应用
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

// 默认转换容量
static final int TREEIFY_THRESHOLD = 8;

// 最小转树容量
static final int MIN_TREEIFY_CAPACITY = 64;

// 当前位置已经被扩容时迁移走了
static final int MOVED     = -1; // hash for forwarding nodes

// 当前位置是一个树
static final int TREEBIN   = -2; // hash for roots of trees

// 当前位置被占用,还未赋值
static final int RESERVED  = -3; // hash for transient reservations

// int的最大值,Integer.MAX_VALUE
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

// concurrentHashMap维护的数组
transient volatile Node<K,V>[] table;

// 扩容时的数组
private transient volatile Node<K,V>[] nextTable;

// 计数属性
private transient volatile long baseCount;

// cpu内核数
static final int NCPU = Runtime.getRuntime().availableProcessors();

// 标识数组初始化和扩容的标识信息
// 等于 -1 代表正在初始化
// 小于 -1 代表正在扩容
// 等于 0 代表没有初始化
// 大于 0 如果数组没有初始化,则代表初始化的长度,如果数组已经初始化了,代表下次扩容的阈值
private transient volatile int sizeCtl;

查询值

get 方法

对于查询流程:

  1. 调用spread方法对key进行散列运算,获取key的散列值;
  2. 先判断是否为空,并且该key对应下标的位置是否为空,如果都为空,则该值不存在;
  3. 再判断要查询的key是否在数组上,如果在则直接返回数组上的值;
  4. 如果不在则判断对应的节点的hash是否小于0;下面会有很多地方用到该数值,请大家务必记住,后面也会反复提示,小于零的含义分别是: // 当前位置已经被扩容时迁移走了 static final int MOVED = -1; // 当前位置是一棵树 static final int TREEBIN = -2; // 当前位置被占用,还未赋值 static final int RESERVED = -3;
  5. 因为不同的情况封装为不同的node类型,下面会详解各种类型的find方法
  6. 如果上述条件都不满足,则该key的值在链表中,从链表中查询数据。

具体代码以及注释如下:

public V get(Object key) {
    // tab:数组
    // e : 要查询的节点
    // n : 数组长度
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 获取hash值
    int h = spread(key.hashCode());
    
    // 数组不为null,数组上有数据,拿到数组指定位置上的数据
    if ((tab = table) != null && 
        (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 该key的hash 与 数组该位置上数据的hash相当
        if ((eh = e.hash) == h) {
            // 判断两个key的值相等,直接返回数组中该数据的val
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 如果eh小于0,这里有特殊情况
        // MOVED、TREEBIN、RESERVED 上述属性有注释
        else if (eh < 0)
            // 三种情况的find方法
            return (p = e.find(h, key)) != null ? p.val : null;
        // 是链表,遍历链表,找到符合的数据并返回
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    // 如果都没有则不存在。
    return null;
}

看完这里就简单的回答第一个问题了,但此时回答还不够详细,例如散列算法、是否加锁等内容还没解释。

spread 散列方法

散列算法与hashMap类似,但多了一步和Integer.MAX_VALUE进行与运算,保证得到一个正整数的结果。因为负数有特殊的含义:

  • static final int MOVED = -1; // 当前位置已经被扩容时迁移走了
  • static final int TREEBIN = -2; // 当前位置是一棵树
  • static final int RESERVED = -3; // 当前位置被占用,还未赋值

这三个值在源码中会非常高频的出现,文中会反复强调。

static final int spread(int h) {
    // (h ^ (h >>> 16)) 高位右移动16位后进行异或运算,为了让高位和低位都能参与运算。
    // 这里和hashMap是中的散列值计算是完全相同的。而因为负数有其他含义,这里需要保证为正数
    // 前面的计算结果 与 HASH_BITS 是为了保证返回的结果是正数。
    return (h ^ (h >>> 16)) & HASH_BITS;
}

看完这里可以回答第二个问题,concurrentHashMap的散列算法比HashMap多了一步需要保证为正数,因为负数有其他的含义。

迁移(ForwardingNode) find 方法

当该节点的hash为MOVED=-1说明该节点已经被迁移走了,要查询数据时需要去新数组上查询数据。

  1. 新数组、新数组中该下标位置的节点都为空,说明不存在该key,直接返回null
  2. 死循环查找,类似get方法,如果直接查询到了就返回结果
  3. 如果找到节点的hash还是小于0的,说明新数组中也出现了这三种情况(迁移走、树、还未赋值)
  4. 既不在数组上,又不是负数,那么只能是链表了,从链表中查询。

下面是具体代码,我在代码上每一行都写上注释了。

Node<K,V> find(int h, Object k) {
    // tab : 新数组
    outer: for (Node<K,V>[] tab = nextTable;;) {
        // n:新数组长度
        // e:新数组中的节点
        Node<K,V> e; int n;
        // 判断数组和k都不为空
        if (k == null || tab == null || (n = tab.length) == 0 ||
            (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        // 死循环
        for (;;) {
            // eh: 节点hash
            // ek: 节点的key
            int eh; K ek;
            // 如果能找到直接返回
            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            // 当hash小于0,说明还有特殊情况,下面和get的方法类似
            if (eh < 0) {
                // 继续套娃
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                    return e.find(h, k);
            }
            // 说明是链表,e.next 赋值给 e
            if ((e = e.next) == null)
                return null;
        }
    }
}

占用的(ReservationNode) find 方法

ReservationNode是被占用但还未赋值的实现,因为没被赋值呢,直接返回空即可。

static final class ReservationNode<K,V> extends Node<K,V> {
    ReservationNode() {
        super(RESERVED, null, null, null);
    }
    // 直接返回null
    Node<K,V> find(int h, Object k) {
        return null;
    }
}

(TreeBin) find方法

这里是TreeBin的实现,大家都知道当数组大于64并且链表长度超过8会被转换成红黑树。在源码中不仅仅会被转成红黑树还维护着一个双向链表,后续会详细讲解。查询过程:

  1. 拿到双向链表的头节点;
  2. 判断当前桶的状态,是否有写锁或等待锁;
  3. 如果有则判断该节点的key是否就是要查询的key,是直接返回,不是的话往下遍历一个节点;
  4. 使用cas尝试设置读锁,设置成功则从红黑树中查询当前key
  5. 判断自己为最后一个释放读锁的线程并且有其他线程正在请求写锁,唤醒请求写锁的线程。
//双向链表
TreeNode<K,V> root;
// 红黑树
volatile TreeNode<K,V> first;
volatile Thread waiter;
// 锁状态
volatile int lockState;
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock

final Node<K,V> find(int h, Object k) {
    if (k != null) {
        // e : 双向链表
        for (Node<K,V> e = first; e != null; ) {
            // s : 锁的状态
            // ek : 节点的key
            int s; K ek;
            
            // 判断当前节点是否有写锁或等待写锁
            if (((s = lockState) & (WAITER|WRITER)) != 0) {
                // 如果有的话,直接从双向链表中查询数据
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                e = e.next;
            }
            // 没有写锁或者等待写锁,则直接用case的方式设置一个读锁
            else if (U.compareAndSwapInt(this, LOCKSTATE, s,
                                         s + READER)) {
                TreeNode<K,V> r, p;
                try {
                    // 在红黑树中检索
                    p = ((r = root) == null ? null :
                         r.findTreeNode(h, k, null));
                } finally {
                    // 判断自己为最后一个释放读锁的线程,并且有写线程正在请求锁资源
                    Thread w;
                    if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                        (READER|WAITER) && (w = waiter) != null)
                        // 唤醒写线程
                        LockSupport.unpark(w);
                }
                return p;
            }
        }
    }
    return null;
}

看完这里就可以回答第三个问题,get的时候也可能会加一把读锁。

存储值

对于存放值比较常用的三个方法put、putIfAbsent、putAll,通过下面的代码可以看到他们三个都会调用putVal

put方法

// 直接把key的值变更为value
public V put(K key, V value) {
    // 这里直接调用的是 putValue,后面跟了一个boolean类型的参数,
    // 它的含义是:true:使用原有值,false:覆盖原有值
    return putVal(key, value, false);
}

// 如果key以前有对应的值,则不赋值;如果没有,则赋值。
public V putIfAbsent(K key, V value) {
    return putVal(key, value, true);
}

// 塞入一个map
public void putAll(Map<? extends K, ? extends V> m) {
    tryPresize(m.size());
    for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
        putVal(e.getKey(), e.getValue(), false);
}

putVal

下面是真实的添加值,由于该方法比较长,大家可以分块看,我在代码里对每一块进行了分割,它主要做了以下几步动作:

  1. 通过spread方法拿到散列值,该方法上面已经解释过了;
  2. 懒加载,调用initTable方法初始化数组;
  3. 因为上层是一个for循环,初始化成功后再次循环就进入到第二块,采用cas方法设置node值,如果成功则跳出循环,失败的继续往下走;
  4. 判断当前是否为MOVE状态,这里前面有反复提到,此时的状态为该桶已经被迁移走了,调用helpTransfer帮助扩容。扩容后面会详细解释;
  5. 接下来还会根据onlyIfAbsent判断当前值是否存在,因为onlyIfAbsent为true是不对旧值进行覆盖的,直接返回即可。这行代码是新版本优化的代码;
  6. 上述经历了,cas尝试存放节点、判断是否被迁移以及是否已经存在该key的节点都失败,此时说明该桶有hash冲突,采用synchronized进行加锁操作;此时又有一个DCL检测;判断当前桶的状态是否大于0,大于0说明是正常的节点,遍历当前链表把节点放到链表尾部;
  7. 前面判断了hash值大于0,当hash值等于TREEBIN(-3)说明为红黑树,根据红黑树的逻辑进行添加;
  8. 数据添加完成,判断链表是否大于TREEIFY_THRESHOLD(8),调用treeifyBin尝试转为红黑树;
  9. 如果是覆盖原有key的值,则直接返回
  10. 进行addCount计数
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 如果key value为空则直接抛出空指针。
    if (key == null || value == null) throw new NullPointerException();
    
    // 获取key的hash,散列算法。这里看下 ### spread 散列方法 
    int hash = spread(key.hashCode());
    // 记录链表长度
    int binCount = 0;
--------------------------------------- 第一块 --------------------------------
    // 获取成员变量数组
    for (Node<K,V>[] tab = table;;) {
        // f:i位置的数据
        // n:数组长度
        // i:索引位置
        // fh: key的hash
        Node<K,V> f; int n, i, fh; K fk; V fv;
        
        // tab==null 代表数据还未初始化入。(第一次put的第一次循环会走该条件进行初始化)
        if (tab == null || (n = tab.length) == 0)
            // 初始化数组。 这里看下 <a herf='#zz'> ### initTable 初始化数组 </a>
            tab = initTable();
            
--------------------------------------- 第二块 --------------------------------

        // i : 数组长度 & hash 求当前key的下标位置。
        // f : tabAt(数组,索引位置),获取数组对应位置的数据。
        // 当前位置数据为空
        // 初始化结束后,在正常情况下第二次循环就会走该分支逻辑
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 把key value封装成Node对象,并以CAS方式放到该数组的i位置中
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                // 成功就跳出循环
                break;                  
        }
        
--------------------------------------- 第三块 --------------------------------

        // 说明当前位置数据已经被迁移到新数组
        else if ((fh = f.hash) == MOVED)
            // 帮助扩容。
            tab = helpTransfer(tab, f);
            
        // 在jdk8中并没有这行判断,该行代码属于后面版本的优化,它的作用是如果不需要覆盖原有值
        // 并且该值在concurrentHashMap中,我直接返回即可,可以省去加锁put等操作。
        else if (onlyIfAbsent 
                 && fh == hash
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                 && (fv = f.val) != null)
            return fv;
            
--------------------------------------- 第四块 --------------------------------

        // 该分支代表着,该下标位置有数据,需要进行加锁添加数据
        else {
            // 声明数据,返回结果
            V oldVal = null;
            // 锁住当前桶位置的数据,优化了1.8之前的锁粒度。
            synchronized (f) {
                //DCL 再次判断数据无变化
                if (tabAt(tab, i) == f) {
                    // 当fh大于等于0,说明是链表添加。前面属性有解释TREEBIN=-2为红黑树
                    if (fh >= 0) {
                        // 记录链表长度
                        binCount = 1;
                        // e:指向数组位置数据
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 拿到当前key的hash,和数组位置数据的hash进行比较
                            // 如果两个key相等 返回ture
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 赋值老数据
                                oldVal = e.val;
                                // 判断是否需要覆盖
                                if (!onlyIfAbsent)
                                    // 可以覆盖,直接赋新的值。
                                    e.val = value;
                                break;
                            }
                            
                            // 两个key不等,说明是新数据,暂存当前下标数据
                            Node<K,V> pred = e;
                            // e 指向下一个节点,如果e == null,说明下面没节点了
                            if ((e = e.next) == null) {
                                // 将当前key、value封装为Node,挂在最后一个节点后面
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }
                    // 红黑树添加
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 这里是红黑树添加逻辑。
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;

                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                   // 在新版本的jdk中还 f 增加了预留类型,这里不做介绍。
                }
            }
--------------------------------------- 第五块 --------------------------------

            // 如果链表计数不为0
            if (binCount != 0) {
                // 链表计数大于等于8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 转换为红黑树
                    treeifyBin(tab, i);
                if (oldVal != null)
                    // 返回该key曾经对应的值。
                    return oldVal;
                break;
            }
        }
    }
--------------------------------------- 第五块 --------------------------------
    // 进行计数
    addCount(1L, binCount);
    return null;
}

initTable 初始化数组

这里需要用到属性sizeCtrl,它的数值有以下含义: // 标识数组初始化和扩容的标识信息 // 等于 -1 代表正在初始化 // 小于 -1 代表正在扩容低16位代表当前数组正在扩容的线程个数(如果1个线程扩容,值为-2,如果2个线程扩容,值为-3) // 等于 0 代表没有初始化 // 大于 0 如果数组没有初始化,则代表初始化的长度,如果数组已经初始化了,代表下次扩容的阈值 private transient volatile int sizeCtl;

该方法步骤如下:

  1. 判断是否当前table为空,
  2. 判断它的状态,如果小于0说明有人在扩容,让出cpu,让其他线程进行扩容;
  3. 使用cas尝试更改为-1代表正在初始化;
  4. 更改成功,采用DCL(双重检查)检查一次是否已经被初始化了,这里的代码要注意,整个JUC包下的源码非常多的地方都采用DCL。开始初始化,这里记住两个数:初始容量为16,扩容阈值(3/4)
  5. 更改失败,说明已经有其他线程初始化了,直接返回。

代码注释如下:

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // 判断数组未初始化
    while ((tab = table) == null || tab.length == 0) {

        // sizeCtl在属性里有介绍,当它等于-1时代表正在初始化,小于-1时正在扩容,并发情况下,被其他线程初始化完了
        if ((sc = sizeCtl) < 0)
            // 让出cpu给,让其他线程初始化
            Thread.yield(); // lost initialization race; just spin‘
          
        // 使用cas把sizeCtl更改为-1,代表当前线程正在初始化concurrentHashMap
        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
            try {
                // DCL 双重检查
                if ((tab = table) == null || tab.length == 0) {
                    // 判断sizeCtl是否大于0,如果是则取sc,否则取默认值16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    
                    // 创建数组
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    
                    // 成员变量和局部变量赋值
                    table = tab = nt;
                    
                    // 计算下次扩容的阈值,例如默认初始化容量16,下次扩容阈值就是12
                    sc = n - (n >>> 2);
                }
            } finally {
                // 把扩容阈值赋值给sizeCtl
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

treeifyBin 转换树

它是转为树的前置方法:

  1. 首先判断数组的长度是否小于MIN_TREEIFY_CAPACITY(64),如果是的话,优先扩容,因为数据存放在数组中查询起来性能会更好;
  2. 该位置不能空,并且它的hash>=0,说明是正常节点,小于的话说明已经被转完树、扩容了;
  3. 拿到该节点的锁,并且DCL检查,首先组装一个双向链表,再调用setTabAt转换红黑树。

判断是否需要转红黑树还是扩容;tab为数组,index为索引下标。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n;
    // 数组不为空
    if (tab != null) {
        // 如果数组长度小于64时,不转红黑树,优先扩容
        // 更希望存放在数组中,这样性能更好一些。
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 扩容
            tryPresize(n << 1);
            
      
        // 如果是负数说明已经转为红黑树或正在扩容
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
        
            // 以下代码是将单向链表转换为TreeNode(双向链接),再通过TreeBin转换为红黑树
            // TreeBin中保留着双向链表和红黑树
            synchronized (b) {
                // 当前下标数据赋值给b,并且它的hash大于等于零(正常状态),
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    // 遍历节点 b 中的所有节点
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        // 组装为TreeNode(双向链接)
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // 转换为红黑树
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

扩容

tryPresize尝试扩容

扩容前操作,它有两个入口,putAll长度大于8都会调用该方法;由于该方法也过长,我对代码进行了分割,大家一块一块看。

  1. MAXIMUM_CAPACITY 数组长度的最大值,如果比它的一半还大,则需要扩容的数组长度为MAXIMUM_CAPACITY,否则就设置为size最接近的2的N次幂的那个值。
  2. 判断是否没初始化,对数组进行初始化,因为putAll会直接调用这个方法,所以有可能没初始化。
  3. 如果要扩容的数值比sc小或者比最大值还大 什么都不做。
  4. 开始扩容,并设置当前扩容线程数量,扩容时sc的低位代表扩容线程数,第一个线程扩容时扩容线程数量+2,后续帮助扩容的线程,扩容线程数都是+1。这里贴的是jdk8的代码,中间有一段代码是bug,后面新版本已经删掉。
// 插入数据的长度
private final void tryPresize(int size) {
    // c : 扩容后的数组长度
    // 如果size大于数组长度/2 , 直接设置为最大值
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        // 长度设置为2的n次幂
        tableSizeFor(size + (size >>> 1) + 1);
        
    // sc : sizeCtl ,-1:初始化, 小于-1:扩容,0:未初始化,大于0:区分是否初始化,代表阈值或下一次扩容数组长度
    int sc;
    while ((sc = sizeCtl) >= 0) {
        // tab:数组,n:数组长度
        Node<K,V>[] tab = table; int n;
        // DCL 数组未初始化
        if (tab == null || (n = tab.length) == 0) {
            // 因为未初始化,所以代表初始化数组。由于还未初始化,所以选size和构造方法值中最大的一个。
            n = (sc > c) ? sc : c;
            
            // 使用cas设置为-1,代表要进行初始化。
            if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
                try {
                    // DCL 再次检查
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        // 初始化一个数组
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        // 赋值给table
                        table = nt;
                        // 阈值设置为3/4
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 赋阈值
                    sizeCtl = sc;
                }
            }
        }
        

        // 如果要扩容的数值比sc小或者比最大值还大 什么都不做。
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;

        // 开始进行扩容
        else if (tab == table) {
            // 计算扩容标识(基于旧的数组长度计算的扩容标识,因为允许并发扩容)
            // 并发扩容时,各个线程都需要从一个长度扩容成另一个长度。
            int rs = resizeStamp(n);
            
            // 这里是一个bug,前面判断的是sc>=0,但这里是小于0,是永远进不来的。
            // 我看了一下jdk17的源码,这个if已经被删掉。
            // 正在扩容 / 初始化(在这个方法中就是正在扩容)
            if (sc < 0) {
                // 
                Node<K,V>[] nt;
                // 如果扩容标识位不同,说明不是同一次扩容
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 
                    // 判断是否达到最后的检查阶段,(rs << RESIZE_STAMP_SHIFT) + 1
                    sc == rs + 1 ||
                    // 这个是判断到达最大值,这里应该是rs再左移16 + MAX_RESIZERS
                    sc == rs + MAX_RESIZERS || 
                    // 如果新数组是null,说明扩容完了
                    (nt = nextTable) == null ||
                    // 扩容是从后往前扩容,到0说明已经扩容完了
                    transferIndex <= 0)  
                    break;
                // 尝试扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }

            // 还没有执行扩容,cas将sizeCtl更改为sc,扩容标识戳左移16为,让它变为负数
            // 为什么是低位+2,代表1个线程扩容。 低位为5,就代表4个线程正在并发扩容
            // 扩容分为2部:创建新数组,迁移数据。
            // 当最后一个线程迁移完毕数据后,对低位-1.最终结果低位还是1,需要对整个老数组再次检查,数据是否迁移干净
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // 开始扩容
                transfer(tab, null);
        }
    }
}

// 计算n在二进制表示时,前面有多少个0
static final int resizeStamp(int n) {
    // Integer.numberOfLeadingZeros(n) 扩容标识
    // 1 << (RESIZE_STAMP_BITS - 1 这一步是为了保证后面运算时,可以得到一个负数。
    // 这样sc < -1就代表了正在扩容
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

transfer 扩容

这里就是真正的扩容方法了,这个方法超级长,还是会把该方法进行分块,帮助大家更好分析代码。

  1. 计算线程当前要迁移桶的数量,这里会根据cpu的数量进行计算,充分利用cpu资源;
  2. 如果新数组还没初始化,则进行初始化;
  3. 循环迁移,通过cas尝试领取任务;
  4. 判断要迁移的节点是否为空或者已经被迁移走了,如果是则下一次循环;
  5. 锁住当前桶,DCL检测,开始迁移。
  6. 扩容完成finishing = advance = true;并从头检查一遍。注意:这里的判断 线程数-2==0 时才会走到这一步,说明最后一个线程来检查。

该方法代码特别长,注释标记的比较多,如果有不清楚的地方可以留言讨论。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    // 创建新数组流程
    // n:老数组长度,   stride:每个线程每次扩容的步长
    int n = tab.length, stride;
    
    // 如果每个线程迁移的长度基于CPU计算,大于16,就采用计算的值,如果小于16,就用16
    // 每个线程每次最小迁移16长度数据
    // 这个操作就是为了充分发挥CPU性能,因为迁移数据是CPU密集型操作,尽量让并发扩容线程数量不要太大,从而造成CPU的性能都消耗在了切换上,造成扩容效率降低
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
        
--------------------------------------- 第一块 --------------------------------

    // 如果新数组还未初始化
    if (nextTab == null) {            // initiating
        try {
            // 初始化数组
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            // oom 或者 数组越界
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // 成员变量赋值
        nextTable = nextTab;
        // 设置为老数组的长度
        transferIndex = n;
    }
--------------------------------------- 第二块 --------------------------------

    // nextTale,nextTab:新数组
    // nextn:新数组长度  
    // transferIndex:线程领取任务时的核心属性 
    int nextn = nextTab.length;
    
    // 声明fwd节点,在老数组迁移数据完成后,将fwd赋值上去
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 领任务的核心标识
    boolean advance = true;
    // 扩容结束标识
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 领取任务的while循环
        while (advance) {
            int nextIndex, nextBound;
            // 完成了,或者
            if (--i >= bound || finishing)
                advance = false;
            // 老数组下标<=0 说明没有任务可以领取了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // * 这里是通过cas方式给当前线程设置迁移标识,并且领取相应数据。
            // 每次领取是从后往前领取stride个下标位置的数据。
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 计算下次领任务的下标
                bound = nextBound;
                i = nextIndex - 1;
                // 跳出循环
                advance = false;
            }
        }
--------------------------------------- 第三块 --------------------------------

        // i < 0 迁移完最后一段 ||  最后检查阶段 ||  理论上不会出现
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 判断结束
            if (finishing) {
                // 检查阶段
                nextTable = null;
                // 将迁移完数据的新数组,指向指向的老数组
                table = nextTab;
                // 将sizeCtl复制为下次扩容的阈值
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 当前线程没有任务领取,退出扩容操作,sc-1
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 判断是否为最后一个扩容的线程
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 说明是最后一个扩容。结束和领取都设置为true,再次循环检查一遍。
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
--------------------------------------- 第四块 --------------------------------

        // 如果在下标i的值为空,则直接设置为一个fwd,代表当前位置迁移完成
        //在下次循环时该节点 == MOVED , advance = true;
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
            
        //  如果发现还有MOVED
        else if ((fh = f.hash) == MOVED)
            // 可以再次领取
            advance = true; // already processed
        else {
        
--------------------------------------- 第五块 --------------------------------

            // 锁住当前位置
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 说明当前节点正常
                    if (fh >= 0) {

// 这里比较难理解,n为老数组的长度,假设n是32,也就是100000。新数组长度是64,
// 要与64-1进行异或运算 111111
// 此时只有当hash值在第6为是0的时候 也就是h&n = 0,那这个数字肯定在低位。
// 后面的红黑树迁移也类似
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
        // 提前循环一次,将节点赋值到对应的高低位,如果链表后面的高低位没变化,则直接复制
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        // 再次循环时,只循环到lastRun位置
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 设置低位节点
                        setTabAt(nextTab, i, ln);
                        // 设置高位节点
                        setTabAt(nextTab, i + n, hn);
                        // 原数组节点设置fwd
                        setTabAt(tab, i, fwd);
                        // 再次领取
                        advance = true;
                    }
                    // 红黑树迁移
                    else if (f instanceof TreeBin) {
                        // 进行类型转换
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        // 设置低位头、尾节点
                        TreeNode<K,V> lo = null, loTail = null;
                        // 设置高位头、尾节点
                        TreeNode<K,V> hi = null, hiTail = null;
                        // 初始化低位节点数量和高位节点数量为 0
                        int lc = 0, hc = 0;
                        // 循环遍历
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            // 获取节点的hash
                            int h = e.hash;
                            // 创建新的TreeNode
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            // 根据与运算判断是否为低位
// 这里比较难理解,n为老数组的长度,假设n是32,也就是100000。新数组长度是64,
// 要与64-1进行异或运算 111111
// 此时只有当hash值在第6为是0的时候 也就是h&n = 0,那这个数字肯定在低位。
                            if ((h & n) == 0) {
                                // 如果低位节点目前是null,则把该节点设置为低位节点
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    // 否则设置为低位的下一个节点
                                    loTail.next = p;
                                // 更新低位节点
                                loTail = p;
                                // 低位节点数量➕1
                                ++lc;
                            }
                            
                            // 是高位
                            else {
                                // 同理低位逻辑
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 根据低位数量决定是链表还是红黑树
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // 设置低位节点
                        setTabAt(nextTab, i, ln);
                        // 设置高位节点
                        setTabAt(nextTab, i + n, hn);
                        // 把原数组设置为fwd
                        setTabAt(tab, i, fwd);
                        // 再次领取
                        advance = true;
                    }
                }
            }
        }
    }
}

helpTransfer 帮助扩容

该方法在介绍的源码中,只有在put时,发现该桶已经被移走了才会触发,在扩容时sc低位代表正在扩容的数量,协助扩容数量会+1,第一次扩容的线程数量会+2。

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 老数组不为null,当前节点是fwd,新数组不为null
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // 获取自己的扩容标识戳
        int rs = resizeStamp(tab.length);
        // 判断之前赋值的内容是否有变化,并且sizeCtl是否小于0
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            //这里贴的是jdk8的代码,有bug,新版本jdk对这里有修复。
            // 这里含义就是如果不是同一批扩容 、已经扩容完成、扩容的数量超过最大值都直接跳过
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 
                sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || 
                transferIndex <= 0)
                break;
            // CAS,将sizeCtl + 1,代表来协助扩容了
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

计数

size 方法

面试时特别爱问concurrenHashMap是如何统计数值的,对于size方法,它是直接调用的sumCount

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

sumCount 获取总数

sumCount方法的实现非常简单,它就是取baseCount与数组counterCells中存的值的和。

final long sumCount() {
    // 获取counterCells数组
    CounterCell[] as = counterCells; CounterCell a;
    // 把baseCount赋值给sum
    long sum = baseCount;
    if (as != null) {
        // 如果counterCells数组不为空,把它的值都加给sum
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

addCount 计数

对于concurrentHashMap取数来讲是很容易看到的,我们接下来看一下它是如何计数的。 这个方法有两个作用:

  • 计数器,如果添加元素成功,对计数器 + 1
  • 检验当前ConcurrentHashMap是否需要扩容

这里先说计数器:

  1. 判断counterCells是否不为空 || 通过cas直接给baseCount计数失败;
  2. 如果进入到该if分支,则会调用fullAddCount对counterCells初始化,后续计数随机拿一个counterCell通过cas进行计数;

是否需要扩容: 这里主要是对check进行判断,当check小于0时是移出节点,不需要进行扩容判断,只有大于等于0时才需要扩容。

代码注释如下:

private final void addCount(long x, int check) {
    // 定义计数器单元格数组、基础计数值的旧值和新值
    CounterCell[] as; long b, s;
    // 如果计数器单元格数组不为空,或者通过 CAS 操作更新基础计数值失败。
    // 没有冲突时size值存放在baseCount,有冲突才会初始化counterCells,并放在counterCells中
    if ((as = counterCells)!= null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        // 定义计数器单元格、单元格的值、掩码
        CounterCell a; long v; int m;
        // 标记是否无竞争
        boolean uncontended = true;
        // 如果计数器单元格数组为空,或者数组长度小于 0,或者根据随机探针获取的单元格为空,或者通过 CAS 操作更新单元格值失败
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                // CounterCell[]已经初始化了,并且指定索引位置上有CounterCell
                // 直接CAS修改指定的CounterCell上的value即可。
                // CAS成功,直接告辞!
                // CAS失败,代表有冲突,uncontended = false,执行fullAddCount方法
                !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // 执行完整的计数增加操作,
            // 1、这里会对CounterCell 进行初始化
            // 2、当增加size数量有失败、有其他线程正在添加、还未超过最大cpu都会对CounterCell进行扩容,每次扩容为上次的2倍。
            fullAddCount(x, uncontended);
            return;
        }
        // 如果检查条件小于等于 1,直接返回
        if (check <= 1)
            return;
        // 计算总和计数
        s = sumCount();
    }
    // 如果检查条件大于等于 0
    if (check >= 0) {
        // 定义表格、新表格、表格长度、扩容控制值等
        Node<K,V>[] tab, nt; int n, sc;
        // 当总和计数大于等于扩容控制值,且表格不为空,表格长度小于最大容量时
        while (s >= (long)(sc = sizeCtl) && (tab = table)!= null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            // 计算扩容标记
            int rs = resizeStamp(n);
            // 如果扩容控制值小于 0
            if (sc < 0) {
                // 各种条件判断是否中断扩容
                if ((sc >>> RESIZE_STAMP_SHIFT)!= rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    break;
                // 通过 CAS 操作增加扩容工作线程数量
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 如果扩容控制值大于等于 0
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            // 重新计算总和计数
            s = sumCount();
        }
    }
}

fullAddCount 方法

这个方法是对CounterCell整体的初始化;

  1. 第一块代码代表着对数组中对象初始化;
  2. 第二块代码代表着对数组进行进行扩容每次为上一次的2倍;这里的扩容条件是cas失败,没线程扩容,数组长度小于cpu核数;
  3. 第三块代码代表着对数组进行初始化,第一次初始化的值为2;
private final void fullAddCount(long x, boolean wasUncontended) {
    // 获取随机探针值
    int h;
    // 如果随机探针值为 0
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        // 初始化随机数生成器
        ThreadLocalRandom.localInit();
        // 重新获取随机探针值
        h = ThreadLocalRandom.getProbe();
        // 标记为无竞争
        wasUncontended = true;
    }
    // 标记是否冲突(上次槽位非空)
    boolean collide = false;
    // 无限循环,直到满足条件退出
    for (;;) {
    
        // 定义计数器单元格数组、计数器单元格、数组长度、单元格值等
        CounterCell[] cs; CounterCell c; int n; long v;
        // 如果计数器单元格数组不为空且长度大于 0
        if ((cs = counterCells)!= null && (n = cs.length) > 0) {
            // 如果根据随机探针计算的槽位为空
            if ((c = cs[(n - 1) & h]) == null) {
                // 如果没有其他线程正在操作单元格
                if (cellsBusy == 0) {
                    // 尝试创建新的计数器单元格
                    CounterCell r = new CounterCell(x);
                    // 如果没有其他线程正在操作单元格,通过cas设置为忙绿
                    if (cellsBusy == 0 &&
                            U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {
                            // DCL检查
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells)!= null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            // 重置忙碌标志
                            cellsBusy = 0;
                        }
                        // 如果创建成功,退出循环
                        if (created)
                            break;
                        // 否则继续
                        continue;
                    }
                    // 标记无冲突
                    collide = false;
                }
--------------------------------------- 第一块 --------------------------------

                // 如果上次操作有竞争
                else if (!wasUncontended)
                    wasUncontended = true;
                // 如果通过 CAS 操作成功更新单元格值
                else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))
                    break;
                // 如果计数器单元格数组发生变化或达到最大 CPU 数量
                else if (counterCells!= cs || n >= NCPU)
                    collide = false;
                // 如果之前没有冲突,现在标记为冲突
                else if (!collide)
                    collide = true;
                // 如果没有其他线程正在操作单元格并且成功设置忙碌标志
                else if (cellsBusy == 0 &&
                        U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        // 如果计数器单元格数组未变化,进行数组扩容
                        if (counterCells == cs)
                            counterCells = Arrays.copyOf(cs, n << 1);
                    } finally {
                        // 重置忙碌标志
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;
                }
                // 推进随机探针
                h = ThreadLocalRandom.advanceProbe(h);
            }
        }
--------------------------------------- 第二块 --------------------------------

        // 如果计数器单元格数组为空,且没有其他线程正在操作,并且成功设置忙碌标志
        else if (cellsBusy == 0 && counterCells == cs &&
                U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {
                // 初始化计数器单元格数组
                if (counterCells == cs) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                // 重置忙碌标志
                cellsBusy = 0;
            }
            // 如果初始化成功,退出循环
            if (init)
                break;
        }
        
--------------------------------------- 第三块 --------------------------------

        // 前面都没有成功,再通过 CAS 操作成功更新基础计数值一次如果成功了,直接退出
        else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
            break;
    }
}

问题

现在回头看文章开头的十个问题。

1、concurrentHashMap的查询过程是什么样的?

1、获取key散列值计算下标位置,判断该位置的key是否与要查询的key相等,如果相等直接返回; 2、如果该位置的节点hash值小于0,根据不同含义封装的Node进行策略查找。小于0有特殊含义;被迁移走、未赋值、一颗树; 3、上述两种情况都不符合,说明是在链表中,逐个查询。


2、 concurrentHashMap的散列算法和hashMap的有什么区别?

concurrentHashMap的散列算法需要保证计算计算的结果为正数,而hashMap不需要。


3、concurrentHashMap查询过程会加锁么?

在查询的过程中有可能加锁,当要查询的位置是一棵树时,如果该树正在被其他线程的写锁、等待写锁占用,则直接从双向链表中查询一个节点判断是否符合,如果不符合则尝试获取读锁,读锁获取成功则拿到值进行返回。


4、concurrentHashMap的K/V允许为空么?为什么?

不允许为空,如果为空会抛出NullPointerException异常。同时如果存入一个null的话会产生歧义,concurrentHashMap是一个线程安全的工具类,出现歧义的话没法使用了。


5、concurrentHashMap什么时候初始化数组?

在真正插入数据时进行初始化。如:putValputAll方法。


6、 concurrentHashMap哪些时机会进行扩容?

1、达到阈值时进行扩容,阈值为当前数组长度的3/4; 2、链表转红黑树时会检查数组长度是否小于64,如果小于也会进行扩容。


7、concurrentHashMap如何实现并发扩容的?

根据扩容时机会有一个线程发起扩容,扩容时sizeCtl的低16位会记录扩容的线程数量,第一个线程会记录成2个线程,后续的协助线程都记录成1个线程。开始扩容时会根据cpu以及数组长度计算出每个线程每次需要搬迁节点的步长,每个线程都会从尾部开始分配要迁移的桶,每次分配前面计算的步长个桶,在搬迁的过程锁住每个桶,桶下的数据只会迁移到新数组中的当前index,或者index+n(n:代表原数组长度)的位置;所有线程都完成迁移后,最后一个线程会从尾部到头检查一遍。


8、concurrentHashMap的size方法是准确的么?

是准确的。


9、concurrentHashMap中计数的桶是如何扩容的?

内部维护了一个计数字段、一个计数数组;如果没并发时,通过cas维护计数字段即可,并发高时就需要初始化计数数组,并且随着并发度会逐渐对该数组进行扩容,扩容的长度不超过cpu核数,每次扩容是原来的2倍,初始化长度为2。


10、concurrentHashMap同一个桶中,链表和红黑树可以共存么?

可以的,TreeBin其实既维护了链表又维护了红黑树;当链表转为红黑树时是先转为双向链表再转为红黑树的。

总结

至此就解析完了concurrentHashMap绝大部分代码,由于代码量太大、代码写的也比较晦涩并夹杂着一些bug,已经尽力解析成让大家能懂的注释。 最近看了很多juc相关代码,它的整体套路爱用死循环、一个参数代表多种含义,大家自己看juc源码时可以从以下几点入手:

  1. 对一个实体中的成员属性研究清楚代表什么含义,看起源码来会事半功倍;
  2. 在一个死循环中有多个if去处理不同的事情,大家可以优先去看每个if是做什么的,这样可以让大家更快的理解这个方法都能干什么事。
  3. 实在看不懂的代码,可以多个jdk版本同时看一下,毕竟是人写的代码就会有bug,在新版本中有对bug进行修复。

有解析不对、不清楚的地方大家可以留言讨论。


看到这里的朋友辛苦帮忙点个赞吧~

转载自:https://juejin.cn/post/7403586214861275175
评论
请登录