ReentrantReadWriteLock详解
前言
并发编程中,确保线程间共享数据的原子性是很重要的。在大多数情况下,我们可以使用锁,使对共享数据的并行访问转变成串行,从而保证数据的原子性:
lock(); // 加锁
doSomething(data); // 只有一个线程可以对data操作
unlock(); // 释放锁
但是,我们也知道,在一些情况下,我们只需要去读取数据,而不需要对数据进行更改。这种读操作,并不对数据进行更改,是幂等的。通常情况下,我们可以认为它们之间并不需要加锁:
read(data); // 多个线程可以一起读
在读远远多于写的情况下,对并发的读也进行加锁、改并行为串行,无疑是一种低效的处理方式。那么应该如何处理呢?也就是说,需要一种锁,可以保证读写、写写互斥,而读读却是可以并发执行的。
这种锁,我们称之为读写锁。java 中,读写锁的使用非常简单:
private final Map<String, Data> m = new TreeMap<>();
private final ReadWriteLock rwl = new ReentrantReadWriteLock(); // 创建读写锁
private final Lock r = rwl.readLock(); // 获取读锁
private final Lock w = rwl.writeLock(); // 获取写锁
// 读锁
public Data get(String key) {
r.lock();
try {
return m.get(key);
} finally {
r.unlock();
}
}
// 写锁
public Data put(String key, Data value) {
w.lock();
try {
return m.put(key, value);
} finally {
w.unlock();
}
}
我们可以看出,读写锁有两种锁,一是读锁,而是写锁,而使用中,我们只需更具需要操作读锁或写锁就可以了。
ReentrantReadWriteLock
ReentrantReadWriteLock 是读写锁接口 ReadWriteLock 的一个实现类:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
}
可以看到,ReentrantReadWriteLock 有两种模式,一种是公平的,另一种是非公平的。公平锁可以按序排队获得锁,而非公平性锁则不能保证按序获取。但是,非公平性的吞吐量更大。
读锁和写锁
读锁和写锁的类非常清晰,我们先来看读锁:
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void unlock() {
sync.releaseShared(1);
}
}
读锁的锁操作,基本上是通过 sync 对象来实现的。而熟悉 AQS(AbstractQueuedSynchronizer)的读者们,看到 sync 对象这几个方法,可能已经意识到了 sync 继承了 AQS 类。事实也是如此。因此,读锁利用了 AQS 中的共享模式来实现。而写锁,则是利用独占锁来实现的。
Sync
通过上文可知,读写锁有两把锁:一是读锁,二是写锁。读锁通过 AQS 的共享锁实现,写锁则通过独占锁来实现。而 ReentrantReadWriteLock 中,继承 AQS 的来实际实现加锁解锁的辅助类是 Sync:
abstract static class Sync extends AbstractQueuedSynchronizer {
// int的高16位为共享锁数量,低16位为独占锁数量
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 返回c的共享锁(也就是读锁)数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回c的独占锁(也就是写锁)数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
写锁(独占锁)
在这里,我们姑且可以将独占锁看做是写锁,共享锁看做是读锁。继承 AQS 所需要实现的独占锁相关方法如下:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 写锁数量
if (c != 0) { // 有线程正在读或者写
if (w == 0 || current != getExclusiveOwnerThread()) // 其他线程正在读||有其他线程在写
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 需求的写锁数量超过最大值
throw new Error("Maximum lock count exceeded");
// 正在写的线程是本线程,也就是发生了写锁重入
setState(c + acquires); // 更新锁数量
return true;
}
if (writerShouldBlock() || // 钩子方法,是否应该堵塞(公平锁与非公平锁处理)
!compareAndSetState(c, c + acquires)) // 加锁失败
return false;
setExclusiveOwnerThread(current); // 加锁成功
return true;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively()) // 判断是否当前线程持有锁
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // 计算剩余的锁数量
boolean free = exclusiveCount(nextc) == 0; // 独占锁数量为0
if (free)
setExclusiveOwnerThread(null);
setState(nextc); // 更新锁数量
return free;
}
读锁(共享锁)
读锁的抢占分为了三个步骤:
- 其他线程正持有写锁,失败。
- 判断该线程是否应该堵塞。不堵塞则直接尝试通过 CAS 加锁(增加读锁持有数)。
- 上一步失败,采用乐观锁不断尝试加锁,直至成功。
@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 有其他线程在写
return -1;
int r = sharedCount(c); // 读锁数量
if (!readerShouldBlock() && // 钩子方法,是否应该堵塞(公平锁与非公平锁处理)
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 加锁成功
if (r == 0) { // 没有正在读的线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 读重入
firstReaderHoldCount++;
} else { // 已有正在读的线程
HoldCounter rh = cachedHoldCounter; // 最后一个抢占读锁的线程,其占有读锁的数量
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get(); // 更新cachedHoldCounter
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current); // 以上快速抢占失败,则使用乐观锁直至抢占成功
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null; // 线程占有读锁的数量
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) { // 有线程在读
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) { // 钩子方法,是否应该堵塞(公平锁与非公平锁处理)
if (firstReader == current) { // 重入处理
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter; // 最后一个抢占读锁的线程,其占有读锁的数量
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get(); // 更新cachedHoldCounter
if (rh.count == 0)
readHolds.remove(); // 拥有读锁数量为0,移出ThreadLocalHoldCounter
}
}
if (rh.count == 0)
return -1; // 加锁失败
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 读锁抢占成功
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else { // 更新readHolds与cachedHoldCounter
if (rh == null)
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) { // 更新firstReader(第一个占有读锁的线程)
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else { // 更新本线程拥有的读锁数量
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) { // 死循环解锁,直至成功
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
公平与非公平:FairSync 与 NonfairSync
回到 ReentrantReadWriteLock 的构造方法:
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
我们看到,事实上起到同步作用的对象是 Sync 的两个子类:FairSync 与 NonfairSync。这两个子类实现了 Sync 锁定义的两个抽象方法:writerShouldBlock 与 readerShouldBlock,也就是给线程抢占读锁或写锁时,是否应该堵塞,等待其他线程。
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors(); // 等待队列(详见AQS)有前驱结点时,堵塞
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors(); // 等待队列(详见AQS)有前驱结点时,堵塞
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // 不堵塞等待
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive(); // 等待队列头结点是写锁则堵塞,来避免写锁饥饿
}
}
转载自:https://juejin.cn/post/7200945340122693689