likes
comments
collection
share

ConcurrentHashMap源码解析

作者站长头像
站长
· 阅读数 19

本文主要内容

  • ConcurrentHashMap介绍
  • ConcurrentHashMap初始化
  • ConcurrentHashMap存储流程
  • ConcurrentHashMap取出流程
  • 总结

1、ConcurrentHashMap介绍

关于Java集合类,已经介绍过很多了,今天介绍完ConcurrentHashMap后,就暂时先告一段落。

ConcurrentHashMap,顾名思义,它也是map接口的实现者,从Concurrent一词来看,并发的,它是线程安全的。Hashtable也是线程安全的map接口实现类,这二者有什么不同吗?

Hashtable源码解析 中阐述过Hashtable线程安全的原理,它使用 synchronized 关键字实现线程安全,比如 get 方法和 put 方法:

public synchronized V get(Object key) {}

public synchronized V put(K key, V value) {}

注意到,synchronized 关键字加在非静态方法上,说明同步锁对象即是 Hashtable 对象本身,只有一个锁。

在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。

如果说HashTable 只有一个锁,那么 ConcurrentHashMap 则开创性的使用了锁分段技术。

假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术

ConcurrentHashMap源码解析

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment继承可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

2、ConcurrentHashMap初始化

先来查看下ConcurrentHashMap中的重要成员变量。

//每个Segment都是一个ReentrantLock锁,同时它内部保存着一个HashEntry数组
final Segment<K,V>[] segments;
static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;

    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }

    /**
     * Sets next field with volatile write semantics.  (See above
     * about use of putOrderedObject.)
     * 设置next,注意unsafe的使用,ConcurrentHashMap中很多这种操作
     */
    final void setNext(HashEntry<K,V> n) {
        UNSAFE.putOrderedObject(this, nextOffset, n);
    }

    // Unsafe mechanics
    static final sun.misc.Unsafe UNSAFE;
    static final long nextOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = HashEntry.class;
            //计算 nextOffset ,以使用设置next
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

HashEntry比较简单,只是内部使用了Unsafe来设置next成员变量,不清楚Unsafe使用的,可以查看本人之前的文章,说说Java的Unsafe类

接下来我们看看ConcurrentHashMap的构造方法。

//DEFAULT_INITIAL_CAPACITY和DEFAULT_CONCURRENCY_LEVEL一样,都是常量,16,而DEFAULT_LOAD_FACTOR为0.75
public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//为简单分析,我们假定这些参数全是默认值,16,0.75,16
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    //当concurrencyLevel值为16时,最终sshift 将为4,而ssize 值等于16
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    //根据上边计算结果,segmentShift 等于28,而segmentMask 为15
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    //因为c值为1,所以cap值就是它的默认值,MIN_SEGMENT_TABLE_CAPACITY,为2
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    //创建名为s0的Segment,其中包含一个长度为2的HashEntry数组
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    //创建长度为16的Segment数组,并且将数组的第一个元素置为s0
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

为分析更简单,我们设ConcurrentHashMap构造函数的各参数为默认值,最后将创建一个长度为16的Segment数组,并且将它赋值给ConcurrentHashMap的成员变量segments 。同时创建一个名为s0的Segment,内部有一个长度为2的HashEntry数组,并且将s0作为segments 第一个元素。

这里边有些参数可以记下来,等会分析的时候有用。

3、ConcurrentHashMap存储流程

直接查看它的put方法:

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    //计算key的哈希
    int hash = hash(key.hashCode());
    //根据hash值计算此键值对应该插入到segments数组中的哪个Segment当中
    //在构造方法中,计算segmentShift等于28,segmentMask等于15,
    //所以,j 就等于hash值的高4位
    int j = (hash >>> segmentShift) & segmentMask;
    //使用Unsafe获取segments中索引为 j 的元素。
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        //如果segments数组中,索引 j 上的元素为null,则先初始化此元素
        s = ensureSegment(j);
    //最终调用Segment的put方法
    return s.put(key, hash, value, false);
}

在ConcurrentHashMap初始化章节中,我们分析到,构造函数只是初始化了segments数组的第一个元素,所以其它元素可能为null,如果需要往某个元素上存储数据,必然先要将其初始化,我们查看 ensureSegment 方法

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    //计算索引为k的元素的内存偏移值u,可根据u,利用Unsafe操作此元素
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    //如果索引为k的元素为null
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        //将ss[0]赋值给proto ,
        //注意它的命名,将ss[0]当成原型,本以为只能在js中看到原型这个概念,没想到啊。。。
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        //根据proto的HashEntry数组长度,重新构造一个一模一样长度的数组
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        //重新检查索引为k上的元素是否为空,如果为空,则赋值
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            //依然是重新检查,为了应用多线程的环境,注意到它使用了getObjectVolatile方法
            //如果其它线程初始化此元素成功了,因为是volatile,所以此线程也是可见的
            //这在一定程度上保证了此元素不会被重复赋值
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                //在while循环中,尝试使用CAS这种原子操作实现数组赋值操作,这一定是线程安全的
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

ensureSegment 使用 ss[0] 作为原型对象,重新构造一个新的Segment ,并且最后使用CAS原子操作为数组赋值,确定线程安全。值得一提的是,在检测数组元素是否为null时,使用了getObjectVolatile方法,Volatile将保证线程修改数组后对其它线程的可见性,一定程度为能够确保线程安全。

