我们看下Reentrantock
的源码。
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
原来lock,unlock等核心方法都是通过sync
来实现的。而sync
其实是它的一个内部类。
abstract static class Sync extends AbstractQueuedSynchronizer {...}
这个内部类继承了AbstractQueuedSynchronizer
,也就是我们今天要重点介绍的队列同步器AQS
。它其实就是我们锁机制的基础,它封装了包括锁的获取、释放以及等待队列。
线程的调度其关键就在于等待队列,其数据结构就是双向链表,可以参考下图。
我们先来了解以下每个Node
有什么内容,点开AQS
的源码,它定义了内部类Node
。
static final class Node {
// 每个节点分为独占模式和共享模式、分别适用于独占锁和共享锁
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 定义等待状态
// CANCELLED:唯一一个数值大与0的状态,说明此节点已经被取消
static final int CANCELLED = 1;
// 此节点后面的节点被挂起,进入等待状态
static final int SIGNAL = -1;
// 在条件队列中的状态
static final int CONDITION = -2;
// 传播,一般用于共享锁
static final int PROPAGATE = -3;
volatile int waitStatus; //等待状态值
volatile Node prev; //双向链表基本操作
volatile Node next;
volatile Thread thread; //每一个线程可以封装到一个节点进入等待队列
Node nextWaiter; //在等待队列中表示模式,在条件队列中表示下一个节点
// 判断是否为共享节点
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // 初始化建立节点或共享标记(Used to establish initial head or SHARED marker)
}
Node(Thread thread, Node mode) { // 等待队列使用
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // 条件对立使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
再跳出Node
看AQS
,它定义了三个属性,head
,tail
默认为null,state
默认为0,并且AQS
的构造器并没有给它们赋值。
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state; // 当前锁的状态
实际上,双向链表初始化是在实际使用时完成的,后面将演示使用。看看其中一个置状态的操作。
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
原来是通过unsafe
的compareAndSwapInt()
实现的。这个是CAS
算法。不妨看看unsafe
,它也是内部的属性。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
// 找到各个属性锁在的内存地址(相对于unsafe类的偏移地址)
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* CAS 操作头节点
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS 操作尾节点
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* CAS 操作WaitStatus属性
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
/**
* CAS 操作next属性
*/
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
其实,Unsafe
里面调用的都是native
方法,读者可以自己点进去看看。它会直接找到属性的内存地址,操作内存中的数据,效率比较高。在AQS
中静态块计算了各个属性的相对于类的偏移地址,并且在调用Unsafe
中的方法时会将偏移地址传过去哦。
并且我们回过头看看,这里unsafe操作的属性在被定义时都是定义为volatile
修饰,这是因为他们在被修改时都是使用的CAS
算法,我们要使用vilotile
修饰保证其可见性。
private volatile int state; // 当前锁的状态
现在我们已经大致了解了AQS
的底层机制,接着来看看它到底时如何被使用的。先看看它可以被重写的五个方法吧。
// 独占式获取同步状态,查看同步状态是否与参数一致,如果没有问题则通过CAS设置同步状态并返回true
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 独占式释放同步状态
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享式获取同步状态,返回值大于0表示成功,否则失败
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享式释放同步状态
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 是否在独占模式下被当前线程占有(是否当前线程持有锁)
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
现在我们以ReentantLock
的公平锁为例,看看它怎么被重写的。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
...
}
先看看lock方法。调AQS
的acquire()
方法。里面会调用AQS
中定义的tryAquire
方法。而在ReentrantLock
中tryAuire
方法公平锁与非公平锁的实现不同,其具体内容我们暂时略过。这里使用短路&&
运算,如果拿到锁了,就不会走后面的逻辑。否则会调用acquireQueued
,其内部调用了addWaiter
。这就是说如果其它线程持有锁,就会把当前节点加入等待队列中。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //节点为独占模式,EXCLUSIVE
selfInterrupt();
}
跟到addWaiter
中看看。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 先尝试用CAS直接入队,如果CAS入队成功,则return
Node pred = tail;
if (pred != null) { //初始状态tail尾节点未赋值会指null,如果不为空说明有其它节点插入了
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// CAS失败(其它线程也在获取锁或者tail节点为空无法cas)
enq(node);
return node;
}
上面的注释写的很清楚,我们接着看看enq
怎么实现的,它其实是AQS
的一个自旋机制。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 说明头尾节点没有初始化
if (compareAndSetHead(new Node())) //新建空节点置为头节点
tail = head; //头尾节点指向同一个节点
} else {
node.prev = t; //队列插入节点操作,把当前节点的prev指向尾节点
if (compareAndSetTail(t, node)) { //设置队列的尾节点为刚插入的当前节点
t.next = node;
return t;
}
}
}
}
addWaiter
终于看完了,再退回去看下,它的结果会作为参数传给acquireQueued
,我们接着来看下acquireQueued
。它在得到返回的节点时也会进入自旋状态(入等待队列成功,准备排队获取锁了)。
其过程可以结合下图理解。
具体代码如下。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 成功与否标记,初始置为true
try {
boolean interrupted = false; //中断标记
for (;;) {
final Node p = node.predecessor(); //获取当前已经插入节点的前驱节点
if (p == head && tryAcquire(arg)) { //如果前驱节点是头节点,说明当前节点位于队首(上图节点1),会调用tryAcquire抢锁
setHead(node); //抢锁成功,节点出队
p.next = null; // 建议 GC
failed = false;
return interrupted; //正常返回,未在等待队列中被中断
}
// 当前节点不是队首节点,将当前节点的前驱节点的等待状态设置为signal(siganl含义:siganl状态的节点下一个节点处于等锁状态)。如果设置失败进行下一轮循环,否则往下执行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 通过Unsafe类操作底层挂起线程(直接进入阻塞状态,也就是等待锁的状态)
return Thread.interrupted();
}
再来看看shouldParkAfterFailedAcquire
的具体逻辑。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 如果前驱节点已经是signal,则直接返回true
return true;
if (ws > 0) { // ws>0表示前驱节点已经被取消,不能是被取消的节点,向前遍历直到找到第一个未被取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node; //把被取消的节点全部抛弃
} else {
//前驱节点不是signal, 使用CAS设置为signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; // 返回false,直接进入下一轮,判断CAS是否成功将前驱节点状态设置为signal
}
在上面的代码分析过程中,我们频繁看到park
,unpark
方法,他们的作用是将线程挂起,和解除线程的挂起状态。看下列示例代码。
public class Demo22 {
public static void main(String[] args) {
Thread t = Thread.currentThread();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("unpark thread t");
LockSupport.unpark(t);
// thread.interrupt();
} catch (InterruptedException e) {
}
}).start();
System.out.println("Thread t to be park...");
LockSupport.park();
System.out.println("Thread t unpark successfully");
}
}
其运行结果是。
Thread t to be park... unpark thread t Thread t unpark successfully
到此为止,ReentrantLock
公平锁的Lock
方法已经讲解完毕了。继续深入。我们接着来看它的tryAcquire
方法。也就是看看它究竟是如何去抢锁的。
// 可重入独占锁的公平实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获取当前AQS的状态,独占模式下如果为0说明未占用,如果大于0说明已经被占用
if (c == 0) {
if (!hasQueuedPredecessors() && //判断是否等待队列不为空且当前线程没有获得锁,其实就是当前线程是否需要排队
compareAndSetState(0, acquires)) { // CAS设置状态,如果成功则说明成功的获取到了这把锁
setExclusiveOwnerThread(current); // 将独占锁的线程拥有者设置为当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果AQS状态不为0,说明锁被占用,判断占用者是否是当前线程
int nextc = c + acquires; // 每加锁一次则状态值加一
if (nextc < 0) //加到int溢出了
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; // 任何其它情况返回false,加锁失败
}
}
加锁过程算是讲完了,接下来看看它的解锁过程。
看ReentrantLock
的unlock
方法。原来调用的是release
方法,状态传参1,因为释放锁的次数为1.
public void unlock() {
sync.release(1);
}
看看AQS
的release
方法。
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试解锁
Node h = head;
if (h != null && h.waitStatus != 0) //头节点不为空且waitStatus状态不为0(初始状态为0,当被设置成为signal后为-1)
unparkSuccessor(h); // 唤醒下一个后继节点
return true;
}
return false;
}
接下来看看unparkSuccessor
.看看它到底是怎么唤醒下一个节点的。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) // 如果等待状态<0,说明是signal状态,将其设置为0,即恢复状态
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) { //如果没有下一个节点或者状态>0(已经取消),遍历节点找其它符合unpark要求的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) //从队尾往队前找
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //要是没有找到,就算了,找到了就unpark
LockSupport.unpark(s.thread);
}
再看看release中的tryRelease
。
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //当前状态值减去要释放锁的次数(之前传的是1)
if (Thread.currentThread() != getExclusiveOwnerThread()) // 独占锁,如果不是当前线程持有锁,抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //解锁后状态值为0,则完全释放这把锁
free = true;
setExclusiveOwnerThread(null);
}
setState(c); //状态值
return free; // 是否完全释放
}
下面通过一张图对锁的加锁、释放机制做一个总结。
非公平锁的情况大致进行介绍如下。压根没啥等待队列,上来直接哐哐CAS.
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}