likes
comments
collection
share

Java线程安全的集合类ConcurrentHashMap详解

作者站长头像
站长
· 阅读数 11

前言

本文为笔者学习和思考后结合自己的理解所做的总结,参考了很多大佬的讲解,但还是不排除存在理解有误的地方,如有错误请指正

我会分别总结Java1.7和1.8两个版本的ConcurrentHashMap的实现原理,主要是对初始化、put、get、扩容几个方法进行总结

为什么有了HashMap还需要ConcurrentHashMap

HashMap作为Java非常经典的集合数据结构,增删改查都可以做到O(1)的时间复杂度,非常值得学习。但是HashMap在并发情况下会出现死循环的问题。

HashMap并发死循环原理

HashMap扩容时会进行Rehash过程,而Rehash的过程是顺序访问旧数组中的链表,但采用头插法添加到到新数组对应位置的链表中,这个过程会将原来的链表顺序颠倒,同时也是在并发情况下产生死循环的原因

//顺序遍历旧数组中的链表,取出链表的当前元素e,记住旧链表的下一个元素next
Entry<K,V> next = e.next;  // 1  
//重新在计算新数组中的下标
int i = indexFor(e.hash, newCapacity);
//使用头插法将元素插入新数组
e.next = newTable[i];
newTable[i] = e;
//再取旧数组中的下一个元素
e = next;

假如线程1执行到代码 1 处时间片用完了,当前的的链表情况如下:

e指向 key=3的节点1 next 指向 key=7的节点2

Java线程安全的集合类ConcurrentHashMap详解

而此时线程二获得时间片开始执行

第一次:e指向节点1,next指向节点2,采用头插法把e插入新数组中

第一次:e指向节点2,next指向节点3(key=5),采用头插法把e插入新数组中

此时在新链表中链表的顺序发生了颠倒 节点2在前,节点1在后了

第三次执行后完成了扩容,现在的链表情况就变成了这样:

Java线程安全的集合类ConcurrentHashMap详解

之后线程1重新获得时间片执行,但是他并不知道已经扩容结束了,他现在的 e指向 key=3的节点1 next 指向 key=7的节点2

开始继续按照扩容的代码执行,再次将e使用头插法添加到新数组中,这个时候就出现环了节点e2.next=节点e1,而线程1又将e1使用头插法加入到链表头部,e1.next=e2

后续线程1不断取next进行头插法加入新数组,其实一直都是e1,e2两个在互换位置

所以官方提供了ConcurrentHashMap保证并发情况下HashMap的线程安全

ConcurrentHashMap源码

JDK1.7版本

存储结构如下

Java线程安全的集合类ConcurrentHashMap详解

1.7版本的ConcurrentHashMap是引入了一个段(Segment)数组,每个段内是一个HashMap结构,使用分段锁保证线程安全,每次锁一个段(Segment),不同段上的数据互不影响,所以理论上最大并发度就是Segment数组的长度,Segment数组的长度在初始化后就不可再改变,因此最大并发度也在初试化时就确定

初始化

初始化函数除了一些参数校验之外主要做了三件事:

  1. 确定最大并发度ssize:根据concurrencyLevel向上取最接近的2的n次方做为最大并发度,也就是Segment数组的长度
  2. 根据总的容量确定每个Segment的容量大小cap:(initialCapacity/段长度)向上取最接近的2的n次方,最小是2
  3. 根据确定的ssize创建Segment数组,并且根据cap和loadFactor初始化Segment[0]作为后续其他Segment节点初始化的模版
/**
 * 默认初始化容量
 */
