likes
comments
collection
share

AQS框架最最最简单流程梳理

作者站长头像
站长
· 阅读数 6

AQS框架最最最简单流程梳理

AQS是Lock的基础,之前一直想自己梳理一下,奈何一直没有静下心来梳理,这里记录并做梳理,文章基于JDK 1.8。

独占锁的上锁流程

AQS源码中给了我们一个最常见的互斥锁的代码示例:

public class MutexCustom implements Lock {

    public static final int STATUS_ORIGIN = 0;
    public static final int STATUS_TARGET = 1;
    private Sync sync;

    public MutexCustom() {
        sync = new Sync();
    }

    public static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(STATUS_ORIGIN, STATUS_TARGET)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            setState(STATUS_ORIGIN);
            setExclusiveOwnerThread(null);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {return true;}

        public Condition newCondition() {return new ConditionObject();}

        public int getCurrState() {return getState();}
    }

    @Override
    public void lock() {sync.acquire(STATUS_TARGET);}

    @Override
    public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(STATUS_TARGET);}

    @Override
    public boolean tryLock() {return sync.tryAcquire(STATUS_TARGET);}

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(STATUS_TARGET, unit.toNanos(time));
    }

    @Override
    public void unlock() {sync.release(STATUS_TARGET);}

    @Override
    public Condition newCondition() {return sync.newCondition();}

    public int getState() {return sync.getCurrState();}
}

我们先来关注最基础的最基础的最基础的锁的功能,也就是acquire获取锁release释放锁

