likes
comments
collection
share

【生吃源码】java.util.concurrent?(内容很长)

作者站长头像
站长
· 阅读数 15

什么是JUC

在jdk内的rt包下的java包内,java.util.concurrent包底下的工具,包含了lock、atomic以及一些并发容器比如说线程池、阻塞队列等

本文基于JDK1.8!

【生吃源码】java.util.concurrent?(内容很长)

sun.misc.Unsafe

  • unsafe 顾名思义就是 不安全,java为我们提供了unsafe类,通过源码可以看到他提供了一系列的native方法,通过jni调用jdk的底层c++实现

【生吃源码】java.util.concurrent?(内容很长)

  • unsafe 提供了一系列的c++操作

    • 对象管理(查看和修改) getXXX() putXXX()
    • 内存分配 xxxMemory
    • 对象地址获取 xxxFieldOffset xxxBase
    • 宿主机信息
    • 线程调度
    • Class方法
    • 内存屏障
    • CAS

【生吃源码】java.util.concurrent?(内容很长)

图片来自美团 tech.meituan.com/2019/02/14/…

CAS

可以从源码中看到的是在juc包内基本上都是依赖于unsafe的cas操作,所以主要来看看cas是什么

CAS就是compareAndSwap 比较交换

拿 Unsafe.compareAndSwapObject来说,他在Unsafe类中的定义是这样的

  /***
   * Compares the value of the object field at the specified offset
   * in the supplied object with the given expected value, and updates
   * it if they match.  The operation of this method should be atomic,
   * thus providing an uninterruptible way of updating an object field.
   *
   * @param obj the object containing the field to modify.
   * @param offset the offset of the object field within <code>obj</code>.
   * @param expect the expected value of the field.
   * @param update the new value of the field if it equals <code>expect</code>.
   * @return true if the field was changed.
   */
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

他注释通过google的翻译是

将提供的对象中指定偏移处的对象字段的值与给定的预期值进行比较,如果它们匹配则更新它。 此方法的操作应该是原子的,因此提供了一种更新对象字段的不间断方式。

  • obj :包含要修改的字段对象;
  • offset :字段在对象内的偏移量;
  • expect : 字段的期望值;
  • update :如果该字段的值等于字段的期望值,用于更新字段的新值;

【生吃源码】java.util.concurrent?(内容很长)

大概的意思就是 如果 我要处理的对象是我现在的值,那就替换我预设定的值,也就是乐观锁

Atomic

【生吃源码】java.util.concurrent?(内容很长)

atomic的中文释义是:原子,原子是 不能被进一步分割的最小粒子 即原子操作是指不会被线程调度机制打断的操作,就是并发安全

juc内提供的原子类

【生吃源码】java.util.concurrent?(内容很长)

  • AtomicBoolean
  • AtomicInteger
  • AtomicIntegerArray
  • AtomicIntegerFieldUpdater
  • AtomicLong
  • AtomicLongArray
  • AtomicLongFieldUpdater
  • AtomicMarkableReference
  • AtomicReference
  • AtomicReferenceArray
  • AtomicReferenceFieldUpdater
  • AtomicStampedReference
  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder
  • Striped64

以 AtomicInteger 为例

我们取我们最为基础的数据类型int的原子类AtomicInteger来看,他提供了一系列对int数据操作的原子操作方法

【生吃源码】java.util.concurrent?(内容很长)