static final int DEFAULT_INITIAL_CAPACITY = 16;
​
/**
  * 默认负载因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
​
/**
 * 默认并发级别
 */
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
​
//无参构造 调用的是有参构造函数,传入三个默认值
public ConcurrentHashMap() {
  this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
​
//有参构造
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    // 参数校验
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    
  // 校验并发级别大小,大于 1<<16,重置为 65536
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    // 2的多少次方
    int sshift = 0;
    int ssize = 1;
    // 这个循环可以找到 concurrencyLevel 之上最近的 2的次方值
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 记录段偏移量
    this.segmentShift = 32 - sshift;
    // 记录段掩码
    this.segmentMask = ssize - 1;
  
    // 设置容量
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // c = 容量 / ssize ,默认 16 / 16 = 1,这里是计算每个 Segment 中的类似于 HashMap 的容量
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    //Segment 中的类似于 HashMap 的容量至少是2或者2的倍数
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    // 创建 Segment 数组,设置 segments[0]
    Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
put

put分三部分

  1. 段数组层面的put:通过key的hash值前几位j=(hash >>> segmentShift) 找到段数组对应的段
  2. 段初始化:段为null时调用ensureSegment(j) 对段进行初始化
  3. 段层面的put:一个段相当于一个HashMap,这里是在段内进行put

段数组层面的put

通过key的hash值计算段地址j,找到段数组上对应的段,调用段的put

  • segmentShift:每个段的容量cap需要占的位数,比如每个段的大小为1024,则segmentShift=10
  • segmentMask: 段掩码,拿来取段内地址,比如段大小为1024,则段内地址为[0-1023],用低9位表示,segmentMask就是为了取hash值的低9位 所以segmentMask=2进制(111111111)
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // hash 值无符号右移 28位(初始化时获得),然后与 segmentMask=15 做与运算
    // 其实也就是把高4位与segmentMask(1111)做与运算
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 如果查找到的 Segment 为空,调用ensureSegment初始化
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

段初始化ensureSegment(j)

段初始化其实就三个操作

  1. 获取模版Segment[0]的参数信息(容量,负载因子,阈值)
  2. 创建Segment
  3. CAS操作将新创建的段添加到Segments数组中

但本着尽可能早的发现Segement以及被其他线程初始化而终止当前的初始化,从而减少资源浪费的思想,在每进行一个阶段操作前都进行一次判断空,先后进行了两次判空操作才使用CAS初始化

  1. 进入后尽早判断Segment[k]是否null (第一次判断段是否为null)
  2. 获取Segment[0]段模版的容量,负载因子,阈值做初始化准备
  3. 第二次判断段是否为null
  4. 使用上面获取到的创建Segment
  5. CAS操作将新创建的段添加到Segments数组中
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    // 第一次判断 u 位置的 Segment 是否为null
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        // 获取0号 segment 里的 HashEntry<K,V> 初始化长度
        int cap = proto.table.length;
        // 获取0号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是相同的
        float lf = proto.loadFactor;
        // 计算扩容阀值
        int threshold = (int)(cap * lf);
        // 创建一个 cap 容量的 HashEntry 数组
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
      // 第二次判断 u 位置的 Segment 是否为null
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            // 再次检查 u 位置的 Segment 是否为null,因为这时可能有其他线程进行了操作
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 自旋检查 u 位置的 Segment 是否为null
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                // 使用CAS 赋值,只会成功一次
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

段内put

在Segment数组中获取hash值对应的段Segment,并且确认段不为null后会进行段内put

因为段内本身就是一个HashMap结构所以段内put和HashMap的put几乎是一样的,唯一的区别就是在最开始需要先获取锁tryLock()

  1. 获取锁,获取成功往下执行,获取失败则进入scanAndLockForPut(key, hash, value) 后面会介绍
  2. 根据hash值计算key在段内HashEntry数组的位置,该位置上是一个链表(也可能为null)
  3. 遍历链表看是否存在key值相同的元素,如果有则判断是否允许覆盖,然后再对值进行覆盖
  4. 若不存在key值相同的,则使用头插法添加一个新节点,同时判断是否添加新节点后是否达到阈值,达到阈值则扩容 rehash(node)后面会介绍
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 获取 ReentrantLock 独占锁,获取不到,scanAndLockForPut 获取。
    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        // 计算要put的数据位置
        int index = (tab.length - 1) & hash;
        // CAS 获取 index 坐标的值
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                // 检查是否 key 已经存在,如果存在,则遍历链表寻找位置,找到后替换 value
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                // first 有值没说明 index 位置已经有值了,有冲突,链表头插法。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 容量大于扩容阀值,小于最大容量,进行扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    // index 位置赋值 node,node 可能是一个元素,也可能是一个链表的表头
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

再来看看scanAndLockForPut(key, hash, value)

tryLock()获取不到锁的线程则进入这个函数,他有两个作用

  1. 不断自旋获取锁while (!tryLock()) 这是最主要的作用
  2. 在自旋的过程他为了不是纯粹的空自旋,还添加了一点别的作用就是去遍历链表查看是否存在key值相同的元素,如果不存在就在自旋期间把 HashEntry节点先创建了,算是为后面获得锁之后的插入操作节约了一点时间

在确定链表查看是否存在key值相同的元素后的自旋就是空自旋了,进入空自旋有两个操作

  1. 他会对空自旋次数进行统计,达到自旋阈值MAX_SCAN_RETRIES就进入阻塞了
  2. 同时每两次空自旋就会判断一次当前的链表头节点是否变化,因为添加元素时使用头插法,通过判断头节点是否变化来判断是否有新的节点加入进来(也有可能是头节点被删除了) ,如果有新节点加入进来他就得重新判断是否存在key相同的元素
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    // 自旋获取锁
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            // 自旋达到指定次数后,阻塞等到只到获取到锁
            lock();
            break;
        }
        //判断一次当前的链表头节点是否变化
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
          //重新变成刚进入时的状态,重新判断是否存在key相同的元素
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}
扩容

