java线程-AtomicInteger原子类实现原理
CAS
CAS简介
我们在学习多线程期间,看到最多的例子就是累加器,代码如下:
public class Test {
long count = 0;
void add10K() {
int idx = 0;
while(idx++ < 10000) {
count += 1;
}
}
}
上面的代码并非线程安全的,问题主要出现在count变量的不可见性和count+=1并非原子性操作,之前的解决方式都是使用Synchronized或者java sdk加锁的方式来保证原子性,使用volatile来保证可见性。
在java线程-Lock详解&AQS原理详解一文中,发现大量使用了自旋+CAS操作,来保证线程安全性,这是一种无锁的操作,性能会比加锁的方式高很多。
我们先了解一下什么是CAS:
CAS指的是先检查再更新这类复合操作,英文翻译有很多种:Compare And Set、Compare And Swap或者Check And Set。
CAS指令包含3个参数:
- 共享变量的内存地址A
- 用于比较的值B
- 共享变量的新值C
CAS的检查更新逻辑如下:
if A == B # 当 A = B的时候才能将A的值更新为C
A = c
原子CAS指令
了解了CAS的基本概念以后,我们来看一下如何通过自旋 + CAS操作来实现一个线程安全的累加器。
代码如下:
class SimulatedCAS{
volatile int count;
// 实现count+=1
addOne(){
do {
newValue = count+1;
}while(count !=
cas(count,newValue)
}
// 模拟实现CAS,仅用来帮助理解
int cas(
int expect, int newValue){
// 读目前count的值
int curValue = count;
// 比较目前count值是否==期望值
if(curValue == expect){
// 如果是,则更新count的值
count= newValue;
}
// 返回写入前的值
return curValue;
}
}
但如果只是单纯的CAS操作,还是会出现线程安全问题,有可能出现这种情况,多个线程均检测到curvalue == expect,然后先后将count置为newValue,导致出现线程安全问题。
如果给cas方法加上synchronized修饰,虽说能解决上面说的问题,但是无法通过无锁的方式来解决原子性的问题,但如果cas操作本身就是一个原子性操作,那就解决了原子性操作。java作为一个高级编程语言,内部是提供了原子性的CAS操作的方法,那就是Unsafe类里的CAS操作。
Unsafe
不过先来个番外,介绍Unsafe类的具体用法。
Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。
Unsafe使用方法
使用Unsafe的方式如下:
// 可以直接调用Unsafe.getUnsafe()静态方法获取Unsafe对象。
public static Unsafe getUnsafe() {
Class<?> caller = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(caller.getClassLoader()))
// 这是
throw new SecurityException("Unsafe");
return theUnsafe;
}
不过直接在我们app代码里调用Unsafe.getUnsafe(),会抛出SecurityException("Unsafe")异常,这是因为JVM会判断当前类是否由Bootstrap classLoader
加载,如果不是的话那么就会抛出一个SecurityException
异常。也就是说,只有启动类加载器加载的类才能够调用Unsafe类中的方法,来防止这些方法在不可信的代码中被调用。
那么,为什么要对Unsafe类进行这么谨慎的使用限制呢,说到底,还是因为它实现的功能过于底层,例如直接进行内存操作、绕过jvm的安全检查创建对象等等。
那如果就想要在我们的app代码里想要使用这个类,那该如何使用的呢?这里提供两种方法:
方法一:利用反射,获取到Unsafe#theUnsafe
变量
private static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
log.error(e.getMessage(), e);
return null;
}
}
方法二:通过Java命令行命令-Xbootclasspath/a
把调用Unsafe相关方法的类A所在jar包路径追加到默认的bootstrap路径中,使得A被引导类加载器加载,从而通过Unsafe.getUnsafe
方法安全的获取Unsafe实例。
// java -Xbootclasspath/a: ${path} # 其中path为调用Unsafe相关方法的类所在jar包路径.
// java -Xbootclasspath:$JAVA_HOME/jre/lib/rt.jar:./A.jar foo.bar.MyApp
public class A{
private static final Unsafe unsafe = Unsafe.getUnsafe();
// ...
}
Unsafe主要功能
如上图所示,Unsafe提供的API大致可分为内存操作、CAS、Class相关、对象操作、线程调度、系统信息获取、内存屏障、数组操作等几类
不过这里我们主要介绍一下CAS操作,其余操作请参考美团技术团队写的文章
硬件指令
X86提供的CAS指令为cmpxchg指令。指令格式如下:
cmpxchg [目标操作数], [源操作数]
- 目标操作数位于寄存器或者内存中,用于存储变量的当前值C(currentValue)
- 源操作数位于寄存器中,用于存储变量的更新值N(NewValue)
- 隐藏的操作数位于AX寄存器中,在指令中没有明确指出,用于存储变量的期望值E(ExpectedValue)
在单核计算机上cmpxchg是原子操作,因为指令是CPU的最小单元,指令执行过程中不可中断。
但是在多核CPU上,cmpxchg就不是非原子操作,多个线程可以在多核CPU上并行执行cmpxchg指令,为了在多核CPU上还能保障cmpxchg的原子性,需要在cmpxchg指令前加上LOCK前缀。
LOCK cmpxchg [目标操作数], [源操作数]
Unsafe-CAS
但由于cmpxchg指令是硬件指令,java并不能直接调用,所以Unsafe类提供了三个的native方法,对CAS操作进行封装
/**
* CAS
* @param o 包含要修改field的对象
* @param offset 对象中某field的偏移量
* @param expected 期望值
* @param update 更新值
* @return true | false
*/
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object update);
public final native boolean compareAndSwapInt(Object o, long offset, int expected,int update);
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long update);
对应的C++代码如下:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jobject e_h, jobject x_h))
UnsafeWrapper("Unsafe_CompareAndSwapObject");
oop x = JNIHandles::resolve(x_h);
oop e = JNIHandles::resolve(e_h);
oop p = JNIHandles::resolve(obj);
HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset);
oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true);
jboolean success = (res == e);
if (success)
update_barrier_set((void*)addr, x);
return success;
UNSAFE_END
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
UnsafeWrapper("Unsafe_CompareAndSwapLong");
Handle p (THREAD, JNIHandles::resolve(obj));
jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
if (VM_Version::supports_cx8())
return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
else {
jboolean success = false;
ObjectLocker ol(p, THREAD);
if (*addr == e) { *addr = x; success = true; }
return success;
}
UNSAFE_END
通过对JVM源码的阅读,发现其实底层都是调用Atomic::cmpxchg
这种汇编指令完成的。x86里的汇编指令如下:参考mail.openjdk.org/pipermail/h…
inline jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
int mp = os::is_MP();
jbyte result;
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgb %1,(%3)"
: "=a" (result)
: "q" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return result;
}
CAS失败处理
如果多个线程竞争执行CAS,那么只会有一个线程会执行成功,其他执行失败的线程又该如何处理?主要的方案有两种:
方案一:synchronized和Lock,通过加锁的方式
public void increment_lock(){
synchronized(this){
value ++;
}
}
但是这种方式默认是通过悲观锁来实现的,可能出现等待资源而阻塞线程导致内核态到用户态的上下文切换,带来性能损耗
方案二:自旋+原子性cas操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
private volatile int value;
static {
try {
valueOffset = unsafe.objectFieldOffset(Accumulator.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
public void increment_cas(){
boolean success = false;
while(!success){
int oldValue = value;
int newValue = oldValue + 1;
success = unsafe.compareAndSwapInt(this, valueOffset, oldValue, newValue);
}
}
方案一和方案二的优缺点
加锁 | 自旋+CAS | |
---|---|---|
优点 | 处于阻塞状态的线程不会占用CPU时间片,不会浪费CPU资源 | 基于乐观锁实现,循环执行CAS,不需要阻塞线程,不会出现线程阻塞,不会出现内核态和用户态的上下文切换 |
缺点 | 通过悲观锁来实现的,可能出现等待资源而阻塞线程导致内核态到用户态的上下文切换,带来性能损耗 | 如果线程一直在运行,会浪费CPU资源 |
ABA问题
什么是ABA问题?例如有2个线程同时对同一个值(初始值为A)进行CAS操作,这三个线程如下
- 线程1,期望值为A,欲更新的值为B
- 线程2,期望值为A,欲更新的值为B
- 线程3,期望值为B,欲更新的值为A
- 线程1抢先获得CPU时间片,而线程2因为其他原因阻塞了。
- 线程1取值与期望的A值比较,发现相等然后将值更新为B。
- 然后这个时候出现了线程3,线程3取值与期望的值B比较,发现相等则将值更新为A。
- 此时线程2从阻塞中恢复,并且获得了CPU时间片,这时候线程2取值与期望的值A比较,发现相等则将值更新为B,虽然线程也完成了操作,但是线程2并不知道值已经经过了
A->B->A
的变化过程。
大部分情况下我们还是不需要关心ABA问题,但是有些场景就不能忽视ABA问题。
ABA的解决方案一般是加上版本号或者时间戳。
原子类
CAS在java里运用最常见的就是AQS里的操作和原子类了,但对于普通程序猿来说,最常见的还是原子类的使用,大家都会使用AtomicInteger#incrementAndGet来实现自增。这个章节就是主要来讨论一下原子类的实现。
java原子类的实现都在java.util.concurrent.atomic
包下,基本是通过自旋+原子CAS操作实现的,不会出现安全问题。
根据处理的数据类型,可以大致分为5类:
重点说明一下基本类型原子类,引用类型原子类以及累加器
基本类型原子类
基本类型原子类的实现基本相同,我们以AtomicInteger
为例,深入学习一下基本类型原子类。
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}
// 其余API
getAndIncrement() //原子化i++
getAndDecrement() //原子化的i--
incrementAndGet() //原子化的++i
decrementAndGet() //原子化的--i
//当前值+=delta,返回+=前的值
getAndAdd(delta)
//当前值+=delta,返回+=后的值
addAndGet(delta)
//CAS操作,返回是否成功
compareAndSet(expect, update)
//以下四个方法
//新值可以通过传入func函数来计算
getAndUpdate(func)
updateAndGet(func)
getAndAccumulate(x,func)
accumulateAndGet(x,func)
}
其中compareAndSet方法是CAS标准函数,如果value值等于expect,那么就将value更新为update值,并返回true。
getAndAdd和addAndGet方法类似,addAndGet只不过是在getAndAdd返回值的基础上再加上一个delta再返回。
getAndIncrement和incrementAndGet方法,相当于getAndAdd和addAndGet方法中的delta=1;
getAndDecrement和decrementAndGet方法,相当于getAndAdd和addAndGet方法中的delta=-1;
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
// unsafe.getAndAddInt
public final int getAndAddInt(Object o, long offset, int delta) {
int oldValue;
// 使用自旋+CAS,保证getAndAddInt总是可以将value的值增加delta
do {
oldValue = this.getIntVolatile(o, offset);
} while(!this.compareAndSwapInt(o, offset, oldValue, oldValue + delta));
return oldValue;
}
引用类型原子类
AtomicReference与AtomicInteger的实现方式类似,只不过引用类型原子类使用的CAS操作是sun.misc.Unsafe#compareAndSwapObject。
public class AtomicReference<V> implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile V value;
public AtomicReference(V initialValue) {
value = initialValue;
}
public AtomicReference() {
}
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
}
我们在学习CAS操作的时候,提到过ABA问题,在引用类型原子类中AtomicStampedReference和AtomicMarkableReference就是为了解决ABA问题而存在的。
AtomicStampedReference相比于AtomicReference多了一个int类型的stamp版本戳,它将stamp和引用封装成一个新的Pair对象,在Pair对象上执行CAS。即便引用对象存在ABA问题,但是Stamp单调递增,stamp不会存在ABA问题,所以两者组成的Pair对象,也就不会存在ABA问题。
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long pairOffset =
objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
}
AtomicMarkableReference和AtomicStampedReference的区别在于AtomicMarkableReference使用的是boolean类型的mark是否改变,来判断reference是否有被更改过。
数组类型原子类
AtomicIntegerArray中的原子操作跟AtomicInteger中的原子操作一一对应,只不过多了下标而已。
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
private boolean compareAndSetRaw(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(array, offset, expect, update);
}
对象属性更新器
如果某个类的属性没有提供合适的原子操作,那么我们可以使用对象属性更新器来对其进行原子操作,但是属性必须要被volatile修饰,否则会报异常。
参考代码:
AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
final String fieldName,
final Class<?> caller) {
final Field field;
final int modifiers;
try {
field = AccessController.doPrivileged(
new PrivilegedExceptionAction<Field>() {
public Field run() throws NoSuchFieldException {
return tclass.getDeclaredField(fieldName);
}
});
modifiers = field.getModifiers();
sun.reflect.misc.ReflectUtil.ensureMemberAccess(
caller, tclass, null, modifiers);
ClassLoader cl = tclass.getClassLoader();
ClassLoader ccl = caller.getClassLoader();
if ((ccl != null) && (ccl != cl) &&
((cl == null) || !isAncestor(cl, ccl))) {
sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
}
} catch (PrivilegedActionException pae) {
throw new RuntimeException(pae.getException());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
if (field.getType() != int.class)
throw new IllegalArgumentException("Must be integer type");
if (!Modifier.isVolatile(modifiers))
throw new IllegalArgumentException("Must be volatile type");
this.cclass = (Modifier.isProtected(modifiers) &&
tclass.isAssignableFrom(caller) &&
!isSamePackage(tclass, caller))
? caller : tclass;
this.tclass = tclass;
this.offset = U.objectFieldOffset(field);
}
累加器
针对累加这种特殊的业务场景,JUC提供了专门的LongAdder累加器,它比AtomicLong原子类性能更高,在高并发的情况下,多线程同时执行add()函数,AtomicLong会因为大量线程而不断自旋导致性能下降,但是LongAdder却能保持高性能。
其底层原理比较复杂,涉及到数据分片,哈希优化,去伪共享,非精确求和等各种优化手段。
java.util.concurrent.atomic.Striped64是所有累加器的父类,目前JUC里的所有累加器都会继承它。
abstract class Striped64 extends Number {
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
static final int NCPU = Runtime.getRuntime().availableProcessors();
transient volatile Cell[] cells;
transient volatile long base;
transient volatile int cellsBusy;
Striped64() {
}
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset
(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Striped64里主要有四个变量
-
cells
- cells数组保存多个累加变量
- Cell只包含一个成员变量value,以及一个操作value的cas()函数。
- cells数组支持动态扩容,cells数组的长度必须是2的幂次方,每次扩容都会增加为原来数组长度的2倍。
- 最开始初始化为null,当第一次出现线程竞争执行add()函数的时候,才会被创建
-
NCPU
- JVM最大可用CPU核数。
- 当cells数组长度大于等于NCPU的最小2的幂次方时,cells数组就不再扩容。这是因为同时执行累加操作的线程数不可能大于cpu的核数。
-
base
- base是一个比较特殊的累加变量。
- 用来有效避免执行复杂的分片累加逻辑。
-
cellBusy
- cellBusy用来实现锁,类似ReentrantLock中的state字段,cellBusy初始化为0,多个线程通过CAS竞争更新cellBusy,谁先将cellBusy设置为1,谁就持有了锁。
- 用来保证多个线程同时创建cells数组,创建cells数组中的cell对象,以及对cells数组进行动态扩容。
数据分片
在高并发的情况下,AtomicLong性能不高的主要原因是,多线程同时CAS更新一个变量,但是LongAdder会将一个累加变量分解成多个。多线程同时执行累加操作时,不同的线程对不同的累加变量进行操作,线程之间互不影响,这样就避免了一个线程等待另一个线程操作完成之后再操作。
我们以add()函数为例,了解一下LongAdder的实现原理
// LongAdder#add()
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 初始状态下,cells==null,第一个条件一定是false,此时会通过casBase方法,以CAS的方式更新base值,且只有当cas失败时,才会走到if中,再执行分片累加的逻辑(将新增值累加到cells数组中)。在低并发的情况下,使用base可以有效避免执行复杂的分片累加逻辑。
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
实现流程如下:
其中比较重要的是getProbe()函数,这是一个哈希函数,如果getProbe() & m = k,那么,当前线程会通过CAS将新增值x累加到cells[k]的value变量上。
另外一个函数就是longAccumulate,这是java.util.concurrent.atomic.Striped64#longAccumulate里的函数,看上述流程图,会有三种情况走到这一步。
- cells数组为null,或者cells.length == 0;
- cells[getProbe() & m] == null;
- cas更新cells[getProbe() & m]对象的value值失败;
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
// 自旋
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
哈希优化
再了解数据分片的时候,我们发现会频繁调用到getProbe()函数,实际上这是个哈希函数,不过原子类对哈希函数进行了一些性能优化,对应的Cell对象的下标是通过getProbe() & n公式来计算得到。其中,n表示cells数组的长度,n为2的幂次方,因此getProbe() % n可以转化为getProbe() & (n - 1),以提高计算速度。除此之外getProbe的值会保存在对应的Thread对象的成员变量中,之后便可以一直重复使用,除非发生冲突,两个线程同时执行CAS更新同一个Cell对象,执行CAS失败的线程会重新生成新的哈希值,并同步更新到对应的Thread对象中。
/**
* 直接获取当前线程对应的Thread对象的PROBE成员变量值
*/
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
/**
* 基于老的哈希值probe重新计算新的哈希值,并存储到当前线程对应的Thread对象的PROBE成员变量中
*/
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
去伪共享
CPU读写缓存的最小单元是缓存行。不同CPU上的缓存行大小不同,可以为32字节,64字节或者128字节等,常见的大小为64字节。
因为数据是以缓存行为单位来读写的,所以,当线程t1从内存中读取cellA到缓存中,会顺带着读取cellB,同理,当线程t2从内存中读取cellB到缓存中,也会顺带着读取cellA。当t1更新cellA的值时,按理说不会影响线程t2对cellB的缓存,但是缓存中的数据是按照缓存行来读写的,因此,线程t1会将整个缓存行设置为无效,这就会导致线程t2对cellB的缓存也会失效,需要重新从内存里读取。同理线程2更新cellB的值,也会导致线程t1对cellA的缓存失效,导致缓存频繁失效。
JDK 8开始,提供了一个sun.misc.Contended
注解,用来解决伪共享问题,加上这个注解的类会自动补齐缓存行。Cell类就使用了sun.misc.Contended
注解,所以两个Cell对象便不会存储在同一个缓存行中,因此,也就不会出现伪共享的问题。
非精确求和
LongAdder的sum()会累加base和cells中的Cell对象的value值,从而得到最终的累加值,但这个值不一定是准确的。
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
sum执行时,并没有限制对base和cells的更新。也就是说,对LongAdder的最后一次更新not happens-before 最近的一次读取。
首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致。
其次,这里对cell的读取也无法保证是最后一次写入的值。
所以,sum方法在没有并发的情况下,可以获得正确的结果。
统计器
LongAccumulator和DoubleAccumulator是统计器,比累加器多了一些统计的操作。源码如下:
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
public void accumulate(long x) {
Cell[] as; long b, v, r; int m; Cell a;
if ((as = cells) != null ||
(r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended =
(r = function.applyAsLong(v = a.value, x)) == v ||
a.cas(v, r)))
longAccumulate(x, function, uncontended);
}
}
看源码发现,accumulate的实现跟累加器的类似,唯一不同的是,构造器里带着LongBinaryOperator接口。使用方式如下:
public class LongAccumulatorDemo {
public static class LongMax implements LongBinaryOperator{
@Override
public long applyAsLong(long left, long right) {
return Math.max(left, right);
}
}
public static void main(String[] args) {
LongAccumulator accumulator = new LongAccumulator(new LongMax(), Long.MIN_VALUE);
accumulator.accumulate(10);
accumulator.accumulate(-3);
accumulator.accumulate(20);
System.out.println(accumulator.get());
}
}
转载自:https://juejin.cn/post/7201009854524375095