Java并发编程之Condition(一)
基本使用
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock()
try {
// do something...
condition.await(); // 进行等待
} catch (InterruptedException e) {
// do something...
} finally {
lock.unlock();
}
首先通过ReentrantLock
来获取一个Condition
对象,然后在获取到锁之后调用await()
方法进行等待,当被其他线程调用signal
唤醒或者interrupt
中断才有机会继续向下执行。
需要注意的是,lock.lock()
方法阻塞时是在锁对象中维护的阻塞队列,而await
方法等待是在Condition
对象中维护的条件等待队列,当await
被唤醒或者中断之后,该节点会从条件队列中转到锁的阻塞队列中,从而可以重新参与锁的竞争。
await方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
先从整体上看一下await
方法的流程
- 如果被中断过,就直接抛出异常;
- 根据当前线程创建一个节点,添加到条件等待队列的末尾;
- 当前线程释放锁,因为可能存在重入的情况,所以需要释放所有锁;
- 如果当前节点不在同步阻塞队列中,也就是在条件等待队列中,就调用park进行等待;如果已经在同步阻塞队列中,就执行第六步;
- 唤醒之后检查中断情况,如果中断过就把节点从条件等待队列转换到同步阻塞队列中去;
- 尝试获取锁,获取失败中途可能会进行
park
,如果返回true
表示被中断了,false
表示没有中断; - 当前节点被取消等待,清除掉所有条件等待队列中取消的节点;
- 根据中断标记抛出异常获取重新中断。
下面再依次根据源码来分析各个方法的实现过程:
添加条件等待节点
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
首先获取尾节点lastWaiter
,如果尾节点的waitStatus
已经不是Node.CONDITION
了,说明已经从等待队列中取消了,就需要从队列中清除掉这个节点,于是调用了unlinkCancelledWaiters
方法,然后重新获取尾节点,根据当前线程创建一个新节点,添加到尾节点后面变成新的尾节点。
这里需要注意的是:新建的节点的waitStatus
的值是Node.CONDITION
,表示在条件队列中进行等待,如果变成其他值了就需要移除队列。
下面看下unlinkCancelledWaiters
的实现过程:
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null; // 表示上一个没取消的节点
while (t != null) {
Node next = t.nextWaiter; // 下一个节点
if (t.waitStatus != Node.CONDITION) { // 当前节点被取消
t.nextWaiter = null; // 断开和后面节点的连接
if (trail == null) // 表示目前还没有遍历到应该留下的节点
firstWaiter = next;
else
trail.nextWaiter = next; // 用前一个节点指向下一个节点,就把当前节点移除了
if (next == null) // 遍历完毕了
lastWaiter = trail;
}
else
trail = t;
t = next; // 继续遍历下一个节点
}
}
通过while
循环从前往后遍历等待队列,把所有取消的节点从队列中移除。这里需要注意trail
的含义,表示的是上一个没被取消的节点,它的作用就是在当前节点取消时,可以直接将前一个节点指向下一个节点。
全部释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
这个方法很简单,通过前面的文章我们已经知道unlock
方法就是调用release(1)
,也就是对state
进行减1
,但是如果有锁重入的情况,unlock
次数就需要和重入次数一样才能释放锁。这里的做法是先获取state
的值,不管是否有重入,一次性全部release
掉,state
的值就变成0
了,也就把锁释放了。
是否在同步队列中
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
return true;
return findNodeFromTail(node);
}
这里很容易搞混的一点是
prev
、next
这两个属性是在同步阻塞队列中才会有的,而在条件等待队列中使用的是nextWaiter
属性。
waitStatus
是Node.CONDITION
表示在条件等待队列中,而node.prev==null
为什么也表示不在同步阻塞队列中呢?这是因为同步阻塞队列在初始化时会先创建一个空的Node
,其他节点都是在这个空Node
之后,所以同步阻塞队列中的节点的node.prev
一定不是null
。
同理如果node.next != null
,表示这个节点已经在同步阻塞队列中了。
如果上述条件都不满足,就直接去同步阻塞队列中去找这个节点,找到了说明在就返回true
,否则就返回false
,查找方法findNodeFromTail
如下:
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
这个方法是从后往前找,如果一直找到了头节点都没有说明不在队列中,最终就返回false
。
检查中断情况
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
从park
中唤醒有两种方式,一个是unpark
,一个是interrupt
。如果是unpark
中断标志位就是false
,直接返回;如果是interrupt
中断的标志位为true
,就尝试把节点从条件等待队列转换到同步阻塞队列中去。
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
首先通过CAS
修改waitStatus
为0
,表示从条件等待队列中取消等待,然后调用enq
方法添加到同步阻塞队列中去,成功就返回true
。
如果signal
方法先把waitStatus
修改了,并且正在把节点转换到同步阻塞队列中去,这里第一个if
判断就是false
,下面while
循环的目的是等待节点完全转换到同步阻塞队列中去,因为这个过程很短暂所以采用自旋的方式,然后染回false
。
重新尝试获取锁
acquireQueued
方法前面文章中已经介绍过,意思就是尝试获取锁,失败了就park
,被唤醒之后继续重试。
再次清除取消的节点
因为当前节点已经从条件等待队列中转换到了同步阻塞队列中,所以需要重新清除一遍取消的节点。
中断处理
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
如果在条件等待队列中被中断,并且转换到同步阻塞队列成功就抛出异常InterruptedException
;
如果在条件等待队列中被中断,并且转换到同步阻塞队列失败,然后在同步阻塞队列中也被中断,或者在条件等待队列中没有被中断,只在同步阻塞队列中被中断只在就重新响应中断selfInterrupt()
。
转载自:https://juejin.cn/post/7229660119658102841