likes
comments
collection
share

Java并发编程之Condition(一)

作者站长头像
站长
· 阅读数 19

基本使用

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock()
try {
    // do something...
    condition.await();  // 进行等待
} catch (InterruptedException e) {
    // do something...
} finally {
    lock.unlock();
}

首先通过ReentrantLock来获取一个Condition对象,然后在获取到锁之后调用await()方法进行等待,当被其他线程调用signal唤醒或者interrupt中断才有机会继续向下执行。

需要注意的是,lock.lock()方法阻塞时是在锁对象中维护的阻塞队列,而await方法等待是在Condition对象中维护的条件等待队列,当await被唤醒或者中断之后,该节点会从条件队列中转到锁的阻塞队列中,从而可以重新参与锁的竞争。

await方法

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

先从整体上看一下await方法的流程

  • 如果被中断过,就直接抛出异常;
  • 根据当前线程创建一个节点,添加到条件等待队列的末尾;
  • 当前线程释放锁,因为可能存在重入的情况,所以需要释放所有锁;
  • 如果当前节点不在同步阻塞队列中,也就是在条件等待队列中,就调用park进行等待;如果已经在同步阻塞队列中,就执行第六步;
  • 唤醒之后检查中断情况,如果中断过就把节点从条件等待队列转换到同步阻塞队列中去;
  • 尝试获取锁,获取失败中途可能会进行park,如果返回true表示被中断了,false表示没有中断;
  • 当前节点被取消等待,清除掉所有条件等待队列中取消的节点;
  • 根据中断标记抛出异常获取重新中断。

下面再依次根据源码来分析各个方法的实现过程:

添加条件等待节点
private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

首先获取尾节点lastWaiter,如果尾节点的waitStatus已经不是Node.CONDITION了,说明已经从等待队列中取消了,就需要从队列中清除掉这个节点,于是调用了unlinkCancelledWaiters方法,然后重新获取尾节点,根据当前线程创建一个新节点,添加到尾节点后面变成新的尾节点。

这里需要注意的是:新建的节点的waitStatus的值是Node.CONDITION,表示在条件队列中进行等待,如果变成其他值了就需要移除队列。

下面看下unlinkCancelledWaiters的实现过程:

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;  // 表示上一个没取消的节点
    while (t != null) {
        Node next = t.nextWaiter;  // 下一个节点
        if (t.waitStatus != Node.CONDITION) {  // 当前节点被取消
            t.nextWaiter = null;  // 断开和后面节点的连接
            if (trail == null)    // 表示目前还没有遍历到应该留下的节点
                firstWaiter = next;
            else
                trail.nextWaiter = next;  // 用前一个节点指向下一个节点,就把当前节点移除了
            if (next == null)  // 遍历完毕了
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;  // 继续遍历下一个节点
    }
}

通过while循环从前往后遍历等待队列,把所有取消的节点从队列中移除。这里需要注意trail的含义,表示的是上一个没被取消的节点,它的作用就是在当前节点取消时,可以直接将前一个节点指向下一个节点。

全部释放锁
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

这个方法很简单,通过前面的文章我们已经知道unlock方法就是调用release(1),也就是对state进行减1,但是如果有锁重入的情况,unlock次数就需要和重入次数一样才能释放锁。这里的做法是先获取state的值,不管是否有重入,一次性全部release掉,state的值就变成0了,也就把锁释放了。

是否在同步队列中
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null)
        return true;
    return findNodeFromTail(node);
}

这里很容易搞混的一点是prevnext这两个属性是在同步阻塞队列中才会有的,而在条件等待队列中使用的是nextWaiter属性。

waitStatusNode.CONDITION表示在条件等待队列中,而node.prev==null为什么也表示不在同步阻塞队列中呢?这是因为同步阻塞队列在初始化时会先创建一个空的Node,其他节点都是在这个空Node之后,所以同步阻塞队列中的节点的node.prev一定不是null

同理如果node.next != null,表示这个节点已经在同步阻塞队列中了。

如果上述条件都不满足,就直接去同步阻塞队列中去找这个节点,找到了说明在就返回true,否则就返回false,查找方法findNodeFromTail如下:

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

这个方法是从后往前找,如果一直找到了头节点都没有说明不在队列中,最终就返回false

检查中断情况
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

park中唤醒有两种方式,一个是unpark,一个是interrupt。如果是unpark中断标志位就是false,直接返回;如果是interrupt中断的标志位为true,就尝试把节点从条件等待队列转换到同步阻塞队列中去。

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

首先通过CAS修改waitStatus0,表示从条件等待队列中取消等待,然后调用enq方法添加到同步阻塞队列中去,成功就返回true

如果signal方法先把waitStatus修改了,并且正在把节点转换到同步阻塞队列中去,这里第一个if判断就是false,下面while循环的目的是等待节点完全转换到同步阻塞队列中去,因为这个过程很短暂所以采用自旋的方式,然后染回false

重新尝试获取锁

acquireQueued方法前面文章中已经介绍过,意思就是尝试获取锁,失败了就park,被唤醒之后继续重试。

再次清除取消的节点

因为当前节点已经从条件等待队列中转换到了同步阻塞队列中,所以需要重新清除一遍取消的节点。

中断处理
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

如果在条件等待队列中被中断,并且转换到同步阻塞队列成功就抛出异常InterruptedException

如果在条件等待队列中被中断,并且转换到同步阻塞队列失败,然后在同步阻塞队列中也被中断,或者在条件等待队列中没有被中断,只在同步阻塞队列中被中断只在就重新响应中断selfInterrupt()

转载自:https://juejin.cn/post/7229660119658102841
评论
请登录