它基于一个volatile修饰的int值进行操作,提供了

  • public final int get()
  • public final void set(int newValue)
  • public final void lazySet(int newValue)
  • public final int getAndSet(int newValue)
  • public final boolean compareAndSet(int expect, int update)
  • public final boolean weakCompareAndSet(int expect, int update)
  • public final int getAndIncrement()
  • public final int getAndIncrement()
  • public final int getAndAdd(int delta)
  • public final int incrementAndGet()
  • public final int decrementAndGet()
  • public final int addAndGet(int delta)
  • public final int getAndUpdate(IntUnaryOperator updateFunction)
  • public final int updateAndGet(IntUnaryOperator updateFunction)
  • public final int getAndAccumulate(int x,IntBinaryOperator accumulatorFunction)
  • public final int accumulateAndGet(int x,IntBinaryOperator accumulatorFunction
  • 还有一些数据转换方法toString,int、long、float、doubleValue

具体实现的方式

  • get和set方法就是单纯的获取和修改value的值
/**  
* Gets the current value.  
* @return the current value  
*/  
public final int get() {  
    return value;  
}  

/**  
* Sets to the given value.  
* @param newValue the new value  
*/  
public final void set(int newValue) {  
    value = newValue;  
}
  • lazySet方法是调用unsafe类的putOrderedInt方法
/**  
* Eventually sets to the given value.  
*  最终设置为给定值。
* @param newValue the new value  
* @since 1.6  
*/  
public final void lazySet(int newValue) {  
    unsafe.putOrderedInt(this, valueOffset, newValue);  
}
  • putOrderedInt作为putIntVolatile不保证可见性的版本,拥有较快的处理速度,但是不保证volatile不建议使用

  • getAndSet方法调用unsafe类的getAndSetInt方法,getAndSetInt通过getIntVolatile方法获取值,通过while(!cas)进行自旋直至赋值成功

/**  
* Atomically sets to the given value and returns the old value.  
* 以原子方式设置给定值并返回旧值。
* @param newValue the new value  
* @return the previous value  
*/  
public final int getAndSet(int newValue) {  
    return unsafe.getAndSetInt(this, valueOffset, newValue);  
}

//unsafe.getAndSetInt
public final int getAndSetInt(Object var1, long var2, int var4) {  
    int var5;  
    do {  
        var5 = this.getIntVolatile(var1, var2);  
    } while(!this.compareAndSwapInt(var1, var2, var5, var4));  
    return var5;  
}
  • compareAndSet 方法是调用unsafe类的compareAndSwapInt方法进行cas修改值
  • weakCompareAndSet 在jdk1.8下和compareAndSet方法一样,在1.9之后新增了一个Possibly单词的描述
  • getAndIncrement、getAndDecrement、getAndAdd、incrementAndGet、decrementAndGet 这五个方法都是调用unsafe类的getAndAddInt进行赋值,在unsafe类中的实现一样是通过cas进行数据操作
public final int getAndAddInt(Object var1, long var2, int var4) {  
    int var5;  
    do {  
        var5 = this.getIntVolatile(var1, var2);  
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));  

    return var5;  
}
  • getAndUpdate、updateAndGet、getAndAccumulate、accumulateAndGet这四个方法都是根据入参的Function对数据进行计算后通过自旋cas将数值进行赋值
/**  
* Atomically updates the current value with the results of  
* applying the given function, returning the previous value. The  
* function should be side-effect-free, since it may be re-applied  
* when attempted updates fail due to contention among threads.  
*  
* @param updateFunction a side-effect-free function  
* @return the previous value  
* @since 1.8  
*/  
public final int getAndUpdate(IntUnaryOperator updateFunction) {  
    int prev, next;  
    do {  
        prev = get();  
        next = updateFunction.applyAsInt(prev);  
    } while (!compareAndSet(prev, next));  
    return prev;  
}

原子类操作都和AtomicInteger类似,主要基于cas操作

locks

【生吃源码】java.util.concurrent?(内容很长)

locks就是锁,Lock接口是这样描述的

锁是一种工具,用于控制多个线程对共享资源的访问。通常,锁提供对共享资源的独占访问:一次只有一个线程可以获得锁,所有对共享资源的访问都需要首先获得锁。但是,有些锁可能允许并发访问共享资源,例如ReadWriteLock的读锁。

lock接口

locke接口定义了这些方法

  • void lock() 获取锁
  • void lockInterruptibly() 除非当前线程被中断,否则获取锁。如果锁可用,获取锁并立即返回。
  • boolean tryLock() 尝试获取锁
  • boolean tryLock(long time, TimeUnit unit)带超时时间的获取锁
  • void unlock()释放锁
  • Condition newCondition()获取一个新的Condition

Condition

  • Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式

【生吃源码】java.util.concurrent?(内容很长)

juc.lock包提供了以下几种锁

【生吃源码】java.util.concurrent?(内容很长)

  • ReentrantLock 可重入互斥锁
  • ReentrantReadWriteLock 可重入读写锁
  • StampedLock 读写锁,不允许重入,乐观锁

先来看看 AbstractQueuedSynchronizer

public class ReentrantLock implements Lock, java.io.Serializable {  
private static final long serialVersionUID = 7373984872572414699L;  
/** Synchronizer providing all implementation mechanics */  
private final Sync sync;  
  
/**  
* Base of synchronization control for this lock. Subclassed  
* into fair and nonfair versions below. Uses AQS state to  
* represent the number of holds on the lock.  
*/  
abstract static class Sync extends AbstractQueuedSynchronizer {
....

ReentrantLock锁对象中包含了一个final修饰的Sync对象,Sync继承来时AbstractQueuedSynchronizer,AbstractQueuedSynchronizer就是耳熟能详的AQS

这里先浅浅了解一下AQS再继续,AbstractQueuedSynchronizer抽象类注释上是这样描述的 为实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)提供了一个框架。该类被设计为大多数类型的同步器的有用基础,这些同步器依赖于单个原子int值来表示状态。子类必须定义改变这种状态的受保护方法,这些方法定义了该对象被获取或释放时该状态的含义。考虑到这些,这个类中的其他方法执行所有排队和阻塞机制。子类可以维护其他状态字段,但是只有使用getState、setState和compareAndSetState方法操纵的自动更新的int值才会跟踪同步。

