一、前言
AQS中的条件队列相比较前文中的“独占锁”、“共享锁”等比较独立,即便没有条件队列也丝毫不影响诸如 ReentrantLock 、 Semaphore 类的实现,那如此说来条件队列是否就是一个可有可无的产物?答案是否定的,我们来看下直接或间接用到条件队列的 jdk 并发类:
- ReentrantLock 独占锁经典类
- ReentrantReadWriteLock 读写锁
- ArrayBlockingQueue 基于数组的阻塞队列
- CyclicBarrier 循环栅栏,解决线程同步问题
- DelayQueue 延时队列
- LinkedB lock ingDeque 双向阻塞队列
- PriorityBlockingQueue 支持优先级的无界阻塞队列
- ThreadPoolExecutor 线程池构造器
- ScheduledThreadPoolExecutor 可基于时间调度的 线程池 构造器
- StampedLock 邮戳锁,1.8后引入,更高效地读写锁
如此豪华的阵容,可见 Condition 的地位不可小觑
我们简单描述下条件队列实现的功能:有3个 线程 A、B、C,分别调用 wait/await 方法后,线程进入阻塞,在没有其他线程去唤醒的情况下,3个线程将永远处于阻塞状态。此时如果有另外线程调用 notify/signal ,那么A、B、C线程中的某一个将被激活(根据其进入条件队列的顺序而定),从而执行后续的逻辑;如果调用 notifyAll/signalAll 的话,那么3个线程都将被激活,这可能是我们对条件队列的简单认识。这样的描述是否准确呢?可能不太严谨,我们引入JDK的条件队列来做说明
统一话术:其实语法层面支持的 wait/notify 与AQS都属于JDK的范畴,但为了区分两者,我们定义如下:
- JDK条件队列 :语法层面提供支持的 wait/notify ,即 Object 类中的 wait()/notify() 方法
- AQS条件队列 :AQS提供的条件队列,即AQS内部的 ConditionObject 类
二、JDK中的条件队列(wait/notify)
众所周知,在JDK中, wait/notify/notifyAll 是根对象 Object 中内置的方法,且方法均被定义为 native 本地方法
// 等待
public final native void wait(long timeout) throws Interrupted Exception ;
// 唤醒
public final native void notify();
// 唤醒所有等待线程
public final native void notifyAll();
2.1、wait
// 步骤
synchronized (obj) {
// 步骤
before();
// 步骤
obj.wait();
// 步骤
after();
}
相信大家对上述代码并不陌生,我们将JDK的条件队列抽象为4步,逐一阐述
- 步骤1: synchronized (obj) 在jdk中如果想调用 Object.wait() 方法,必须首先获取该对象的 synchronized 锁,当前步骤,如果成功获取到锁,那么将进入“步骤2”,如果存在并发,当前线程将会进入阻塞(线程状态为BLOCKED),知道获取到锁为止
- 步骤2: before() 我们知道 synchronized 是独占锁,所以在执行步骤2代码时,程序是不存在并发的,即同一时刻,只有一个线程正在执行,此处也相对好理解
- 步骤3: obj.wait() 此步骤是将当前线程放入条件队列,同时释放 obj 的同步锁。此处跟我们对 synchronized 的认知有悖,我们一般认为 synchronized (obj) {……} 在大括号中的代码会一直持有锁,而事实情况却是,当程序执行 wait() 方法时,会释放 obj 的同步锁
- 步骤4: after() 此步骤是并发执行还是串行执行?假设我们现在有3个线程A、B、C都已经执行完毕 wait() 方法,并进入了条件队列,等待其他线程唤醒;此时另外一个线程执行了 notifyAll() 时,后续的激活流程是怎么样的?错误观点:有很多同学直观感受是,线程A、B、C同时被激活,所以步骤4是并发执行的;就像是百米赛跑,所有同学都准备就绪(wait),一声枪响后(notifyAll),所有人开始赛跑,并跑到终点(步骤4)正确观点:其实“步骤4”是串行执行的,大家再检查下代码后便可发现,“步骤4”处于 synchronized 的大括号之间;还是拿上述赛跑举例,如果认为从听到枪响至跑到终点是“步骤4”的话,那真实的场景应该是这样的:一声枪响后,A起跑,B、C原地不动;A跑到终点后,B开始起跑,C原地不动;最后是C跑到终点
由此我们断定, obj.wait() 虽然是native方法,但其内部经历了释放锁、重新抢锁的两个大环节
2.2、notify
synchronized (obj) {
obj.notify();
// obj.notifyAll();
}
所有因 obj.wait() 阻塞的线程,都要通过 notify 来唤醒
- notify() 唤醒条件队列中,队首节点
- notifyAll() 唤醒条件队列中所有节点
三、AQS中的条件队列(await/signal)
我们初看AQS中的条件队列时,发现其提供了与JDK条件队列几乎一致的功能
JDK |
AQS |
wait |
await |
notify |
singal |
notifyAll |
singalAll |
用法上也极其相似:
await :
// 初始化
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
lock.lock();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
singal :
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
lock.lock();
condition.signal();
} finally {
lock.unlock();
}
3.1、条件队列
我们知道在AQS内部维护了一个阻塞队列,数据结构如下:
上图描述的是一个长度为 3 的FIFO阻塞队列,因为头结点常驻内存,所以不算在内;我们可以发现阻塞队列中每个节点都包含了前、后引用
那AQS内部的另一个条件队列又是什么样的数据结构呢?
可见,条件队列为单向列表,只有指向下一个节点的引用;没有被唤醒的节点全部存储在条件队列上。上图描述的是一个长度为 5 的条件队列,即有5个线程执行了 await() 方法;与阻塞队列不同,条件队列没有常驻内存的“head结点”,且一个处于正常状态节点的 waitStatus 为 -2 。当有新节点加入时,将会追加至队列尾部
3.2、唤醒
当我们调用 signal () 方法时,会发生什么?我们还是拿长度为 5 的条件队列举例说明,在AQS内部会经历队列转移,即由条件队列转移至阻塞队列
而 signalAll() 执行时,具体执行流程与 signal() 类似,即会将条件队列中的所有节点全部转移至阻塞队列(并发度为1,按顺序依次激活)中,依靠阻塞队列自身依次唤醒的机制,达到激活所有线程的目的
四、JDK vs AQS
经过上文的介绍,似乎AQS做了与 wait/notify 相同的功能,相比较而言,甚至JDK的写法更简洁;那他们在性能上的表现如何呢?让我们来做个对比
4.1、对比
我们模拟这样的一个场景:启动10个线程,分别调用 wait() 方法,当所有线程都进入阻塞后,调用 notifyAll() ,10个线程均被唤醒并执行完毕后,方法结束。 上述方法执行10000次,对比JDK与AQS耗时
JDK测试代码:
public class ConditionCompareTest {
@Test
public void runTest() throws InterruptedException {
long begin = System.currentTimeMillis();
for (int i =; i < 10000; i++) {
if (i % == 0) {
System.out.println(i);
}
jdkTest();
}
long cost = System.currentTimeMillis() - begin;
System.out.println("耗时: " + cost);
}
public void jdkTest() throws InterruptedException {
Object lock = new Object();
List<Thread> list = Lists.newArrayList();
// 步骤一:启动个线程,并进入wait等待
for (int i =; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
list.add(thread);
}
// 步骤二:等待个线程全部进入wait方法
while (true) {
boolean allWaiting = true;
for (Thread thread : list) {
if (thread.getState() != Thread.State.WAITING) {
allWaiting = false;
break;
}
}
if (allWaiting) {
break;
}
}
// 步骤三:唤醒个线程
synchronized (lock) {
lock.notifyAll();
}
// 步骤四:等待个线程全部执行完毕
for (Thread thread : list) {
thread.join();
}
}
}
AQS测试代码:
public class ConditionCompareTest {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
@Test
public void runTest() throws InterruptedException {
long begin = System.currentTimeMillis();
for (int i =; i < 10000; i++) {
if (i % == 0) {
System.out.println(i);
}
aqsTest();
}
long cost = System.currentTimeMillis() - begin;
System.out.println("耗时: " + cost);
}
@Test
public void aqsTest() throws InterruptedException {
AtomicInteger lockedNum = new AtomicInteger();
List<Thread> list = Lists.newArrayList();
// 步骤一:启动个线程,并进入wait等待
for (int i =; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
lock.lock();
lockedNum.incrementAndGet();
condition.await();
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
list.add(thread);
}
// 步骤二:等待个线程全部进入wait方法
while (true) {
if (lockedNum.get() !=) {
continue;
}
boolean allWaiting = true;
for (Thread thread : list) {
if (thread.getState() != Thread.State.WAITING) {
allWaiting = false;
break;
}
}
if (allWaiting) {
break;
}
}
// 步骤三:唤醒个线程
lock.lock();
condition.signalAll();
lock.unlock();
// 步骤四:等待个线程全部执行完毕
for (Thread thread : list) {
thread.join();
}
}
}
条件队列 |
耗时1 |
耗时2 |
耗时3 |
耗时4 |
耗时5 |
平均耗时(ms) |
JDK |
5000 |
5076 |
5054 |
5089 |
4942 |
5032 |
AQS |
5358 |
5440 |
5444 |
5473 |
5472 |
5437 |
4.2、基准测试Q&A
基于以上的测试我们还是有一些疑问的,不要小看这些疑问,通过这些疑问我们可以把之前的知识点全都串联起来
- Q:AQS测试中的“步骤二”,为什么在判断“等待10个线程全部进入wait方法”时,要引入 lockedNum.get() != 10 的判断?直接通过判断所有线程是否均为 waiting 方法不可以吗?
- A:如果真的删除 lockedNum.get() != 10 的判断,在多次并发测试时,会有较小的概率出现程序 死锁 的情况(作者电脑的环境是平均5万次调用会出现一次),为什么会出现死锁呢?我们对AQS源码就会发现,不管是调用 lock() 还是 await ,挂起线程使用的方法均为 LockSupport.park() 方法,此方法会将线程置为 WAITING 状态,也就是线程状态是 WAITING 状态时,有可能线程刚进入 lock() 方法,从而导致 await 与 thread.join() 的死锁
- Q:既然是这样,为什么JDK的测试没有出现死锁?
- A:我们看到JDK的加锁是通过 synchronized 关键字完成的,而当线程因为等待 synchronized 资源而阻塞时,线程状态将变为 BLOCKED ,而进入 wait() 方法后,状态才会变为 WAITING
- Q:那看来只有通过引入 AtomicInteger lockedNum 变量才能解决死锁问题了
- A:其实解决问题的方式有很多种,我们甚至可以简单将 ReentrantLock lock 置为公平锁,也能解决上述死锁问题;因为当前场景发生死锁的情况是, singalAll() 先于 await() 发生,而当所有线程都变成 WAITING 状态后,公平锁则确保了 singalAll() 一定是在所有线程都调用了 await() 。但因为 synchronized 本身是非公平锁,故如果AQS使用公平锁的话,性能偏差较大
- Q:那这样看来,AQS中的阻塞队列相对比JDK的没有优势可言啊,用法上没有JDK简洁,性能上还没人家快
- A:的确,如果真是只是单纯地使用阻塞、唤醒功能的话,还是建议使用JDK内置的方式;但AQS的优势并不在此
五、再说AQS条件队列
AQS的优势在于,其提供了丰富的api可以查询条件队列的状态;例如当我们想看一下在条件队列中等待节点的个数时,使用JDK的 wait/notify 时,是无法做的;AQS提供的api如下:
- boolean hasWaiters() 阻塞队列中是否有等待节点
- int getWaitQueueLength() 获取阻塞队列长度
- Collection<Thread> getWaitingThreads() 获取阻塞队列中线程对象
这些api为程序提供了更灵活的控制,条件队列对于javaer已不是黑盒;当然使用AQS的条件队列必然要引入独占锁,例如 ReentrantLock ,自然地我们还可以通过它查看条件队列外围的一些指标,例如:
- Interrupted 响应中断,借助独占锁,提供响应中断能力; wait/notify 不提供,因为虽然 wait 方法响应中断,但是 synchronized 关键字是会一直阻塞的
- boolean tryLock() 尝试获取锁; wait/notify 不提供
- int getHoldCount() 获取阻塞线程的数量
- boolean isLocked() 是否持有锁
- fair/nonFair 提供公平/非公平锁
- …
可见整个AQS体系相比较 Object 的 wait/notify 方法是相当灵活的,提供了很多监控条件队列、阻塞队列的指标