因为此处的扩容是在段内扩容,并且是在持有锁的情况下扩容,所以这里的扩容和HashMap的扩容机制是几乎是一样的

  1. 创建一个新的数组,数组长度为旧数组的两倍
  2. 遍历旧数组每个位置的链表,将链表上的每个元素与新的掩码(newCapacity - 1)进行&计算在新数组的位置,并使用头插法把元素插入新数组中

区别在于这里的扩容会在第2步前先计算lastRun的位置,lastRun节点就是最后一串hash值在新数组中位置没变的节点中的头节点

Java线程安全的集合类ConcurrentHashMap详解

找到lastRun节点后直接将lastRun节点先加入到新数组中,使得最后这一部分的节点得到复用,不需要重新创建

所以实际上的顺序为

  1. 创建一个新的数组,数组长度为旧数组的两倍
  2. 找到lastRun节点后直接将lastRun节点先加入到新数组中
  3. 遍历旧数组每个位置的链表,将链表上的截止到lastRun节点之前到每个元素与新的掩码(newCapacity - 1)进行&计算在新数组的位置,并使用头插法把元素插入新数组中
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    // 老容量
    int oldCapacity = oldTable.length;
    // 新容量,扩大两倍
    int newCapacity = oldCapacity << 1;
    // 新的扩容阀值 
    threshold = (int)(newCapacity * loadFactor);
    // 创建新的数组
    HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩码,默认2扩容后是4,-1是3,二进制就是11。
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        // 遍历老数组
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 计算新的位置,新的位置只可能是不便或者是老的位置+老的容量。
            int idx = e.hash & sizeMask;
            if (next == null)   //  Single node on list
                // 如果当前位置还不是链表,只是一个元素,直接赋值
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // 如果是链表了
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                // 新的位置只可能是不便或者是老的位置+老的容量。
                // 遍历结束后,lastRun 后面的元素位置都是相同的
                for (HashEntry<K,V> last = next; last != null; last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // ,lastRun 后面的元素位置都是相同的,直接作为链表赋值到新位置。
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    // 遍历剩余元素,头插法到指定 k 位置。
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 头插法插入新的节点
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}
get

get操作不需要加锁,直接读就行了

  1. 根据hash值计算段地址,Segment数组找到对应的段
  2. 根据hash值计算在段内的数组上的位置
  3. 遍历数组对应位置的链表,查看是否存在key相同的值,若存在则返回value,不存在则返回null