  • AbstractQueuedSynchronizer中定义了一个子类Node,Node是等待队列节点类。队列结构如下图

【生吃源码】java.util.concurrent?(内容很长)

  • Node对象包含了以下属性
    • waitStatus 节点状态 取消:1 阻塞:-1 等待条件 -2 无条件向后传播 -3
    • prev 前一个节点
    • next 下一个节点
    • thread 使用该节点排队的线程。 构造时初始化,使用后无效。
    • nextWaiter 链接到下一个等待条件的节点,或特殊值SHARED。

无条件向后传播 在共享模式的释放方法doReleaseShared中这样解释到: 确保一个发布得到传播,即使有其他正在进行的获取发布。这以通常的方式进行,如果需要信号,则尝试unparkSuccessor of head。但如果没有,则将status设置为PROPAGATE,以确保在发布时继续传播

  • 提供同步状态的修改操作方法:getState、setState、compareAndSetState,操作方式类似于原子类
  • 提供对头(head)、尾(tail)节点的cas设置操作
    • 比如对同步等待队列操作,通过源码我们可以发现也是基于unsafe类的cas操作对队列进行修改

private Node enq(final Node node) {  
    for (;;) {  
        Node t = tail;  
        if (t == null) { // Must initialize  
            if (compareAndSetHead(new Node()))  
                tail = head;  
        } else {  
            node.prev = t;  
            if (compareAndSetTail(t, node)) {  
                t.next = node;  
                return t;  
            }  
        }  
    }  
}  
  

private Node addWaiter(Node mode) {  
    Node node = new Node(Thread.currentThread(), mode);  
    // Try the fast path of enq; backup to full enq on failure  
    Node pred = tail;  
    if (pred != null) {  
        node.prev = pred;  
        if (compareAndSetTail(pred, node)) {  
            pred.next = node;  
            return node;  
        }  
    }  
    enq(node);  
    return node;  
}

/**  
* CAS head field. Used only by enq.  
*/  
private final boolean compareAndSetHead(Node update) {  
    return unsafe.compareAndSwapObject(this, headOffset, null, update);  
}  
  
/**  
* CAS tail field. Used only by enq.  
*/  
private final boolean compareAndSetTail(Node expect, Node update) {  
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);  
}
  • AQS支持2种模式 独占模式共享模式

以ReentrantLock的非公平锁NonfairSync为例

/**  
* 执行非公平的 tryLock。 tryAcquire 是在子类中实现的,但是都需要对 trylock 方法进行非公平尝试。 
*/  
final boolean nonfairTryAcquire(int acquires) {  
    final Thread current = Thread.currentThread();  
    int c = getState();  
    if (c == 0) {  
        if (compareAndSetState(0, acquires)) {  
            setExclusiveOwnerThread(current);  
            return true;  
        }  
    }  
    else if (current == getExclusiveOwnerThread()) {  
        int nextc = c + acquires;  
        if (nextc < 0) // overflow  
            throw new Error("Maximum lock count exceeded");  
        setState(nextc);  
        return true;  
    }  
    return false;  
}

可以从源码看到

  • 如果aqs的state==0,就通过cas操作把state由0设置成acquires
  • 否则判断是不是当前线程独占的锁,是的话对state进行累加,不是则获取失败

独占锁就是这个锁只能自己使用

共享模式ReentrantReadWriteLock为例子

/**
* 1. 如果写锁由另一个线程持有,则失败
* 2. 否则,该线程就有资格进入锁wrt状态
* 3. 如果步骤2失败,因为线程显然不符合条件,或者CAS失败或计数饱和,则链接到具有完整重试循环的版本
*/
protected final int tryAcquireShared(int unused) {  
    Thread current = Thread.currentThread();  
    int c = getState();  
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)  
        //失败
        return -1;  
    int r = sharedCount(c);  
    if (!readerShouldBlock() &&  r < MAX_COUNT &&  compareAndSetState(c, c + SHARED_UNIT)) {  
        if (r == 0) {  
            firstReader = current;  
            firstReaderHoldCount = 1;  
        } else if (firstReader == current) {  
            firstReaderHoldCount++;  
        } else {  
            HoldCounter rh = cachedHoldCounter;  
            if (rh == null || rh.tid != getThreadId(current))  
                cachedHoldCounter = rh = readHolds.get();  
            else if (rh.count == 0)  
                readHolds.set(rh);  
            rh.count++;  
        }  
        return 1;  
    }  
    //继续尝试
    return fullTryAcquireShared(current);  
}  
  
