看看ReentrantLock
中的newCondition
方法
final ConditionObject newCondition() {
return new ConditionObject();
}
再点进去发现原来就是AQS
中的内部类。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter;
private transient Node lastWaiter;
.....
}
发现没有,又有链表操作的定义,其实现的原理就是条件队列哦。其结构可以参考下图哦。
我们知道,当一个线程调用await
方法时,会进入等待状态,直到被其它线程使用signal
方法唤醒。这里的等待队列就是用来存储处于await
等待状态的线程的。
我们先来看看最关键的await
方法是如何实现的。这里我们先来阐述几个前提。
- 只有已经持有锁的线程才可以调用此方法。
- 当调用此方法时,会直接释放锁,无论加了多少次锁。
- 只有其它线程调用
signal
方法或者是中断时才会唤醒等待中的线程。 - 被唤醒后要等其它线程释放锁,拿到锁后才能够继续执行,并且会恢复之前的状态(await之前加了几层锁之后依旧是几层锁)。
来看下await
方法的源码吧。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); //如果在调用await之前就已经中断的话,直接抛出中断异常。
Node node = addConditionWaiter(); // 为当前线程创建一个新的节点,将其加入条件队列中
int savedState = fullyRelease(node); // 完全释放当前线程的锁,并且保存当前线程的状态,因为还得恢复
int interruptMode = 0; // 中断状态
while (!isOnSyncQueue(node)) { //循环判断是否存在于AQS的同步队列中(如果处于等待状态的线程被其它唤醒,会进入AQS的同步队列中,后面我们会讲)
LockSupport.park(this); // 如果还处于等待状态,将线程挂起
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查是否在等待过程中被中断了
break;
}
// 跳出循环表示线程肯定已经被唤醒了,这个时候只差拿到锁就可以运行了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 直接去尝试拿锁,这里和一个线程去抢锁的过程基本上是一样的
interruptMode = REINTERRUPT;
// 拿到锁基本就可以继续运行了,这里再进行一些基本的清理工作
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0) // 依旧是响应中断
reportInterruptAfterWait(interruptMode);
}
上面的过程很中规中矩,无非就是响应中断、线程挂起、强锁、清理工作等基本的逻辑,再来看看signal
方法是怎么实现的。在阅读源码之前,我们先明确几点:
- 只有持有锁的线程才能调用
siganl
方法唤醒其它线程。 - 优先唤醒条件队列中的第一个,如果在唤醒过程中出现问题,则接着往下找,直到找到第一个可以被唤醒的线程。
- 唤醒结果本质上来说就是将条件队列的节点直接丢进等待队列中,让其参与锁资源的竞争。
- 拿到锁之后,线程才能继续执行。
其过程可以参考下图哟。
上源码。
public final void signal() {
if (!isHeldExclusively()) // 查看当前线程是不是持有锁的状态
throw new IllegalMonitorStateException(); // 不持有锁不能唤醒其它线程
Node first = firstWaiter; // 获取条件队列的第一个节点
if (first != null) //条件队列不为空
doSignal(first); // 唤醒
}
doSignal
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) // 如果当前节点没有后继节点了,条件队列直接为空了
lastWaiter = null; // 这里相当于是清空条件队列了
first.nextWaiter = null; // 将当前节点的nextWaiter置为空,这是因为当前节点出条件队列了哦
} while (!transferForSignal(first) && // 将第一个节点唤醒,没有唤醒(被取消)而且条件队列不为空就一直循环
(first = firstWaiter) != null);
}
transferForSignal
final boolean transferForSignal(Node node) {
// 如果这里CAS失败,可能是因为线程被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// CAS成功,直接将节点丢入AQS的等待队列中
Node p = enq(node); // enq返回的是传入节点的前驱节点,不记得可以往下看
// 核心逻辑结束,后面代码是做了一层优化,可以提前跳出之前的while循环
int ws = p.waitStatus; // 保存前驱节点的等待状态
// 如果前驱节点的状态为取消,或者CAS将其置为signal失败(可能是在ws>0后的瞬间取消了)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 直接唤醒线程
return true;
}
不知道入队方法大家还记得不。这里不记得可以看看代码。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
下图总结了await,signal的过错。