public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 计算得到 key 的存放位置
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            // 如果是链表,遍历查找到相同 key 的 value。
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}
锁的使用
  1. 采用了Segment数组+HashEntry数组+链表的结构,通过Segment来降低锁的粒度,提高并发度
  2. 段初始化时ensureSegment使用乐观锁(CAS操作)为将初始化的段赋值到Segment数组中
  3. put操作时先对Segment段加锁,加锁成功才能进入段内操作

总的思想将一个大的HashMap拆分成n个小的HashMap(segment),每个segment在任意时刻只允许一个线程操作

JDK1.8版本

存储结构

JDK1.7中采用Segement数组+HashEntry数组+链表的结构进行存储,采用分段锁的方式解决并发问题,所以并发度取决于Segement数组的大小,并且这个大小是在初始化的过程就决定的,后期不能修改

JDK1.8版本中摒弃了这种设计,采用了和HashMap类似的Node数组+链表+红黑树的方式实现,并发的大小取决于Node数组的大小,锁的粒度更小,每次只锁一条链表的数据,并且Node数组是可以扩容的,所以并发度也可以随着扩容而增大。

Java线程安全的集合类ConcurrentHashMap详解

初始化

初始化的函数非常简单,因为他并没有做实际的初始化工作,真正的初始化工作是在添加元素的时候做的

对于无参数的构造方法,他甚至什么不做

对于有参数的方法

  1. 找到初始化容量1.5倍+1向上最接近的2的n次方cap
  2. 将cap暂存在sizeCtl中
// 这构造函数里,什么都不干
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
  //sizeCtl这个变量在整个ConcurrentHashMap中非常重要,后面会具体讲
  this.sizeCtl = cap;
}

sizeCtl

sizeCtl在整个ConcurrentHashMap中非常重要,他有四种状态

  1. 表示未实际初始化前暂存初始化的容量大小:sizeCtl=cap
  2. 表示处于实际初始化过程中:sizeCtl=-1
  3. 表示整个ConcurrentHashMap的阈值:sizeCtl = sc (sc是0.75*n)
  4. 表示并发扩容的线程个数 sizeCtl=-N -N表示有N个线程正在协助扩容

这里没看懂没关系,跟着思路走,后面到具体使用到的地方就理解了

put

实际上的put方法只有5个步骤

  1. 根据hash值找到元素应该存放的Node数组的下标

  2. 对Node数组对应下标的链表头节点上锁 synchronized (f)

  3. 上锁成功之后第一步就是判断我们锁的节点是否还是链表头节点,如果不是就放弃操作,重新去竞争表头节点的锁。为什么需要这一步判断呢? 因为我们上锁的这个时间内可能我们上锁的表头节点已经不是表头节点了,比如remove了表头节点,这样我们上的锁就无效了,因为别的线程通过synchronized锁的表头节点和我们不是一个对象,所以也能成功获得锁

  4. 成功锁住链表后就开始真正的put操作了,判断是链表节点还是红黑树节点(if (fh >= 0) // fh是头节点的 hash值,大于 0说明是链表

    • 如果是链表则遍历链表检查是否有key相同的元素,若存在相同的元素则覆盖,若没有则在表尾插入元素
    • 如果是红黑树节点则检查是否调用红黑树的putVal方法插入元素
  5. 成功插入后调用addCount将元素个数+1,addCount()里面会判断存储的数据大小是否达到阈值,如果达到则进入扩容操作

但是put方法在上述的操作之前还进行3个状态判断

  1. 判断Hash表是否初始化 if (tab == null || (n = tab.length) == 0)

    在之前讲到init方法并未实际做初始化,而是在put操作的时候进行实际的初始化,就是通过这个判断分支调用初始化方法initTable(),后面会详解

  2. 判断Hash表对应下标的位置是否为空 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)

    因为Hash表对应下标的位置为空则不存在表头节点,没有办法对表头节点进行上锁,所以通过CAS操作直接将我们要插入的节点变成表头节点

  3. 判断是否正在并发扩容 else if ((fh = f.hash) == MOVED)

    hash值为MOVED是并发扩容过程中的一个标志状态,在这个状态下线程需要去先一起帮忙完成扩容后才可以进行其他操作 tab = helpTransfer(tab, f) 后面会详解

