likes
comments
collection
share

ReentrantLock源码解析

作者站长头像
站长
· 阅读数 10

前言

ReentrantLock是Java并发包java.util.concurrent.locks中的一个类,它实现了Lock接口,提供了与synchronized关键字类似的同步功能,但比synchronized更加灵活。

ReentrantLock的主要特性如下:

  1. 可重入性:如果一个线程已经持有了一个锁,那么它可以再次获取该锁而不会被阻塞。这是由其内部的计数器实现的,每次获取锁,计数器都会加1,每次释放锁,计数器都会减1,当计数器为0时,锁就被释放了。

  2. 公平性和非公平性ReentrantLock可以在构造时指定是否是公平锁。如果是公平锁,那么等待时间最长的线程将获得锁;如果是非公平锁,那么任何等待的线程都有可能获得锁,这取决于操作系统的调度。

  3. 条件变量ReentrantLock提供了一个Condition类,可以让线程在某个条件下等待,或者在某个条件改变时被唤醒。这比Object类的waitnotify方法提供了更精细的线程控制。

  4. 锁中断:在等待锁的过程中,线程可以响应中断,即它可以选择停止等待锁,转而处理其他事情。

  5. 锁尝试:线程可以尝试获取锁,如果锁已经被其他线程持有,那么它可以立即返回,不会被阻塞。

ReentrantLock的使用通常需要配合try-finally语句块来确保锁能够被正确释放。

ReentrantLock内部的Sync分为公平和非公平两种,默认构造函数为非公平的

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

为了标识锁现在的归属线程,以及支持重入,需要为锁对象增加exclusiveOwnerThread标识持有锁的线程

公平锁lock

final void lock() {
    acquire(1);
}
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread(); // 当前线程
    int c = getState(); // 当前锁状态
    if (c == 0) { // 未被占用
        /*
         * 队列中没有等待线程, 且状态为0, 说明锁没有被占用, 可以直接获取锁
         * hasQueuedPredecessors,判断是否有等待线程,体现了公平锁的特性
         */
        if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current); // 设置当前线程为独占线程
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        // 锁的独占线程是当前线程, 重入锁
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非公平锁lock

lock方法多了一步非公平快速获取锁的尝试

final void lock() {
    // 快速获取锁,不全判断队列中是否有等待线程,体现了非公平锁的特性
    if (compareAndSetState(0, 1))
        // 设置当前线程为独占线程
        setExclusiveOwnerThread(Thread.currentThread());
    else // 快速获取锁失败, 走正常流程
        acquire(1);
}

