我们看下Reentrantock
的源码。
public void lock() { | |
sync.lock(); | |
} | |
public void unlock() { | |
sync.release(1); | |
} |
原来lock,unlock等核心方法都是通过sync
来实现的。而sync
其实是它的一个内部类。
abstract static class Sync extends AbstractQueuedSynchronizer {...}
这个内部类继承了AbstractQueuedSynchronizer
,也就是我们今天要重点介绍的队列同步器AQS
。它其实就是我们锁机制的基础,它封装了包括锁的获取、释放以及等待队列。
线程的调度其关键就在于等待队列,其数据结构就是双向链表,可以参考下图。
我们先来了解以下每个Node
有什么内容,点开AQS
的源码,它定义了内部类Node
。
static final class Node { | |
// 每个节点分为独占模式和共享模式、分别适用于独占锁和共享锁 | |
static final Node SHARED = new Node(); | |
static final Node EXCLUSIVE = null; | |
// 定义等待状态 | |
// CANCELLED:唯一一个数值大与0的状态,说明此节点已经被取消 | |
static final int CANCELLED = 1; | |
// 此节点后面的节点被挂起,进入等待状态 | |
static final int SIGNAL = -1; | |
// 在条件队列中的状态 | |
static final int CONDITION = -2; | |
// 传播,一般用于共享锁 | |
static final int PROPAGATE = -3; | |
volatile int waitStatus; //等待状态值 | |
volatile Node prev; //双向链表基本操作 | |
volatile Node next; | |
volatile Thread thread; //每一个线程可以封装到一个节点进入等待队列 | |
Node nextWaiter; //在等待队列中表示模式,在条件队列中表示下一个节点 | |
// 判断是否为共享节点 | |
final boolean isShared() { | |
return nextWaiter == SHARED; | |
} | |
// 返回前驱节点 | |
final Node predecessor() throws NullPointerException { | |
Node p = prev; | |
if (p == null) | |
throw new NullPointerException(); | |
else | |
return p; | |
} | |
Node() { // 初始化建立节点或共享标记(Used to establish initial head or SHARED marker) | |
} | |
Node(Thread thread, Node mode) { // 等待队列使用 | |
this.nextWaiter = mode; | |
this.thread = thread; | |
} | |
Node(Thread thread, int waitStatus) { // 条件对立使用 | |
this.waitStatus = waitStatus; | |
this.thread = thread; | |
} | |
} |
再跳出Node
看AQS
,它定义了三个属性,head
,tail
默认为null,state
默认为0,并且AQS
的构造器并没有给它们赋值。
private transient volatile Node head; | |
private transient volatile Node tail; | |
private volatile int state; // 当前锁的状态 |
实际上,双向链表初始化是在实际使用时完成的,后面将演示使用。看看其中一个置状态的操作。
protected final boolean compareAndSetState(int expect, int update) { | |
return unsafe.compareAndSwapInt(this, stateOffset, expect, update); | |
} |
原来是通过unsafe
的compareAndSwapInt()
实现的。这个是CAS
算法。不妨看看unsafe
,它也是内部的属性。
private static final Unsafe unsafe = Unsafe.getUnsafe(); | |
private static final long stateOffset; | |
private static final long headOffset; | |
private static final long tailOffset; | |
private static final long waitStatusOffset; | |
private static final long nextOffset; | |
// 找到各个属性锁在的内存地址(相对于unsafe类的偏移地址) | |
static { | |
try { | |
stateOffset = unsafe.objectFieldOffset | |
(AbstractQueuedSynchronizer.class.getDeclaredField("state")); | |
headOffset = unsafe.objectFieldOffset | |
(AbstractQueuedSynchronizer.class.getDeclaredField("head")); | |
tailOffset = unsafe.objectFieldOffset | |
(AbstractQueuedSynchronizer.class.getDeclaredField("tail")); | |
waitStatusOffset = unsafe.objectFieldOffset | |
(Node.class.getDeclaredField("waitStatus")); | |
nextOffset = unsafe.objectFieldOffset | |
(Node.class.getDeclaredField("next")); | |
} catch (Exception ex) { throw new Error(ex); } | |
} | |
/** | |
* CAS 操作头节点 | |
*/ | |
private final boolean compareAndSetHead(Node update) { | |
return unsafe.compareAndSwapObject(this, headOffset, null, update); | |
} | |
/** | |
* CAS 操作尾节点 | |
*/ | |
private final boolean compareAndSetTail(Node expect, Node update) { | |
return unsafe.compareAndSwapObject(this, tailOffset, expect, update); | |
} | |
/** | |
* CAS 操作WaitStatus属性 | |
*/ | |
private static final boolean compareAndSetWaitStatus(Node node, | |
int expect, | |
int update) { | |
return unsafe.compareAndSwapInt(node, waitStatusOffset, | |
expect, update); | |
} | |
/** | |
* CAS 操作next属性 | |
*/ | |
private static final boolean compareAndSetNext(Node node, | |
Node expect, | |
Node update) { | |
return unsafe.compareAndSwapObject(node, nextOffset, expect, update); | |
} |
其实,Unsafe
里面调用的都是native
方法,读者可以自己点进去看看。它会直接找到属性的内存地址,操作内存中的数据,效率比较高。在AQS
中静态块计算了各个属性的相对于类的偏移地址,并且在调用Unsafe
中的方法时会将偏移地址传过去哦。
并且我们回过头看看,这里unsafe操作的属性在被定义时都是定义为volatile
修饰,这是因为他们在被修改时都是使用的CAS
算法,我们要使用vilotile
修饰保证其可见性。
private volatile int state; // 当前锁的状态
现在我们已经大致了解了AQS
的底层机制,接着来看看它到底时如何被使用的。先看看它可以被重写的五个方法吧。
// 独占式获取同步状态,查看同步状态是否与参数一致,如果没有问题则通过CAS设置同步状态并返回true | |
protected boolean tryAcquire(int arg) { | |
throw new UnsupportedOperationException(); | |
} | |
// 独占式释放同步状态 | |
protected boolean tryRelease(int arg) { | |
throw new UnsupportedOperationException(); | |
} | |
// 共享式获取同步状态,返回值大于0表示成功,否则失败 | |
protected int tryAcquireShared(int arg) { | |
throw new UnsupportedOperationException(); | |
} | |
// 共享式释放同步状态 | |
protected boolean tryReleaseShared(int arg) { | |
throw new UnsupportedOperationException(); | |
} | |
// 是否在独占模式下被当前线程占有(是否当前线程持有锁) | |
protected boolean isHeldExclusively() { | |
throw new UnsupportedOperationException(); | |
} |
现在我们以ReentantLock
的公平锁为例,看看它怎么被重写的。
static final class FairSync extends Sync { | |
private static final long serialVersionUID = -3000897897090466540L; | |
final void lock() { | |
acquire(1); | |
} | |
... | |
} |
先看看lock方法。调AQS
的acquire()
方法。里面会调用AQS
中定义的tryAquire
方法。而在ReentrantLock
中tryAuire
方法公平锁与非公平锁的实现不同,其具体内容我们暂时略过。这里使用短路&&
运算,如果拿到锁了,就不会走后面的逻辑。否则会调用acquireQueued
,其内部调用了addWaiter
。这就是说如果其它线程持有锁,就会把当前节点加入等待队列中。
public final void acquire(int arg) { | |
if (!tryAcquire(arg) && | |
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //节点为独占模式,EXCLUSIVE | |
selfInterrupt(); | |
} |
跟到addWaiter
中看看。
private Node addWaiter(Node mode) { | |
Node node = new Node(Thread.currentThread(), mode); | |
// 先尝试用CAS直接入队,如果CAS入队成功,则return | |
Node pred = tail; | |
if (pred != null) { //初始状态tail尾节点未赋值会指null,如果不为空说明有其它节点插入了 | |
node.prev = pred; | |
if (compareAndSetTail(pred, node)) { | |
pred.next = node; | |
return node; | |
} | |
} | |
// CAS失败(其它线程也在获取锁或者tail节点为空无法cas) | |
enq(node); | |
return node; | |
} |
上面的注释写的很清楚,我们接着看看enq
怎么实现的,它其实是AQS
的一个自旋机制。
private Node enq(final Node node) { | |
for (;;) { | |
Node t = tail; | |
if (t == null) { // 说明头尾节点没有初始化 | |
if (compareAndSetHead(new Node())) //新建空节点置为头节点 | |
tail = head; //头尾节点指向同一个节点 | |
} else { | |
node.prev = t; //队列插入节点操作,把当前节点的prev指向尾节点 | |
if (compareAndSetTail(t, node)) { //设置队列的尾节点为刚插入的当前节点 | |
t.next = node; | |
return t; | |
} | |
} | |
} | |
} |
addWaiter
终于看完了,再退回去看下,它的结果会作为参数传给acquireQueued
,我们接着来看下acquireQueued
。它在得到返回的节点时也会进入自旋状态(入等待队列成功,准备排队获取锁了)。
其过程可以结合下图理解。
具体代码如下。
final boolean acquireQueued(final Node node, int arg) { | |
boolean failed = true; // 成功与否标记,初始置为true | |
try { | |
boolean interrupted = false; //中断标记 | |
for (;;) { | |
final Node p = node.predecessor(); //获取当前已经插入节点的前驱节点 | |
if (p == head && tryAcquire(arg)) { //如果前驱节点是头节点,说明当前节点位于队首(上图节点1),会调用tryAcquire抢锁 | |
setHead(node); //抢锁成功,节点出队 | |
p.next = null; // 建议 GC | |
failed = false; | |
return interrupted; //正常返回,未在等待队列中被中断 | |
} | |
// 当前节点不是队首节点,将当前节点的前驱节点的等待状态设置为signal(siganl含义:siganl状态的节点下一个节点处于等锁状态)。如果设置失败进行下一轮循环,否则往下执行 | |
if (shouldParkAfterFailedAcquire(p, node) && | |
parkAndCheckInterrupt()) | |
interrupted = true; | |
} | |
} finally { | |
if (failed) | |
cancelAcquire(node); | |
} | |
} | |
private final boolean parkAndCheckInterrupt() { | |
LockSupport.park(this); // 通过Unsafe类操作底层挂起线程(直接进入阻塞状态,也就是等待锁的状态) | |
return Thread.interrupted(); | |
} |
再来看看shouldParkAfterFailedAcquire
的具体逻辑。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { | |
int ws = pred.waitStatus; | |
if (ws == Node.SIGNAL) // 如果前驱节点已经是signal,则直接返回true | |
return true; | |
if (ws > 0) { // ws>0表示前驱节点已经被取消,不能是被取消的节点,向前遍历直到找到第一个未被取消的节点 | |
do { | |
node.prev = pred = pred.prev; | |
} while (pred.waitStatus > 0); | |
pred.next = node; //把被取消的节点全部抛弃 | |
} else { | |
//前驱节点不是signal, 使用CAS设置为signal | |
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); | |
} | |
return false; // 返回false,直接进入下一轮,判断CAS是否成功将前驱节点状态设置为signal | |
} |
在上面的代码分析过程中,我们频繁看到park
,unpark
方法,他们的作用是将线程挂起,和解除线程的挂起状态。看下列示例代码。
public class Demo22 { | |
public static void main(String[] args) { | |
Thread t = Thread.currentThread(); | |
new Thread(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
System.out.println("unpark thread t"); | |
LockSupport.unpark(t); | |
// thread.interrupt(); | |
} catch (InterruptedException e) { | |
} | |
}).start(); | |
System.out.println("Thread t to be park..."); | |
LockSupport.park(); | |
System.out.println("Thread t unpark successfully"); | |
} | |
} |
其运行结果是。
Thread t to be park... | |
unpark thread t | |
Thread t unpark successfully |
到此为止,ReentrantLock
公平锁的Lock
方法已经讲解完毕了。继续深入。我们接着来看它的tryAcquire
方法。也就是看看它究竟是如何去抢锁的。
// 可重入独占锁的公平实现 | |
protected final boolean tryAcquire(int acquires) { | |
final Thread current = Thread.currentThread(); | |
int c = getState(); // 获取当前AQS的状态,独占模式下如果为0说明未占用,如果大于0说明已经被占用 | |
if (c == 0) { | |
if (!hasQueuedPredecessors() && //判断是否等待队列不为空且当前线程没有获得锁,其实就是当前线程是否需要排队 | |
compareAndSetState(0, acquires)) { // CAS设置状态,如果成功则说明成功的获取到了这把锁 | |
setExclusiveOwnerThread(current); // 将独占锁的线程拥有者设置为当前线程 | |
return true; | |
} | |
} | |
else if (current == getExclusiveOwnerThread()) { // 如果AQS状态不为0,说明锁被占用,判断占用者是否是当前线程 | |
int nextc = c + acquires; // 每加锁一次则状态值加一 | |
if (nextc < 0) //加到int溢出了 | |
throw new Error("Maximum lock count exceeded"); | |
setState(nextc); | |
return true; | |
} | |
return false; // 任何其它情况返回false,加锁失败 | |
} | |
} |
加锁过程算是讲完了,接下来看看它的解锁过程。
看ReentrantLock
的unlock
方法。原来调用的是release
方法,状态传参1,因为释放锁的次数为1.
public void unlock() { | |
sync.release(1); | |
} |
看看AQS
的release
方法。
public final boolean release(int arg) { | |
if (tryRelease(arg)) { // 尝试解锁 | |
Node h = head; | |
if (h != null && h.waitStatus != 0) //头节点不为空且waitStatus状态不为0(初始状态为0,当被设置成为signal后为-1) | |
unparkSuccessor(h); // 唤醒下一个后继节点 | |
return true; | |
} | |
return false; | |
} |
接下来看看unparkSuccessor
.看看它到底是怎么唤醒下一个节点的。
private void unparkSuccessor(Node node) { | |
int ws = node.waitStatus; | |
if (ws < 0) // 如果等待状态<0,说明是signal状态,将其设置为0,即恢复状态 | |
compareAndSetWaitStatus(node, ws, 0); | |
// 获取当前节点的后继节点 | |
Node s = node.next; | |
if (s == null || s.waitStatus > 0) { //如果没有下一个节点或者状态>0(已经取消),遍历节点找其它符合unpark要求的节点 | |
s = null; | |
for (Node t = tail; t != null && t != node; t = t.prev) //从队尾往队前找 | |
if (t.waitStatus <= 0) | |
s = t; | |
} | |
if (s != null) //要是没有找到,就算了,找到了就unpark | |
LockSupport.unpark(s.thread); | |
} |
再看看release中的tryRelease
。
protected final boolean tryRelease(int releases) { | |
int c = getState() - releases; //当前状态值减去要释放锁的次数(之前传的是1) | |
if (Thread.currentThread() != getExclusiveOwnerThread()) // 独占锁,如果不是当前线程持有锁,抛出异常 | |
throw new IllegalMonitorStateException(); | |
boolean free = false; | |
if (c == 0) { //解锁后状态值为0,则完全释放这把锁 | |
free = true; | |
setExclusiveOwnerThread(null); | |
} | |
setState(c); //状态值 | |
return free; // 是否完全释放 | |
} |
下面通过一张图对锁的加锁、释放机制做一个总结。
非公平锁的情况大致进行介绍如下。压根没啥等待队列,上来直接哐哐CAS.
final void lock() { | |
if (compareAndSetState(0, 1)) | |
setExclusiveOwnerThread(Thread.currentThread()); | |
else | |
acquire(1); | |
} | |
final boolean nonfairTryAcquire(int acquires) { | |
final Thread current = Thread.currentThread(); | |
int c = getState(); | |
if (c == 0) { | |
if (compareAndSetState(0, acquires)) { | |
setExclusiveOwnerThread(current); | |
return true; | |
} | |
} | |
else if (current == getExclusiveOwnerThread()) { | |
int nextc = c + acquires; | |
if (nextc < 0) // overflow | |
throw new Error("Maximum lock count exceeded"); | |
setState(nextc); | |
return true; | |
} | |
return false; | |
} |