当put方法每次进入上述三个分支的时候表示本次插入还未完成(第二个分支CAS失败也是插入失败),所以在外层加了一个for循环来不断尝试插入元素,直到成功插入才退出

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 得到 hash 值
    int hash = spread(key.hashCode());
    // 用于记录相应链表的长度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果数组"空",进行数组初始化
        if (tab == null || (n = tab.length) == 0)
            // 初始化数组,后面会详细介绍
            tab = initTable();
​
        // 找该 hash 值对应的数组下标,得到第一个节点 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果数组该位置为空,
            //    用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了
            //          如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // hash 居然可以等于 MOVED,这个需要到后面才能看明白,不过从名字上也能猜到,肯定是因为在扩容
        else if ((fh = f.hash) == MOVED)
            // 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
            tab = helpTransfer(tab, f);
​
        else { // 到这里就是说,f 是该位置的头节点,而且不为空
​
            V oldVal = null;
            // 获取数组该位置的头节点的监视器锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 头节点的 hash 值大于 0,说明是链表
                        // 用于累加,记录链表的长度
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了链表的最末端,将这个新值放到链表的最后面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 红黑树
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插值方法插入新节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
​
            if (binCount != 0) {
                // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换,
                    // 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
                    //    具体源码我们就不看了,扩容部分后面说
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 
    addCount(1L, binCount);
    return null;
}

initTable

在put操作前进行了三次状态的判断,第一个就是判断Hash表是否初始化,如果未初始化则通过initTable函数完成初始化操作

  1. 判断当前Hash表是否还为null: (tab = table) == null || tab.length == 0

    因为在我们上次判断到进入initTable方法的过程中可能已经有其他线程完成了初始化工作

  2. 判断Hash表是否已经处于了初始化状态 (sc = sizeCtl) < 0

    前面提到在初始化过程中sizeCtl=-1,初始化过程只能由一个线程完成,不能多个线程协助,所以在发现有线程已经开始初始化过后就会放弃时间片进入等待

  3. 经过前两个判断后还能继续往下执行说明Hash表还未初始化,而且还没有线程竞争到初始化的任务,就先通过CAS操作去竞争初始化的工作(也就是尝试将sizeCtl改为-1),竞争成功则开始负责初始化工作

  4. 实际创建工作

    1. 创建Node数组长度为初试容量大小
    2. 将table指向新创建的Node数组
    3. 计算扩容阈值0.75 * n,最后保存到sizeCtl中
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 初始化的"功劳"被其他线程"抢去"了
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 一下,将 sizeCtl 设置为 -1,代表抢到了锁
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // DEFAULT_CAPACITY 默认初始容量是 16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // 初始化数组,长度为 16 或初始化时提供的长度
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 将这个数组赋值给 table,table 是 volatile 的
                    table = tab = nt;
                    // 如果 n 为 16 的话,那么这里 sc = 12
                    // 其实就是 0.75 * n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 设置 sizeCtl 为 sc,我们就当是 12 吧
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

addCount

addCount方法用于更新hash表中元素的个数,检查是否达到扩容阈值s >= (long)(sc = sizeCtl) ,如果达到阈值则开启扩容. transfer(tab, null)

transfer是实际扩容的方法,注意transfer的第二个参数,第一个开启扩容的线程传的是null,后续协助扩容的线程传递的都是第一个线程创建的容量为旧数组两倍的新数组

但是在判断扩容阈值的时候还会有一种特殊情况,就是hash表已经处于扩容的过程中,这个时候sizeCtl=-N,N表示协助扩容的线程个数,这种情况下线程会进行协助扩容 transfer(tab, nt)

//新增元素时,也就是在调用 putVal 方法后,为了通用,增加了个 check 入参,用于指定是否可能会出现扩容的情况
//check >= 0 即为可能出现扩容的情况,例如 putVal方法中的调用
private final void addCount(long x, int check){
    ... ...
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        //检查当前集合元素个数 s 是否达到扩容阈值 sizeCtl ,扩容时 sizeCtl 为负数,依旧成立,同时还得满足数组非空且数组长度不能大于允许的数组最大长度这两个条件才能继续
        //这个 while 循环除了判断是否达到阈值从而进行扩容操作之外还有一个作用就是当一条线程完成自己的迁移任务后,如果集合还在扩容,则会继续循环,继续加入扩容大军,申请后面的迁移任务
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            // sc < 0 说明集合正在扩容当中
            if (sc < 0) {
                //判断扩容是否结束或者并发扩容线程数是否已达最大值,如果是的话直接结束while循环
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
                    break;
                //扩容还未结束,并且允许扩容线程加入,此时加入扩容大军中
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            //如果集合还未处于扩容状态中,则进入扩容方法,并首先初始化 nextTab 数组,也就是新数组
            //(rs << RESIZE_STAMP_SHIFT) + 2 为首个扩容线程所设置的特定值,后面扩容时会根据线程是否为这个值来确定是否为最后一个线程
            else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

helpTransfer

在put方法中进行状态判断时发现当前hash表正在扩容(链表头节点的hash值为MOVED. else if ((fh = f.hash) == MOVED) ),则进入负责扩容函数helpTransfer,具体操作如下

  1. 判断当前是否还处于扩容状态 f instanceof ForwardingNode
  2. 判断扩容是否结束或者并发扩容线程数是否已达最大值,如果是的话直接结束while循环
  3. 协助进入扩容
//扩容状态下其他线程对集合进行插入、修改、删除、合并、compute等操作时遇到 ForwardingNode 节点会调用该帮助扩容方法 (ForwardingNode 后面介绍)
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        //判断扩容是否结束或者并发扩容线程数是否已达最大值,如果是的话直接结束while循环
        while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}
扩容

进行实际扩容操作的函数是 transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) ,而进入这个扩容函数的途径有3种

  1. addCount中发现达到扩容阈值
  2. Hash表处于扩容状态时其他线程对hash进行增删改等操作是遇到ForwardingNode 节点
  3. 将链表转化为红黑树时发现数组长度小于64

扩容函数中具体的操作如下:

  1. 计算步长stride,也就是每个线程单次需要处理Node数组中Node节点个数

    如果只有一个CPU则步长就整个数组长度

    若多个CPU则每个线程处理 (n >>> 3) / NCPU个Node节点,但是最小不能小于16个节点

  2. 创建新数组,长度为旧数组的2倍(只有第一个开启扩容的线程(nextTab == null)才会执行)

  3. 第一个for循环为当前线程分配节点长度为步长的待处理节点区间

  4. 遍历分配到的待处理节点,将表头节点上锁

  5. 判断链表时红黑树节点还是链表节点

    链表节点:

    1. 找到lastRun节点直接复用

    2. 遍历链表节点直到lastRun节点结束,将每个节点的hash值与于旧数组长度相&,将结果为1和0的拆分到两个不同的链表中,最后插入到新数组对应位置

      为什么将每个节点的hash值与于旧数组长度相&?

      举个例子就理解了:

      原来数组长度为16,扩容后数组长度为32

      原来数组下标为1处的链表节点的hash值最后5位可能是00001,也可能是10001,他们与旧数组的掩码1111进行&操作后结果都是1所以都放在了下标为1的位置

      新数组的掩码为11111,所以要求他们在新数组的位置可以使用hash值与新数组的掩码进行&操作,但是由于之前他们之前在旧数组中位置1已经确定了他们的低4位大小为1,所以只需要确定第5位进行,将第5位为1的就放到新数组下标17的位置,第5位为0的就还是放到新数组下标1的位置

    红黑树节点

    1. 找到lastRun节点直接复用
    2. 使用for循环以链表方式遍历整棵红黑树,依然将节点拆分为高位和低位两个链表
    3. 判断高位和低位两个链表的长度是否小于UNTREEIFY_THRESHOLD,如果小于就在新数组中转化为链表,如果不小于则还是以红黑树结构保存
  6. 在处理完自己分配到的节点区间后会判断自己是不是最后一个处理扩容的线程

    • 如果是则会再将整个旧数组遍历一遍,避免发生遗漏,完成后将table指向新数组,阈值sizeCtl改成新数组的075倍
    • 如果不是则将直接退出(如果扩容未结束还会通过addCount等函数的for循环再次进入扩容函数协助扩容)
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //计算每条线程处理的桶个数,每条线程处理的桶数量一样,如果CPU为单核,则使用一条线程处理所有桶
    //每条线程至少处理16个桶,如果计算出来的结果少于16,则一条线程处理16个桶
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // 初始化新数组(原数组长度的2倍)
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        //将 transferIndex 指向最右边的桶,也就是数组索引下标最大的位置
        transferIndex = n;
    }
    int nextn = nextTab.length;
    //新建一个占位对象,该占位对象的 hash 值为 -1 该占位对象存在时表示集合正在扩容状态,key、value、next 属性均为 null ,nextTable 属性指向扩容后的数组
    //该占位对象主要有两个用途:
    //   1、占位作用,用于标识数组该位置的桶已经迁移完毕,处于扩容中的状态。
    //   2、作为一个转发的作用,扩容期间如果遇到查询操作,遇到转发节点,会把该查询操作转发到新的数组上去,不会阻塞查询操作。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    //该标识用于控制是否继续处理下一个桶,为 true 则表示已经处理完当前桶,可以继续迁移下一个桶的数据
    boolean advance = true;
    //该标识用于控制扩容何时结束,该标识还有一个用途是最后一个扩容线程会负责重新检查一遍数组查看是否有遗漏的桶
    boolean finishing = false; // to ensure sweep before committing nextTab
    //这个循环用于处理一个 stride 长度的任务,i 后面会被赋值为该 stride 内最大的下标,而 bound 后面会被赋值为该 stride 内最小的下标
    //通过循环不断减小 i 的值,从右往左依次迁移桶上面的数据,直到 i 小于 bound 时结束该次长度为 stride 的迁移任务
    //结束这次的任务后会通过外层 addCount、helpTransfer、tryPresize 方法的 while 循环达到继续领取其他任务的效果
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            //每处理完一个hash桶就将 bound 进行减 1 操作
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                //transferIndex <= 0 说明数组的hash桶已被线程分配完毕,没有了待分配的hash桶,将 i 设置为 -1 ,后面的代码根据这个数值退出当前线的扩容操作
                i = -1;
                advance = false;
            }
            //只有首次进入for循环才会进入这个判断里面去,设置 bound 和 i 的值,也就是领取到的迁移任务的数组区间
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            //扩容结束后做后续工作,将 nextTable 设置为 null,表示扩容已结束,将 table 指向新数组,sizeCtl 设置为扩容阈值
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            //每当一条线程扩容结束就会更新一次 sizeCtl 的值,进行减 1 操作
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 成立,说明该线程不是扩容大军里面的最后一条线程,直接return回到上层while循环
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                //(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 说明这条线程是最后一条扩容线程
                //之所以能用这个来判断是否是最后一条线程,因为第一条扩容线程进行了如下操作:
                //    U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)
                //除了修改结束标识之外,还得设置 i = n; 以便重新检查一遍数组,防止有遗漏未成功迁移的桶
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            //遇到数组上空的位置直接放置一个占位对象,以便查询操作的转发和标识当前处于扩容状态
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            //数组上遇到hash值为MOVED,也就是 -1 的位置,说明该位置已经被其他线程迁移过了,将 advance 设置为 true ,以便继续往下一个桶检查并进行迁移操作
            advance = true; // already processed
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    //该节点为链表结构
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        //遍历整条链表,找出 lastRun 节点
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        //根据 lastRun 节点的高位标识(0 或 1),首先将 lastRun设置为 ln 或者 hn 链的末尾部分节点,后续的节点使用头插法拼接
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        //使用高位和低位两条链表进行迁移,使用头插法拼接链表
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        //setTabAt方法调用的是 Unsafe 类的 putObjectVolatile 方法
                        //使用 volatile 方式的 putObjectVolatile 方法,能够将数据直接更新回主内存,并使得其他线程工作内存的对应变量失效,达到各线程数据及时同步的效果
                        //使用 volatile 的方式将 ln 链设置到新数组下标为 i 的位置上
                        setTabAt(nextTab, i, ln);
                        //使用 volatile 的方式将 hn 链设置到新数组下标为 i + n(n为原数组长度) 的位置上
                        setTabAt(nextTab, i + n, hn);
                        //迁移完成后使用 volatile 的方式将占位对象设置到该 hash 桶上,该占位对象的用途是标识该hash桶已被处理过,以及查询请求的转发作用
                        setTabAt(tab, i, fwd);
                        //advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
                        advance = true;
                    }
                    //该节点为红黑树结构
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        //lo 为低位链表头结点,loTail 为低位链表尾结点,hi 和 hiTail 为高位链表头尾结点
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        //同样也是使用高位和低位两条链表进行迁移
                        //使用for循环以链表方式遍历整棵红黑树,使用尾插法拼接 ln 和 hn 链表
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            //这里面形成的是以 TreeNode 为节点的链表
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        //形成中间链表后会先判断是否需要转换为红黑树:
                        //1、如果符合条件则直接将 TreeNode 链表转为红黑树,再设置到新数组中去
                        //2、如果不符合条件则将 TreeNode 转换为普通的 Node 节点,再将该普通链表设置到新数组中去
                        //(hc != 0) ? new TreeBin<K,V>(lo) : t 这行代码的用意在于,如果原来的红黑树没有被拆分成两份,那么迁移后它依旧是红黑树,可以直接使用原来的 TreeBin 对象
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                        (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                        (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        //setTabAt方法调用的是 Unsafe 类的 putObjectVolatile 方法
                        //使用 volatile 方式的 putObjectVolatile 方法,能够将数据直接更新回主内存,并使得其他线程工作内存的对应变量失效,达到各线程数据及时同步的效果
                        //使用 volatile 的方式将 ln 链设置到新数组下标为 i 的位置上
                        setTabAt(nextTab, i, ln);
                        //使用 volatile 的方式将 hn 链设置到新数组下标为 i + n(n为原数组长度) 的位置上
                        setTabAt(nextTab, i + n, hn);
                        //迁移完成后使用 volatile 的方式将占位对象设置到该 hash 桶上,该占位对象的用途是标识该hash桶已被处理过,以及查询请求的转发作用
                        setTabAt(tab, i, fwd);
                        //advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
                        advance = true;
                    }
                }
            }
        }
    }
}
get

get方法因为不涉及到对hash表的修改,所以也不需要上锁

  1. 通过hash值找到Node数组对应位置的表头节点
  2. 查看当前状态是否在扩容,而且当前链表已经扩容完成,表头节点为ForwardingNode,如果是则进入新数组中找我们需要的元素
  3. 通过遍历链表寻找需要的节点,如果找到则返回value,没找到则返回null
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 判断头节点是否就是我们需要的节点
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 如果头节点的 hash 小于 0,说明 正在扩容,或者该位置是红黑树
        else if (eh < 0)
            // 参考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k)
            return (p = e.find(h, key)) != null ? p.val : null;
​
        // 遍历链表
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
优点
  1. 采用Node数组+链表+红黑树,锁的粒度更小,同时并发度可以随着扩容而增加
  2. 通过锁Node节点保证同时只有一个线程操作链表,保证线程安全
  3. 通过多个线程并发协助扩容,大幅度提高了扩容的效率

特别感谢

Java全栈知识体系-ConcurrentHashMap详解

ConcurrentHashMap1.8 - 扩容详解

JavaGuide-ConcurrentHashMap 源码分析