likes
comments
collection
share

java线程-AtomicInteger原子类实现原理

作者站长头像
站长
· 阅读数 27

CAS

CAS简介

我们在学习多线程期间,看到最多的例子就是累加器,代码如下:

public class Test {
  long count = 0;
  void add10K() {
    int idx = 0;
    while(idx++ < 10000) {
      count += 1;
    }
  }
}

上面的代码并非线程安全的,问题主要出现在count变量的不可见性和count+=1并非原子性操作,之前的解决方式都是使用Synchronized或者java sdk加锁的方式来保证原子性,使用volatile来保证可见性。

在java线程-Lock详解&AQS原理详解一文中,发现大量使用了自旋+CAS操作,来保证线程安全性,这是一种无锁的操作,性能会比加锁的方式高很多。

我们先了解一下什么是CAS:

CAS指的是先检查再更新这类复合操作,英文翻译有很多种:Compare And Set、Compare And Swap或者Check And Set。

CAS指令包含3个参数:

  1. 共享变量的内存地址A
  2. 用于比较的值B
  3. 共享变量的新值C

CAS的检查更新逻辑如下:

if A == B # 当 A = B的时候才能将A的值更新为C
  A = c

原子CAS指令

了解了CAS的基本概念以后,我们来看一下如何通过自旋 + CAS操作来实现一个线程安全的累加器。

代码如下:

class SimulatedCAS{
  volatile int count;
  // 实现count+=1
  addOne(){
    do {
      newValue = count+1;
    }while(count !=
      cas(count,newValue)
  }
  // 模拟实现CAS,仅用来帮助理解
  int cas(
    int expect, int newValue){
    // 读目前count的值
    int curValue = count;
    // 比较目前count值是否==期望值
    if(curValue == expect){
      // 如果是,则更新count的值
      count= newValue;
    }
    // 返回写入前的值
    return curValue;
  }
}

但如果只是单纯的CAS操作,还是会出现线程安全问题,有可能出现这种情况,多个线程均检测到curvalue == expect,然后先后将count置为newValue,导致出现线程安全问题。

如果给cas方法加上synchronized修饰,虽说能解决上面说的问题,但是无法通过无锁的方式来解决原子性的问题,但如果cas操作本身就是一个原子性操作,那就解决了原子性操作。java作为一个高级编程语言,内部是提供了原子性的CAS操作的方法,那就是Unsafe类里的CAS操作。

Unsafe

不过先来个番外,介绍Unsafe类的具体用法。

Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。

Unsafe使用方法

使用Unsafe的方式如下:

// 可以直接调用Unsafe.getUnsafe()静态方法获取Unsafe对象。
public static Unsafe getUnsafe() {
  Class<?> caller = Reflection.getCallerClass();
  if (!VM.isSystemDomainLoader(caller.getClassLoader()))
    // 这是
    throw new SecurityException("Unsafe");
  return theUnsafe;
}

不过直接在我们app代码里调用Unsafe.getUnsafe(),会抛出SecurityException("Unsafe")异常,这是因为JVM会判断当前类是否由Bootstrap classLoader加载,如果不是的话那么就会抛出一个SecurityException异常。也就是说,只有启动类加载器加载的类才能够调用Unsafe类中的方法,来防止这些方法在不可信的代码中被调用。

那么,为什么要对Unsafe类进行这么谨慎的使用限制呢,说到底,还是因为它实现的功能过于底层,例如直接进行内存操作、绕过jvm的安全检查创建对象等等。

那如果就想要在我们的app代码里想要使用这个类,那该如何使用的呢?这里提供两种方法:

方法一:利用反射,获取到Unsafe#theUnsafe变量

private static Unsafe reflectGetUnsafe() {
    try {
      Field field = Unsafe.class.getDeclaredField("theUnsafe");
      field.setAccessible(true);
      return (Unsafe) field.get(null);
    } catch (Exception e) {
      log.error(e.getMessage(), e);
      return null;
    }
}

方法二:通过Java命令行命令-Xbootclasspath/a把调用Unsafe相关方法的类A所在jar包路径追加到默认的bootstrap路径中,使得A被引导类加载器加载,从而通过Unsafe.getUnsafe方法安全的获取Unsafe实例。

// java -Xbootclasspath/a: ${path}   # 其中path为调用Unsafe相关方法的类所在jar包路径.
// java -Xbootclasspath:$JAVA_HOME/jre/lib/rt.jar:./A.jar foo.bar.MyApp
public class A{
  private static final Unsafe unsafe = Unsafe.getUnsafe();
  // ...
}

Unsafe主要功能

java线程-AtomicInteger原子类实现原理 如上图所示,Unsafe提供的API大致可分为内存操作、CAS、Class相关、对象操作、线程调度、系统信息获取、内存屏障、数组操作等几类

不过这里我们主要介绍一下CAS操作,其余操作请参考美团技术团队写的文章

硬件指令

X86提供的CAS指令为cmpxchg指令。指令格式如下:

cmpxchg [目标操作数], [源操作数]
  • 目标操作数位于寄存器或者内存中,用于存储变量的当前值C(currentValue)
  • 源操作数位于寄存器中,用于存储变量的更新值N(NewValue)
  • 隐藏的操作数位于AX寄存器中,在指令中没有明确指出,用于存储变量的期望值E(ExpectedValue)

在单核计算机上cmpxchg是原子操作,因为指令是CPU的最小单元,指令执行过程中不可中断。

但是在多核CPU上,cmpxchg就不是非原子操作,多个线程可以在多核CPU上并行执行cmpxchg指令,为了在多核CPU上还能保障cmpxchg的原子性,需要在cmpxchg指令前加上LOCK前缀。

LOCK cmpxchg [目标操作数], [源操作数]

Unsafe-CAS

但由于cmpxchg指令是硬件指令,java并不能直接调用,所以Unsafe类提供了三个的native方法,对CAS操作进行封装

/**
  *  CAS
  * @param o         包含要修改field的对象
  * @param offset    对象中某field的偏移量
  * @param expected  期望值
  * @param update    更新值
  * @return          true | false
  */
public final native boolean compareAndSwapObject(Object o, long offset,  Object expected, Object update);
​
public final native boolean compareAndSwapInt(Object o, long offset, int expected,int update);
  
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long update);

