likes
comments
collection
share

并发-AQS之ReentrantReadWriteLock源码解读(一)

作者站长头像
站长
· 阅读数 32

ReentrantReadWriteLock是Java中的一个可重入读写锁,它允许多个线程同时读取一个共享资源,但只允许一个线程写入该共享资源,当一个线程持有写锁时,任何其他线程都不能持有读或写锁。

该锁具有以下特点:

  1. 可重入性:线程可以多次获得同一个锁。
  2. 公平性:可选择公平或非公平模式。
  3. 读写分离:支持多个线程同时读取共享资源,但只允许一个线程写入共享资源。
  4. 锁降级:写锁可以降级为读锁,但读锁不能升级为写锁。

使用ReentrantReadWriteLock可以提高并发访问性能,因为多个线程可以同时读取共享资源,而不会相互干扰。但是,在写操作期间,所有读和写的访问都会被暂停,直到写操作完成。 其实现较为复杂,UML类图如下

并发-AQS之ReentrantReadWriteLock源码解读(一) 这张图更为形象

并发-AQS之ReentrantReadWriteLock源码解读(一)

读写锁特性如下

是否互斥

如何实现读写锁,下面我们来依据源码一一道来

构造函数

//默认构造方法
public ReentrantReadWriteLock() {
    this(false);
}
//是否使用公平锁的构造方法
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

成员变量设置如下

//读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
//写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
//继承AQS锁实现
final Sync sync;

Sync

Sync是ReentrantReadWriteLock的内部静态类,它是ReentrantReadWriteLock的核心实现。Sync继承自AbstractQueuedSynchronizer(AQS),并重写了其方法来实现读写锁的语义。

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;
    //表示共享锁状态在锁状态中占用的二进制位数(16位),即共享锁状态的左移位数
    static final int SHARED_SHIFT   = 16;
    //表示每个共享锁状态的数量(1 << SHARED_SHIFT)
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    //表示共享锁状态的最大数量((1 << SHARED_SHIFT) - 1)。
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    //返回一个掩码,用于提取锁状态中的排它锁状态(即低16位)
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    ...

量和方法的作用是将锁状态分成两部分:高16位表示读锁状态低16位表示写锁状态。 这种方式,可以高效地实现读写分离的锁机制。在获取和释放锁时,可以根据不同的状态进行不同的处理,以保证锁的正确性和高效性。 Sync有两个实现,非公平锁和公平锁

//非公平锁
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
}
//公平锁
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

ReadLock、WriteLock

//读锁
public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -5992448646407690164L;
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    ...
//写锁
public static class WriteLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -4992448646407690164L;
    private final Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

读锁和写锁是私有属性,通过这两个方法暴露出去

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

非公平锁读写锁加锁解锁

读锁ReadLock

加锁

获取读锁。 如果写锁没有被另一个线程持有,则立即获取读锁并返回。 如果写锁被另一个线程持有,则当前线程为了线程调度目的变为无效状态,并处于休眠状态,直到读锁被获取。

lock()

//ReentrantReadWriteLock.ReadLock
public void lock() {
    sync.acquireShared(1);
}

共享状态获取锁,acquireShared是AQS方法

// AbstractQueuedSynchronizer
public final void acquireShared(int arg) {
    // 尝试获取共享锁(返回1表示成功,返回-1表示失败)
    if (tryAcquireShared(arg) < 0)
        // 失败了就可能要排队等待
        doAcquireShared(arg);
}

下面主要解读ReentrantReadWriteLock.Sync中重写的tryAcquireShared方法

//ReentrantReadWriteLock.Sync
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    //// 状态变量的值 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数
    int c = getState();
    //写锁本占用,判断是否被当前线程占用,如果不是直接返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    //读锁获取次数
    int r = sharedCount(c);
    //读锁不需要block且读锁次数小于最大值,尝试更新state值
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //获取读锁成功
        //如果之前还没有线程获取读锁
        if (r == 0) {
            // 记录第一个读者为当前线程
            firstReader = current;
            // 第一个读者重入的次数为1
            firstReaderHoldCount = 1;
        // 如果有线程获取了读锁且是当前线程
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            // 如果有线程获取了读锁且当前线程不是第一个
            // 则从缓存中获取重入次数保存器
            HoldCounter rh = cachedHoldCounter;
            // 如果缓存不属性当前线程 再从ThreadLocal中获取
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // 如果rh的次数为0,把它放到ThreadLocal中去
                readHolds.set(rh);
            // 重入的次数加1(初始次数为0)
            rh.count++;
        }
        // 获取读锁成功,返回1
        return 1;
    }
    // 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败)
    return fullTryAcquireShared(current);
}

exclusiveCount是写锁获取次数

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

sharedCount是读锁获取次数

static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

readerShouldBlock是判断获取读锁是否需要block

// AbstractQueuedSynchronizer
abstract boolean readerShouldBlock();

我们看非公平锁的实现

//ReentrantReadWriteLock.NonfairSync
final boolean readerShouldBlock() {
    //该方法检查在队列中是否存在等待的写入线程,如果存在,则返回true
    return apparentlyFirstQueuedIsExclusive();
}
//AbstractQueuedSynchronizer
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    // 获取当前的头结点。
    // 如果头结点不为null,则获取头结点的下一个结点。
    // 如果下一个结点不为null,并且该结点不是共享结点,且该结点的线程不为null,则返回true;否则返回false。
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

