一、前言
经过了前面两章的铺垫,终于要切入正题了,本章也是整个AQS的核心之一
从本章开始,我们要精读AQS源码,在欣赏它的同时也要学会质疑它。当然本文不会带着大家逐行过源码(会有“只在此山中,云深不知处”的弊端),而是从功能入手,对其架构进行逐层剖析,在核心位置重点解读,并提出质疑;虽然AQS源码读起来比较“跳”,但我还是建议大家花时间及精力去好好读它
本章我们采用经典并发类 ReentrantLock 来阐述独占锁
二、整体回顾
独占锁,顾名思义,即在同一时刻,仅允许一个 线程 执行同步块代码。好比一伙儿人想要过河,但只有一根独木桥,且只能承受一人的重量
相信我们平时写独占锁的程序大抵是这样的:
ReentrantLock lock = new ReentrantLock();
try {
lock.lock();
doBusiness();
} finally {
lock.unlock();
}
上述代码分为三部分:
- 加锁 lock.lock()
- 执行同步代码 doBusiness()
- 解锁 lock.unlock()
加锁部分,一定是众矢之的,兵家争抢的要地,对于高并发的程序来说,同一时刻,大量的线程争相涌入,而 lock() 则保证只能有一个线程进入 doBusiness() 逻辑,且在其执行完毕 unlock() 方法之前,不能有其他线程进入。所以相对而言, unlock() 方法相对轻松,不用处理多线程的场景
2.1、waitStatus
本章中,我们引入节点中一个关键的字段 waitStatus (后文简写为 ws ),在独占锁模式中,可能会使用到的等待状态如下:
- 1、 0 初始状态,当一个节点新建时,其默认 ws 为0
- 2、 SIGNAL (-1) 如果某个节点的状态为 SIGNAL ,即表明其后续节点处于(或即将处于)阻塞状态。所以当前节点在执行完同步代码或被取消后,一定要记得唤醒其后续节点
- 3、 CANCELLED (1) 顾名思义,即取消操作的含义。当一个节点等待超时、或者被打断、或者执行 tryAcquire 发生异常,都会导致当前节点取消。而当节点一旦取消,便永远不会再变为 0 或者 SIGNAL 状态了
三、加锁(核心)
我们先上一张 ReentrantLock 加锁功能(非公平)的整体流程图,在并发或关键部分有注释
第一眼看上去,确实有点复杂,不过不用怕,我们逐一分析解读后,它其实就是只纸老虎
大体上可以分为三大部分
- a、加入阻塞队列
- b、阻塞队列调度
- c、异常处理
按照正常的理解,可能只会有a、b两部分就够了,为什么会有c呢?什么时候会发生异常?
3.1、加入阻塞队列
当一个线程尝试加锁失败后,便会放入阻塞队列的队尾;这节我们来讨论一下这个动作的细节
在加入阻塞队列之前,首先会查看头节点是否为null,如果是null的话,需要新建 ws 为0的头结点,(为什么在AQS初始化的时候,不直接新建头结点呢?其实由此可见作者细节处理的严谨,因为如果当我们的独占锁并发度不大,在尝试加锁的过程中,总能获取到锁,这时便不会向阻塞队列添加内容,假如初始化便新建头结点,会导致其白白占用内存空间而得不到有效利用)然后将当前节点添加至阻塞队列的尾部,当然头结点初始化、向尾部节点追加新节点都是通过CAS操作的。而阻塞队列呢,正如我们前文提及的是一个FIFO的队列,且带有 next 、 prev 两个引用来标记前、后节点;我们在阻塞队列中加入第一个节点后,阻塞队列的样子:
3.2、阻塞队列调度
这一节属于独占锁很核心的部分,里面涉及 ws 更改、线程挂起与唤醒、更换头结点等
我们接着3.1继续,在节点进入调度后,首先检查下当前节点的前节点是否为 head 节点,如果是的话,那么有一次尝试加锁的机会,加锁成功或失败将导致2个分支
我们首先看加锁加锁成功的情况,一旦加锁成功,当前节点便从阻塞队列中“消失”(其实是当前节点变为了头结点,而原头结点内存不可达,等待垃圾回收),当所有节点都加锁成功,阻塞队列便为空了,但并不代表阻塞队列的长度为0,因为有头结点的存在,所以空阻塞队列的长度是1
而加锁失败或者当前节点的前节点不是 head 节点呢?是马上将线程挂起吗?答案是不确定的,要看前节点的 ws 状态而定。而此步骤还有个隐藏任务:将当前节点之前的所有已取消节点从阻塞队列中剔除。
从上图中我们看到,一个节点如果想正常进入挂起状态,那么一定要将前节点的 ws 改为 SIGNAL (-1) 状态,但如果前节点已经变为 CANCELLED (1) 状态后,就要递归向前寻找第一个非 CANCELLED 的节点。
针对“线程挂起并等待其他线程唤醒”,我们提出2个问题
问题1
- 如果是普通节点,直接挂在队尾,且将其线程挂起,这个没啥问题;但如果是头节点被唤醒,尝试加锁却失败了,又被再次挂起,会不会导致头结点永远处于挂起状态?
- 答:不会,因为头结点之所以抢锁失败,一定是因为另外一个A线程抢锁成功。虽然头节点暂时处于挂起状态,但当A线程执行完加锁代码后,还会再次唤醒头结点
问题2
- 假定当前节点判定需要被挂起,在执行挂起操作前,拥有锁的线程执行完毕,并唤醒了当前线程,而当前线程又马上要进行挂起操作,岂不是会导致无法成功将当前节点唤醒,从而永远hang死?
- 答:能考虑到这个问题,说明你已经带着分身去思考问题了,不错。不过此处是不会存在这个问题的,因为线程挂起、唤醒使用的api为 park /unpark ,即便是unpark发生在park之前,在执行park操作时,也会成功唤醒。这个特质区别于 wait/notify
而针对阻塞队列的调度,还有一些没有解释的问题:
- a、为什么阻塞队列内有这么多 CANCELLED 状态的节点?
- b、当前节点在挂起前,前节点为 SIGNAL 状态,但经过一段时间运行,前节点变为了 CANCELLED 状态,岂不是导致当前节点永远无法被唤醒?
要回答这两个问题,就要引出异常处理了
3.3、异常处理
我们首先讨论如果AQS不做异常处理可以吗? 不可以,例如第一个节点被唤醒后,在加锁阶段发生了异常,如果没有异常处理,这个异常节点将永远处于阻塞队列,成为“僵尸节点”,且后续节点也不会被唤起
官方标明可能会出现异常的部分,诸如“等待超时”、“打断”等,那如果我们调用 acquire() 方法,而非 acquireInterruptibly() 、 tryAcquireNanos(time) 是不是就不会出现异常?不是的,因为还有AQS下放给我们自己实现的 tryRelease() 等方法。我们实现一个自己的AQS,并模拟 tryRelease() 报错,看AQS能否正常应对
public class FindBugAQS {
public volatile static int FLAG =;
private static ThreadLocal<Integer> FLAG_STORE = new ThreadLocal<>();
private static ThreadLocal<Integer> TIMES = ThreadLocal.withInitial(() ->);
private Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
private Sync() {
setState();
}
public void lock() {
FLAG_STORE.set(++FLAG);
int state = getState();
if (state == && compareAndSetState(state, 0)) {
return;
}
acquire();
}
@Override
protected boolean tryAcquire(int acquires) {
if (FLAG_STORE.get() ==) {
Integer time = TIMES.get();
if (time ==) {
TIMES.set();
} else {
// 模拟发生异常,第二个节点在第二次访问tryAcquire方法时,将会扔出运行期异常
System.out.println("发生异常");
throw new RuntimeException("lkn aqs bug");
}
}
int state = getState();
if (state == && compareAndSetState(state, 0)) {
return true;
}
return false;
}
@Override
protected final boolean tryRelease(int releases) {
setState();
return true;
}
public void unlock() {
release();
}
}
public void lock() {
sync.lock();
}
public void unlock() {
sync.unlock();
}
}
// 测试用例如下:
public class BugTest {
private static volatile int number = 0;
@Test
public void test() throws InterruptedException {
List<Thread> list = Lists.newArrayList();
FindBugAQS aqs = new FindBugAQS();
Thread thread = new Thread(() -> {
aqs.lock();
PubTools.sleep();
number++;
aqs.unlock();
});
thread.start();
list.add(thread);
PubTools.sleep();
for (int i =; i < 4; i++) {
Thread thread = new Thread(() -> {
aqs.lock();
PubTools.sleep();
number++;
aqs.unlock();
});
thread.start();
list.add(thread);
}
for (Thread thread : list) {
thread.join();
}
System.out.println("number is " + number);
}
}
运行结果:
发生异常
Exception in thread "Thread-" java.lang.RuntimeException: lkn aqs bug
at org.xijiu.share.aqs.bug.FindBugAQS$Sync.tryAcquire(FindBugAQS.java:)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:)
at org.xijiu.share.aqs.bug.FindBugAQS$Sync.lock(FindBugAQS.java:)
at org.xijiu.share.aqs.bug.FindBugAQS.lock(FindBugAQS.java:)
at org.xijiu.share.aqs.bug.BugTest.lambda$test$2(BugTest.java:61)
at java.lang.Thread.run(Thread.java:)
number is
我们自定义了AQS实现类 FindBugAQS.java ,模拟第二个节点在第二次访问 tryAcquire 会扔出异常;然后启动5个线程,对 number 进行累加。可见,最后的结果符合预期,AQS处理的很完美。那程序发生异常后,阻塞队列究竟如何应对?
举例说明吧,假定现在除去头结点外,阻塞队列中还有3个节点,当第1个节点被唤醒执行时,发生了异常,那么第1个节点会将 ws 置为 CANCELLED ,且将向后的链条打断(指向自己),但向前链条保持不变,并唤醒下一个节点
由上图可见,当某个节点响应中断/发生异常后,其会主动打断向后链条,但依旧保留向前的链条,这样做的目的是为了后续节点在寻找前节点时,可以找到标记为 CANCELLED 状态的节点,而不是找到 null 。至此便解答了3.2提出的两个问题
a、为什么阻塞队列内有这么多 CANCELLED 状态的节点?
- 当被调度执行的节点发生了异常,状态便会更改为 CANCELLED 状态,但仍存在于阻塞队列中,直到正常执行的节点将其剔除
b、当前节点在挂起前,前节点为 SIGNAL 状态,但经过一段时间运行,前节点变为了 CANCELLED 状态,岂不是导致当前节点永远无法被唤醒?
- 不会,节点发生异常后,会主动唤起后续节点,而后续节点负责将前节点从阻塞队列中删除
四、解锁
本来想针对“解锁逻辑”画一张流程图,但猛然发现解锁部分仅仅10行左右的代码,那就索性把源码贴上,逐一论述下
- AQS解锁源码
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus !=)
unparkSuccessor(h);
return true;
}
return false;
}
- ReentrantLock 解锁源码
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c ==) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
我们发现当 tryRelease() 方法返回 true 时,AQS便会负责唤醒后续节点,因为 ReentrantLock 支持了可重入的特性,所以当前线程的每次加锁都会对 state 累加,而每次 tryRelease() 方法则会对 state 累减,直到 state 变为初始状态0时, tryRelease() 方法才会返回 true ,即唤醒下一个节点
解锁逻辑相对简洁,且不存在并发,本文不再赘述
五、后记
再次强调本文是通过 ReentrantLock 的视角来分析独占锁,且主要分析的是 ReentrantLock.lock()/unlock() 方法,目的是让大家对AQS整体的数据结构有个全面认识,方便后续在实现自己的并发框架时,明白api背后发生的事情,做到游刃有余
而像 ReentrantLock 的 lockInterruptibly() 、 tryLock(TimeUnit) 或者其他独占锁的实现类,读者可自行阅读源码,原理类似,核心代码也是一样的