对应的C++代码如下:

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jobject e_h, jobject x_h))
  UnsafeWrapper("Unsafe_CompareAndSwapObject");
  oop x = JNIHandles::resolve(x_h);
  oop e = JNIHandles::resolve(e_h);
  oop p = JNIHandles::resolve(obj);
  HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset);
  oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true);
  jboolean success  = (res == e);
  if (success)
    update_barrier_set((void*)addr, x);
  return success;
UNSAFE_END
​
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
​
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
  UnsafeWrapper("Unsafe_CompareAndSwapLong");
  Handle p (THREAD, JNIHandles::resolve(obj));
  jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
  if (VM_Version::supports_cx8())
    return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
  else {
    jboolean success = false;
    ObjectLocker ol(p, THREAD);
    if (*addr == e) { *addr = x; success = true; }
    return success;
  }
UNSAFE_END

通过对JVM源码的阅读,发现其实底层都是调用Atomic::cmpxchg这种汇编指令完成的。x86里的汇编指令如下:参考mail.openjdk.org/pipermail/h…

inline jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
   int mp = os::is_MP();
   jbyte result;
   __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgb %1,(%3)"
                     : "=a" (result)
                     : "q" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                     : "cc", "memory");
   return result;
}

CAS失败处理

如果多个线程竞争执行CAS,那么只会有一个线程会执行成功,其他执行失败的线程又该如何处理?主要的方案有两种:

方案一:synchronized和Lock,通过加锁的方式

public void increment_lock(){
  synchronized(this){
    value ++;
  }
}

但是这种方式默认是通过悲观锁来实现的,可能出现等待资源而阻塞线程导致内核态到用户态的上下文切换,带来性能损耗

方案二:自旋+原子性cas操作

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
private volatile int value;
​
static {
  try {
    valueOffset = unsafe.objectFieldOffset(Accumulator.class.getDeclaredField("value"));
  } catch (Exception ex) {
    throw new Error(ex);
  }
}
​
public void increment_cas(){
  boolean success = false;
  while(!success){
    int oldValue = value;
    int newValue = oldValue + 1;
    success = unsafe.compareAndSwapInt(this, valueOffset, oldValue, newValue);
  }
}

方案一和方案二的优缺点

加锁自旋+CAS
优点处于阻塞状态的线程不会占用CPU时间片,不会浪费CPU资源基于乐观锁实现,循环执行CAS,不需要阻塞线程,不会出现线程阻塞,不会出现内核态和用户态的上下文切换
缺点通过悲观锁来实现的,可能出现等待资源而阻塞线程导致内核态到用户态的上下文切换,带来性能损耗如果线程一直在运行,会浪费CPU资源

ABA问题

