基本介绍
勾勾在学习多线程基础知识的时候,学习了Object的wait和notify方法,这对组合可以使线程进入阻塞并由其他线程唤醒,但是notify的唤醒不能明确线程。在实际的工作中,各个业务之间有明确的等待关系,那么在唤醒的时候就需要唤醒特定的线程。
AQS同步器提供了条件等待的实现类Condition,在Condition对象中与wait、notify、notifyAll对应的方法是await、 SIGNAL 、signalAll。今天的文章我们依然撇开Condition本身的实现原理先了解AQS底层的条件等待的实现机制。
AQS维护了内部类ConditionObject,其实现了Condition接口。源码关键信息如下:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 条件等待队列中的第一个节点 */ private transient Node firstWaiter;
/** 条件等待队列中的最后一个节点*/ private transient Node lastWaiter;
/**
* 无参构造
*/ public ConditionObject() { }
/** 在退出wait方法的时候重新打断线程 */ private static final int REINTERRUPT = ;
/**退出await方法的时候抛出异常*/ private static final int THROW_IE = -;
...
}
条件等待.获取操作
ConditionObject类重写了Condition定义的 await 方法。
public final void await() throws InterruptedException {
//await是可中断方法,当线程被打断后会抛出InterruptedException异常
if ( thread .interrupted())
throw new InterruptedException();
//将当前线程节点添加至等待队列尾部,并设置其节点的waitStatus为CONDITION
Node node = addConditionWaiter();
//释放所有的同步器状态,并返回state的值
int savedState = fullyRelease(node);
int interruptMode = ;
//如果node不在同步等待队列中,则挂起当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//如果线程被打断了则退出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != )
break;
}
//如果node在同步等待队列中,则调用acquireQueued不断尝试获取同步器状态,如果acquireQueued返回中断标志为true且interruptMode不等于-,则将interruptMode修改为1
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//如果节点的nextWaiter不为空,则移除队列中不是CONDITION状态的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//如果interruptMode不为,则在await之后返回中断信息
if (interruptMode != )
reportInterruptAfterWait(interruptMode);
}
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
//如果interruptMode为-,则抛出异常
if (interruptMode == THROW_IE)
throw new InterruptedException();
//如果interruptMode为,则中断当前线程的阻塞状态
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
addConditionWaiter 将当前线程的Node节点添加至条件等待队列。
private Node addConditionWaiter() {
//获取等待队列中的最后一个节点
Node t = lastWaiter;
// 如果t节点为null或者t节点的等待状态不是CONDITION,则将其移除等待队列
if (t != null && t.waitStatus != Node.CONDITION) {
//移除等待队列
unlinkCancelledWaiters();
//获取等待队列中最新的最后一个节点赋值给t
t = lastWaiter;
}
//将当前线程封装成Node对象,且waitStatus为CONDITION,注意此处用的构造函数与独占和共享模式不同
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果等待队列中的最后一个节点为空,表示等待队列中没有节点
if (t == null)
//将node作为等待队列中的第一个节点
firstWaiter = node;
else
//如果已经有其他节点在等待队列中,则将node添加到lastWaiter的下一个节点处,即将node添加至等待队列尾部
t.nextWaiter = node;
//将node作为等待队列中的最后一个节点
lastWaiter = node;
return node;
}
/**
* 有参构造,适用于Condition,还记得我们独占模式的文章中的Node的这个构造参数吗
*/Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
addConditionWaiter执行流程:
- 获取等待队列的最后一个节点,如果最后一个节点不为null并且其waitStatus不是CONDITION,表示等待队列中有节点等待,但是其状态为已取消,则需删除此类节点。
- 将当前线程封装成node对象,设置其waitStatus为CONDITION,此处与我们前面学习的独占和共享模式是不同的,独占和共享封装node对象时不指定waitStatus,但是指定null或者常量的nextWaiter,条件等待则指定了waitStatus。
- 获取node对象后,把node对象添加到等待队列尾部,此时如果等待队列中没有其他节点,那么node就是第一个等待的节点将其指定为firstWaiter,如果等待队列中已经含有其他节点,此时把lastWaiter的nextWaiter指向node,并将lastWaiter指定为node。
- 返回node对象。
addConditionWaiter执行流程图:
fullyRelease 释放所有的同步器状态
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取同步器的状态值
int savedState = getState();
//释放所有的同步器状态值,如果成功则返回同步器状态值,否则抛出异常
if (release(savedState)) {
failed = false ;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//如果释放同步器状态失败,则将节点的等待状态修改为取消
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
fullyRelease执行流程:
- 获取当前同步器状态state;
- 释放所有的同步器状态,如果成功则返回当前的state值。
- 如果失败则抛出异常IllegalMonitorStateException。
- 如果释放同步状态失败,则将当前节点的状态修改为CANCLLED。
isOnSyncQueue 判断node是否在同步队列中。
//判断节点是否在同步等待队列中
final boolean isOnSyncQueue(Node node) {
//如果节点的状态为CONDITION或者节点的前驱节点为null,则返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果节点的后继节点不为空则返回true
if (node.next != null) // If has successor, it must be on queue
return true;
//node的前驱节点不为null的时候也不能判定node一定在同步队列中,因为enq方法cas替换 tail 可能失败,所以我们需要从尾部开始查找确认node是否在同步队列总。如果cas不失败,node节点总会在tail附近
return findNodeFromTail(node);
}
//从尾节点开始查找node节点
private boolean findNodeFromTail(Node node) {
Node t = tail;
//死循环从后向前遍历节点,直到找到node节点返回true或者遍历所有的节点没找到则返回false
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
isOnSyncQueue执行流程:
- 判断节点的waitStatus是否为CONDITION,如果为CONDITION则节点一定还在等待队列中。在这里涉及到两个队列同步队列和条件等待队列,只有在同步队列中的node节点才可以竞争同步器状态。
- 如果waitStatus不是CONDITION,那么此时节点可能在同步队列中,但是如果其前驱节点为null,则一定不在。这是因为同步队列中肯定会存在一个head节点。
- 如果其前驱节点不为null,再次判断node节点的后继节点,如果后继节点不为空,则一定在同步队列中。说明node节点后已经有其他节点入队了。
- 如果后继节点是空的,enq方法可能修改pre节点成功但是cas更新为节点失败,需要再次确认。从同步队列的尾部tail节点遍历,直到找到node返回true或者遍历所有节点没有找到返回false。
isOnSyncQueue执行流程图:
checkInterruptWhileWaiting 阻塞状态检查中断信息
private int checkInterruptWhileWaiting(Node node) {
//如果线程被中断了,则继续判断node节点添加到同步队列是否成功,成功则返回THROW_IE,失败则返回REINTERRUPT
//如果线程没有中断则返回
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
;
}
final boolean transferAfterCancelledWait(Node node) {
//CAS将当前节点的等待状态设置为,如果cas修改成功,则将节点添加到同步队列
if (compareAndSetWaitStatus(node, Node.CONDITION, )) {
//将节点添加到同步队列
enq(node);
return true;
}
//如果cas修改状态失败,判断节点是否在同步队列中,如果不在同步队列中,则当前线程让出CPU进入Runnable状态
//如果节点已经在同步队列中,则返回false
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
checkInterruptWhileWaiting执行流程:
- 获取线程的中断标志,如果为false则返回0;
- 如果为true,则修改节点的状态为0,修改成功则将节点添加到同步队列,返回true,即结果返回-1。
- 如果修改节点不成功,则一直判断节点是否在队列中,不在则让出cpu,在则返回false,即结果返回1。
unlinkCancelledWaiters 删除等待队列中的CANCLLED状态的节点
//将等待队列中的非CONDITION状态的节点移出队列
private void unlinkCancelledWaiters() {
//获取等待队列的第一个节点
Node t = firstWaiter;
Node trail = null;
//如果t节点不为null
while (t != null) {
//获取t节点的下一个等待节点
Node next = t.nextWaiter;
//如果t节点的等待状态不是CONDITION
if (t.waitStatus != Node.CONDITION) {
//将t的下一个等待节点置为null此步骤将非CONDITION的节点从等待队列中移除了
t.nextWaiter = null;
//如果trail为null,则将firstWaiter置为next节点,
if (trail == null)
firstWaiter = next;
else
//trail不为null,则将next赋值给trail的后继节点
trail.nextWaiter = next;
//如果t没有下一个等待,那么t就是等待队列中的最后一个节点
if (next == null)
lastWaiter = trail;
}
//如果t节点的等待状态为CONDITION,则将t赋值给trail
else
trail = t;
//修改t的值为next继续循环
t = next;
}
}
unlinkCancelledWaiters执行流程:
- 获取等待队列的第一个节点赋值给t,t代表遍历的当前节点,从头节点开始向后遍历;
- 如果t的waitStatus不是CONDITION,则将t节点从等待队列中移除。
- 如果t的waitStatus是CONDITION,则把他存临时节点trail。
- 一直遍历直到t为null退出循环。
到这里为止获取操作的关键方法学完啦!
获取操作整体的执行流程图:
条件等待.释放操作
signal 从等待队列中的第一个节点开始释放。
public final void signal() {
//判断当前线程是否为持有同步状态的线程,如果不是则抛出异常,只有持有同步器状态的线程才可以通知其他线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获取等待队列中的第一个节点
Node first = firstWaiter;
//如果节点不为null,表示有节点在等待唤醒,则唤醒第一个等待节点
if (first != null)
doSignal(first);
}
doSignal 通过循环方式不断修改等待队列中的节点状态,直到修改成功将节点添加到同步队列。
private void doSignal(Node first) {
//如果first节点修改状态失败,且下一个等待节点不为null
do {
//如果first的后继节点为null,则表示没有其他节点等待了,则把lastWaiter置为null且nextWaiter置为null
//此步的修改可以退出循环
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//如果修改节点状态失败,则一直向后遍历,直到修改节点成功
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal 将节点添加到同步队列并将状态修改为SIGNAL。
final boolean transferForSignal(Node node) {
//CAS方式将node节点的状态修改为,如果修改失败,则表示node节点的状态被修改为取消,则返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, ))
return false;
//如果修改成功,则将node节点加入同步队列
Node p = enq(node);
//获取node节点的等待状态
int ws = p.waitStatus;
//如果node节点的状态为CANCLLED,或者是修改node的节点状态为SIGNAL失败,则挂起node节点的线程
if (ws > || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
唤醒操作的流程比较简单,通过轮询的方式修改等待队列中的节点状态,将其添加到同步队列,成功则退出循环。
修改状态失败则一直轮询直到等待队列所有节点全部检查退出循环。
释放操作执行流程图:
总结
await的方法将当前线程加入条件等待队列后,需要释放同步器状态,这样其他线程才能获取同步器状态执行结束后唤醒条件等待队列中的节点,即await方法会释放锁。
当前线程节点不在同步队列时,会挂起线程,直到添加到同步队列中。即只有在同步队列的线程才能获取同步状态,条件等待队列中的线程要获取同步状态就必须调用enq方法添加到同步队列中。
将等待队列中的节点添加到同步队列中有两种情况:
- 被等待的线程执行完操作调用了signal方法,将条件等待队列中的节点添加到同步队列中。
- 挂起的线程被其他线程打断,调用transferAfterCancelledWait方法将等待队列中的节点修改为0,并将条件等待队列中的节点添加到同步队列中。
AQS源码的学习已经结束了,但是只是开始!!!
勾勾会继续学习工作中常用的工具类:ReentrantLock、Condition、Semaphore、CountDownLatch、CyclicBarrier。