/**  
* 处理在tryacquirered中没有处理的CAS缺失和可重入读取  
*/  
final int fullTryAcquireShared(Thread current) {  
    HoldCounter rh = null;  
    for (;;) {  
        int c = getState();  
        if (exclusiveCount(c) != 0) {  
            if (getExclusiveOwnerThread() != current)  
                return -1;  
        } else if (readerShouldBlock()) {  
            if (firstReader == current) {
                  //assert firstReaderHoldCount > 0;
            } else {  
                if (rh == null) {  
                    rh = cachedHoldCounter;  
                    if (rh == null || rh.tid != getThreadId(current)) {  
                        rh = readHolds.get();  
                        if (rh.count == 0)  
                            readHolds.remove();  
                    }  
                }  
                if (rh.count == 0)  
                    return -1;  
            }  
        }  
        if (sharedCount(c) == MAX_COUNT)  
            throw new Error("Maximum lock count exceeded");  
        if (compareAndSetState(c, c + SHARED_UNIT)) {  
                if (sharedCount(c) == 0) {  
                    firstReader = current;  
                    firstReaderHoldCount = 1;  
                } else if (firstReader == current) {  
                    firstReaderHoldCount++;  
                } else {  
                if (rh == null)  
                    rh = cachedHoldCounter;  
                if (rh == null || rh.tid != getThreadId(current))  
                    rh = readHolds.get();  
                else if (rh.count == 0)  
                    readHolds.set(rh);  
                rh.count++;  
                cachedHoldCounter = rh; // cache for release  
            }  
            return 1;  
        }  
    }  
}

回到aqs源码中可以看到

public final void acquireShared(long arg) {  
    if (tryAcquireShared(arg) < 0)  
        doAcquireShared(arg);  
}

tryAcquireShared 失败的话,会进入doAcquireShared方法

private void doAcquireShared(long arg) {  
    final Node node = addWaiter(Node.SHARED);  
    boolean failed = true;  
    try {  
        boolean interrupted = false;  
        for (;;) {  
            final Node p = node.predecessor();  
            if (p == head) {  
                //尝试获取锁
                long r = tryAcquireShared(arg); 
                //是否获取成功
                if (r >= 0) {  
                    //设置为等待队列的头
                    setHeadAndPropagate(node, r);  
                    p.next = null; // help GC  
                    if (interrupted)  
                        //中断线程
                        selfInterrupt();  
                    failed = false;  
                    return;  
                  }  
            }  
            //判断获取失败是否需要中断线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())  
                interrupted = true;  
        }  
    } finally {  
        if (failed)  
            cancelAcquire(node);  
    }  
}