什么是ABA问题?例如有2个线程同时对同一个值(初始值为A)进行CAS操作,这三个线程如下

  1. 线程1,期望值为A,欲更新的值为B
  2. 线程2,期望值为A,欲更新的值为B
  3. 线程3,期望值为B,欲更新的值为A
  • 线程1抢先获得CPU时间片,而线程2因为其他原因阻塞了。
  • 线程1取值与期望的A值比较,发现相等然后将值更新为B。
  • 然后这个时候出现了线程3,线程3取值与期望的值B比较,发现相等则将值更新为A。
  • 此时线程2从阻塞中恢复,并且获得了CPU时间片,这时候线程2取值与期望的值A比较,发现相等则将值更新为B,虽然线程也完成了操作,但是线程2并不知道值已经经过了A->B->A的变化过程。

大部分情况下我们还是不需要关心ABA问题,但是有些场景就不能忽视ABA问题。

ABA的解决方案一般是加上版本号或者时间戳。

原子类

CAS在java里运用最常见的就是AQS里的操作和原子类了,但对于普通程序猿来说,最常见的还是原子类的使用,大家都会使用AtomicInteger#incrementAndGet来实现自增。这个章节就是主要来讨论一下原子类的实现。

java原子类的实现都在java.util.concurrent.atomic包下,基本是通过自旋+原子CAS操作实现的,不会出现安全问题。

根据处理的数据类型,可以大致分为5类:

java线程-AtomicInteger原子类实现原理

重点说明一下基本类型原子类,引用类型原子类以及累加器

基本类型原子类

基本类型原子类的实现基本相同,我们以AtomicInteger为例,深入学习一下基本类型原子类。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
​
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
​
    private volatile int value;
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }
​
    public AtomicInteger() {
    }
  
    // 其余API
    getAndIncrement() //原子化i++
    getAndDecrement() //原子化的i--
    incrementAndGet() //原子化的++i
    decrementAndGet() //原子化的--i
    //当前值+=delta,返回+=前的值
    getAndAdd(delta) 
    //当前值+=delta,返回+=后的值
    addAndGet(delta)
    //CAS操作,返回是否成功
    compareAndSet(expect, update)
    //以下四个方法
    //新值可以通过传入func函数来计算
    getAndUpdate(func)
    updateAndGet(func)
    getAndAccumulate(x,func)
    accumulateAndGet(x,func)
}

其中compareAndSet方法是CAS标准函数,如果value值等于expect,那么就将value更新为update值,并返回true。

getAndAdd和addAndGet方法类似,addAndGet只不过是在getAndAdd返回值的基础上再加上一个delta再返回。

getAndIncrement和incrementAndGet方法,相当于getAndAdd和addAndGet方法中的delta=1;

getAndDecrement和decrementAndGet方法,相当于getAndAdd和addAndGet方法中的delta=-1;

public final int getAndAdd(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta);
}
​
public final int addAndGet(int delta) {
  return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
​
public final int getAndIncrement() {
  return unsafe.getAndAddInt(this, valueOffset, 1);
}
​
public final int incrementAndGet() {
  return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
​
public final int getAndDecrement() {
  return unsafe.getAndAddInt(this, valueOffset, -1);
}
​
public final int decrementAndGet() {
  return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
​
// unsafe.getAndAddInt
public final int getAndAddInt(Object o, long offset, int delta) {
  int oldValue;
  // 使用自旋+CAS,保证getAndAddInt总是可以将value的值增加delta
  do {
    oldValue = this.getIntVolatile(o, offset);
  } while(!this.compareAndSwapInt(o, offset, oldValue, oldValue + delta));
​
  return oldValue;
}

引用类型原子类

AtomicReference与AtomicInteger的实现方式类似,只不过引用类型原子类使用的CAS操作是sun.misc.Unsafe#compareAndSwapObject。

public class AtomicReference<V> implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
​
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicReference.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
​
    private volatile V value;
    public AtomicReference(V initialValue) {
        value = initialValue;
    }
​
    public AtomicReference() {
    }
  
    public final boolean compareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }
}

我们在学习CAS操作的时候,提到过ABA问题,在引用类型原子类中AtomicStampedReference和AtomicMarkableReference就是为了解决ABA问题而存在的。

AtomicStampedReference相比于AtomicReference多了一个int类型的stamp版本戳,它将stamp和引用封装成一个新的Pair对象,在Pair对象上执行CAS。即便引用对象存在ABA问题,但是Stamp单调递增,stamp不会存在ABA问题,所以两者组成的Pair对象,也就不会存在ABA问题。

public class AtomicStampedReference<V> {
​
    private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
​
    private volatile Pair<V> pair;
​
    public AtomicStampedReference(V initialRef, int initialStamp) {
        pair = Pair.of(initialRef, initialStamp);
    }
​
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }
​
    private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
    private static final long pairOffset =
        objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
​
    private boolean casPair(Pair<V> cmp, Pair<V> val) {
        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
    }
}