nonfairTryAcquire也会有非公平获取锁的尝试

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { // 状态为0, CAS获取锁即可 ,不去队列里排队, 体现了非公平锁的特性
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 状态大于0, 且锁的独占线程是当前线程, 重入锁
    else if (current == getExclusiveOwnerThread()) {
        // 重入锁, 直接增加状态值
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

unlock

protected final boolean tryRelease(int releases) {
    // 减少状态值
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { // 减少到0, 释放锁
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

条件对象Condition

Condition是Java并发包java.util.concurrent.locks中的一个接口,它与Lock接口配合使用,提供了类似于Object类的waitnotify方法的功能,但比它们更加灵活和强大。

Condition的主要特性如下:

  1. 等待/通知机制Condition提供了await方法让线程等待某个条件,以及signalsignalAll方法来通知等待的线程条件可能已经改变。这是一种线程间的通信方式。

  2. 多条件等待:与Object类的waitnotify方法相比,Condition的一个优点是支持多条件等待。每个Lock对象可以有多个Condition对象,线程可以选择等待某个特定的Condition,这样就可以更精细地控制线程的行为。

  3. 响应中断Conditionawait方法能够响应线程中断,当线程被中断时,await方法会立即返回,并抛出InterruptedException

示例:condition实现生产消费模型

public class ReentrantLockTest {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition noFull = lock.newCondition();
    private final Condition noEmpty = lock.newCondition();
    private final Queue<Integer> queue = new LinkedList<>();
    private final Integer max = 10;
    private int num = 0;

    @Test
    public void test() throws InterruptedException {
        Thread consumer = new Thread(this::consumer);
        Thread producer = new Thread(this::producer);
        consumer.start();
        producer.start();
        consumer.join();
        producer.join();
    }

    private void producer() {
        while (true) {
            lock.lock(); // 加锁
            try {
                if (queue.size() == max) {
                    noFull.await(); // 等待队列不满
                }
                System.out.println("offer: " + num);
                queue.offer(num++);
                Thread.sleep(1000);
                noEmpty.signal(); // 通知队列不空
            } catch (InterruptedException e) {
                System.out.println("InterruptedException: " + e);
            } finally {
                lock.unlock(); // 解锁
                Thread.yield();
            }
        }
    }

    private void consumer() {
        while (true) {
            lock.lock(); // 加锁
            try {
                if (queue.isEmpty()) {
                    noEmpty.await(); // 等待队列不空
                }
                System.out.println("poll: " + queue.poll());
                Thread.sleep(500);
                noFull.signal(); // 通知队列不满
            } catch (InterruptedException e) {
                System.out.println("InterruptedException: " + e);
            } finally {
                lock.unlock(); // 解锁
                Thread.yield();
            }
        }
    }
}

Condition的创建

ReentrantLock调用内部sync的newCondition方法,ConditionObject类为AQS的内部类

public Condition newCondition() {
    return sync.newCondition();
}
final java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject newCondition() {
    return new ConditionObject();
}

调用了await的线程,会加入到条件队列中,而signal操作会将线程从条件队列转移到同步队列中。

  • 一个锁对象只有一个同步队列syncQueue
  • 一个锁对象可以有多个条件Condition对象
  • 每个条件都可以有一个条件队列Condition Queue

await 条件等待

ReentrantLock源码解析

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程包装为Node,并添加到条件队列中
    Node node = addConditionWaiter();
    // 释放所有锁,不管重入了几次,返回值为重入次数,这里也意味着,调用await方法的线程,必须持有锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;

    /*
    判断节点是否在同步队列中
    第一次循环, 肯定是false, 因为节点刚被加入条件队列
    之后的循环,表示节点被signal唤醒了,但是否在同步队列中,不一定
     */
    while (!isOnSyncQueue(node)) {
        // 阻塞当前线程
        LockSupport.park(this);
        // 检查等待期间变化的中断状态
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 此时表示线程的等待条件已经满足,可以抢占锁了
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        // 清理条件队列中状态不是CONDITION的节点
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 中断处理
        reportInterruptAfterWait(interruptMode);
}

addConditionWaiter 加入到条件队列中

private Node addConditionWaiter() {
    // 尾节点
    Node t = lastWaiter;
    // 如果尾节点被取消了, 则清除掉
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 从链表中移除已取消线程
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

fullyRelease 释放锁、清空所有重入次数

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 释放锁, 不论重入了多少次, 返回重入次数
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed) // 失败了, 取消节点
            node.waitStatus = Node.CANCELLED;
    }
}

isOnSyncQueue 判断节点是否在同步队列中

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null)
        // 存在后继节点, 那当前节点也一定在同步队列中
        return true;
    // 从同步队列中查找node
    return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (; ; ) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

signal, signalAll 条件唤醒

本文以signalAll流程为例

ReentrantLock源码解析

public final void signalAll() {
    if (!isHeldExclusively()) // 校验是否持有锁
        throw new IllegalMonitorStateException();
    Node first = firstWaiter; // 等待队列头节点
    if (first != null)
        doSignalAll(first);
}

doSignalAll 唤醒所有条件队列的等待线程

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null; // 设置为null,别的线程就无法通过共享拿到等待队列了
    do { // 当前线程通过局部变量, 从头节点开始遍历, 依次唤醒
        Node next = first.nextWaiter; // 备份下一个节点
        first.nextWaiter = null; // 相当于删除first节点
        transferForSignal(first); // 将节点加入到同步队列中,并唤醒节点中的线程
        first = next; // first转到下一个节点
    } while (first != null);
}

transferForSignal 将线程入队到同步队列中

final boolean transferForSignal(Node node) {
    /**
     * 如果设置状态失败, 表示这个node已经被取消了
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 将node加入到同步队列中, 返回值为node的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果前驱节点的状态是取消状态 或者 尝试设置前驱节点的状态为SIGNAL失败
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 唤醒node节点的线程
        LockSupport.unpark(node.thread);
    return true;
}