doAcquireShared就是对锁的重新获取和处理获取失败的情况(是自旋获取的

再来看下锁释放,依旧以

/**
*
public final boolean releaseShared(int arg) {  
    if (tryReleaseShared(arg)) {  
        doReleaseShared();  
        return true;  
    }  
    return false;  
}


private void doReleaseShared() {  
    for (;;) {  
        Node h = head;  
        if (h != null && h != tail) {  
            int ws = h.waitStatus;  
        if (ws == Node.SIGNAL) {  
            //自旋 cas  修改当前节点状态由 SIGNAL -1 变为 0 
            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  
                continue; 
             //唤醒后面的节点
            unparkSuccessor(h);  
        }  
        else if (ws == 0 &&  !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
            continue; 
        }  
        //head没变化退出循环,否则会再次自旋
        if (h == head) 
            break;  
    }  
}


通过上面的源码可以了解到 共享锁模式下,多个线程可同时获取锁,但是受共享次数的限制

接下来我们来具体看看锁

ReentrantLock


public class ReentrantLock implements Lock, java.io.Serializable {  
    private static final long serialVersionUID = 7373984872572414699L;  
    private final Sync sync;
      ....

ReentrantLock锁内定义了一个Sync对象,还提供了Sync的两个实现方法,公平和非公平


//公平
static final class FairSync extends Sync {  
    final void lock() {  
        acquire(1);  
    }

    @ReservedStackAccess  
    protected final boolean tryAcquire(int acquires) {  
        final Thread current = Thread.currentThread();  
        int c = getState();  
        if (c == 0) {  
            if (!hasQueuedPredecessors() &&  compareAndSetState(0, acquires)) {  
                setExclusiveOwnerThread(current);  
                return true;  
            }  
        }  
        else if (current == getExclusiveOwnerThread()) {  
            int nextc = c + acquires;  
            if (nextc < 0)  
                throw new Error("Maximum lock count exceeded");  
            setState(nextc);  
            return true;  
        }  
        return false;  
    }
}

//非公平
static final class NonfairSync extends Sync {  
private static final long serialVersionUID = 7316153563782823691L;  
  
    @ReservedStackAccess  
    final void lock() {  
        if (compareAndSetState(0, 1))  
            setExclusiveOwnerThread(Thread.currentThread());  
        else  
            acquire(1);  
    }
    
    protected final boolean tryAcquire(int acquires) {  
        return nonfairTryAcquire(acquires);  
    }
}

//Sync自带了一个非公平加锁方法
@ReservedStackAccess  
final boolean nonfairTryAcquire(int acquires) {  
    final Thread current = Thread.currentThread();  
    int c = getState();  
    if (c == 0) {  
        if (compareAndSetState(0, acquires)) {  
            setExclusiveOwnerThread(current);  
            return true;  
        }  
    }  
    else if (current == getExclusiveOwnerThread()) {  
        int nextc = c + acquires;  
        if (nextc < 0) // overflow  
            throw new Error("Maximum lock count exceeded");  
        setState(nextc);  
        return true;  
     }  
     return false;  
}
//Sync还有一个释放方法,通过protected修饰
@ReservedStackAccess  
protected final boolean tryRelease(int releases) {  
    int c = getState() - releases;  
    if (Thread.currentThread() != getExclusiveOwnerThread())  
        throw new IllegalMonitorStateException();  
    boolean free = false;  
    if (c == 0) {  
        free = true;  
        setExclusiveOwnerThread(null);  
    }  
    setState(c);  
    return free;  
}

@ReservedStackAccess 是标记临界区

ReentrantLock 提供了两种构造方法,默认为非公平,公平和非公平的区别是在获取锁的时候,是否乖乖排队,非公平锁会先尝试cas抢占

public ReentrantLock() {  
    sync = new NonfairSync();  
}  

public ReentrantLock(boolean fair) {  
    sync = fair ? new FairSync() : new NonfairSync();  
}

在释放锁上,他们用的都是同样的方法

@ReservedStackAccess  
protected final boolean tryRelease(int releases) {  
    int c = getState() - releases;  
    if (Thread.currentThread() != getExclusiveOwnerThread())  
        throw new IllegalMonitorStateException();  
    boolean free = false;  
    if (c == 0) {  
        free = true;  
        setExclusiveOwnerThread(null);  
    }  
    setState(c);  
    return free;  
}

即对Sync对象的state进行修改

ReentrantReadWriteLock

public class ReentrantReadWriteLock implements ReadWriteLock,

ReentrantReadWriteLock实现的读写锁ReadWriteLock,也提供是是否公平锁的创建方式

public ReentrantReadWriteLock(boolean fair) {  
    sync = fair ? new FairSync() : new NonfairSync();  
    readerLock = new ReadLock(this);  
    writerLock = new WriteLock(this);  
}

他和ReentrantLock的区别就是,他分为读锁和写锁,来看看重要的Sync类实现

abstract static class Sync extends AbstractQueuedSynchronizer {

static final int SHARED_SHIFT = 16;  
static final int SHARED_UNIT = (1 << SHARED_SHIFT);  
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;  
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
....

可以看到,他和ReentrantLock的Sync对象有较大的区别,多了一些定义的静态常量,通过int的高16位为共享锁数量,低16位为独占锁数量,同一个字段来运算来记录锁状态

/** 返回以count表示的共享持有的数量 */  
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }  
/** 返回以count表示的独占保存的个数 */  
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

先来看看读锁ReadLock

public void lock() {  
    sync.acquireShared(1);  
}


//Sync里的实现,类似于tryAcquireShared
final boolean tryReadLock() {  
    Thread current = Thread.currentThread();  
    for (;;) {  
        int c = getState();  
        if (exclusiveCount(c) != 0 &&  getExclusiveOwnerThread() != current)  
            return false;  
        int r = sharedCount(c);  
        if (r == MAX_COUNT)  
            throw new Error("Maximum lock count exceeded");  
        if (compareAndSetState(c, c + SHARED_UNIT)) {  
            if (r == 0) {  
                firstReader = current;  
                firstReaderHoldCount = 1;  
            } else if (firstReader == current) {  
                firstReaderHoldCount++;  
            } else {  
                HoldCounter rh = cachedHoldCounter;  
                if (rh == null || rh.tid != getThreadId(current))  
                    cachedHoldCounter = rh = readHolds.get();  
                else if (rh.count == 0)  
                    readHolds.set(rh);  
                rh.count++;  
        }  
        return true;  
        }  
    }  
}

//释放锁
public void unlock() {  
    sync.releaseShared(1);  
}

Sync继承于AQS,读锁依赖于AQS的共享锁来实现

看完读锁,我们来看看写锁WriteLock

public void lock() {  
    sync.acquire(1);  
}

public boolean tryLock( ) {  
    return sync.tryWriteLock();  
}

public void unlock() {  
    sync.release(1);  
}


