目录
- 一.什么是AQS
- 1.定义
- 2.特性
- 3.属性
- 4.资源共享方式
- 5.两种队列
- 6.队列节点状态
- 7.实现方法
- 二.等待队列
- 1.同步等待队列
- 2.条件等待队列
- 三.condition接口
- 四.ReentrantLock
- 五.源码解析
一.什么是AQS
1.定义
java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。
JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的。
- 一般都是通过一个内部类sync继承AQS
- 将同步器所有调用都同步到Sync对应的方法
2.特性
- 阻塞等待队列
- 共享/独占
- 公平/非公平
- 可重入
- 允许中断
3.属性
内部维护属性volatile int state,表示资源的可用状态
- getState()
- setState()
- compareAndSetState()
4.资源共享方式
- Exclusive-独占,只有一个线程能执行,如ReentrantLock
- Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
5.两种队列
- 同步等待队列: 主要用于维护获取锁失败时入队的线程
- 条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁
6.队列节点状态
- 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
- CANCELLED,值为1,表示当前的线程被取消;
- SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
- CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
- PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
7.实现方法
自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
二.等待队列
1.同步等待队列
AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
AQS 依赖CLH同步队列来完成同步状态的管理:
- 当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
- 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
- 通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)
2.条件等待队列
AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:
- 调用await方法阻塞线程;
- 当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)
三.condition接口
- 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列尾部添加一个节点,所以调用Condition#await方法的时候必须持有锁。
- 调用Condition#signal方法会将Condition队列的首节点移动到阻塞队列尾部,然后唤醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所以调用Condition#signal方法的时候必须持有锁,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。
public static void main(String[] args) { | |
Lock lock = new ReentrantLock(); | |
Condition condition = lock.newCondition(); | |
new Thread(() -> { | |
lock.lock(); | |
try { | |
log.debug(Thread.currentThread().getName() + " 开始处理任务"); | |
//会释放当前持有的锁,然后阻塞当前线程 | |
condition.await(); | |
log.debug(Thread.currentThread().getName() + " 结束处理任务"); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} finally { | |
lock.unlock(); | |
} | |
}).start(); | |
new Thread(() -> { | |
lock.lock(); | |
try { | |
log.debug(Thread.currentThread().getName() + " 开始处理任务"); | |
Thread.sleep(); | |
//唤醒因调用Condition#await方法而阻塞的线程 | |
condition.signal(); | |
log.debug(Thread.currentThread().getName() + " 结束处理任务"); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} finally { | |
lock.unlock(); | |
} | |
}).start(); | |
} |
Thread-0 开始处理任务
Thread-1 开始处理任务
Thread-1 结束处理任务
Thread-0 结束处理任务
四.ReentrantLock
1.ReentrantLock是什么
ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。
2.特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 与 synchronized 一样,都支持可重入
3. ReentrantLock和synchronized的区别
- synchronized是JVM层次的锁实现,ReentrantLock是JDK层次的锁实现;
- synchronized的锁状态是无法在代码中直接判断的,但是ReentrantLock可以通过ReentrantLock#isLocked判断;
- synchronized是非公平锁,ReentrantLock可以是公平也可以是非公平的,默认是非公平的;
- synchronized是不可以被中断的,而ReentrantLock#lockInterruptibly方法是可以被中断的;
- 在发生异常时synchronized会自动释放锁,而ReentrantLock需要开发者在finally块中显示释放锁;
- ReentrantLock获取锁的形式有多种:如立即返回是否成功的tryLock(),以及等待指定时长的获取,更加灵活;
- synchronized在特定的情况下对于已经在等待的线程是后来的线程先获得锁(回顾一下sychronized的唤醒策略),而ReentrantLock对于已经在等待的线程是先来的线程先获得锁;
4. ReentrantLock的使用
伪代码:
ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁 | |
ReentrantLock lock = new ReentrantLock(true); //公平锁 | |
//加锁 | |
lock.lock(); | |
try { | |
//临界区 | |
} finally { | |
// 解锁 | |
lock.unlock(); |
例子:基本使用
private static int sum =; | |
private static Lock lock = new ReentrantLock(); | |
public static void main(String[] args) throws InterruptedException { | |
for (int i =; i < 3; i++) { | |
Thread thread = new Thread(()->{ | |
//加锁 一般写在try前面 | |
lock.lock(); | |
try { | |
// 临界区代码 业务逻辑 | |
for (int j =; j < 10000; j++) { | |
sum++; | |
} | |
} finally { | |
// 解锁 | |
lock.unlock(); | |
} | |
}); | |
thread.start(); | |
} | |
Thread.sleep(); | |
System.out.println(sum); | |
} |
30000
可重入
public static ReentrantLock lock = new ReentrantLock(); | |
public static void main(String[] args) { | |
method(); | |
} | |
public static void method() { | |
lock.lock(); | |
try { | |
log.debug("execute method"); | |
method(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
public static void method() { | |
lock.lock(); | |
try { | |
log.debug("execute method"); | |
method(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
public static void method() { | |
lock.lock(); | |
try { | |
log.debug("execute method"); | |
} finally { | |
lock.unlock(); | |
} | |
} |
execute method1
execute method2
execute method3
可中断
public static void main(String[] args) throws InterruptedException { | |
ReentrantLock lock = new ReentrantLock(); | |
Thread t = new Thread(() -> { | |
log.debug("t启动..."); | |
try { | |
lock.lockInterruptibly(); | |
try { | |
log.debug("t获得了锁"); | |
} finally { | |
lock.unlock(); | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
log.debug("t等锁的过程中被中断"); | |
} | |
}, "t"); | |
lock.lock(); | |
try { | |
log.debug("main线程获得了锁"); | |
t.start(); | |
//先让线程t执行 | |
Thread.sleep(); | |
t.interrupt(); | |
log.debug("线程t执行中断"); | |
} finally { | |
lock.unlock(); | |
} | |
} |
main线程获得了锁
t1启动…
线程t1执行中断
t1等锁的过程中被中断
锁超时
public static void main(String[] args) throws InterruptedException { | |
ReentrantLock lock = new ReentrantLock(); | |
Thread t = new Thread(() -> { | |
log.debug("t启动..."); | |
try { | |
//if (!lock.tryLock()) { | |
// log.debug("t获取锁失败,立即返回false"); | |
// return; | |
//} | |
if (!lock.tryLock(, TimeUnit.SECONDS)) { | |
log.debug("等待s 后获取锁失败,返回"); | |
return; | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
return; | |
} | |
try { | |
log.debug("t获得了锁"); | |
} finally { | |
lock.unlock(); | |
} | |
}, "t"); | |
lock.lock(); | |
try { | |
log.debug("main线程获得了锁"); | |
t.start(); | |
//先让线程t执行 | |
Thread.sleep(); | |
} finally { | |
lock.unlock(); | |
} | |
} |
main线程获得了锁
t1启动…
等待 1s 后获取锁失败,返回
公平锁和非公平锁
public static void main(String[] args) throws InterruptedException { | |
// ReentrantLock lock = new ReentrantLock(true); //公平锁 | |
ReentrantLock lock = new ReentrantLock(); //非公平锁 | |
for (int i =; i < 500; i++) { | |
new Thread(() -> { | |
lock.lock(); | |
try { | |
try { | |
Thread.sleep(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
log.debug(Thread.currentThread().getName() + " running..."); | |
} finally { | |
lock.unlock(); | |
} | |
}, "t" + i).start(); | |
} | |
//s 之后去争抢锁 | |
Thread.sleep(); | |
for (int i =; i < 500; i++) { | |
new Thread(() -> { | |
lock.lock(); | |
try { | |
log.debug(Thread.currentThread().getName() + " running..."); | |
} finally { | |
lock.unlock(); | |
} | |
}, "强行插入" + i).start(); | |
} | |
} |
条件变量
private static ReentrantLock lock = new ReentrantLock(); | |
private static Condition cigCon = lock.newCondition(); | |
private static Condition takeCon = lock.newCondition(); | |
private static boolean hashcig = false; | |
private static boolean hastakeout = false; | |
//送烟 | |
public void cigratee(){ | |
lock.lock(); | |
try { | |
while(!hashcig){ | |
try { | |
log.debug("没有烟,歇一会"); | |
cigCon.await(); | |
}catch (Exception e){ | |
e.printStackTrace(); | |
} | |
} | |
log.debug("有烟了,干活"); | |
}finally { | |
lock.unlock(); | |
} | |
} | |
//送外卖 | |
public void takeout(){ | |
lock.lock(); | |
try { | |
while(!hastakeout){ | |
try { | |
log.debug("没有饭,歇一会"); | |
takeCon.await(); | |
}catch (Exception e){ | |
e.printStackTrace(); | |
} | |
} | |
log.debug("有饭了,干活"); | |
}finally { | |
lock.unlock(); | |
} | |
} | |
public static void main(String[] args) { | |
ReentrantLockDemo test = new ReentrantLockDemo6(); | |
new Thread(() ->{ | |
test.cigratee(); | |
}).start(); | |
new Thread(() -> { | |
test.takeout(); | |
}).start(); | |
new Thread(() ->{ | |
lock.lock(); | |
try { | |
hashcig = true; | |
log.debug("唤醒送烟的等待线程"); | |
cigCon.signal(); | |
}finally { | |
lock.unlock(); | |
} | |
},"t").start(); | |
new Thread(() ->{ | |
lock.lock(); | |
try { | |
hastakeout = true; | |
log.debug("唤醒送饭的等待线程"); | |
takeCon.signal(); | |
}finally { | |
lock.unlock(); | |
} | |
},"t").start(); | |
} |
没有烟,歇一会
没有饭,歇一会
唤醒送烟的等待线程
唤醒送饭的等待线程
有烟了,干活
有饭了,干活
五.源码解析
首先会调用lock方法
public void lock() { | |
sync.lock(); | |
} |
lock会调用公平方法或者非公平的方法,默认是非公平锁方法,非公平锁则会cas尝试加锁,state是不是0,是0的话就把它改为1,并设置当前线程为独占线程,加锁成功,待下个线程进来时已经变成1,则失败阻塞。
加锁
final void lock() { | |
// 看状态是不是,如果是0 则改为1,加锁成功 | |
if (compareAndSetState(, 1)) | |
// 并设置当前线程为独占线程 | |
setExclusiveOwnerThread(Thread.currentThread()); | |
else | |
//不是则失败阻塞 | |
acquire(); | |
} | |
protected final void setExclusiveOwnerThread(Thread thread) { | |
exclusiveOwnerThread = thread; | |
} |
加锁失败(入队 阻塞)
public final void acquire(int arg) { | |
if (!tryAcquire(arg) && | |
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) | |
//恢复中断标识位 | |
selfInterrupt(); | |
} |
首先tryAcquire 又进行了一次判断,看是否能获取锁,
final boolean nonfairTryAcquire(int acquires) { | |
final Thread current = Thread.currentThread(); | |
int c = getState(); | |
//其他线程进来,状态值是 | |
if (c ==) { | |
if (compareAndSetState(, acquires)) { | |
setExclusiveOwnerThread(current); | |
return true; | |
} | |
} | |
else if (current == getExclusiveOwnerThread()) { | |
// 重入,将状态值+ | |
int nextc = c + acquires; | |
if (nextc <) // overflow | |
throw new Error("Maximum lock count exceeded"); | |
setState(nextc); | |
return true; | |
} | |
return false; | |
} |
添加进队列
private Node addWaiter(Node mode) { | |
Node node = new Node(Thread.currentThread(), mode); | |
// Try the fast path of enq; backup to full enq on failure | |
Node pred = tail; | |
//第一次tail为空 | |
if (pred != null) { | |
//尾插法 | |
node.prev = pred; | |
if (compareAndSetTail(pred, node)) { | |
pred.next = node; | |
return node; | |
} | |
} | |
//tail为空则在这里创建队列 | |
enq(node); | |
return node; | |
} |
创建队列并且入队
private Node enq(final Node node) { | |
for (;;) { | |
Node t = tail; | |
if (t == null) { // Must initialize | |
//创建队列 | |
if (compareAndSetHead(new Node())) | |
// 将头节点指向前一节点的尾节点,这时候tail不为空了 | |
tail = head; | |
} else { | |
//双向接口,前一节点的尾节点也指向当前节点的头节点 | |
node.prev = t; | |
if (compareAndSetTail(t, node)) { | |
t.next = node; | |
return t; | |
} | |
} | |
} | |
} |
阻塞
final boolean acquireQueued(final Node node, int arg) { | |
boolean failed = true; | |
try { | |
boolean interrupted = false; | |
for (;;) { //保证一定获取锁 | |
//获取head节点 | |
final Node p = node.predecessor(); | |
//是头节点则尝试获取锁 | |
if (p == head && tryAcquire(arg)) { | |
//设置头节点 | |
setHead(node); | |
p.next = null; // help GC | |
failed = false; | |
return interrupted; | |
} | |
//获取锁失败的情况,阻塞,在for循环里,第一次shouldParkAfterFailedAcquire为false,会将其设置为-,第二次就可以阻塞 | |
if (shouldParkAfterFailedAcquire(p, node) && | |
parkAndCheckInterrupt()) | |
interrupted = true; | |
} | |
} finally { | |
if (failed) | |
cancelAcquire(node); | |
} | |
} |
是否需要阻塞,把状态设置为SIGNAL,可以被唤醒了
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { | |
int ws = pred.waitStatus; | |
//是-了就可以去阻塞 | |
if (ws == Node.SIGNAL) | |
/* | |
* This node has already set status asking a release | |
* to signal it, so it can safely park. | |
*/ | |
return true; | |
if (ws >) { | |
/* | |
* Predecessor was cancelled. Skip over predecessors and | |
* indicate retry. | |
*/ | |
do { //把节点去掉 | |
node.prev = pred = pred.prev; | |
} while (pred.waitStatus >); | |
pred.next = node; | |
} else { | |
/* | |
* waitStatus must be or PROPAGATE. Indicate that we | |
* need a signal, but don't park yet. Caller will need to | |
* retry to make sure it cannot acquire before parking. | |
*/ | |
//把状态设置为SIGNAL,可以被唤醒了 | |
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); | |
} | |
return false; | |
} |
真正的阻塞方法
private final boolean parkAndCheckInterrupt() { | |
//阻塞 | |
LockSupport.park(this); | |
//清除中断标识位,在加锁失败方法的后面恢复中断标识位,可能其他地方还用到这个锁标识位 | |
return Thread.interrupted(); | |
} |
唤醒 unlock()
public void unlock() { | |
sync.release(); | |
} | |
public final boolean release(int arg) { | |
// 尝试唤醒 | |
if (tryRelease(arg)) { | |
Node h = head; | |
if (h != null && h.waitStatus !=) | |
//唤醒阻塞的线程 | |
unparkSuccessor(h); | |
return true; | |
} | |
return false; | |
} | |
protected final boolean tryRelease(int releases) { | |
//当前状态- | |
int c = getState() - releases; | |
if (Thread.currentThread() != getExclusiveOwnerThread()) | |
throw new IllegalMonitorStateException(); | |
boolean free = false; | |
if (c ==) { | |
free = true; | |
setExclusiveOwnerThread(null); | |
} | |
//设置状态 | |
setState(c); | |
return free; | |
} |
在这里唤醒
private void unparkSuccessor(Node node) { | |
/* | |
* If status is negative (i.e., possibly needing signal) try | |
* to clear in anticipation of signalling. It is OK if this | |
* fails or if status is changed by waiting thread. | |
*/ | |
int ws = node.waitStatus; | |
if (ws <) | |
compareAndSetWaitStatus(node, ws,); | |
/* | |
* Thread to unpark is held in successor, which is normally | |
* just the next node. But if cancelled or apparently null, | |
* traverse backwards from tail to find the actual | |
* non-cancelled successor. | |
*/ | |
Node s = node.next; | |
if (s == null || s.waitStatus >) { | |
s = null; | |
for (Node t = tail; t != null && t != node; t = t.prev) | |
if (t.waitStatus <=) | |
s = t; | |
} | |
//后面一个节点不为空 则直接唤醒当前线程 | |
if (s != null) | |
LockSupport.unpark(s.thread); | |
} |
线程取消获取锁
private void cancelAcquire(Node node) { | |
// Ignore if node doesn't exist | |
if (node == null) | |
return; | |
node.thread = null; | |
// Skip cancelled predecessors | |
Node pred = node.prev; | |
while (pred.waitStatus >) | |
//将前一个节点干掉 | |
node.prev = pred = pred.prev; | |
// predNext is the apparent node to unsplice. CASes below will | |
// fail if not, in which case, we lost race vs another cancel | |
// or signal, so no further action is necessary. | |
Node predNext = pred.next; | |
// Can use unconditional write instead of CAS here. | |
// After this atomic step, other Nodes can skip past us. | |
// Before, we are free of interference from other threads. | |
node.waitStatus = Node.CANCELLED; | |
// If we are the tail, remove ourselves. | |
if (node == tail && compareAndSetTail(node, pred)) { | |
compareAndSetNext(pred, predNext, null); | |
} else { | |
// If successor needs signal, try to set pred's next-link | |
// so it will get one. Otherwise wake it up to propagate. | |
int ws; | |
if (pred != head && | |
((ws = pred.waitStatus) == Node.SIGNAL || | |
(ws <= && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && | |
pred.thread != null) { | |
Node next = node.next; | |
if (next != null && next.waitStatus <=) | |
compareAndSetNext(pred, predNext, next); | |
} else { | |
unparkSuccessor(node); | |
} | |
node.next = node; // help GC | |
} | |
} |
至此加锁、解锁、阻塞、唤醒的底层源码都梳理完了。