AtomicMarkableReference和AtomicStampedReference的区别在于AtomicMarkableReference使用的是boolean类型的mark是否改变,来判断reference是否有被更改过。

数组类型原子类

AtomicIntegerArray中的原子操作跟AtomicInteger中的原子操作一一对应,只不过多了下标而已。

public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update); 
}
​
private boolean compareAndSetRaw(long offset, int expect, int update) {
    return unsafe.compareAndSwapInt(array, offset, expect, update);
}

对象属性更新器

如果某个类的属性没有提供合适的原子操作,那么我们可以使用对象属性更新器来对其进行原子操作,但是属性必须要被volatile修饰,否则会报异常。

参考代码:

AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
                              final String fieldName,
                              final Class<?> caller) {
    final Field field;
    final int modifiers;
    try {
        field = AccessController.doPrivileged(
            new PrivilegedExceptionAction<Field>() {
                public Field run() throws NoSuchFieldException {
                    return tclass.getDeclaredField(fieldName);
                }
            });
        modifiers = field.getModifiers();
        sun.reflect.misc.ReflectUtil.ensureMemberAccess(
            caller, tclass, null, modifiers);
        ClassLoader cl = tclass.getClassLoader();
        ClassLoader ccl = caller.getClassLoader();
        if ((ccl != null) && (ccl != cl) &&
            ((cl == null) || !isAncestor(cl, ccl))) {
            sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
        }
    } catch (PrivilegedActionException pae) {
        throw new RuntimeException(pae.getException());
    } catch (Exception ex) {
        throw new RuntimeException(ex);
    }
​
    if (field.getType() != int.class)
        throw new IllegalArgumentException("Must be integer type");
​
    if (!Modifier.isVolatile(modifiers))
        throw new IllegalArgumentException("Must be volatile type");
​
    this.cclass = (Modifier.isProtected(modifiers) &&
                   tclass.isAssignableFrom(caller) &&
                   !isSamePackage(tclass, caller))
                  ? caller : tclass;
    this.tclass = tclass;
    this.offset = U.objectFieldOffset(field);
}

累加器

针对累加这种特殊的业务场景,JUC提供了专门的LongAdder累加器,它比AtomicLong原子类性能更高,在高并发的情况下,多线程同时执行add()函数,AtomicLong会因为大量线程而不断自旋导致性能下降,但是LongAdder却能保持高性能。

其底层原理比较复杂,涉及到数据分片,哈希优化,去伪共享,非精确求和等各种优化手段。

java.util.concurrent.atomic.Striped64是所有累加器的父类,目前JUC里的所有累加器都会继承它。

abstract class Striped64 extends Number {
    
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
​
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
​
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    transient volatile Cell[] cells;
    transient volatile long base;
    transient volatile int cellsBusy;
    Striped64() {
    }
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
      // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
​
}

Striped64里主要有四个变量

  • cells

    • cells数组保存多个累加变量
    • Cell只包含一个成员变量value,以及一个操作value的cas()函数。
    • cells数组支持动态扩容,cells数组的长度必须是2的幂次方,每次扩容都会增加为原来数组长度的2倍。
    • 最开始初始化为null,当第一次出现线程竞争执行add()函数的时候,才会被创建
  • NCPU

    • JVM最大可用CPU核数。
    • 当cells数组长度大于等于NCPU的最小2的幂次方时,cells数组就不再扩容。这是因为同时执行累加操作的线程数不可能大于cpu的核数。
  • base

    • base是一个比较特殊的累加变量。
    • 用来有效避免执行复杂的分片累加逻辑。
  • cellBusy

    • cellBusy用来实现锁,类似ReentrantLock中的state字段,cellBusy初始化为0,多个线程通过CAS竞争更新cellBusy,谁先将cellBusy设置为1,谁就持有了锁。
    • 用来保证多个线程同时创建cells数组,创建cells数组中的cell对象,以及对cells数组进行动态扩容。

数据分片

在高并发的情况下,AtomicLong性能不高的主要原因是,多线程同时CAS更新一个变量,但是LongAdder会将一个累加变量分解成多个。多线程同时执行累加操作时,不同的线程对不同的累加变量进行操作,线程之间互不影响,这样就避免了一个线程等待另一个线程操作完成之后再操作。

我们以add()函数为例,了解一下LongAdder的实现原理

// LongAdder#add()
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // 初始状态下,cells==null,第一个条件一定是false,此时会通过casBase方法,以CAS的方式更新base值,且只有当cas失败时,才会走到if中,再执行分片累加的逻辑(将新增值累加到cells数组中)。在低并发的情况下,使用base可以有效避免执行复杂的分片累加逻辑。
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

