ConcurrentHashMap源码核心方法解析
源码真的可以让一个人学到不少,不要怕!!!坚持看下去,必有收获!!!
源码核心方法
ConcurrentHashMap
结合CAS
和synchronized
-> 平衡性能与线程安全性
putVal()
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) { // 遍历整个Node数组
Node<K,V> f; int n, i, fh; K fk; V fv;
- 初始化,默认大小
DEFAULT_CAPACITY = 16
if (tab == null || (n = tab.length) == 0) // 如果还没有初始化则进行初始化
tab = initTable();
- 添加链表第一个元素,此时只有添加一种操作,使用
CAS
可以在一定程度上优化性能
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) // CAS -> 添加链表第一个元素
break; // no lock when adding to empty bin
}
- 辅助扩容,想了解的小伙伴可以自己去看一下源码
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // -> transfer()
putIfAbsent()
方法的实现,key
存在 -> 返回当前value
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
- 根据
hash
计算得出的数组index
已经存在元素,向链表/红黑树中插入元素key
相同,覆盖原来的value
->oldVal
key
不同,添加链表后继节点 ->pre.next
链表
else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) // 默认情况下覆盖,调用putIfAbsent则返回已存在的value值 e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value); break; } } }
红黑树
else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) // 默认情况下覆盖,调用putIfAbsent则返回已存在的value值 p.val = value; } }
- 扩容期间插入元素 -> 抛出异常
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
} // if(tabAt(tab,i) == f){
} // synchronized(f){
- 记录链表(
binCount=1
)/红黑树(binCount=2
)的节点数- 当链表的节点数"≥8",进行扩容 ->
treeifyBin()
if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 扩容 -> 2*n if (oldVal != null) return oldVal; // 属于putIfAbsent -> 返回已存在的`oldVal` break; } } // else{ } // for (Node<K,V>[] tab = table;;) {
- 当链表的节点数"≥8",进行扩容 ->
- 记录整个
ConcurrentHashMap
表的大小
addCount(1L, binCount); // baseCount + CounterCell[i].value累加
return null;
}
treeifyBin()
接上面,链表长度"≥8" -> 扩容
tryPresize()
-> 数组扩容至"≥64" -> “ 链表 -> 红黑树 ”
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1); // 位运算扩容 -> 2*n
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) { // 链表节点 -> 树节点
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p; // 树的根节点
else
tl.next = p;
tl = p;
}
// 转化后的树通过根节点存入数组,TreeBin会通过旋转、着色等:"链表"树 -> 红黑树
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
} // synchronized (b)
} // else if
}
}
tryPresize()
扩容:旧数组 -> 新数组
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1); // 计算新容量,向上取最近的二进制,如15 -> 16
int sc;
while ((sc = sizeCtl) >= 0) { // `sizeCtl`是根据容量和加载因子计算的控制变量,当sizeCtl>0 -> 扩容
Node<K,V>[] tab = table; int n;
// 在并发环境下,如果哈希表尚未初始化,并且首次调用的是 `tryPresize` 方法,它会负责对哈希表进行初始化。
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
// "sc > c" -> 存在线程已经开始初始化新数组/扩容,并且已经设置了一个更大的大小
// "sc < c" -> 当前线程将根据基于请求的大小计算得到的二进制数初始化新数组/扩容
if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// 已经是最大容量了,不能扩容 -> 直接返回
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 元素迁移:旧数组 -> 新数组
else if (tab == table) { // volatile table -> 自旋CAS -> 没有改变 -> 进行元素迁移
int rs = resizeStamp(n); // 扩容戳 -> 用于扩容过程中多个线程的同步
if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
transfer()
表中数据分块 -> 多线程并发迁移
- 初始化扩容数组
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算每个线程处理的数据区间大小,最小16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 初始化扩容数组 2*n
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
- 记录值 -> 用于判断状态
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 构建在新表中的空节点
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
- 分配处理区域
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 分配处理区域,假设 transferIndex = n = 64
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) { // nextIndex = transferIndex = 64
i = -1;
advance = false;
}
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) { // nextBound = 64-16 = 48
bound = nextBound; // bound = 48
i = nextIndex - 1; // i = 64-1 = 63 -> 处理区域"48~63"
advance = false;
}
}
- 数据迁移完成
// 数据迁移完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果旧数组中当前索引定位表中节点为空,CAS尝试更新为转发节点fwd,记录更新 -> 并发安全
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 如果旧数组中当前索引定位表中存在节点。迁移完成后,旧表中以`MOVED`占位,记录更新
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
- 数据迁移过程
链表
else {
// 数据迁移
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
// 此处应当是为了性能优化,比如 2 -> 10 -> 6 -> 14,纪录hn = lastRun = 6
// "6 -> 14"这一部分无需重新构建,节省部分对象创建的CPU开销、内存占用
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0) // 低位:在新数组原位置上
ln = new Node<K,V>(ph, pk, pv, ln);
else // 高位:在新数组新位置上
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
- 上面这块不容易懂,因此制作下表,不得不感慨二进制与运算(&)的精妙之处😮
红黑树
- 这里倒是没什么特别的地方,就是通过遍历然后将树迁移 -> "链表"树 -
TreeBin
-> 红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 低位:在新数组原位置上
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
// 高位:在新数组新位置上
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
- 经典的转化过程抛出异常
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
} // if (tabAt(tab, i) == f) {
} // synchronized (f) {
} // else { synchronized (f)
} // for (int i = 0, bound = 0;;) {
}
addCount()
private final void addCount(long x, int check) {
CounterCell[] cs; long b, s;
// 尝试更新baseCount,不成功(可能有其它线程占用更新)则使用counterCells
if ((cs = counterCells) != null ||
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell c; long v; int m;
boolean uncontended = true;
// 根据当前线程的hash定位cs中的某个位置,尝试更新cs[x]
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
// 失败则调用`fullAddCount()`
fullAddCount(x, uncontended); // 多个计数单元CounterCell[i] + `CAS`
// 如果竞争激烈则 -> 1.对 CounterCell 扩容 2.回退到 baseCount 更新
return;
}
if (check <= 1)
return;
s = sumCount(); // baseCount + counterCell[i].value累加
}
// 辅助扩容,维护性能和加载因子,通过 条件判断 和 原子操作 确保并发环境下扩容的安全性
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) {
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
transfer(tab, null);
s = sumCount();
}
}
}
整理到这里实属不易,仅仅是阅读和分析这一部分源码就花了2天左右的时间,对于文章中没有涉及的方法感兴趣的小伙伴可以自行去了解,之后视情况也可能会补充一些其它方法,无论如何您是坚持看到了最后,感谢您的认可!
转载自:https://juejin.cn/post/7397286692090691593