目录
- 一:简述
- 二:ReentrantLock类图
- 三:流程简图
- 四:源码分析
- lock()源码分析:
- 非公平实现:
- 公平锁实现:
- tryAcquire()方法
- 公平锁实现:
- 非公平锁实现:
- addWaiter()
- acquireQueued()
- shouldParkAfterFailedAcquire()
- parkAndCheckInterrupt()
- unlock()方法源码分析:
- tryRelease()
- unparkSuccessor()
- 五:总结
一:简述
ReentrantLock是java.util.concurrent包中提供的一种锁机制,它是一种可重入,互斥的锁,ReentrantLock还同时支持公平和非公平两种实现。本篇文章基于java8,对并发工具ReentrantLock的实现原理进行分析。
二:ReentrantLock类图
三:流程简图
四:源码分析
我们以lock()方法和unlock()方法为入口对ReentrantLock的源码进行分析。
lock()源码分析:
ReentrantLock在构造构造方法创建对象的时候会根据构造函数传递的fair参数 创建不同的Sync对象(默认是非公平锁的实现),reentrantLock.lock()会调用sync.lock()。在Sync类中的lock()方法是一个抽象方法,具体实现分别在FairSync(公平)和NonfairSync(非公平)中。
public ReentrantLock() {
//默认是非公平锁
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
非公平实现:
final void lock() {
// state表示锁重入次数 0代表无锁状态
//尝试抢占锁一次 利用cas替换state标志 替换成功代表抢占锁成功
if (compareAndSetState(0, 1))
//抢占成功将抢占到锁的线程设置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
公平锁实现:
final void lock() {
acquire(1);
}
可以看出非公平锁在lock()方法开始会尝试cas抢占一次锁,也就是在这里会插队一次。然后都会调用acquire()方法。acquire()方法中调用了三个方法,tryAcquire(),addWaiter(),acquireQueued()三个方法,而这三个方法正是ReentrantLock加锁的核心方法,接下来我们会对这三个方法进行重点的分析。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//这里acquireQueued()是一步步将线程是否中断的标志传回来的 如果为true 那么代表线程被中断过 需要设置中断标志 交给客户端处理
// 因为LockSupport.park()之后不会对中断进行响应 所以需要一步一步将中断标记传回来
selfInterrupt();
}
tryAcquire()方法
流程图:
tryAcquire()方法是抢占锁的方法,返回ture表示线程抢占到锁了,如果抢占到锁就什么都不做,直接执行同步代码,如果没有抢占到锁就需要将线程的信息保存起来,并且阻塞线程,也就是调用addWaiter()方法和acquireQueued()方法。
tryAcquire()也有公平和非公平锁两种实现。
公平锁实现:
protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取state的值
int c = getState();
if (c == 0) {
//如果state为0 代表现在是无锁状态 那么可以抢占锁
//公平锁这里多了一个判断 只有在AQS链表中不存在元素 才去尝试抢占锁 否则就去链表中排队
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//cas替换成功 那么就代表抢占锁成功了 将获取锁的线程设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
//判断获取锁的线程是否是当前线程 如果是的话增加重入次数 (即增加state的值)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
非公平锁实现:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//如果state为0 代表现在是无锁状态 非公平锁这里就直接尝试抢占锁
// 公平锁这里多了一个判断 先去看AQS链表中有没有已经在等待的线程 有的话就不会尝试去抢占锁了,非公平锁这里没有这个判断,也就是这里允许插队(不公平)
if (compareAndSetState(0, acquires)) {
//cas替换成功 那么就代表抢占锁成功了 将获取锁的线程设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
//判断获取锁的线程是否是当前线程 如果是的话增加重入次数 (即增加state的值)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//重新设置state的值
setState(nextc);
return true;
}
return false;
}
接下来分析addWaiter()方法
addWaiter()
addWaiter()方法的作用就是将没有获取到锁的线程封装成一个Node对象,然后存储在AbstractQueuedSynchronizer这个队列同步器中(为了偷懒简称AQS)。我们先看一下Node对象的结构。
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
通过Node对象的结构我们可以看出这是一个双向链表的结构,保存了prev和next引用,thread成员变量用来保存阻塞的线程引用,并且node是有状态的,分别是CANCELLED,SIGNAL,CONDITION,PROPAGATE,其中ReentrantLock涉及到的状态就是SIGNAL(等待被唤醒),CANCELLED(取消)(由于相关性不强,其他的状态在后续文章用到的时候在讲吧)。而AQS又保存了链表的头结点head和尾结点tail,所以实际上AQS存储阻塞线程的数据结构是一 个Node双向链表。addWaiter()方法的作用就是将阻塞线程封装成Node并且将其保存在AQS的链表结构中。
流程图:
源码分析:
private Node addWaiter(Node mode) {
//将没有获得锁的线程封装成一个node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果AQS尾结点不为null 代表AQS链表已经初始化 尝试将构建好的节点添加到链表的尾部
if (pred != null) {
node.prev = pred;
//cas替换AQS的尾结点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//没有初始化调用enq()方法
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail;
//尾结点为空 说明AQS链表还没有初始化 那么进行初始化
if (t == null) { // Must initialize
//cas 将AQS的head节点 初始化 成功初始化head之后,将尾结点也初始化
//注意 这里我们可以看到head节点是不存储线程信息的 也就是说head节点相当于是一个虚拟节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//尾结点不为空 那么直接添加到链表的尾部即可
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
接下来分析acquireQueued()
acquireQueued()
acquireQueued()方法的作用就是利用Locksupport.park()方法将AQS链表中存储的线程阻塞起来。
流程图:
源码分析:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//进入自旋
for (;;) {
//获取当前节点的前一个节点
final Node p = node.predecessor();
// 如果前一个节点是head 而且再次尝试获取锁成功,将节点从AQS队列中去除 并替换head 同时返回中断标志
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
//注意 只有抢占到锁才会跳出这个for循环
return interrupted;
}
//去除状态为CANCELLED的节点 并且阻塞线程 线程被阻塞在这
//注意 线程被唤醒之后继续执行这个for循环 尝试抢占锁 没有抢占到的话又会阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//如果失败 将node状态修改为CANCELLED
cancelAcquire(node);
}
}
在acquireQueued()方法中有两个方法比较重要shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法。
shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果节点是SIGNAL状态 不需要处理 直接返回
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
//如果节点状态>0 说明节点是取消状态 这种状态的节点需要被清除 用do while循环顺便清除一下前面的连续的、状态为取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//正常的情况下 利用cas将前一个节点的状态替换为 SIGNAL状态 也就是-1
//注意 这样队列中节点的状态 除了最后一个都是-1 包括head节点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
//挂起当前线程 并且返回中断标志 LockSupport.park(thread) 会调用UNSAFE.park()方法将线程阻塞起来(是一个native方法)
LockSupport.park(this);
return Thread.interrupted();
}
到这里lock()方法也就分析完了 接下来我们分析unlock()方法
unlock()方法源码分析:
流程图:
reentrantLock的unlock()方法会调用sync的release()方法。
public void unlock() {
//每次调用unlock 将state减1
sync.release(1);
}
release()方法有两个方法比较重要,tryRelease()方法和unparkSuccessor(),tryRelease()方法计算state的值,看线程是否已经彻底释放锁(这是因为ReentrantLock是支持重入的),如果已经彻底释放锁那么需要调用unparkSuccessor()方法来唤醒线程,否则不需要唤醒线程。
public final boolean release(int arg) {
//只有tryRelease返回true 说明已经释放锁 需要将阻塞的线程唤醒 否则不需要唤醒别的线程
if (tryRelease(arg)) {
Node h = head;
//如果头结点不为空 而且状态不为0 代表同步队列已经初始化 且存在需要唤醒的node
//注意 同步队列的头结点相当于是一个虚拟节点 这一点我们可以在构建节点的代码中很清楚的知道
//并且在shouldParkAfterFailedAcquire方法中 会把head节点的状态修改为-1
//如果head的状态为0 那么代表队列中没有需要被唤醒的元素 直接返回true
if (h != null && h.waitStatus != 0)
//唤醒头结点的下一个节点
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()
protected final boolean tryRelease(int releases) {
//减少重入次数
int c = getState() - releases;
//如果获取锁的线程不是当前线程 抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果state为0 说明已经彻底释放了锁 返回true
if (c == 0) {
free = true;
//将获取锁的线程设置为null
setExclusiveOwnerThread(null);
}
//重置state的值
setState(c);
//如果释放了锁 就返回true 否则返回false
return free;
}
unparkSuccessor()
private void unparkSuccessor(Node node) {
//获取头结点的状态 将头结点状态设置为0 代表现在正在有线程被唤醒 如果head状态为0 就不会进入这个方法了
int ws = node.waitStatus;
if (ws < 0)
//将头结点状态设置为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//唤醒头结点的下一个状态不是cancelled的节点 (因为头结点是不存储阻塞线程的)
Node s = node.next;
//当前节点是null 或者是cancelled状态
if (s == null || s.waitStatus > 0) {
s = null;
//从aqs链表的尾部开始遍历 找到离头结点最近的 不为空的 状态不是cancelled的节点 赋值给s 这里为什么从尾结点开始遍历而是头结点 应该是因为添加结点的时候是先初始化结点的prev的, 从尾结点开始遍历 不会出现prve没有赋值的情况
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//调用LockSupport.unpark()唤醒指定的线程
LockSupport.unpark(s.thread);
}
五:总结
最后总结一下,有以下几点需要注意:
1.ReentrantLock有公平锁和非公平锁两种实现,其实这两种实现的差别只有两点,第一是在lock()方法开始的时候非公平锁会尝试cas抢占锁插一次队, 第二是在tryAcquire()方法发现state为0的时候,非公平锁会抢占一次锁,而公平锁会判断AQS链表中是否存在等待的线程,没有等待的线程才会去抢占锁。
2.AQS存储阻塞线程的数据结构是一个双向链表的结构,而且它是遵循先进先出的,因为它是从头结点的下一个节点开始唤醒,而添加新的节点的时候是添加到链表的尾部,所以AQS同时也是一个队列的数据结构。
3.线程被唤醒之后会继续执行acquireQueued()方法,因为它是阻塞在acquireQueued()方法的for循环中的,唤醒后尝试去获取锁,获取成功就会将节点从AQS中删除并跳出for循环,否则又会继续阻塞。(获取锁失败的原因就是因为有人插队啊。。也就是非公平锁导致的)。