//Sync的tryWriteLock  和aqs的独占锁类似
final boolean tryWriteLock() {  
    Thread current = Thread.currentThread();  
    int c = getState();  
    if (c != 0) {  
        int w = exclusiveCount(c);  
        if (w == 0 || current != getExclusiveOwnerThread())  
            return false;  
        if (w == MAX_COUNT)  
            throw new Error("Maximum lock count exceeded");  
    }  
    if (!compareAndSetState(c, c + 1))  
        return false;  
    setExclusiveOwnerThread(current);  
    return true;  
}

写锁的独占模式下的aqs在公平和非公平锁的锁方法上注释

This is identical in effect to tryAcquire except for lack of calls to writerShouldBlock

writerShouldBlock是什么?

在非公平锁的时候是这样实现的,非公平锁获取写锁时,都是不需要阻塞的

final boolean writerShouldBlock() {  
    return false; // writers can always barge  
}

在公平锁中,会进行判断

final boolean writerShouldBlock() {  
    return hasQueuedPredecessors();  
}

public final boolean hasQueuedPredecessors() {  
    Node t = tail; // Read fields in reverse initialization order  
    Node h = head;  
    Node s;  
    //头!=尾  并且  头的下一个节点不能用为或者下一个节点的持有线程是当前线程
    return h != t &&  ((s = h.next) == null || s.thread != Thread.currentThread());  
}

StampedLock

StampedLock在类头的注释是这样描述到:

  • 一种基于功能的锁,具有三种控制读写访问的模式。StampedLock的状态由版本和模式组成。锁获取方法返回一个表示并控制与锁状态相关的访问的戳记;这些方法的“try”版本可能会返回特殊值零,以表示获取访问失败。锁释放和转换方法需要戳记作为参数,如果戳记与锁的状态不匹配,则会失败。
    • 写锁:方法writeLock可能阻塞等待独占访问,并返回一个戳,该戳可在方法unlockWrite中使用以释放锁。还提供了tryWriteLock的非定时和定时版本。当锁保持在写模式时,不能获得读锁,并且所有乐观读验证都将失败。
    • 读锁:方法readLock可能会阻塞等待非独占访问,并返回一个戳,该戳可在方法unlockRead中使用以释放锁。还提供了tryReadLock的非定时和定时版本
    • 乐观读:方法tryOptimisticRead仅在锁当前未处于写模式时返回非零戳。方法validate返回true,如果在获得给定戳记之后还没有在写模式下获得锁。这种模式可以被认为是读锁的一个极其弱的版本,可以在任何时候被写入器打破。对短的只读代码段使用乐观模式通常会减少争用并提高吞吐量。然而,它的使用本身就很脆弱。乐观读节应该只读取字段,并将它们保存在局部变量中,以备验证后使用。在乐观模式下读取的字段可能非常不一致,因此只有当您足够熟悉数据表示以检查一致性和或重复调用validate()方法时才适用。例如,当第一次读取对象或数组引用,然后访问其中一个字段、元素或方法时,通常需要这样的步骤。

在StampedLock 读写的lock和ReadWriteLock接口类似,来看看新提供的乐观读(tryOptimisticRead

/**  
* 返回一个稍后可验证的戳,如果是独占锁定则返回零。
* @return a stamp, or zero if exclusively locked  
*/  
public long tryOptimisticRead() {  
    long s;  
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;  
}

在类头注释中给出了这样的案例

class Point {  
     private double x, y;  
     private final StampedLock sl = new StampedLock();  

    // an exclusively locked method  
     void move(double deltaX, double deltaY) { 
         long stamp = sl.writeLock();  
         try {  
             x += deltaX;  
             y += deltaY;  
         } finally {  
             sl.unlockWrite(stamp);  
         }  
    }  
  
     // A read-only method  
     double distanceFromOrigin() {
         long stamp = sl.tryOptimisticRead();  
         double currentX = x, currentY = y;  
         //重点看这里,调用了StampedLock的validate,失败就升级为悲观读
         if (!sl.validate(stamp)) {  
             stamp = sl.readLock();  
             try {  
                 currentX = x;  
                 currentY = y;  
             } finally {  
                 sl.unlockRead(stamp);  
             }  
         }  
         return Math.sqrt(currentX * currentX + currentY * currentY);  
     }  
  
     void moveIfAtOrigin(double newX, double newY) { // upgrade  
         // Could instead start with optimistic, not read mode  
         long stamp = sl.readLock();  
         try {  
             while (x == 0.0 && y == 0.0) {  
                 long ws = sl.tryConvertToWriteLock(stamp);  
                 if (ws != 0L) {  
                     stamp = ws;  
                     x = newX;  
                     y = newY;  
                     break;  
                 }  
                 else {  
                     sl.unlockRead(stamp);  
                     stamp = sl.writeLock();  
                 }  
             }  
         } finally {  
             sl.unlock(stamp);  
         }  
     }  
}}

