并发-AQS之ReentrantReadWriteLock源码解读(一)
ReentrantReadWriteLock是Java中的一个可重入读写锁,它允许多个线程同时读取一个共享资源,但只允许一个线程写入该共享资源,当一个线程持有写锁时,任何其他线程都不能持有读或写锁。
该锁具有以下特点:
- 可重入性:线程可以多次获得同一个锁。
- 公平性:可选择公平或非公平模式。
- 读写分离:支持多个线程同时读取共享资源,但只允许一个线程写入共享资源。
- 锁降级:写锁可以降级为读锁,但读锁不能升级为写锁。
使用ReentrantReadWriteLock可以提高并发访问性能,因为多个线程可以同时读取共享资源,而不会相互干扰。但是,在写操作期间,所有读和写的访问都会被暂停,直到写操作完成。 其实现较为复杂,UML类图如下
这张图更为形象
读写锁特性如下
是否互斥 | 读 | 写 |
---|---|---|
读 | 否 | 是 |
写 | 是 | 是 |
如何实现读写锁,下面我们来依据源码一一道来
构造函数
//默认构造方法
public ReentrantReadWriteLock() {
this(false);
}
//是否使用公平锁的构造方法
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
成员变量设置如下
//读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
//写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
//继承AQS锁实现
final Sync sync;
Sync
Sync是ReentrantReadWriteLock的内部静态类,它是ReentrantReadWriteLock的核心实现。Sync继承自AbstractQueuedSynchronizer(AQS),并重写了其方法来实现读写锁的语义。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
//表示共享锁状态在锁状态中占用的二进制位数(16位),即共享锁状态的左移位数
static final int SHARED_SHIFT = 16;
//表示每个共享锁状态的数量(1 << SHARED_SHIFT)
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//表示共享锁状态的最大数量((1 << SHARED_SHIFT) - 1)。
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//返回一个掩码,用于提取锁状态中的排它锁状态(即低16位)
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
...
量和方法的作用是将锁状态分成两部分:高16位表示读锁状态,低16位表示写锁状态。 这种方式,可以高效地实现读写分离的锁机制。在获取和释放锁时,可以根据不同的状态进行不同的处理,以保证锁的正确性和高效性。 Sync有两个实现,非公平锁和公平锁
//非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
//公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
ReadLock、WriteLock
//读锁
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
...
//写锁
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
读锁和写锁是私有属性,通过这两个方法暴露出去
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
非公平锁读写锁加锁解锁
读锁ReadLock
加锁
获取读锁。 如果写锁没有被另一个线程持有,则立即获取读锁并返回。 如果写锁被另一个线程持有,则当前线程为了线程调度目的变为无效状态,并处于休眠状态,直到读锁被获取。
lock()
//ReentrantReadWriteLock.ReadLock
public void lock() {
sync.acquireShared(1);
}
共享状态获取锁,acquireShared是AQS方法
// AbstractQueuedSynchronizer
public final void acquireShared(int arg) {
// 尝试获取共享锁(返回1表示成功,返回-1表示失败)
if (tryAcquireShared(arg) < 0)
// 失败了就可能要排队等待
doAcquireShared(arg);
}
下面主要解读ReentrantReadWriteLock.Sync中重写的tryAcquireShared方法
//ReentrantReadWriteLock.Sync
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
//// 状态变量的值 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数
int c = getState();
//写锁本占用,判断是否被当前线程占用,如果不是直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//读锁获取次数
int r = sharedCount(c);
//读锁不需要block且读锁次数小于最大值,尝试更新state值
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//获取读锁成功
//如果之前还没有线程获取读锁
if (r == 0) {
// 记录第一个读者为当前线程
firstReader = current;
// 第一个读者重入的次数为1
firstReaderHoldCount = 1;
// 如果有线程获取了读锁且是当前线程
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 如果有线程获取了读锁且当前线程不是第一个
// 则从缓存中获取重入次数保存器
HoldCounter rh = cachedHoldCounter;
// 如果缓存不属性当前线程 再从ThreadLocal中获取
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 如果rh的次数为0,把它放到ThreadLocal中去
readHolds.set(rh);
// 重入的次数加1(初始次数为0)
rh.count++;
}
// 获取读锁成功,返回1
return 1;
}
// 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败)
return fullTryAcquireShared(current);
}
exclusiveCount是写锁获取次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
sharedCount是读锁获取次数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
readerShouldBlock是判断获取读锁是否需要block
// AbstractQueuedSynchronizer
abstract boolean readerShouldBlock();
我们看非公平锁的实现
//ReentrantReadWriteLock.NonfairSync
final boolean readerShouldBlock() {
//该方法检查在队列中是否存在等待的写入线程,如果存在,则返回true
return apparentlyFirstQueuedIsExclusive();
}
//AbstractQueuedSynchronizer
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// 获取当前的头结点。
// 如果头结点不为null,则获取头结点的下一个结点。
// 如果下一个结点不为null,并且该结点不是共享结点,且该结点的线程不为null,则返回true;否则返回false。
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
HoldCounter readHolds定义如下
private transient HoldCounter cachedHoldCounter;
private transient ThreadLocalHoldCounter readHolds;
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
fullTryAcquireShared尝试在此获取共享锁,法的实现与 tryAcquireShared
方法中的代码在一定程度上重复,但是此方法不会在重试和惰性读取保持计数之间增加复杂性,因此更加简单。
//ReentrantReadWriteLock.Sync
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
后面doAcquireShared共享模式下是否需要等待,前文已经分析,具体看下这篇 并发-AQS之Semaphore源码解读
lockInterruptibly()
中断获取锁
//ReentrantReadWriteLock.ReadLock
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly增加了线程中断相关逻辑,获取共享锁的方式没变
//AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly前文也分析过,可看下 并发-AQS之Semaphore源码解读
尝试获取锁
tryLock()
//ReentrantReadWriteLock.ReadLock
public boolean tryLock() {
return sync.tryReadLock();
}
tryReadLock方法逻辑与fullTryAcquireShared类似,不再赘述
//ReentrantReadWriteLock.Sync
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
tryLock(long timeout, TimeUnit unit)
//ReentrantReadWriteLock.ReadLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//AbstractQueuedSynchronizer
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
doAcquireSharedNanos前文也分析过,可看下 并发-AQS之Semaphore源码解读
解锁
unlock()
//ReentrantReadWriteLock.ReadLock
public void unlock() {
sync.releaseShared(1);
}
//AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
//如果尝试释放成功了(共享锁全部释放),就唤醒下一个节点
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared实现如下
//ReentrantReadWriteLock.Sync
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// 如果第一个读线程是当前线程 就把它重入的次数减1 如果减到0了就把第一个读者置为空
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 如果第一个读者不是当前线程
// 一样地,把它重入的次数减1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 共享锁获取的次数减1
// 如果减为0了说明完全释放了,才返回true
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
doReleaseShared唤醒下一个节点,前文也分析过,可看下 并发-AQS之Semaphore源码解读
newCondition()
不支持
//ReentrantReadWriteLock.ReadLock
public Condition newCondition() {
throw new UnsupportedOperationException();
}
写锁WriteLock
加锁
lock()
独占式获取锁
//ReentrantReadWriteLock.WriteLock
public void lock() {
sync.acquire(1);
}
//AbstractQueuedSynchronizer
public final void acquire(int arg) {
// 先尝试获取锁 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一样
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
// 互斥锁被获取的次数
int w = exclusiveCount(c);
if (c != 0) {
// 如果共享锁被获取的次数不为0,或者被其它线程获取了互斥锁(写锁) 那么就返回false,获取写锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 溢出检测
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 共享锁不为0且当前线程=占用线程
setState(c + acquires);
return true;
}
// 如果c等于0,就尝试更新state的值(非公平模式writerShouldBlock()返回false)
// 如果失败了,说明获取写锁失败,返回false
// 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回true
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
NonfairSync的writerShouldBlock直接返回false
//ReentrantReadWriteLock.NonfairSync
final boolean writerShouldBlock() {
return false; // writers can always barge
}
acquireQueued参考Reentrantlonk实现 并发-AQS之Reentrantlonk源码解读
lockInterruptibly()
//ReentrantReadWriteLock.WriteLock
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//AbstractQueuedSynchronizer
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly参考Reentrantlonk实现 并发-AQS之Reentrantlonk源码解读
尝试获取锁
tryLock( )
//ReentrantReadWriteLock.WriteLock
public boolean tryLock( ) {
return sync.tryWriteLock();
}
//ReentrantReadWriteLock.Sync
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
tryLock(long timeout, TimeUnit unit)
//ReentrantReadWriteLock.WriteLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//AbstractQueuedSynchronizer
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
doAcquireNanos 参考Reentrantlonk实现 并发-AQS之Reentrantlonk源码解读
解锁
unlock()
//ReentrantReadWriteLock.WriteLock
public void unlock() {
sync.release(1);
}
//AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//ReentrantReadWriteLock.Sync
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
其他方法
newCondition()
//ReentrantReadWriteLock.WriteLock
public Condition newCondition() {
return sync.newCondition();
}
isHeldByCurrentThread()
//ReentrantReadWriteLock.WriteLock
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
//ReentrantReadWriteLock.Sync
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
getHoldCount()
//ReentrantReadWriteLock.WriteLock
public int getHoldCount() {
return sync.getWriteHoldCount();
}
//ReentrantReadWriteLock.Sync
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
篇幅较长,剩余部分放到另外一篇 并发-AQS之ReentrantReadWriteLock源码解读(二)
转载自:https://juejin.cn/post/7245206582002712634