likes
comments
collection
share

ConcurrentHashMap在Java8中的优化与改进

作者站长头像
站长
· 阅读数 19

ConcurrentHashMap是一个高效的并发哈希表,是Java并发编程中常用的数据结构之一。它通过使用锁分段技术,将整个哈希表分成多个小的段(Segment),每个段都维护一个小的哈希表,以减小锁的粒度,提高并发性能。在Java 8之前,ConcurrentHashMap采用的是分段锁(Segment),而在Java 8中,它引入了一种新的并发哈希表实现方式,即使用CAS和synchronized来实现并发控制。

在本文中,我们将深入解析ConcurrentHashMap的源码,探讨它是如何实现高效的并发操作的。

数据结构

ConcurrentHashMap的内部数据结构由若干个Segment组成,每个Segment都是一个独立的哈希表,用于存储实际的键值对数据。每个Segment都包含一个HashEntry数组和一个SegmentLock对象,HashEntry数组用于存储键值对,SegmentLock用于实现Segment级别的锁。

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    private static final long serialVersionUID = 2249069246763182397L;
    transient volatile HashEntry<K,V>[] table;
    transient int count;
    transient int modCount;
    transient int threshold;
    final float loadFactor;
    ...
}

其中,HashEntry是一个静态内部类,用于表示哈希表中的一个键值对,包含一个键(key)、一个值(value)和一个next指针(用于解决哈希冲突)

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
    ...
}

并发控制

ConcurrentHashMap采用的是锁分段技术,它将整个哈希表分成多个小的段,每个段都维护一个小的哈希表,以减小锁的粒度,提高并发性能。每个Segment都包含一个独立的锁,这个锁只会锁住当前Segment,而不会锁住整个哈希表。

ConcurrentHashMap的操作是非阻塞的,它采用了CAS(Compare-And-Swap)和synchronized等机制来实现并发控制。具体来说,它使用了put、get、remove等操作,这些操作都是非阻塞的,它们并不会锁住整个哈希表,而只会锁住当前操作所在的Segment。

在put操作中,ConcurrentHashMap会先计算key的哈希值,然后找到对应的Segment,再对这个Segment加锁。如果Segment中不存在对应的key,那么就直接在Segment中添加一个新的HashEntry。如果Segment中已经存在对应的key,那么就更新这个key对应的value值。在更新value值时,ConcurrentHashMap会使用CAS来实现非阻塞的并发控制。如果CAS失败,说明有其他线程已经修改了这个key对应的value值,那么就需要重试。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = hash(key.hashCode());
    int segmentIndex = getSegmentIndex(hash);
    return segments[segmentIndex].put(key, hash, value, onlyIfAbsent);
}

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    if (count++ >= threshold) {
        // 如果Segment中的元素数量超过了阈值,就需要进行扩容
        // 扩容时会对所有的Segment进行加锁,但只有在Segment中添加元素时才会锁住当前Segment
        resize();
        segmentIndex = getSegmentIndex(hash);
    }
    return segmentFor(hash).put(key, hash, value, onlyIfAbsent);
}

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先尝试通过CAS来插入新节点
    HashEntry<K,V> node = tryLockFreeInsert(key, hash, value);
    if (node != null) {
        return null;
    }
    // 如果CAS失败,说明当前Segment中已经存在对应的key,需要通过加锁来更新value值
    return lockAndPut(key, hash, value, onlyIfAbsent);
}

HashEntry<K,V> tryLockFreeInsert(K key, int hash, V value) {
    // 先计算key在Segment中的索引值
    int index = (hash >>> segmentShift) & segmentMask;
    // 获取Segment中的HashEntry数组
    HashEntry<K,V>[] tab = table;
    // 获取当前索引位置的HashEntry节点
    HashEntry<K,V> first = tab[index];
    // 遍历链表,查找是否已经存在对应的key
    for (HashEntry<K,V> e = first; e != null; e = e.next) {
        K k = e.key;
        if (e.hash == hash && k != null && key.equals(k)) {
            // 如果已经存在对应的key,返回该节点
            return e;
        }
    }
    // 如果不存在对应的key,就使用CAS创建一个新的节点
    HashEntry<K,V> node = new HashEntry<K,V>(hash, key, value, first);
    while (!casTabAt(tab, index, first, node)) {
        // 如果CAS失败,说明有其他线程已经插入了新节点,需要重新获取当前索引位置的节点并重新遍历链表
        first = tab[index];
        for (HashEntry<K,V> e = first; e != null; e = e.next) {
            K k = e.key;
            if (e.hash == hash && k != null && key.equals(k)) {
                // 如果已经存在对应的key,返回该节点
                return e;
            }
        }
    }
    // 如果CAS成功,说明新节点已经成功插入,返回null
    return null;
}

V lockAndPut(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
        // 获取当前索引位置的节点
        HashEntry<K, V> first = getFirst(hash); 
        HashEntry<K,V> e = first; // 遍历链表,查找是否已经存在对应的key 
        while (e != null) { 
            K k = e.key; 
            if (e.hash == hash && k != null && key.equals(k)) {
                V oldValue = e.value; 
                if (!onlyIfAbsent || oldValue == null) { 
                // 如果只有onlyIfAbsent为false或者oldValue为null,就将当前节点的value值更新为新值 
                e.value = value; 
                return oldValue; 
            } else { 
            // 如果onlyIfAbsent为true且oldValue不为null,说明已经存在对应的key,返回oldValue 
            return oldValue; 
            } 
          } 
          e = e.next; 
      } 
      // 如果不存在对应的key,就创建一个新节点,并将其添加到链表的头部 
      modCount++; 
      HashEntry<K,V> newEntry = new HashEntry<K,V>(hash, key, value, first); 
      setFirst(hash, newEntry); 
      count++; 
      return null; 
   } finally { 
   // 解锁 
   unlock(); 
   } 
}

ConcurrentHashMap也提供了一些方法,比如keySet()和values()等方法,这些方法都是在获取Segment的情况下实现的。

public Set<K> keySet() {
    Set<K> ks = keySet;
    if (ks != null) {
        return ks;
    }
    ks = new KeySet();
    keySet = ks;
    return ks;
}

final class KeySet extends AbstractSet<K> {
    public Iterator<K> iterator() {
        return new KeyIterator();
    }
    public int size() {
        return ConcurrentHashMap.this.size();
    }
    public boolean isEmpty() {
        return ConcurrentHashMap.this.isEmpty();
    }
    public boolean contains(Object o) {
        return ConcurrentHashMap.this.containsKey(o);
    }
    public boolean remove(Object o) {
        return ConcurrentHashMap.this.remove(o) != null;
    }
    public void clear() {
        ConcurrentHashMap.this.clear();
    }
}

总体来说,ConcurrentHashMap的源码实现相对比较复杂,但是它提供了高效的并发控制机制,能够保证在高并发的情况下,对于同一个key的操作不会发生冲突,并且具有很高的性能表现。

转载自:https://juejin.cn/post/7222814285645889597
评论
请登录