HoldCounter readHolds定义如下

private transient HoldCounter cachedHoldCounter;
private transient ThreadLocalHoldCounter readHolds;
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

fullTryAcquireShared尝试在此获取共享锁,法的实现与 tryAcquireShared 方法中的代码在一定程度上重复,但是此方法不会在重试和惰性读取保持计数之间增加复杂性,因此更加简单。

//ReentrantReadWriteLock.Sync
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

后面doAcquireShared共享模式下是否需要等待,前文已经分析,具体看下这篇 并发-AQS之Semaphore源码解读

lockInterruptibly()

中断获取锁

//ReentrantReadWriteLock.ReadLock
public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly增加了线程中断相关逻辑,获取共享锁的方式没变

//AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

doAcquireSharedInterruptibly前文也分析过,可看下 并发-AQS之Semaphore源码解读

尝试获取锁

tryLock()

//ReentrantReadWriteLock.ReadLock
public boolean tryLock() {
    return sync.tryReadLock();
}

tryReadLock方法逻辑与fullTryAcquireShared类似,不再赘述

//ReentrantReadWriteLock.Sync
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

tryLock(long timeout, TimeUnit unit)

//ReentrantReadWriteLock.ReadLock
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//AbstractQueuedSynchronizer
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

doAcquireSharedNanos前文也分析过,可看下 并发-AQS之Semaphore源码解读

解锁

unlock()

//ReentrantReadWriteLock.ReadLock
public void unlock() {
    sync.releaseShared(1);
}
//AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
    //如果尝试释放成功了(共享锁全部释放),就唤醒下一个节点
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared实现如下

//ReentrantReadWriteLock.Sync
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // 如果第一个读线程是当前线程 就把它重入的次数减1 如果减到0了就把第一个读者置为空
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 如果第一个读者不是当前线程
        // 一样地,把它重入的次数减1
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        // 共享锁获取的次数减1 
        // 如果减为0了说明完全释放了,才返回true
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

doReleaseShared唤醒下一个节点,前文也分析过,可看下 并发-AQS之Semaphore源码解读

newCondition()

不支持

//ReentrantReadWriteLock.ReadLock
public Condition newCondition() {
    throw new UnsupportedOperationException();
}

写锁WriteLock

加锁

lock()

独占式获取锁

//ReentrantReadWriteLock.WriteLock
public void lock() {
    sync.acquire(1);
}
//AbstractQueuedSynchronizer
public final void acquire(int arg) {
    // 先尝试获取锁 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一样
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    // 互斥锁被获取的次数
    int w = exclusiveCount(c);
    if (c != 0) {
        // 如果共享锁被获取的次数不为0,或者被其它线程获取了互斥锁(写锁) 那么就返回false,获取写锁失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 溢出检测
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 共享锁不为0且当前线程=占用线程
        setState(c + acquires);
        return true;
    }
    // 如果c等于0,就尝试更新state的值(非公平模式writerShouldBlock()返回false) 
    // 如果失败了,说明获取写锁失败,返回false 
    // 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回true
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

NonfairSync的writerShouldBlock直接返回false

//ReentrantReadWriteLock.NonfairSync
final boolean writerShouldBlock() {
    return false; // writers can always barge
}

acquireQueued参考Reentrantlonk实现 并发-AQS之Reentrantlonk源码解读

lockInterruptibly()

//ReentrantReadWriteLock.WriteLock
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}
//AbstractQueuedSynchronizer
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

doAcquireInterruptibly参考Reentrantlonk实现 并发-AQS之Reentrantlonk源码解读

尝试获取锁

tryLock( )

//ReentrantReadWriteLock.WriteLock
public boolean tryLock( ) {
    return sync.tryWriteLock();
}
//ReentrantReadWriteLock.Sync
final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

tryLock(long timeout, TimeUnit unit)

//ReentrantReadWriteLock.WriteLock
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//AbstractQueuedSynchronizer
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

doAcquireNanos 参考Reentrantlonk实现 并发-AQS之Reentrantlonk源码解读

解锁

unlock()

//ReentrantReadWriteLock.WriteLock
public void unlock() {
    sync.release(1);
}
//AbstractQueuedSynchronizer
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//ReentrantReadWriteLock.Sync
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

其他方法

newCondition()

//ReentrantReadWriteLock.WriteLock
public Condition newCondition() {
    return sync.newCondition();
}

isHeldByCurrentThread()

//ReentrantReadWriteLock.WriteLock
public boolean isHeldByCurrentThread() {
    return sync.isHeldExclusively();
}
//ReentrantReadWriteLock.Sync
protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

getHoldCount()

//ReentrantReadWriteLock.WriteLock
public int getHoldCount() {
    return sync.getWriteHoldCount();
}
//ReentrantReadWriteLock.Sync
final int getWriteHoldCount() {
    return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

篇幅较长,剩余部分放到另外一篇 并发-AQS之ReentrantReadWriteLock源码解读(二)