我们继续查看Segment内部的put方法。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        //因为Segment继承自ReentrantLock,tryLock尝试获取锁
        //如果没拿到锁,也不会导致阻塞,则执行scanAndLockForPut方法
        //scanAndLockForPut将尝试获取到锁,并且返回应该插入的node,在获取一定次数后仍失败后,将阻塞线程
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
            //根据hash计算键值对应该存储在HashEntry数组的哪个位置上
            int index = (tab.length - 1) & hash;
            //获取相应索引位上的第1个元素,因为每个索引位置上理论上都是一个链表
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    //第一次循环时e就是first,如果e不为空并且key相等则更新value值
                    //否则将e赋值为它的next
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    //当e为null时,node是scanAndLockForPut函数返回的要播放的entry
                    //如果node不为null,将node放置在链表头,设置它的next为first
                    if (node != null)
                        node.setNext(first);
                    else
                        //node为null时也是放置在链表头,它的next也为first,通过构造方法实现
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    //如果c大于数组长度警戒值,则数组扩容,反之设置node值即可。
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            //最后,释放锁
            unlock();
        }
        return oldValue;
    }

刚刚开始看此方法时,会很纳闷,哪里线程安全了,加锁在啥地方?都没看到。其实Segment继承自ReentrantLock,它本身就是一个锁对象。在刚开始的时候,它会尝试性地去获取锁,调用 tryLock 方法,此方法也可以获取到锁,但如果没获取到也不会导致线程阻塞而已。如果没有获取到,则调用 scanAndLockForPut 方法,此方法返回一个要插入的node,并且阻塞线程。之后的流程则比较简单了,如果其它线程释放锁了,当此线程抢到锁以后,则可以继续执行,后续的逻辑较为简单了,通过死循环,找到要插入node的位置,插入即可,同时size增大,如果数组长度大于警戒值,则数组扩容。

现在我们来看看 scanAndLockForPut 方法:

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
        //获取hash对应索引位置上的第一个元素值
        HashEntry<K,V> first = entryForHash(this, hash);
        HashEntry<K,V> e = first;
        HashEntry<K,V> node = null;
        //retries 某种情况下代表着尝试获取锁的次数,初值为-1
        int retries = -1; // negative while locating node
        //通过循环,不停地获取锁,注意 tryLock 不会导致线程阻塞
        while (!tryLock()) {
            HashEntry<K,V> f; // to recheck first below
            //因为是一个循环,如果first为null或者e不为null,执行了else的情况,e总有一个时候为null
            //当e为null时,并且retries 为负数时,构造出要返回的node并且将retries 置为0
            if (retries < 0) {
                if (e == null) {
                    if (node == null) // speculatively create node
                        node = new HashEntry<K,V>(hash, key, value, null);
                    retries = 0;
                }
                else if (key.equals(e.key))
                    retries = 0;
                else
                    e = e.next;
            }
            //当retries不为负责时,它才真正代表着请求锁的次数,每请求锁一次就自增1
            //如果请求次数大于常量的话,调用lock方法,将真正阻塞线程
            else if (++retries > MAX_SCAN_RETRIES) {
                lock();
                break;
            }
            //如果first值已经被其它线程所改变时,重新获取first值并将retries 设为-1
            else if ((retries & 1) == 0 &&
                     (f = entryForHash(this, hash)) != first) {
                e = first = f; // re-traverse if entry changed
                retries = -1;
            }
        }
        return node;
    }

scanAndLockForPut 方法,就是不停地请求锁,同时构造出一个要插入HashEntry数组的node出来,如果请求锁失败达到一定次数,就调用lock方法,这种请求锁的方式会令线程阻塞,等待其它线程释放锁。

存储流程讲到这里就差不多了,还有一些方法,比如rehash,有兴趣的同学们可以自己看看,都差不多,并且已经也讲过HashMap方法的rehash方法了,虽然HashMap非线程安全,但原理类似。

4、ConcurrentHashMap取出流程

直接看ConcurrentHashMap的get方法:

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    //计算key的hash值
    int h = hash(key.hashCode());
    //根据hash值计算它是由哪个Segment所持有,再从此Segment的HashEntry数组中寻找
    //本文中有太多这样的Unsafe相关操作了,之前已经讲过Unsafe了,此处不再说明
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        //如果Segment不为null,并且内部的HashEntry数组也不为null
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
             //在循环中,查看key是否相等,如果相等则返回
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

get方法比较简单,但肯定有同学会提出疑问,get方法没有保证线程安全,没有加锁。get方法获取元素的值通过getObjectVolatile方法,getObjectVolatile方法与volatile关键字类似,volatile关键字有两个作用:

  • 保证此变量对所有线程的可见性,这里的可见性是指当一条线程修改了这个变量的值,新值对于其它线程立即可见
  • 禁止指令重排序优化,意思就是代码执行顺序和代码本身顺序一致

volatile关键字在只有单一线程能改变元素的值的情况下非常适用,如果是这种情况,它就是线程安全的,因为单一线程改变元素的值后,其它线程立即可见,读取的时候肯定也是最新的,所以 get 方法仍然是线程安全的。不过,volatile关键字,在多个线程都能改变元素值的情况下,就无法保证线程安全了,更多volatile相关的内容,可参考Java内存模型与线程一文。

回忆前一章阐述的,put方法,必须要获得锁才能更改元素内容,所以,在同一时刻,必然只有一个线程能改变元素内容。

5、总结

ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。

总之,ConcurrentHashMap真是一种很特殊的容器,它提出了一种令人眼前一亮的线程安全的方案,读完它的代码,长见识了

转载自:https://juejin.cn/post/7225111825066147895
评论
请登录