在这个例子中validate方法StampedLock是这样定义的

/**
* 如果自给定图章发布以来锁未被独占获取,则返回true。如果戳记为零,则始终返回false。如果戳记代表当前持有
* 的锁,则总是返回true。使用不是从tryOptimisticRead或锁定方法获得的值调用此方法没有定义的效果或结果。
*/
public boolean validate(long stamp) {  
    U.loadFence();  
    return (stamp & SBITS) == (state & SBITS);  
}

为什么需要升级?我们来看看他的乐观读锁


private static final int LG_READERS = 7;
private static final long WBIT = 1L << LG_READERS;  
private static final long RBITS = WBIT - 1L;  
private static final long SBITS = ~RBITS;

public long tryOptimisticRead() {  
    long s;  
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;  
}

乐观读锁尝试获取锁的时候,返回的是一个用于校验的戳,如果校验通过在说明 这期间没有其他的写操作数据是安全的

线程安全的集合

基于lock实现的数据读写安全

队列

  • ArrayBlockingQueue
    • 和普通队列的差别是数据安全
    • 构造方法可以设定容量,锁是否公平,默认容量1~Integer.MAX,默认使用非公平锁
    • 使用ReentrantLock的lockInterruptibly方法获取锁,获取失败则休眠等待
    • 无批量操作例如addAll……等
  • ConcurrentLinkedDeque、ConcurrentLinkedQueue
    • 数据是并发安全的
    • 基于unsafe类的cas操作node节点
  • LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、CopyOnWriteArrayList、CopyOnWriteArraySet
    • 基于ReentrantLock实现的数据操作安全
  • LinkedTransferQueue
    • 实现BlockingQueue并增加自己的Transfer方法
    • 所有队列操作方法基于自身方法xfer

xfer 是通过2层循环对队列进行匹配,需要注意的是只有入队、传递方法和出队方法进行匹配,匹配成功就唤醒下一个waiter后返回,如果没匹配到,根据调用方法时how的值去选择直接返回(NOW),或者添加结点再返回(ASYNC)、等待被后来的请求匹配(AYNC、TIMED)直到匹配成功、达到界限后返回。

Map

  • ConcurrentHashMap(1.8版本)
    • put
      • 如果table为空,初始化一个node数组通过cas写入节点
      • 不为空的话,通过cas插入new出来的node
      • 替换的话,synchronized锁原节点
  • ConcurrentSkipListMap、ConcurrentSkipListSet
    • 基于cas对数据进行操作

线程池

public interface Executor {  
  
    /**  
    * 在将来的某个时间执行给定的命令。命令可以在新线程中执行,也可以在池线程中执行,或者在调用线程中执行,这由Executor实现决定。
    *  
    */  
    void execute(Runnable command);  
}

Executor接口定义了一个task提交方法execute jdk8下,ExecutorService继承了Executor并定义了相关线程池方法 AbstractExecutorService提供了ExecutorService的默认实现 JUC中

  • ThreadPoolExecutor:实现了ExecutorService接口,封装线程池的相关机制。
  • ScheduledExecutorService:实现了ExecutorService接口,拓展了定时能力。
  • ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor并实现ScheduledExecutorService接口
  • ForkJoinPool:是一种支持任务分解的线程池,一般配合接口ForkJoinTask使用。

ThreadPoolExecutor

ThreadPoolExecutor的构造方法如下

public ThreadPoolExecutor(int corePoolSize,  
                         int maximumPoolSize,  
                         long keepAliveTime,  
                         TimeUnit unit,  
                         BlockingQueue<Runnable> workQueue,  
                         ThreadFactory threadFactory,  
                         RejectedExecutionHandler handler)
  • corePoolSize: 线程池核心线程数最大值
  • maximumPoolSize: 线程池最大线程数大小
  • keepAliveTime: 线程池中非核心线程空闲的存活时间大小
  • unit: 线程空闲存活时间单位
  • workQueue: 存放任务的阻塞队列
  • threadFactory: 用于设置创建线程的工厂,可以给创建的线程设置有意义的名字,可方便排查问题。
  • handler: 线城池的饱和策略事件,主要有四种类型。

【生吃源码】java.util.concurrent?(内容很长)

  • AbortPolicy(抛出异常)
  • DiscardPolicy(直接丢弃任务)
  • DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)
  • CallerRunsPolicy(交给线程池调用所在的线程进行处理)

默认的线程工厂是实现在Executors类中的静态类DefaultThreadFactory