实现流程如下:

java线程-AtomicInteger原子类实现原理

其中比较重要的是getProbe()函数,这是一个哈希函数,如果getProbe() & m = k,那么,当前线程会通过CAS将新增值x累加到cells[k]的value变量上。

另外一个函数就是longAccumulate,这是java.util.concurrent.atomic.Striped64#longAccumulate里的函数,看上述流程图,会有三种情况走到这一步。

  • cells数组为null,或者cells.length == 0;
  • cells[getProbe() & m] == null;
  • cas更新cells[getProbe() & m]对象的value值失败;
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    // 自旋
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

哈希优化

再了解数据分片的时候,我们发现会频繁调用到getProbe()函数,实际上这是个哈希函数,不过原子类对哈希函数进行了一些性能优化,对应的Cell对象的下标是通过getProbe() & n公式来计算得到。其中,n表示cells数组的长度,n为2的幂次方,因此getProbe() % n可以转化为getProbe() & (n - 1),以提高计算速度。除此之外getProbe的值会保存在对应的Thread对象的成员变量中,之后便可以一直重复使用,除非发生冲突,两个线程同时执行CAS更新同一个Cell对象,执行CAS失败的线程会重新生成新的哈希值,并同步更新到对应的Thread对象中。

/**
* 直接获取当前线程对应的Thread对象的PROBE成员变量值
*/
static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
​
/**
* 基于老的哈希值probe重新计算新的哈希值,并存储到当前线程对应的Thread对象的PROBE成员变量中
*/
static final int advanceProbe(int probe) {
    probe ^= probe << 13;   // xorshift
    probe ^= probe >>> 17;
    probe ^= probe << 5;
    UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
    return probe;
}

去伪共享

CPU读写缓存的最小单元是缓存行。不同CPU上的缓存行大小不同,可以为32字节,64字节或者128字节等,常见的大小为64字节。

java线程-AtomicInteger原子类实现原理

因为数据是以缓存行为单位来读写的,所以,当线程t1从内存中读取cellA到缓存中,会顺带着读取cellB,同理,当线程t2从内存中读取cellB到缓存中,也会顺带着读取cellA。当t1更新cellA的值时,按理说不会影响线程t2对cellB的缓存,但是缓存中的数据是按照缓存行来读写的,因此,线程t1会将整个缓存行设置为无效,这就会导致线程t2对cellB的缓存也会失效,需要重新从内存里读取。同理线程2更新cellB的值,也会导致线程t1对cellA的缓存失效,导致缓存频繁失效。

JDK 8开始,提供了一个sun.misc.Contended 注解,用来解决伪共享问题,加上这个注解的类会自动补齐缓存行。Cell类就使用了sun.misc.Contended注解,所以两个Cell对象便不会存储在同一个缓存行中,因此,也就不会出现伪共享的问题。

非精确求和

LongAdder的sum()会累加base和cells中的Cell对象的value值,从而得到最终的累加值,但这个值不一定是准确的。

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

sum执行时,并没有限制对base和cells的更新。也就是说,对LongAdder的最后一次更新not happens-before 最近的一次读取。

首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致。

其次,这里对cell的读取也无法保证是最后一次写入的值。

所以,sum方法在没有并发的情况下,可以获得正确的结果。

统计器

LongAccumulator和DoubleAccumulator是统计器,比累加器多了一些统计的操作。源码如下:

public LongAccumulator(LongBinaryOperator accumulatorFunction,
                       long identity) {
    this.function = accumulatorFunction;
    base = this.identity = identity;
}
​
public void accumulate(long x) {
    Cell[] as; long b, v, r; int m; Cell a;
    if ((as = cells) != null ||
        (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended =
              (r = function.applyAsLong(v = a.value, x)) == v ||
              a.cas(v, r)))
            longAccumulate(x, function, uncontended);
    }
}

看源码发现,accumulate的实现跟累加器的类似,唯一不同的是,构造器里带着LongBinaryOperator接口。使用方式如下:

public class LongAccumulatorDemo {
​
  public static class LongMax implements LongBinaryOperator{
​
    @Override
    public long applyAsLong(long left, long right) {
      return Math.max(left, right);
    }
  }
​
  public static void main(String[] args) {
    LongAccumulator accumulator = new LongAccumulator(new LongMax(), Long.MIN_VALUE);
    accumulator.accumulate(10);
    accumulator.accumulate(-3);
    accumulator.accumulate(20);
    System.out.println(accumulator.get());
  }
}