面试官:concurrentHashMap读取数据需要加锁么?万字详解ConcurrentHashMap整理concur
介绍
平常在需要使用一些简单的本地缓存时会用到ConcurrentHashMap,但一直没深入了解过它的原理,之前面试时也有一定几率碰到,回答的总是一知半解的,最近深入的了解一下ConcurrentHashMap的原理,把它完整的梳理成一篇文章。 文章会从查询、存放、扩容、计数四个方面来深入解析ConcurrentHashMap,我先抛出几个问题,大家可以先思考一下,带着问题来阅读这样可以更好帮助大家理解concurrentHashMap,如果都了解那么大神可以出门右拐了;如果不了解可以详细看一下本篇文章,看完本章内容详细你能完整的回答全部问题,如果只想知道答案的,可以直接下滑到底部
- concurrentHashMap的查询过程是什么样的?
- concurrentHashMap的散列算法和hashMap的有什么区别?
- concurrentHashMap查询时会加锁么?
- concurrentHashMap的K/V允许为空么?为什么?
- concurrentHashMap什么时候初始化?
- concurrentHashMap哪些时机会进行扩容?
- concurrentHashMap如何实现并发扩容的?
- concurrentHashMap的
size
方法是准确的么? - concurrentHashMap中计数的桶是如何扩容的?
- concurrentHashMap同一个桶中,链表和红黑树可以共存么?
使用
平时在使用concurrentHashMap时,最常用的就是获取值,存放值以及查询存放了多少个值。代码如下:
// 新建ConcurrentHashMap
ConcurrentHashMap<String,String> map = new ConcurrentHashMap();
// 获取值
map.get("key");
// 存入值
map.put("key","value");
map.putIfAbsent("key","value");
// 获取map中的数量
map.size();
源码
属性
这些是concurrentHashMap内部的属性,为了更好的理解源码,我先贴出来,大家可以先熟悉一下。
// 最大长度
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认容量
private static final int DEFAULT_CAPACITY = 16;
// 扩容操作时对低位进行操作时应用
private static int RESIZE_STAMP_BITS = 16;
// 扩容时,每个线程每次迁移的最小步长
private static final int MIN_TRANSFER_STRIDE = 16;
// 老数组的下标
private transient volatile int transferIndex;
// 扩容时最大值
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 扩容操作时对高位进行操作时应用
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 默认转换容量
static final int TREEIFY_THRESHOLD = 8;
// 最小转树容量
static final int MIN_TREEIFY_CAPACITY = 64;
// 当前位置已经被扩容时迁移走了
static final int MOVED = -1; // hash for forwarding nodes
// 当前位置是一个树
static final int TREEBIN = -2; // hash for roots of trees
// 当前位置被占用,还未赋值
static final int RESERVED = -3; // hash for transient reservations
// int的最大值,Integer.MAX_VALUE
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// concurrentHashMap维护的数组
transient volatile Node<K,V>[] table;
// 扩容时的数组
private transient volatile Node<K,V>[] nextTable;
// 计数属性
private transient volatile long baseCount;
// cpu内核数
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 标识数组初始化和扩容的标识信息
// 等于 -1 代表正在初始化
// 小于 -1 代表正在扩容
// 等于 0 代表没有初始化
// 大于 0 如果数组没有初始化,则代表初始化的长度,如果数组已经初始化了,代表下次扩容的阈值
private transient volatile int sizeCtl;
查询值
get 方法
对于查询流程:
- 调用spread方法对key进行散列运算,获取key的散列值;
- 先判断是否为空,并且该key对应下标的位置是否为空,如果都为空,则该值不存在;
- 再判断要查询的key是否在数组上,如果在则直接返回数组上的值;
- 如果不在则判断对应的节点的hash是否小于0;下面会有很多地方用到该数值,请大家务必记住,后面也会反复提示,小于零的含义分别是: // 当前位置已经被扩容时迁移走了 static final int MOVED = -1; // 当前位置是一棵树 static final int TREEBIN = -2; // 当前位置被占用,还未赋值 static final int RESERVED = -3;
- 因为不同的情况封装为不同的node类型,下面会详解各种类型的find方法;
- 如果上述条件都不满足,则该key的值在链表中,从链表中查询数据。
具体代码以及注释如下:
public V get(Object key) {
// tab:数组
// e : 要查询的节点
// n : 数组长度
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 获取hash值
int h = spread(key.hashCode());
// 数组不为null,数组上有数据,拿到数组指定位置上的数据
if ((tab = table) != null &&
(n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 该key的hash 与 数组该位置上数据的hash相当
if ((eh = e.hash) == h) {
// 判断两个key的值相等,直接返回数组中该数据的val
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果eh小于0,这里有特殊情况
// MOVED、TREEBIN、RESERVED 上述属性有注释
else if (eh < 0)
// 三种情况的find方法
return (p = e.find(h, key)) != null ? p.val : null;
// 是链表,遍历链表,找到符合的数据并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
// 如果都没有则不存在。
return null;
}
看完这里就简单的回答第一个问题了,但此时回答还不够详细,例如散列算法、是否加锁等内容还没解释。
spread 散列方法
散列算法与hashMap类似,但多了一步和Integer.MAX_VALUE进行与运算,保证得到一个正整数的结果。因为负数有特殊的含义:
- static final int MOVED = -1; // 当前位置已经被扩容时迁移走了
- static final int TREEBIN = -2; // 当前位置是一棵树
- static final int RESERVED = -3; // 当前位置被占用,还未赋值
这三个值在源码中会非常高频的出现,文中会反复强调。
static final int spread(int h) {
// (h ^ (h >>> 16)) 高位右移动16位后进行异或运算,为了让高位和低位都能参与运算。
// 这里和hashMap是中的散列值计算是完全相同的。而因为负数有其他含义,这里需要保证为正数
// 前面的计算结果 与 HASH_BITS 是为了保证返回的结果是正数。
return (h ^ (h >>> 16)) & HASH_BITS;
}
看完这里可以回答第二个问题,concurrentHashMap的散列算法比HashMap多了一步需要保证为正数,因为负数有其他的含义。
迁移(ForwardingNode) find 方法
当该节点的hash为MOVED=-1
说明该节点已经被迁移走了,要查询数据时需要去新数组上查询数据。
- 新数组、新数组中该下标位置的节点都为空,说明不存在该key,直接返回null
- 死循环查找,类似
get
方法,如果直接查询到了就返回结果 - 如果找到节点的hash还是小于0的,说明新数组中也出现了这三种情况(迁移走、树、还未赋值)
- 既不在数组上,又不是负数,那么只能是链表了,从链表中查询。
下面是具体代码,我在代码上每一行都写上注释了。
Node<K,V> find(int h, Object k) {
// tab : 新数组
outer: for (Node<K,V>[] tab = nextTable;;) {
// n:新数组长度
// e:新数组中的节点
Node<K,V> e; int n;
// 判断数组和k都不为空
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
// 死循环
for (;;) {
// eh: 节点hash
// ek: 节点的key
int eh; K ek;
// 如果能找到直接返回
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
// 当hash小于0,说明还有特殊情况,下面和get的方法类似
if (eh < 0) {
// 继续套娃
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
// 说明是链表,e.next 赋值给 e
if ((e = e.next) == null)
return null;
}
}
}
占用的(ReservationNode) find 方法
ReservationNode是被占用但还未赋值的实现,因为没被赋值呢,直接返回空即可。
static final class ReservationNode<K,V> extends Node<K,V> {
ReservationNode() {
super(RESERVED, null, null, null);
}
// 直接返回null
Node<K,V> find(int h, Object k) {
return null;
}
}
(TreeBin) find方法
这里是TreeBin的实现,大家都知道当数组大于64并且链表长度超过8会被转换成红黑树。在源码中不仅仅会被转成红黑树还维护着一个双向链表,后续会详细讲解。查询过程:
- 拿到双向链表的头节点;
- 判断当前桶的状态,是否有写锁或等待锁;
- 如果有则判断该节点的key是否就是要查询的key,是直接返回,不是的话往下遍历一个节点;
- 使用cas尝试设置读锁,设置成功则从红黑树中查询当前key
- 判断自己为最后一个释放读锁的线程并且有其他线程正在请求写锁,唤醒请求写锁的线程。
//双向链表
TreeNode<K,V> root;
// 红黑树
volatile TreeNode<K,V> first;
volatile Thread waiter;
// 锁状态
volatile int lockState;
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
final Node<K,V> find(int h, Object k) {
if (k != null) {
// e : 双向链表
for (Node<K,V> e = first; e != null; ) {
// s : 锁的状态
// ek : 节点的key
int s; K ek;
// 判断当前节点是否有写锁或等待写锁
if (((s = lockState) & (WAITER|WRITER)) != 0) {
// 如果有的话,直接从双向链表中查询数据
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
// 没有写锁或者等待写锁,则直接用case的方式设置一个读锁
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
// 在红黑树中检索
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
// 判断自己为最后一个释放读锁的线程,并且有写线程正在请求锁资源
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
// 唤醒写线程
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
看完这里就可以回答第三个问题,get的时候也可能会加一把读锁。
存储值
对于存放值比较常用的三个方法put、putIfAbsent、putAll
,通过下面的代码可以看到他们三个都会调用putVal
put方法
// 直接把key的值变更为value
public V put(K key, V value) {
// 这里直接调用的是 putValue,后面跟了一个boolean类型的参数,
// 它的含义是:true:使用原有值,false:覆盖原有值
return putVal(key, value, false);
}
// 如果key以前有对应的值,则不赋值;如果没有,则赋值。
public V putIfAbsent(K key, V value) {
return putVal(key, value, true);
}
// 塞入一个map
public void putAll(Map<? extends K, ? extends V> m) {
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}
putVal
下面是真实的添加值,由于该方法比较长,大家可以分块看,我在代码里对每一块进行了分割,它主要做了以下几步动作:
- 通过
spread
方法拿到散列值,该方法上面已经解释过了; - 懒加载,调用initTable方法初始化数组;
- 因为上层是一个for循环,初始化成功后再次循环就进入到第二块,采用cas方法设置node值,如果成功则跳出循环,失败的继续往下走;
- 判断当前是否为MOVE状态,这里前面有反复提到,此时的状态为该桶已经被迁移走了,调用helpTransfer帮助扩容。扩容后面会详细解释;
- 接下来还会根据
onlyIfAbsent
判断当前值是否存在,因为onlyIfAbsent为true是不对旧值进行覆盖的,直接返回即可。这行代码是新版本优化的代码; - 上述经历了,cas尝试存放节点、判断是否被迁移以及是否已经存在该key的节点都失败,此时说明该桶有hash冲突,采用
synchronized
进行加锁操作;此时又有一个DCL检测;判断当前桶的状态是否大于0,大于0说明是正常的节点,遍历当前链表把节点放到链表尾部; - 前面判断了hash值大于0,当hash值等于
TREEBIN(-3)
说明为红黑树,根据红黑树的逻辑进行添加; - 数据添加完成,判断链表是否大于
TREEIFY_THRESHOLD(8)
,调用treeifyBin尝试转为红黑树; - 如果是覆盖原有key的值,则直接返回
- 进行addCount计数
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 如果key value为空则直接抛出空指针。
if (key == null || value == null) throw new NullPointerException();
// 获取key的hash,散列算法。这里看下 ### spread 散列方法
int hash = spread(key.hashCode());
// 记录链表长度
int binCount = 0;
--------------------------------------- 第一块 --------------------------------
// 获取成员变量数组
for (Node<K,V>[] tab = table;;) {
// f:i位置的数据
// n:数组长度
// i:索引位置
// fh: key的hash
Node<K,V> f; int n, i, fh; K fk; V fv;
// tab==null 代表数据还未初始化入。(第一次put的第一次循环会走该条件进行初始化)
if (tab == null || (n = tab.length) == 0)
// 初始化数组。 这里看下 <a herf='#zz'> ### initTable 初始化数组 </a>
tab = initTable();
--------------------------------------- 第二块 --------------------------------
// i : 数组长度 & hash 求当前key的下标位置。
// f : tabAt(数组,索引位置),获取数组对应位置的数据。
// 当前位置数据为空
// 初始化结束后,在正常情况下第二次循环就会走该分支逻辑
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 把key value封装成Node对象,并以CAS方式放到该数组的i位置中
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
// 成功就跳出循环
break;
}
--------------------------------------- 第三块 --------------------------------
// 说明当前位置数据已经被迁移到新数组
else if ((fh = f.hash) == MOVED)
// 帮助扩容。
tab = helpTransfer(tab, f);
// 在jdk8中并没有这行判断,该行代码属于后面版本的优化,它的作用是如果不需要覆盖原有值
// 并且该值在concurrentHashMap中,我直接返回即可,可以省去加锁put等操作。
else if (onlyIfAbsent
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
--------------------------------------- 第四块 --------------------------------
// 该分支代表着,该下标位置有数据,需要进行加锁添加数据
else {
// 声明数据,返回结果
V oldVal = null;
// 锁住当前桶位置的数据,优化了1.8之前的锁粒度。
synchronized (f) {
//DCL 再次判断数据无变化
if (tabAt(tab, i) == f) {
// 当fh大于等于0,说明是链表添加。前面属性有解释TREEBIN=-2为红黑树
if (fh >= 0) {
// 记录链表长度
binCount = 1;
// e:指向数组位置数据
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 拿到当前key的hash,和数组位置数据的hash进行比较
// 如果两个key相等 返回ture
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 赋值老数据
oldVal = e.val;
// 判断是否需要覆盖
if (!onlyIfAbsent)
// 可以覆盖,直接赋新的值。
e.val = value;
break;
}
// 两个key不等,说明是新数据,暂存当前下标数据
Node<K,V> pred = e;
// e 指向下一个节点,如果e == null,说明下面没节点了
if ((e = e.next) == null) {
// 将当前key、value封装为Node,挂在最后一个节点后面
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
// 红黑树添加
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 这里是红黑树添加逻辑。
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
// 在新版本的jdk中还 f 增加了预留类型,这里不做介绍。
}
}
--------------------------------------- 第五块 --------------------------------
// 如果链表计数不为0
if (binCount != 0) {
// 链表计数大于等于8
if (binCount >= TREEIFY_THRESHOLD)
// 转换为红黑树
treeifyBin(tab, i);
if (oldVal != null)
// 返回该key曾经对应的值。
return oldVal;
break;
}
}
}
--------------------------------------- 第五块 --------------------------------
// 进行计数
addCount(1L, binCount);
return null;
}
initTable 初始化数组
这里需要用到属性sizeCtrl,它的数值有以下含义: // 标识数组初始化和扩容的标识信息 // 等于 -1 代表正在初始化 // 小于 -1 代表正在扩容低16位代表当前数组正在扩容的线程个数(如果1个线程扩容,值为-2,如果2个线程扩容,值为-3) // 等于 0 代表没有初始化 // 大于 0 如果数组没有初始化,则代表初始化的长度,如果数组已经初始化了,代表下次扩容的阈值 private transient volatile int sizeCtl;
该方法步骤如下:
- 判断是否当前table为空,
- 判断它的状态,如果小于0说明有人在扩容,让出cpu,让其他线程进行扩容;
- 使用cas尝试更改为-1代表正在初始化;
- 更改成功,采用DCL(双重检查)检查一次是否已经被初始化了,这里的代码要注意,整个JUC包下的源码非常多的地方都采用DCL。开始初始化,这里记住两个数:初始容量为16,扩容阈值(3/4)
- 更改失败,说明已经有其他线程初始化了,直接返回。
代码注释如下:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 判断数组未初始化
while ((tab = table) == null || tab.length == 0) {
// sizeCtl在属性里有介绍,当它等于-1时代表正在初始化,小于-1时正在扩容,并发情况下,被其他线程初始化完了
if ((sc = sizeCtl) < 0)
// 让出cpu给,让其他线程初始化
Thread.yield(); // lost initialization race; just spin‘
// 使用cas把sizeCtl更改为-1,代表当前线程正在初始化concurrentHashMap
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
// DCL 双重检查
if ((tab = table) == null || tab.length == 0) {
// 判断sizeCtl是否大于0,如果是则取sc,否则取默认值16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 创建数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 成员变量和局部变量赋值
table = tab = nt;
// 计算下次扩容的阈值,例如默认初始化容量16,下次扩容阈值就是12
sc = n - (n >>> 2);
}
} finally {
// 把扩容阈值赋值给sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}
treeifyBin 转换树
它是转为树的前置方法:
- 首先判断数组的长度是否小于MIN_TREEIFY_CAPACITY(64),如果是的话,优先扩容,因为数据存放在数组中查询起来性能会更好;
- 该位置不能空,并且它的hash>=0,说明是正常节点,小于的话说明已经被转完树、扩容了;
- 拿到该节点的锁,并且DCL检查,首先组装一个双向链表,再调用
setTabAt
转换红黑树。
判断是否需要转红黑树还是扩容;tab为数组,index为索引下标。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n;
// 数组不为空
if (tab != null) {
// 如果数组长度小于64时,不转红黑树,优先扩容
// 更希望存放在数组中,这样性能更好一些。
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 扩容
tryPresize(n << 1);
// 如果是负数说明已经转为红黑树或正在扩容
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 以下代码是将单向链表转换为TreeNode(双向链接),再通过TreeBin转换为红黑树
// TreeBin中保留着双向链表和红黑树
synchronized (b) {
// 当前下标数据赋值给b,并且它的hash大于等于零(正常状态),
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
// 遍历节点 b 中的所有节点
for (Node<K,V> e = b; e != null; e = e.next) {
// 组装为TreeNode(双向链接)
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 转换为红黑树
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
扩容
tryPresize尝试扩容
扩容前操作,它有两个入口,putAll
、长度大于8
都会调用该方法;由于该方法也过长,我对代码进行了分割,大家一块一块看。
- MAXIMUM_CAPACITY 数组长度的最大值,如果比它的一半还大,则需要扩容的数组长度为MAXIMUM_CAPACITY,否则就设置为size最接近的2的N次幂的那个值。
- 判断是否没初始化,对数组进行初始化,因为putAll会直接调用这个方法,所以有可能没初始化。
- 如果要扩容的数值比sc小或者比最大值还大 什么都不做。
- 开始扩容,并设置当前扩容线程数量,扩容时sc的低位代表扩容线程数,第一个线程扩容时扩容线程数量+2,后续帮助扩容的线程,扩容线程数都是+1。这里贴的是jdk8的代码,中间有一段代码是bug,后面新版本已经删掉。
// 插入数据的长度
private final void tryPresize(int size) {
// c : 扩容后的数组长度
// 如果size大于数组长度/2 , 直接设置为最大值
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
// 长度设置为2的n次幂
tableSizeFor(size + (size >>> 1) + 1);
// sc : sizeCtl ,-1:初始化, 小于-1:扩容,0:未初始化,大于0:区分是否初始化,代表阈值或下一次扩容数组长度
int sc;
while ((sc = sizeCtl) >= 0) {
// tab:数组,n:数组长度
Node<K,V>[] tab = table; int n;
// DCL 数组未初始化
if (tab == null || (n = tab.length) == 0) {
// 因为未初始化,所以代表初始化数组。由于还未初始化,所以选size和构造方法值中最大的一个。
n = (sc > c) ? sc : c;
// 使用cas设置为-1,代表要进行初始化。
if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
// DCL 再次检查
if (table == tab) {
@SuppressWarnings("unchecked")
// 初始化一个数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 赋值给table
table = nt;
// 阈值设置为3/4
sc = n - (n >>> 2);
}
} finally {
// 赋阈值
sizeCtl = sc;
}
}
}
// 如果要扩容的数值比sc小或者比最大值还大 什么都不做。
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 开始进行扩容
else if (tab == table) {
// 计算扩容标识(基于旧的数组长度计算的扩容标识,因为允许并发扩容)
// 并发扩容时,各个线程都需要从一个长度扩容成另一个长度。
int rs = resizeStamp(n);
// 这里是一个bug,前面判断的是sc>=0,但这里是小于0,是永远进不来的。
// 我看了一下jdk17的源码,这个if已经被删掉。
// 正在扩容 / 初始化(在这个方法中就是正在扩容)
if (sc < 0) {
//
Node<K,V>[] nt;
// 如果扩容标识位不同,说明不是同一次扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs ||
// 判断是否达到最后的检查阶段,(rs << RESIZE_STAMP_SHIFT) + 1
sc == rs + 1 ||
// 这个是判断到达最大值,这里应该是rs再左移16 + MAX_RESIZERS
sc == rs + MAX_RESIZERS ||
// 如果新数组是null,说明扩容完了
(nt = nextTable) == null ||
// 扩容是从后往前扩容,到0说明已经扩容完了
transferIndex <= 0)
break;
// 尝试扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 还没有执行扩容,cas将sizeCtl更改为sc,扩容标识戳左移16为,让它变为负数
// 为什么是低位+2,代表1个线程扩容。 低位为5,就代表4个线程正在并发扩容
// 扩容分为2部:创建新数组,迁移数据。
// 当最后一个线程迁移完毕数据后,对低位-1.最终结果低位还是1,需要对整个老数组再次检查,数据是否迁移干净
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 开始扩容
transfer(tab, null);
}
}
}
// 计算n在二进制表示时,前面有多少个0
static final int resizeStamp(int n) {
// Integer.numberOfLeadingZeros(n) 扩容标识
// 1 << (RESIZE_STAMP_BITS - 1 这一步是为了保证后面运算时,可以得到一个负数。
// 这样sc < -1就代表了正在扩容
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
transfer 扩容
这里就是真正的扩容方法了,这个方法超级长,还是会把该方法进行分块,帮助大家更好分析代码。
- 计算线程当前要迁移桶的数量,这里会根据cpu的数量进行计算,充分利用cpu资源;
- 如果新数组还没初始化,则进行初始化;
- 循环迁移,通过cas尝试领取任务;
- 判断要迁移的节点是否为空或者已经被迁移走了,如果是则下一次循环;
- 锁住当前桶,DCL检测,开始迁移。
- 扩容完成finishing = advance = true;并从头检查一遍。注意:这里的判断 线程数-2==0 时才会走到这一步,说明最后一个线程来检查。
该方法代码特别长,注释标记的比较多,如果有不清楚的地方可以留言讨论。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// 创建新数组流程
// n:老数组长度, stride:每个线程每次扩容的步长
int n = tab.length, stride;
// 如果每个线程迁移的长度基于CPU计算,大于16,就采用计算的值,如果小于16,就用16
// 每个线程每次最小迁移16长度数据
// 这个操作就是为了充分发挥CPU性能,因为迁移数据是CPU密集型操作,尽量让并发扩容线程数量不要太大,从而造成CPU的性能都消耗在了切换上,造成扩容效率降低
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
--------------------------------------- 第一块 --------------------------------
// 如果新数组还未初始化
if (nextTab == null) { // initiating
try {
// 初始化数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// oom 或者 数组越界
sizeCtl = Integer.MAX_VALUE;
return;
}
// 成员变量赋值
nextTable = nextTab;
// 设置为老数组的长度
transferIndex = n;
}
--------------------------------------- 第二块 --------------------------------
// nextTale,nextTab:新数组
// nextn:新数组长度
// transferIndex:线程领取任务时的核心属性
int nextn = nextTab.length;
// 声明fwd节点,在老数组迁移数据完成后,将fwd赋值上去
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 领任务的核心标识
boolean advance = true;
// 扩容结束标识
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 领取任务的while循环
while (advance) {
int nextIndex, nextBound;
// 完成了,或者
if (--i >= bound || finishing)
advance = false;
// 老数组下标<=0 说明没有任务可以领取了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// * 这里是通过cas方式给当前线程设置迁移标识,并且领取相应数据。
// 每次领取是从后往前领取stride个下标位置的数据。
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 计算下次领任务的下标
bound = nextBound;
i = nextIndex - 1;
// 跳出循环
advance = false;
}
}
--------------------------------------- 第三块 --------------------------------
// i < 0 迁移完最后一段 || 最后检查阶段 || 理论上不会出现
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 判断结束
if (finishing) {
// 检查阶段
nextTable = null;
// 将迁移完数据的新数组,指向指向的老数组
table = nextTab;
// 将sizeCtl复制为下次扩容的阈值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 当前线程没有任务领取,退出扩容操作,sc-1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断是否为最后一个扩容的线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 说明是最后一个扩容。结束和领取都设置为true,再次循环检查一遍。
finishing = advance = true;
i = n; // recheck before commit
}
}
--------------------------------------- 第四块 --------------------------------
// 如果在下标i的值为空,则直接设置为一个fwd,代表当前位置迁移完成
//在下次循环时该节点 == MOVED , advance = true;
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 如果发现还有MOVED
else if ((fh = f.hash) == MOVED)
// 可以再次领取
advance = true; // already processed
else {
--------------------------------------- 第五块 --------------------------------
// 锁住当前位置
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 说明当前节点正常
if (fh >= 0) {
// 这里比较难理解,n为老数组的长度,假设n是32,也就是100000。新数组长度是64,
// 要与64-1进行异或运算 111111
// 此时只有当hash值在第6为是0的时候 也就是h&n = 0,那这个数字肯定在低位。
// 后面的红黑树迁移也类似
int runBit = fh & n;
Node<K,V> lastRun = f;
// 提前循环一次,将节点赋值到对应的高低位,如果链表后面的高低位没变化,则直接复制
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 再次循环时,只循环到lastRun位置
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 设置低位节点
setTabAt(nextTab, i, ln);
// 设置高位节点
setTabAt(nextTab, i + n, hn);
// 原数组节点设置fwd
setTabAt(tab, i, fwd);
// 再次领取
advance = true;
}
// 红黑树迁移
else if (f instanceof TreeBin) {
// 进行类型转换
TreeBin<K,V> t = (TreeBin<K,V>)f;
// 设置低位头、尾节点
TreeNode<K,V> lo = null, loTail = null;
// 设置高位头、尾节点
TreeNode<K,V> hi = null, hiTail = null;
// 初始化低位节点数量和高位节点数量为 0
int lc = 0, hc = 0;
// 循环遍历
for (Node<K,V> e = t.first; e != null; e = e.next) {
// 获取节点的hash
int h = e.hash;
// 创建新的TreeNode
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 根据与运算判断是否为低位
// 这里比较难理解,n为老数组的长度,假设n是32,也就是100000。新数组长度是64,
// 要与64-1进行异或运算 111111
// 此时只有当hash值在第6为是0的时候 也就是h&n = 0,那这个数字肯定在低位。
if ((h & n) == 0) {
// 如果低位节点目前是null,则把该节点设置为低位节点
if ((p.prev = loTail) == null)
lo = p;
else
// 否则设置为低位的下一个节点
loTail.next = p;
// 更新低位节点
loTail = p;
// 低位节点数量➕1
++lc;
}
// 是高位
else {
// 同理低位逻辑
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 根据低位数量决定是链表还是红黑树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 设置低位节点
setTabAt(nextTab, i, ln);
// 设置高位节点
setTabAt(nextTab, i + n, hn);
// 把原数组设置为fwd
setTabAt(tab, i, fwd);
// 再次领取
advance = true;
}
}
}
}
}
}
helpTransfer 帮助扩容
该方法在介绍的源码中,只有在put时,发现该桶已经被移走了才会触发,在扩容时sc低位代表正在扩容的数量,协助扩容数量会+1,第一次扩容的线程数量会+2。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 老数组不为null,当前节点是fwd,新数组不为null
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 获取自己的扩容标识戳
int rs = resizeStamp(tab.length);
// 判断之前赋值的内容是否有变化,并且sizeCtl是否小于0
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//这里贴的是jdk8的代码,有bug,新版本jdk对这里有修复。
// 这里含义就是如果不是同一批扩容 、已经扩容完成、扩容的数量超过最大值都直接跳过
if ((sc >>> RESIZE_STAMP_SHIFT) != rs ||
sc == rs + 1 ||
sc == rs + MAX_RESIZERS ||
transferIndex <= 0)
break;
// CAS,将sizeCtl + 1,代表来协助扩容了
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
计数
size 方法
面试时特别爱问concurrenHashMap是如何统计数值的,对于size方法,它是直接调用的sumCount
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
sumCount 获取总数
sumCount方法的实现非常简单,它就是取baseCount
与数组counterCells中存的值的和。
final long sumCount() {
// 获取counterCells数组
CounterCell[] as = counterCells; CounterCell a;
// 把baseCount赋值给sum
long sum = baseCount;
if (as != null) {
// 如果counterCells数组不为空,把它的值都加给sum
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
addCount 计数
对于concurrentHashMap取数来讲是很容易看到的,我们接下来看一下它是如何计数的。 这个方法有两个作用:
- 计数器,如果添加元素成功,对计数器 + 1
- 检验当前ConcurrentHashMap是否需要扩容
这里先说计数器:
- 判断counterCells是否不为空 || 通过cas直接给baseCount计数失败;
- 如果进入到该if分支,则会调用fullAddCount对counterCells初始化,后续计数随机拿一个counterCell通过cas进行计数;
是否需要扩容: 这里主要是对check进行判断,当check小于0时是移出节点,不需要进行扩容判断,只有大于等于0时才需要扩容。
代码注释如下:
private final void addCount(long x, int check) {
// 定义计数器单元格数组、基础计数值的旧值和新值
CounterCell[] as; long b, s;
// 如果计数器单元格数组不为空,或者通过 CAS 操作更新基础计数值失败。
// 没有冲突时size值存放在baseCount,有冲突才会初始化counterCells,并放在counterCells中
if ((as = counterCells)!= null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 定义计数器单元格、单元格的值、掩码
CounterCell a; long v; int m;
// 标记是否无竞争
boolean uncontended = true;
// 如果计数器单元格数组为空,或者数组长度小于 0,或者根据随机探针获取的单元格为空,或者通过 CAS 操作更新单元格值失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// CounterCell[]已经初始化了,并且指定索引位置上有CounterCell
// 直接CAS修改指定的CounterCell上的value即可。
// CAS成功,直接告辞!
// CAS失败,代表有冲突,uncontended = false,执行fullAddCount方法
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 执行完整的计数增加操作,
// 1、这里会对CounterCell 进行初始化
// 2、当增加size数量有失败、有其他线程正在添加、还未超过最大cpu都会对CounterCell进行扩容,每次扩容为上次的2倍。
fullAddCount(x, uncontended);
return;
}
// 如果检查条件小于等于 1,直接返回
if (check <= 1)
return;
// 计算总和计数
s = sumCount();
}
// 如果检查条件大于等于 0
if (check >= 0) {
// 定义表格、新表格、表格长度、扩容控制值等
Node<K,V>[] tab, nt; int n, sc;
// 当总和计数大于等于扩容控制值,且表格不为空,表格长度小于最大容量时
while (s >= (long)(sc = sizeCtl) && (tab = table)!= null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 计算扩容标记
int rs = resizeStamp(n);
// 如果扩容控制值小于 0
if (sc < 0) {
// 各种条件判断是否中断扩容
if ((sc >>> RESIZE_STAMP_SHIFT)!= rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 通过 CAS 操作增加扩容工作线程数量
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 如果扩容控制值大于等于 0
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
// 重新计算总和计数
s = sumCount();
}
}
}
fullAddCount 方法
这个方法是对CounterCell整体的初始化;
- 第一块代码代表着对数组中对象初始化;
- 第二块代码代表着对数组进行进行扩容每次为上一次的2倍;这里的扩容条件是cas失败,没线程扩容,数组长度小于cpu核数;
- 第三块代码代表着对数组进行初始化,第一次初始化的值为2;
private final void fullAddCount(long x, boolean wasUncontended) {
// 获取随机探针值
int h;
// 如果随机探针值为 0
if ((h = ThreadLocalRandom.getProbe()) == 0) {
// 初始化随机数生成器
ThreadLocalRandom.localInit();
// 重新获取随机探针值
h = ThreadLocalRandom.getProbe();
// 标记为无竞争
wasUncontended = true;
}
// 标记是否冲突(上次槽位非空)
boolean collide = false;
// 无限循环,直到满足条件退出
for (;;) {
// 定义计数器单元格数组、计数器单元格、数组长度、单元格值等
CounterCell[] cs; CounterCell c; int n; long v;
// 如果计数器单元格数组不为空且长度大于 0
if ((cs = counterCells)!= null && (n = cs.length) > 0) {
// 如果根据随机探针计算的槽位为空
if ((c = cs[(n - 1) & h]) == null) {
// 如果没有其他线程正在操作单元格
if (cellsBusy == 0) {
// 尝试创建新的计数器单元格
CounterCell r = new CounterCell(x);
// 如果没有其他线程正在操作单元格,通过cas设置为忙绿
if (cellsBusy == 0 &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
// DCL检查
CounterCell[] rs; int m, j;
if ((rs = counterCells)!= null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 重置忙碌标志
cellsBusy = 0;
}
// 如果创建成功,退出循环
if (created)
break;
// 否则继续
continue;
}
// 标记无冲突
collide = false;
}
--------------------------------------- 第一块 --------------------------------
// 如果上次操作有竞争
else if (!wasUncontended)
wasUncontended = true;
// 如果通过 CAS 操作成功更新单元格值
else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))
break;
// 如果计数器单元格数组发生变化或达到最大 CPU 数量
else if (counterCells!= cs || n >= NCPU)
collide = false;
// 如果之前没有冲突,现在标记为冲突
else if (!collide)
collide = true;
// 如果没有其他线程正在操作单元格并且成功设置忙碌标志
else if (cellsBusy == 0 &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
try {
// 如果计数器单元格数组未变化,进行数组扩容
if (counterCells == cs)
counterCells = Arrays.copyOf(cs, n << 1);
} finally {
// 重置忙碌标志
cellsBusy = 0;
}
collide = false;
continue;
}
// 推进随机探针
h = ThreadLocalRandom.advanceProbe(h);
}
}
--------------------------------------- 第二块 --------------------------------
// 如果计数器单元格数组为空,且没有其他线程正在操作,并且成功设置忙碌标志
else if (cellsBusy == 0 && counterCells == cs &&
U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
// 初始化计数器单元格数组
if (counterCells == cs) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
// 重置忙碌标志
cellsBusy = 0;
}
// 如果初始化成功,退出循环
if (init)
break;
}
--------------------------------------- 第三块 --------------------------------
// 前面都没有成功,再通过 CAS 操作成功更新基础计数值一次如果成功了,直接退出
else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}
问题
现在回头看文章开头的十个问题。
1、concurrentHashMap的查询过程是什么样的?
1、获取key散列值计算下标位置,判断该位置的key是否与要查询的key相等,如果相等直接返回; 2、如果该位置的节点hash值小于0,根据不同含义封装的Node进行策略查找。小于0有特殊含义;被迁移走、未赋值、一颗树; 3、上述两种情况都不符合,说明是在链表中,逐个查询。
2、 concurrentHashMap的散列算法和hashMap的有什么区别?
concurrentHashMap的散列算法需要保证计算计算的结果为正数,而hashMap不需要。
3、concurrentHashMap查询过程会加锁么?
在查询的过程中有可能加锁,当要查询的位置是一棵树时,如果该树正在被其他线程的写锁、等待写锁占用,则直接从双向链表中查询一个节点判断是否符合,如果不符合则尝试获取读锁,读锁获取成功则拿到值进行返回。
4、concurrentHashMap的K/V允许为空么?为什么?
不允许为空,如果为空会抛出
NullPointerException
异常。同时如果存入一个null的话会产生歧义,concurrentHashMap是一个线程安全的工具类,出现歧义的话没法使用了。
5、concurrentHashMap什么时候初始化数组?
在真正插入数据时进行初始化。如:
putVal
、putAll
方法。
6、 concurrentHashMap哪些时机会进行扩容?
1、达到阈值时进行扩容,阈值为当前数组长度的3/4; 2、链表转红黑树时会检查数组长度是否小于64,如果小于也会进行扩容。
7、concurrentHashMap如何实现并发扩容的?
根据扩容时机会有一个线程发起扩容,扩容时sizeCtl的低16位会记录扩容的线程数量,第一个线程会记录成2个线程,后续的协助线程都记录成1个线程。开始扩容时会根据cpu以及数组长度计算出每个线程每次需要搬迁节点的
步长
,每个线程都会从尾部开始分配要迁移的桶,每次分配前面计算的步长
个桶,在搬迁的过程锁住每个桶,桶下的数据只会迁移到新数组中的当前index,或者index+n(n:代表原数组长度)的位置;所有线程都完成迁移后,最后一个线程会从尾部到头检查一遍。
8、concurrentHashMap的size
方法是准确的么?
是准确的。
9、concurrentHashMap中计数的桶是如何扩容的?
内部维护了一个计数字段、一个计数数组;如果没并发时,通过cas维护计数字段即可,并发高时就需要初始化计数数组,并且随着并发度会逐渐对该数组进行扩容,扩容的长度不超过cpu核数,每次扩容是原来的2倍,初始化长度为2。
10、concurrentHashMap同一个桶中,链表和红黑树可以共存么?
可以的,TreeBin其实既维护了链表又维护了红黑树;当链表转为红黑树时是先转为双向链表再转为红黑树的。
总结
至此就解析完了concurrentHashMap绝大部分代码,由于代码量太大、代码写的也比较晦涩并夹杂着一些bug,已经尽力解析成让大家能懂的注释。 最近看了很多juc相关代码,它的整体套路爱用死循环、一个参数代表多种含义,大家自己看juc源码时可以从以下几点入手:
- 对一个实体中的成员属性研究清楚代表什么含义,看起源码来会事半功倍;
- 在一个死循环中有多个if去处理不同的事情,大家可以优先去看每个if是做什么的,这样可以让大家更快的理解这个方法都能干什么事。
- 实在看不懂的代码,可以多个jdk版本同时看一下,毕竟是人写的代码就会有bug,在新版本中有对bug进行修复。
有解析不对、不清楚的地方大家可以留言讨论。
看到这里的朋友辛苦帮忙点个赞吧~
转载自:https://juejin.cn/post/7403586214861275175