static class DefaultThreadFactory implements ThreadFactory {  
    private static final AtomicInteger poolNumber = new AtomicInteger(1);  
    private final ThreadGroup group;  
    private final AtomicInteger threadNumber = new AtomicInteger(1);  
    private final String namePrefix;  

    DefaultThreadFactory() {  
        SecurityManager s = System.getSecurityManager();  
        group = (s != null) ? s.getThreadGroup() :  
        Thread.currentThread().getThreadGroup();  
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";  
    }  

    public Thread newThread(Runnable r) {  
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),0);  
        if (t.isDaemon())  
            t.setDaemon(false);  
        if (t.getPriority() != Thread.NORM_PRIORITY)  
            t.setPriority(Thread.NORM_PRIORITY);  
        return t;  
    }  
}

RejectedExecutionHandler的默认值是AbortPolicy【抛出异常】

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor的构造方法如下

public ScheduledThreadPoolExecutor(int corePoolSize,  
                                    ThreadFactory threadFactory,  
                                    RejectedExecutionHandler handler) {  
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  new DelayedWorkQueue(), threadFactory,handler);  
}

设定了闲线程存活时间为0,等待队列为延迟队列,他的任务提交方法如下

public Future<?> submit(Runnable task) { 
    return schedule(task, 0, NANOSECONDS);  
}

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {  
    //如果任务为空或者时间为空  抛出npe
    if (command == null || unit == null)  
        throw new NullPointerException();  
    //封装任务对象,decorateTask方法会直接返回task对象
    RunnableScheduledFuture<?> t = decorateTask(command,  new ScheduledFutureTask<Void(command, null,  triggerTime(delay, unit)));
    //提交延时间
    delayedExecute(t);  
    return t;  
}

private void delayedExecute(RunnableScheduledFuture<?> task) {  
    if (isShutdown())  
        //如果线程池已关闭,直接拒绝任务
        reject(task);  
    else {  
        //获取工作队列并把任务加入
        super.getQueue().add(task);  
        //如果线程池关闭并且线程池不能执行状态并且移除任务成功
        if (isShutdown() &&=!canRunInCurrentRunState(task.isPeriodic()) && remove(task))  
            //取消任务的执行,但不会中断任务
            task.cancel(false);  
        else  
            //调用父类的添加worker(添加运行任务)
            ensurePrestart();  
    }  
}

ScheduledExecutorService接口中对scheduleAtFixedRate的描述是

创建并执行一个周期性动作,该动作首先在给定的初始延迟之后启用

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,  
                                            long initialDelay,  
                                            long period,  
                                            TimeUnit unit) {  
    if (command == null || unit == null)  
        throw new NullPointerException();  
    if (period <= 0)  
        throw new IllegalArgumentException();  
    //封装成ScheduledFutureTask任务,并设置调用周期period
    ScheduledFutureTask<Void> sft =  new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),  
    unit.toNanos(period));  
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);  
    //设置执行的任务
    sft.outerTask = t;  
    //执行延时任务
    delayedExecute(t);  
    return t;  
}

提交到线程池后,线程池中的线程会从任务队列(DelayedWorkQueue)中获取ScheduledFutureTask执行

ScheduledFutureTask继承FutureTask,实现了RunnableScheduledFuture,提供这些参数

private final long sequenceNumber;//序号
private long time;//任务将要被执行的时间
private final long period;//任务执行周期

并且覆盖FutureTask版本的run,以便定期重置队列

public void run() {  
    //该任务是不是一次性任务
    boolean periodic = isPeriodic();  
    if (!canRunInCurrentRunState(periodic))  
        cancel(false);  
    else if (!periodic)  
        ScheduledFutureTask.super.run();  
    else if (ScheduledFutureTask.super.runAndReset()) {  
        //设置下次运行时间
        setNextRunTime(); 
        //重新提交任务
        reExecutePeriodic(outerTask);  
    }  
}

ForkJoinPool

ForkJoinPool的构造方法如下

public ForkJoinPool(int parallelism,  
                    ForkJoinWorkerThreadFactory factory,  
                    UncaughtExceptionHandler handler,  
                    boolean asyncMode) {  
                    this(checkParallelism(parallelism),  
                    checkFactory(factory),  
                    handler,  
                    asyncMode ? FIFO_QUEUE : LIFO_QUEUE,  
                    "ForkJoinPool-" + nextPoolId() + "-worker-");  
                    checkPermission();  
}

ForkJoinPool需要配合 ForkJoinWorkerThread、ForkJoinTask使用 其中ForkJoinPool负责提供容器,ForkJoinTask负责提交任务而ForkJoinWorkerThread负责处理分片任务

使用ForkJoinPool因为需要占用大量的cpu,所以在非计算密集型任务时,不建议使用

转载自:https://juejin.cn/post/7246638858912677945
评论
请登录