acquire获取锁
    public final void acquire(int arg) {
    	// Node.EXCLUSIVE为null
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这里有三个主要的方法:

  1. tryAcquire尝试获取锁,得到true,也就是获取到锁直接返回,也就是不会走里面的方法体,直接执行临界区的代码了。
  2. tryAcquire返回false,执行addWaiter,顾名思义,把自己加到等待的nodeList最后面
  3. acquireQueued

首先明确锁是怎么生效的,如果这3个方法都很快的返回了结果,不管返回的是true还是false,都会继续往下执行,区别只不过是执不执行selfInterrupt()方法罢了,并没有影响当前线程继续往后执行后续方法,也就是临界区代码。

如果这三个方法里面有某一个方法没有很快的返回结果,而是把自己困在了一个while或者for(;;)循环里面,或者使用LockSupport.park()方法,那么这个线程就阻塞住了,不能继续往后执行后续的临界区代码,那这个时候就实现了阻塞。

在合理的逻辑下竞争和释放这个阻塞的代码就实现了锁。 那究竟是在哪个方法里面阻塞住了呢?我们挨个方法去看:

1.1. tryAcquire
// 需要被重写,给出获取资源逻辑
// 比如最简单的互斥锁,只需要在state为0的时候compareAndSetState(0,1)成功返回true就好了。
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

这个没有什么好说的,既然AQS是实现Lock的基础框架,那肯定就要把逻辑代码空出来让开发者自己去实现,返回true表示竞争到了锁,可以执行临界区代码,反之没有竞争到锁,需要等待锁的释放并重新竞争。

1.2. addWaiter
    private Node addWaiter(Node mode) {
    	// mode如果是Node.EXCLUSIVE则为null,so,node的next现在为null
        Node node = new Node(Thread.currentThread(), mode);
        // 尝试快速插入到最尾端,也就是把新创建的node放到tail的next,然后用node取代tail。
        // tail是全局变量,初始化为null
        Node pred = tail;
        // 前提是tail不为空
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果tail为空则直接走enq方法。
        enq(node);
        return node;
    }

    // 其实比快速插入不一样的就是如果没有tail,把tail赋值成head,然后再继续append
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
            	// 初始化状态tail为null,给head设置一个new Node(),同时把tail也指向这个head
            	// tail\head同为全局变量,初始化都为null
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

看着注释理一遍: addWaiter方法顾名思义,就是把自己的node插入到链表的最尾端,加入抢占资源的排队序列,等待抢占资源。 需要注意的是,tail / head都是全局变量,都是初始化为null,刚开始运行的时候tail / head都为null,如果tail为null,会走enq方法,做了两件事

  1. tail为null,先把head初始化为一个new Node(),同时暂时把tail赋值为head。
  2. 把当前node赋值为head的next,同时当前node作为tail。

注意返回值是t,而不是当前node,也就是返回node的preNode,是t,上述情况下当前链表有两个值head->tail,返回的应该是是head而不是tail。当然这里调用enq方法并没有使用它的返回值。addWaiter方法返回值是当前node。

所以从空链表初始化结束后,链表里应该有两个节点,而不是只有一个head节点,保证了工作节点能获取到一个pre节点。

compareAndSetHead和compareAndSetTail这些CAS方法都不是阻塞的,都是Unsafe方法实现,所谓乐观锁,不用关心脏数据,只需关心成功或者失败。

1.3. acquireQueued

这里附上一个Node状态的表格:

类型说明
CANCELLED1当前节点由于超时或者中断被取消,节点进入这个状态以后将保持不变。 注:这是唯一大于0的值,很多判断逻辑会用到这个特征
SIGNAL-1后继节点会依据此状态值来判断自己是否应该阻塞、当前节点的线程如果释放了同步状态或者被取消、会通知后继节点、后继节点会获取锁并执行(当一个节点的状态为SIGNAL时就意味着其后置节点应该处于阻塞状态)
CONDITION-2当前节点正处在等待队列中,在条件达成前不能获取锁。
PROPAGATE-3当前节点获取到锁的信息需要传递给后继节点,共享锁模式使用该值
INITIAL0节点初始状态,默认值,表示当前节点在sync同步阻塞队列中,等待着获取锁
// 放入等待队列
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                	// 如果前一个节点是head并且这时获取到了临界资源,把当前node设置为head,head的next置为null
                	// 两个条件:
                	// 			1. node是head的next节点,也就是轮到它了。
                	// 			2. tryAcquire(arg)得到了资源,也就是获取到了锁。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 注意哈,这里是&&,也就是shouldParkAfterFailedAcquire方法返回true才会执行parkAndCheckInterrupt,两个方法同时返回true才会执行interrupted = true
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 检测失败的时候是否需要阻塞自己,我要睡觉了,等人来叫醒我再检测是不是需要排队。
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	// 注意,ws是上一个节点的status
        int ws = pred.waitStatus;
        // SINGAL: 此node的后续node通过park把阻塞住了,所以当前node在release或者取消acquire的时候必须unpark一下后续的node。
        // (当一个节点的状态为SIGNAL时就意味着其后置节点应该处于阻塞状态)
        if (ws == Node.SIGNAL)
        	// 前置节点已经设置了SIGNAL,可以阻塞我的线程了,后续检测到Node.SIGNAL会唤醒我的线程
            return true;
        if (ws > 0) {
            // 前置节点被设置为cancel了,跳过pre,我来当pre, ^_^
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 检测到自己是INITAL或者PROPAGATE,把pre的status设置为SIGNAL,继续循环
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    //park睡觉了,睡醒了之后检测是否需要中断。
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

注释很清晰了,梳理一下:

acquireQueued方法顾名思义,尝试获取锁,获取不到就进入队列等待。

  1. acquire处在一个for(;;)死循环中, 获取锁需要两个条件: 1.1. 前置节点是head,也就是轮到我自己获取锁了 1.2. tryAcquire(arg)==true,也就是竞争到了锁
  2. 没必要一直尝试获取锁,把自己标记为需要唤醒,同时调用park阻塞,告诉predecessor,需要的时候unpark我。

LockSupport.park(this);之后当前线程会阻塞,直到有unpark操作才会继续往后执行。所以在for(;;)死循环中并没有一直尝试获取锁,在获取不到之后会阻塞自己,并把node的状态标记为SIGNAL。后续看到node的状态为SIGNAL,在自己的锁释放后会调用unpark唤醒自己继续执行for(;;)中没有执行完的部分,继续尝试tryAcquire获取锁,这时就能获取到锁并结束循环了。 这就是最基础的获取锁的功能。

release释放锁

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // head不为空,并且status不是初始状态,则需要唤醒unpark一下下一个节点。否则下个节点还没有阻塞,不需要unpark唤醒。
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    // 需要unpark唤醒下一个节点
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 需要unpark的Thread被当前node的继任node持有,通常情况下是下一个node,但是有可能是cancel状态或者是null,
        // 从尾部逆序往前遍历以找到状态不是canceled的继任者。
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒下个节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }

释放锁的流程看起来就简单了一些。tryRelease成功释放锁之后,找到下个节点,唤醒他。

至于为什么需要从后往前遍历而不是从前往后遍历:

	private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //看这一块的代码
                node.prev = t;
                // ①
                if (compareAndSetTail(t, node)) {
                    // ②
                    t.next = node;
                    return t;
                }
            }
        }
    }

这段代码是往tail插入新的next节点,使用CAS保证安全,如果当前线程执行到了②,但是这时有一个新线程执行到了①,新线程只执行了把node的pre指向了tail,这时tail的next仍然为null,这时tail变了再执行CAS就会失败了。 这种情况下从后往前遍历是能够找到的,所以是安全的。

参考链接: www.zhihu.com/question/50… blog.csdn.net/anlian523/a… blog.csdn.net/java_lifeng…