从AQS源码解读Java并发编程的底层机制
1. AQS简介
AQS(AbstractQueuedSynchronizer)是Java中并发包中最重要的同步组件之一,也是ReentrantLock和Semaphore等锁的核心。AQS提供了一种基于FIFO队列的排他锁和共享锁机制,同时还提供了一些方法,允许子类实现非常灵活的同步控制。
AQS中的同步控制基于以下两种操作:
- acquire:获取同步状态。该操作会阻塞线程直到同步状态可用,或者线程被中断或超时。AQS通过一个双向链表来维护阻塞线程队列。
- release:释放同步状态。该操作通常由持有同步状态的线程来执行,并且通常会唤醒队列中的一个或多个等待线程。
2. AQS原理
AQS的原理非常简单:它维护了一个volatile类型的int变量state来表示同步状态,同时使用一个FIFO队列来维护等待获取同步状态的线程。具体来说,AQS中的同步状态可以分为两种:独占和共享。
- 独占状态:在任意时刻只能由一个线程持有的同步状态,例如ReentrantLock。AQS通过设置state的值来表示独占状态,如果state的值为0表示当前没有线程持有独占锁,否则表示有一个线程持有独占锁。
- 共享状态:在同一时刻可以由多个线程同时持有的同步状态,例如Semaphore。AQS通过设置state的高16位来表示共享状态的数量,如果state的高16位为0表示当前没有线程持有共享锁,否则表示有多个线程持有共享锁。
AQS的核心是一个双向队列,该队列保存了所有等待获取同步状态的线程。当线程调用acquire方法时,如果同步状态不可用,则当前线程会被加入到等待队列中,然后线程会被阻塞直到同步状态可用。当同步状态可用时,AQS会从等待队列中唤醒一个线程,并将同步状态分配给该线程。
当一个线程获取了同步状态后,如果该线程需要阻塞,那么它会自动释放同步状态并将自己加入到等待队列的尾部。这样可以保证等待时间最长的线程最先获取到同步状态。
3. AQS源码解读
AQS的源码比较复杂,其中最核心的部分是同步队列的实现。下面我们将详细介绍AQS的源码。
3.1 AQS的状态管理
AQS的同步状态由一个volatile类型的int变量state来表示。在AQS中,state的值被分为两个部分,高16位表示共享状态的数量,低16位表示独占状态的值。因此,在AQS中,独占状态的值必须小于2^16,共享状态的数量必须小于2^16。
// state变量
private volatile int state;
3.2 AQS的同步队列
AQS使用一个FIFO双向链表来维护等待获取同步状态的线程。在AQS中,同步队列的实现是一项非常重要的工作。下面我们将详细介绍AQS同步队列的实现原理。
3.2.1 Node节点
在AQS中,每个等待获取同步状态的线程都被封装成一个Node节点。Node节点是一个双向链表节点,它有以下两个重要的属性:
- prev:指向前一个节点。
- next:指向后一个节点。
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
int waitStatus;
}
除了prev和next属性之外,Node节点还包括其他一些属性,例如:
-
thread:表示等待获取同步状态的线程。
-
waitStatus:表示节点的状态,可以为以下四种状态之一:
- CANCELLED:表示节点被取消。
- SIGNAL:表示节点所在的线程被唤醒。
- CONDITION:表示节点在条件队列中。
- PROPAGATE:表示ReleaseShared方法需要传播释放的信号。
3.2.2 同步队列的基本操作
AQS中同步队列的基本操作包括入队、出队、插入节点和删除节点。下面我们将逐个介绍这些操作。
3.2.2.1 入队操作
当一个线程需要获取同步状态但同步状态不可用时,它会被加入到等待队列中。入队操作会将当前线程封装成一个Node节点,并将该节点加入到等待队列的尾部。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter方法会首先将当前线程封装成一个Node节点,并使用CAS操作将该节点加入到等待队列的尾部。如果CAS操作失败,则会调用enq方法将节点插入到等待队列的尾部。
3.2.2.2 出队操作
当一个线程被唤醒并且成功获取了同步状态时,它需要从等待队列中移除。出队操作会将当前线程对应的节点从等待队列中移除。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
unparkSuccessor方法会首先将当前线程对应的节点的waitStatus设置为0,表示节点已经被唤醒。然后它会找到当前节点的后继节点,并将其唤醒。
3.2.2.3 插入节点操作
当一个线程需要获取同步状态但同步状态不可用时,它会被加入到等待队列中。插入节点操作会将一个节点插入到等待队列的尾部。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法会先尝试使用CAS操作将节点加入到等待队列的尾部,如果CAS操作失败则使用自旋重试。注意,enq方法会在等待队列为空时创建一个空节点作为头节点,这样可以减少特殊情况的处理。
3.2.2.4 删除节点操作
当一个线程被取消或者超时时,它需要从等待队列中移除。删除节点操作会将一个节点从等待队列中移除。
private void cancelAcquire(Node node) {
// If node doesn't exist, don't bother.
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// Otherwise, we need to splice out ourself.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
cancelAcquire方法会首先将node对应的线程设置为null。然后,它会在等待队列中寻找前驱节点pred,直到pred的waitStatus小于等于0。在这个过程中,它会将所有的pred的waitStatus设置为0,表示它们的后继节点需要被唤醒。
接下来,cancelAcquire方法会将当前节点的waitStatus设置为CANCELLED,表示当前节点已经被取消。
如果当前节点是等待队列的尾节点,那么它可以直接从等待队列中删除,只需要将tail指针指向pred即可。
否则,当前节点不是等待队列的尾节点。此时,它需要将自己从等待队列中剔除,并且唤醒它的后继节点。
具体来说,它会检查pred的waitStatus,如果它是SIGNAL或者0,那么它会尝试将pred的waitStatus设置为SIGNAL,并将pred的后继节点指向当前节点的后继节点。这样做的目的是将当前节点从等待队列中剔除。
如果pred的waitStatus不是SIGNAL或者0,那么它会直接唤醒当前节点的后继节点。
最后,它会将当前节点的next指针指向自己,这样可以帮助垃圾回收器回收节点。
3.2.2.5 AQS的共享模式
除了独占模式外,AQS还支持共享模式。共享模式可以让多个线程同时获取同步状态。
AQS中的共享模式实现了两种锁:共享锁和排他锁。共享锁可以让多个线程同时获取同步状态,但是不能修改同步状态。排他锁只能让一个线程获取同步状态,但是它可以修改同步状态。
在AQS中,实现共享模式需要扩展Node节点,为它添加一个sharedCount字段,表示获取同步状态的线程数量。当线程获取同步状态时,它需要增加sharedCount字段的值;当线程释放同步状态时,它需要减少sharedCount字段的值。
AQS中的共享模式实现了一个FIFO的等待队列,其中包含了所有等待获取同步状态的线程。当一个线程获取共享锁时,它需要检查等待队列中是否还有其他线程等待获取共享锁,如果有,那么当前线程需要等待直到它们全部释放共享锁。
在AQS中,共享模式的同步操作需要实现tryAcquireShared和tryReleaseShared两个方法。tryAcquireShared方法用于尝试获取共享锁,它返回一个整数,表示当前线程获取共享锁的状态。tryReleaseShared方法用于释放共享锁。
在AQS中,tryAcquireShared方法和tryReleaseShared方法的实现与tryAcquire方法和tryRelease方法类似,也是使用CAS操作更新同步状态。不同之处在于,tryAcquireShared方法和tryReleaseShared方法需要处理多个线程同时获取或释放共享锁的情况。
当一个线程尝试获取共享锁时,它会首先调用tryAcquireShared方法,如果返回值大于等于0,表示当前线程获取共享锁成功。如果返回值小于0,那么当前线程会加入等待队列,等待其他线程释放共享锁。
如果当前线程获取共享锁失败,它会调用shouldParkAfterFailedAcquire方法将自己挂起,并且阻塞当前线程,直到其他线程释放共享锁并唤醒它。
当一个线程释放共享锁时,它会首先调用tryReleaseShared方法,释放共享锁,并且更新同步状态。然后,它会唤醒等待队列中的所有线程,让它们重新尝试获取共享锁。
与独占模式不同的是,在共享模式中,线程需要等待所有其他线程释放共享锁才能获取共享锁。因此,共享模式比独占模式更加复杂,需要更多的同步操作。
3.3 AQS的应用
AQS作为Java中并发编程的基础,它的应用非常广泛,几乎涵盖了Java中所有的并发类。下面我们介绍几个常见的应用场景。
3.3.1 ReentrantLock
ReentrantLock是Java中一个非常常用的锁,它是通过AQS实现的。ReentrantLock可以使用独占模式和共享模式,可以重入,可以响应中断,可以定时等待。
在ReentrantLock中,lock方法和unlock方法实际上是调用了AQS中的acquire方法和release方法。tryLock方法实际上是调用了AQS中的tryAcquire方法。
3.3.2 Semaphore
Semaphore是Java中的一个信号量,它是通过AQS实现的。Semaphore可以控制并发线程的数量,它可以被多个线程共享。
在Semaphore中,acquire方法和release方法实际上是调用了AQS中的acquireShared方法和releaseShared方法。
3.3.3 CountDownLatch
CountDownLatch是Java中的一个倒计数器,它是通过AQS实现的。CountDownLatch可以让一个线程等待多个线程执行完毕后再执行。
在CountDownLatch中,await方法和countDown方法实际上是调用了AQS中的acquireShared方法和releaseShared方法。
3.3.4 CyclicBarrier
CyclicBarrier是Java中的一个栅栏,它是通过AQS实现的。CyclicBarrier可以让多个线程在同一时刻到达某个屏障,然后继续执行下一步操作。
在CyclicBarrier中,await方法实际上是调用了AQS中的acquireSharedInterruptibly方法。每个线程到达栅栏时,它会调用acquireSharedInterruptibly方法,等待所有其他线程到达栅栏。当所有线程都到达栅栏时,acquireSharedInterruptibly方法返回,线程继续执行。
3.3.5 ReentrantReadWriteLock
ReentrantReadWriteLock是Java中的一个读写锁,它是通过AQS实现的。ReentrantReadWriteLock可以使用独占模式和共享模式,读锁和写锁是互斥的。
在ReentrantReadWriteLock中,读锁和写锁实际上是使用AQS中的独占模式和共享模式实现的。读锁是共享模式,写锁是独占模式。
3.4 总结
AQS是Java中并发编程的基础,它通过同步状态和等待队列实现了锁和同步器的抽象。AQS的实现原理非常复杂,但是它提供了非常强大的同步机制,可以应用于各种场景。
在使用AQS时,需要注意以下几点:
- AQS提供了基本的同步机制,但是实际使用时需要根据具体场景进行定制化开发。
- AQS中的同步状态必须是原子的,否则会导致并发问题。
- AQS中的等待队列必须是线程安全的,否则会导致并发问题。
- AQS提供了独占模式和共享模式,可以用于实现各种锁和同步器。
- AQS的实现原理非常复杂,需要深入理解才能更好地使用和定制化开发。
通过对AQS的学习和理解,我们可以更好地掌握Java中并发编程的基础知识,更好地应对各种并发场景和问题。
转载自:https://juejin.cn/post/7221255028895678521