【并发编程】锁的分类以及源码解析

Java
106
0
0
2024-09-01

一、锁的分类

1.1 可重入锁、不可重入锁

Java中提供的synchronizedReentrantLockReentrantReadWriteLock都是可重入锁。

重入:当前线程获取到A锁,在获取之后尝试再次获取A锁是可以直接拿到的。

不可重入:当前线程获取到A锁,在获取之后尝试再次获取A锁,无法获取到的,因为A锁被当前线 程占用着,需要等待自己释放锁再获取锁。

1.2 乐观锁、悲观锁

Java中提供的synchronizedReentrantLockReentrantReadWriteLock都是悲观锁。

Java中提供的CAS操作,就是乐观锁的一种实现。

悲观锁:获取不到锁资源时,会将当前线程挂起(进入BLOCKED、WAITING),线程挂起会涉及到用户态和内核态的切换,而这种切换是比较消耗资源的。

  • 用户态:JVM可以自行执行的指令,不需要借助操作系统执行。
  • 内核态:JVM不可以自行执行,需要操作系统才可以执行。

乐观锁:获取不到锁资源,可以再次让CPU调度,重新尝试获取锁资源。

Atomic原子性类中,就是基于CAS乐观锁实现的。

1.3 公平锁、非公平锁

Java中提供的synchronized只能是非公平锁。

Java中提供的ReentrantLockReentrantReadWriteLock可以实现公平锁和非公平锁

公平锁:线程A获取到了锁资源,线程B没有拿到,线程B去排队,线程C来了,锁被A持有,同时线程B在排队。直接排到B的后面,等待B拿到锁资源或者是B取消后,才可以尝试去竞争锁资源。

非公平锁:线程A获取到了锁资源,线程B没有拿到,线程B去排队,线程C来了,先尝试竞争一波拿到锁资源:开心,插队成功。没有拿到锁资源:依然要排到B的后面,等待B拿到锁资源或者是B取消后,才可以尝试去竞争锁资源。

1.4 互斥锁、共享锁

Java中提供的synchronizedReentrantLock是互斥锁。

Java中提供的ReentrantReadWriteLock,有互斥锁也有共享锁。

互斥锁:同一时间点,只会有一个线程持有者当前互斥锁。 共享锁:同一时间点,当前共享锁可以被多个线程同时持有。

二、深入synchronized

2.1 类锁、对象锁

synchronized的使用一般就是同步方法和同步代码块。

synchronized的锁是基于对象实现的。

如果使用同步方法

  • static:此时使用的是当前类.class作为锁(类锁)
  • 非static:此时使用的是当前对象做为锁(对象锁)
public class MiTest {  
    public static void main(String[] args) {  
        // 锁的是,当前Test.class  
        Test.a();  
        Test test = new Test();  
        // 锁的是new出来的test对象  
        test.b();  
    }  
}  

class Test{  
    public static synchronized void a(){  
        System.out.println("1111");  
    }  
    public synchronized void b(){  
        System.out.println("2222");  
    }  
}
2.2 synchronized的优化

在JDK1.5的时候,Doug Lee推出了ReentrantLock,lock的性能远高于synchronized,所以JDK团队就在JDK1.6中,对synchronized做了大量的优化。

锁消除:在synchronized修饰的代码中,如果不存在操作临界资源的情况,会触发锁消除,你即便 写了synchronized,他也不会触发。

public synchronized void method(){  
    // 没有操作临界资源  
    // 此时这个方法的synchronized你可以认为木有~~  
}

锁膨胀:如果在一个循环中,频繁的获取和释放做资源,这样带来的消耗很大,锁膨胀就是将锁的范围扩大,避免频繁的竞争和获取锁资源带来不必要的消耗。

public void method(){  
    for(int i = 0;i < 999999;i++){  
        synchronized(对象){
        
        }
    }  

    // 这是上面的代码会触发锁膨胀  
    synchronized(对象){  
        for(int i = 0;i < 999999;i++){  
        }  
    }  
}

锁升级ReentrantLock的实现,是先基于乐观锁的CAS尝试获取锁资源,如果拿不到锁资源,才会挂起线程。synchronized在JDK1.6之前,完全就是获取不到锁,立即挂起当前线程,所以synchronized性能比较差。

synchronized就在JDK1.6做了锁升级的优化:

  • 无锁、匿名偏向:当前对象没有作为锁存在。
  • 偏向锁:如果当前锁资源,只有一个线程在频繁的获取和释放,那么这个线程过来,只需要判断,当前指向的线程是否是当前线程.
  • 如果是,直接拿着锁资源走。
  • 如果当前线程不是我,基于CAS的方式,尝试将偏向锁指向当前线程。如果获取不到,触发锁升级,升级为轻量级锁。(偏向锁状态出现了锁竞争的情况)
  • 轻量级锁:会采用自旋锁的方式去频繁的以CAS的形式获取锁资源(采用的是自适应自旋锁 )
  • 如果成功获取到,拿着锁资源走
  • 如果自旋了一定次数,没拿到锁资源,锁升级。
  • 重量级锁:就是最传统的synchronized方式,拿不到锁资源,就挂起当前线程。(用户态&内核态)
2.3 synchronized实现原理

synchronized是基于对象实现的。

先要对Java中对象在堆内存的存储有一个了解。

展开MarkWord

MarkWord中标记着四种锁的信息:无锁、偏向锁、轻量级锁、重量级锁

2.4 synchronized的锁升级

为了可以在Java中看到对象头的MarkWord信息,需要导入依赖

<dependency>  
    <groupId>org.openjdk.jol</groupId>  
    <artifactId>jol-core</artifactId>  
    <version>0.9</version>  
</dependency>

锁默认情况下,开启了偏向锁延迟。

偏向锁在升级为轻量级锁时,会涉及到偏向锁撤销,需要等到一个安全点(STW),才可以做偏向锁撤销,在明知道有并发情况,就可以选择不开启偏向锁,或者是设置偏向锁延迟开启. 因为JVM在启动时,需要加载大量的.class文件到内存中,这个操作会涉及到synchronized的使用,为了避免出现偏向锁撤销操作,JVM启动初期,有一个延迟4s开启偏向锁的操作. 如果正常开启偏向锁了,那么不会出现无锁状态,对象会直接变为匿名偏向
public static void main(String[] args) throws InterruptedException {  
    Thread.sleep(5000);  
    Object o = new Object();  
    System.out.println(ClassLayout.parseInstance(o).toPrintable());  
    new Thread(() -> {  
        synchronized (o){  
        //t1 - 偏向锁  
        System.out.println("t1:" + ClassLayout.parseInstance(o).toPrintable());  
        }  
    }).start();  
    //main - 偏向锁 - 轻量级锁CAS - 重量级锁  
    synchronized (o){  
        System.out.println("main:" + ClassLayout.parseInstance(o).toPrintable());  
    }  
}

整个锁升级状态的转变:

Lock Record以及ObjectMonitor存储的内容:

2.5 重量锁底层ObjectMonitor

需要去找到openjdk,在百度中直接搜索openjdk,第一个链接就是

找到ObjectMonitor的两个文件,hpp,cpp

先查看核心属性:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/69087d08d473/src/share/vm/runtime/objectMonitor.hpp

ObjectMonitor() {  
    _header = NULL; // header存储着MarkWord  
    _count = 0; // 竞争锁的线程个数  
    _waiters = 0, // wait的线程个数  
    _recursions = 0; // 标识当前synchronized锁重入的次数  
    _object = NULL;  
    _owner = NULL; // 持有锁的线程  
    _WaitSet = NULL; // 保存wait的线程信息,双向链表  
    _WaitSetLock = 0 ;  
    _Responsible = NULL ;  
    _succ = NULL ;  
    _cxq = NULL ; // 获取锁资源失败后,线程要放到当前的单向链表中  
    FreeNext = NULL ;  
    _EntryList = NULL ; // _cxq以及被唤醒的WaitSet中的线程,在一定机制下,会放到EntryList中  
    _SpinFreq = 0 ;  
    _SpinClock = 0 ;  
    OwnerIsThread = 0 ;  
    _previous_owner_tid = 0;  
}  

适当的查看几个C++中实现的加锁流程

http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/69087d088d473/src/share/vm/runtime/objectMonitor.cpp

TryLock

int ObjectMonitor::TryLock (Thread * Self) {  
    for (;;) {  
        // 拿到持有锁的线程  
        void * own = _owner ;  
        // 如果有线程持有锁,告辞  
        if (own != NULL) return 0 ;  
        // 说明没有线程持有锁,own是null,cmpxchg指令就是底层的CAS实现。  
        if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {  
            // 成功获取锁资源  
            return 1 ;  
        }  
        // 这里其实重试操作没什么意义,直接返回-1  
        if (true) return -1 ;  
    }  
}  

try_entry

bool ObjectMonitor::try_enter(Thread* THREAD) {  
    // 在判断_owner是不是当前线程  
    if (THREAD != _owner) {  
        // 判断当前持有锁的线程是否是当前线程,说明轻量级锁刚刚升级过来的情况  
        if (THREAD->is_lock_owned ((address)_owner)) {  
            _owner = THREAD ;  
            _recursions = 1 ;  
            OwnerIsThread = 1 ;  
            return true;  
        }  
        // CAS操作,尝试获取锁资源  
        if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {  
            // 没拿到锁资源,告辞  
            return false;  
        }  
        // 拿到锁资源  
        return true;  
    } else {  
        // 将_recursions + 1,代表锁重入操作。  
        _recursions++;  
        return true;  
    }  
}

enter(想方设法拿到锁资源,如果没拿到,挂起扔到_cxq单向链表中)

void ATTR ObjectMonitor::enter(TRAPS) {  
    // 拿到当前线程  
    Thread * const Self = THREAD ;  
    void * cur ;  
    // CAS走你,  
    cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;  
    if (cur == NULL) {  
        // 拿锁成功  
        return ;  
    }  
    // 锁重入操作  
    if (cur == Self) {  
        // TODO-FIXME: check for integer overflow! BUGID 6557169.  
        _recursions ++ ;  
        return ;  
    }  
    //轻量级锁过来的。  
    if (Self->is_lock_owned ((address)cur)) {  
        _recursions = 1 ;  
        _owner = Self ;  
        OwnerIsThread = 1 ;  
        return ;  
    }  
    // 走到这了,没拿到锁资源,count++  
    Atomic::inc_ptr(&_count);  
    for (;;) {  
        jt->set_suspend_equivalent();  
        // 入队操作,进到cxq中  
        EnterI (THREAD) ;  
        if (!ExitSuspendEquivalent(jt)) break ;  
            _recursions = 0 ;  
            _succ = NULL ;  
            exit (false, Self) ;  
            jt->java_suspend_self();  
        }  
    }  
    // count--  
    Atomic::dec_ptr(&_count);
}

EnterI

for (;;) {  
    // 入队  
    node._next = nxt = _cxq ;  
    // CAS的方式入队。  
    if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;  
    // 重新尝试获取锁资源  
    if (TryLock (Self) > 0) {  
        assert (_succ != Self , "invariant") ;  
        assert (_owner == Self , "invariant") ;  
        assert (_Responsible != Self , "invariant") ;  
        return ;  
    }  
}

三、深入ReentrantLock

3.1 ReentrantLock和synchronized的区别

废话区别:单词不一样。。。

核心区别:

  • ReentrantLock是个类,synchronized是关键字,当然都是在JVM层面实现互斥锁的方式

效率区别:

  • 如果竞争比较激烈,推荐ReentrantLock去实现,不存在锁升级概念。而synchronized是存在锁升级概念的,如果升级到重量级锁,是不存在锁降级的。

底层实现区别:

  • 实现原理是不一样,ReentrantLock基于AQS实现的,synchronized是基于ObjectMonitor

功能向的区别:

  • ReentrantLock的功能比synchronized更全面。
  • ReentrantLock支持公平锁和非公平锁
  • ReentrantLock可以指定等待锁资源的时间。

选择哪个:如果你对并发编程特别熟练,推荐使用ReentrantLock,功能更丰富。如果掌握的一般般,使用synchronized会更好

3.2 AQS概述

AQS就是AbstractQueuedSynchronizer抽象类,AQS其实就是JUC包下的一个基类,JUC下的很多内容都是基于AQS实现了部分功能,比如ReentrantLockThreadPoolExecutor,阻塞队列,CountDownLatchSemaphoreCyclicBarrier等等都是基于AQS实现。

首先AQS中提供了一个由volatile修饰,并且采用CAS方式修改的int类型的state变量。

其次AQS中维护了一个双向链表,有head,有tail,并且每个节点都是Node对象

static final class Node {  
    static final Node SHARED = new Node();  
    static final Node EXCLUSIVE = null;  
    static final int CANCELLED = 1;  
    static final int SIGNAL = -1;  
    static final int CONDITION = -2;  
    static final int PROPAGATE = -3;  
    volatile int waitStatus;  
    volatile Node prev;  
    volatile Node next;  
    volatile Thread thread;  
}  

AQS内部结构和属性

3.3 加锁流程源码剖析
3.3.1 加锁流程概述

这个是非公平锁的流程

3.3.2 三种加锁源码分析
3.3.2.1 lock方法
  1. 执行lock方法后,公平锁和非公平锁的执行套路不一样
// 非公平锁
final void lock() {  
    // 上来就先基于CAS的方式,尝试将state从0改为1  
    if (compareAndSetState(0, 1))  
        // 获取锁资源成功,会将当前线程设置到exclusiveOwnerThread属性,代表是当前线程持有着锁资源  
        setExclusiveOwnerThread(Thread.currentThread());  
    else  
        // 执行acquire,尝试获取锁资源  
        acquire(1);  
}  

// 公平锁  
final void lock() {  
    // 执行acquire,尝试获取锁资源  
    acquire(1);  
}
  1. acquire方法,是公平锁和非公平锁的逻辑一样
public final void acquire(int arg) {  
    // tryAcquire:再次查看,当前线程是否可以尝试获取锁资源  
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
        // 没有拿到锁资源  
        // addWaiter(Node.EXCLUSIVE):将当前线程封装为Node节点,插入到AQS的双向链表的结尾  
        // acquireQueued:查看我是否是第一个排队的节点,如果是可以再次尝试获取锁资源,如果长时间拿不到,挂起线  
        // 如果不是第一个排队的额节点,就尝试挂起线程即可  
        
        // 中断线程的操作  
        selfInterrupt();  
}
  1. tryAcquire方法竞争锁资源的逻辑,分为公平锁和非公平锁
// 非公平锁实现  
final boolean nonfairTryAcquire(int acquires) {  
    // 获取当前线程  
    final Thread current = Thread.currentThread();  
    // 获取了state熟属性  
    int c = getState();  
    // 判断state当前是否为0,之前持有锁的线程释放了锁资源  
    if (c == 0) {  
        // 再次抢一波锁资源  
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);  
            // 拿锁成功返回true  
            return true;  
        }  
    }  
    // 不是0,有线程持有着锁资源,如果是,证明是锁重入操作  
    else if (current == getExclusiveOwnerThread()) {  
        // 将state + 1  
        int nextc = c + acquires;  
        if (nextc < 0) // 说明对重入次数+1后,超过了int正数的取值范围  
            // 01111111 11111111 11111111 11111111  
            // 10000000 00000000 00000000 00000000  
            // 说明重入的次数超过界限了。  
            throw new Error("Maximum lock count exceeded");  
            
        // 正常的将计算结果,复制给state  
        setState(nextc);  
        // 锁重入成功  
        return true;  
    }  
    // 返回false  
    return false;  
}  

// 公平锁实现  
protected final boolean tryAcquire(int acquires) {  
    // 获取当前线程  
    final Thread current = Thread.currentThread();  
    // ....  
    int c = getState();  
    if (c == 0) {  
        // 查看AQS中是否有排队的Node  
        // 没人排队抢一手 。有人排队,如果我是第一个,也抢一手  
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {  
            // 抢一手~  
            setExclusiveOwnerThread(current);  
            return true;  
        }  
    }  
    // 锁重入~~~  
    else if (current == getExclusiveOwnerThread()) {  
        int nextc = c + acquires;  
        if (nextc < 0)  
            throw new Error("Maximum lock count exceeded");  
        
        setState(nextc);  
        return true;
    }  
    return false;  
}  

// 查看是否有线程在AQS的双向队列中排队  
// 返回false,代表没人排队  
public final boolean hasQueuedPredecessors() {  
    // 头尾节点  
    Node t = tail;  
    Node h = head;  
    // s为头结点的next节点  
    Node s; 
    
    // 如果头尾节点相等,证明没有线程排队,直接去抢占锁资源  
    // s节点不为null,并且s节点的线程为当前线程(排在第一名的是不是我)  
    return h != t &&  (s == null || s.thread != Thread.currentThread());  
}
  1. addWaite方法,将没有拿到锁资源的线程扔到AQS队列中去排队
// 没有拿到锁资源,过来排队, mode:代表互斥锁  
private Node addWaiter(Node mode) {  
    // 将当前线程封装为Node,  
    Node node = new Node(Thread.currentThread(), mode);  
    // 拿到尾结点  
    Node pred = tail;  
    // 如果尾结点不为null  
    if (pred != null) {  
        // 当前节点的prev指向尾结点  
        node.prev = pred;  
        // 以CAS的方式,将当前线程设置为tail节点  
        if (compareAndSetTail(pred, node)) {  
            // 将之前的尾结点的next指向当前节点  
            pred.next = node;  
            return node;  
        }  
    }  
    // 如果CAS失败,以死循环的方式,保证当前线程的Node一定可以放到AQS队列的末尾  
    enq(node);  
    return node;  
}  

private Node enq(final Node node) {  
    for (;;) {  
        // 拿到尾结点  
        Node t = tail;  
        // 如果尾结点为空,AQS中一个节点都没有,构建一个伪节点,作为head和tail
        if (t == null) {  
            if (compareAndSetHead(new Node()))  
            tail = head;  
        } else {  
            // 比较熟悉了,以CAS的方式,在AQS中有节点后,插入到AQS队列的末尾  
            node.prev = t;  
            if (compareAndSetTail(t, node)) {  
                t.next = node;  
                return t;  
            }  
        }  
    }  
}
  1. acquireQueued方法,判断当前线程是否还能再次尝试获取锁资源,如果不能再次获取锁资源,或者又没获取到,尝试将当前线程挂起
// 当前没有拿到锁资源后,并且到AQS排队了之后触发的方法。 中断操作这里不用考虑  
final boolean acquireQueued(final Node node, int arg) {  
    // 不考虑中断  
    // failed:获取锁资源是否失败(这里简单掌握落地,真正触发的,还是tryLock和lockInterruptibly)  
    boolean failed = true;  
    try {  
        boolean interrupted = false;  
        // 死循环............  
        for (;;) {  
            // 拿到当前节点的前继节点  
            final Node p = node.predecessor();  
            // 前继节点是否是head,如果是head,再次执行tryAcquire尝试获取锁资源。  
            if (p == head && tryAcquire(arg)) {  
                // 获取锁资源成功  
                // 设置头结点为当前获取锁资源成功Node,并且取消thread信息  
                setHead(node);  
                // help GC  
                p.next = null;  
                // 获取锁失败标识为false  
                failed = false;  
                return interrupted;  
            }  
            // 没拿到锁资源......  
            // shouldParkAfterFailedAcquire:基于上一个节点转改来判断当前节点是否能够挂起线程,如果可以返回true,  
            // 如果不能,就返回false,继续下次循环  
            // 这里基于Unsafe类的park方法,将当前线程挂起  
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())  
                // 这里基于Unsafe类的park方法,将当前线程挂起                  
                interrupted = true;  
        }
   } finally {  
        if (failed)  
            // 在lock方法中,基本不会执行。  
            cancelAcquire(node);  
   }  
}  
    
// 获取锁资源成功后,先执行setHead  
private void setHead(Node node) {  
    // 当前节点作为头结点 伪  
    head = node;  
    // 头结点不需要线程信息  
    node.thread = null;  
    node.prev = null;  
}  

// 当前Node没有拿到锁资源,或者没有资格竞争锁资源,看一下能否挂起当前线程  
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
    // -1,SIGNAL状态:代表当前节点的后继节点,可以挂起线程,后续我会唤醒我的后继节点  
    // 1,CANCELLED状态:代表当前节点以及取消了  
    int ws = pred.waitStatus;  
    if (ws == Node.SIGNAL)  
        // 上一个节点为-1之后,当前节点才可以安心的挂起线程  
        return true;  
    if (ws > 0) {  
        // 如果当前节点的上一个节点是取消状态,我需要往前找到一个状态不为1的Node,作为他的next节点  
        // 找到状态不为1的节点后,设置一下next和prev  
        do {  
           node.prev = pred = pred.prev;  
        } while (pred.waitStatus > 0);  
                
        pred.next = node;  
    } else {  
        // 上一个节点的状态不是1或者-1,那就代表节点状态正常,将上一个节点的状态改为-1 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  
    }  
    return false;  
}  
3.3.2.2 tryLock方法
  • tryLock()
// tryLock方法,无论公平锁还有非公平锁。都会走非公平锁抢占锁资源的操作
// 就是拿到state的值, 如果是0,直接CAS浅尝一下
// state 不是0,那就看下是不是锁重入操作
// 如果没抢到,或者不是锁重入操作,告辞,返回false
public boolean tryLock() {
    // 非公平锁的竞争锁操作
    return sync.nonfairTryAcquire(1);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {    
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        
        setState(nextc);
        return true;
    }
    return false;
}
  • tryLock(time, unit);
  • 第一波分析,类似的代码:
// tryLock(time,unit)执行的方法  
   public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {  
   // 线程的中断标记位,是不是从false,别改为了true,如果是,直接抛异常  
   if (Thread.interrupted())  
       throw new InterruptedException();  

   // tryAcquire分为公平和非公平锁两种执行方式,如果拿锁成功, 直接告辞,  
   return tryAcquire(arg) ||  
   // 如果拿锁失败,在这要等待指定时间  
   doAcquireNanos(arg, nanosTimeout);  
}  

private boolean doAcquireNanos(int arg, long nanosTimeout)  
throws InterruptedException {  
   // 如果等待时间是0秒,直接告辞,拿锁失败  
   if (nanosTimeout <= 0L)  
       return false;  

   // 设置结束时间。  
   final long deadline = System.nanoTime() + nanosTimeout;  
   // 先扔到AQS队列  
   final Node node = addWaiter(Node.EXCLUSIVE);  
   // 拿锁失败,默认true  
   boolean failed = true;

   try {  
       for (;;) {  
           // 如果在AQS中,当前node是head的next,直接抢锁  
           final Node p = node.predecessor();  
           if (p == head && tryAcquire(arg)) {  
           setHead(node);  
           p.next = null; // help GC  
           failed = false;  
           return true;  
       }  

   // 结算剩余的可用时间  
   nanosTimeout = deadline - System.nanoTime();  
   // 判断是否是否用尽的位置  
   if (nanosTimeout <= 0L)  
       return false;  
   
   // shouldParkAfterFailedAcquire:根据上一个节点来确定现在是否可以挂起线程
   // 避免剩余时间太少,如果剩余时间少就不用挂起线程 
   if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)  
       // 如果剩余时间足够,将线程挂起剩余时间  
       LockSupport.parkNanos(this, nanosTimeout);  
       // 如果线程醒了,查看是中断唤醒的,还是时间到了唤醒的。  
       if (Thread.interrupted())  
           // 是中断唤醒的!  
           throw new InterruptedException();  
       }  
   } finally {  
       if (failed)  
           cancelAcquire(node);  
   }  
}
  • 取消节点分析:

// 取消在AQS中排队的Node  
private void cancelAcquire(Node node) {  
    // 如果当前节点为null,直接忽略。  
    if (node == null)  
        return;  
    
    //1. 线程设置为null  
    node.thread = null;  
    //2. 往前跳过被取消的节点,找到一个有效节点  
    Node pred = node.prev;  
    while (pred.waitStatus > 0)  
        node.prev = pred = pred.prev;  
       
    //3. 拿到了上一个节点之前的next  
    Node predNext = pred.next;  
    //4. 当前节点状态设置为1,代表节点取消  
    node.waitStatus = Node.CANCELLED;  
    // 脱离AQS队列的操作  
    // 当前Node是尾结点,将tail从当前节点替换为上一个节点  
    if (node == tail && compareAndSetTail(node, pred)) {  
       compareAndSetNext(pred, predNext, null);  
    } else {  
       // 到这,上面的操作CAS操作失败  
       int ws = pred.waitStatus;  
       // 不是head的后继节点  
       if (pred != head 
           // 拿到上一个节点的状态,只要上一个节点的状态不是取消状态,就改为-1  
          && (ws == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {  
           // 上面的判断都是为了避免后面节点无法被唤醒。  
           // 前继节点是有效节点,可以唤醒后面的节点  
           Node next = node.next;  
           if (next != null && next.waitStatus <= 0)  
               compareAndSetNext(pred, predNext, next);  
           
       } else {  
         // 当前节点是head的后继节点  
         unparkSuccessor(node);  
       }  
       node.next = node; // help GC  
    }  
}
3.3.2.3 lockInterruptibly方法


// 这个是lockInterruptibly和tryLock(time,unit)唯一的区别  
// lockInterruptibly,拿不到锁资源,就死等,等到锁资源释放后,被唤醒,或者是被中断唤醒  
private void doAcquireInterruptibly(int arg) throws InterruptedException {  
    final Node node = addWaiter(Node.EXCLUSIVE);  
    boolean failed = true;  
    try {  
        for (;;) {  
            final Node p = node.predecessor();  
            if (p == head && tryAcquire(arg)) {  
                setHead(node);  
                p.next = null; // help GC  
                failed = false;  
                return;  
            }  
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())  
                // 中断唤醒抛异常!  
                throw new InterruptedException();  
        }  
    } finally {  
        if (failed)  
            cancelAcquire(node);  
    }  
}  

private final boolean parkAndCheckInterrupt() {  
    LockSupport.park(this);  
    // 这个方法可以确认,当前挂起的线程,是被中断唤醒的,还是被正常唤醒的。  
    // 中断唤醒,返回true,如果是正常唤醒,返回false  
    return Thread.interrupted();  
}
3.4 释放锁流程源码剖析
3.4.1 释放锁流程概述

3.4.2 释放锁源码分析
public void unlock() {
 	// 释放锁资源不分为公平锁和非公平锁,都是一个sync对象
 	sync.release(1);
}
// 释放锁的核心流程
public final boolean release(int arg) {
	 // 核心释放锁资源的操作之一
	 if (tryRelease(arg)) {
 		// 如果锁已经释放掉了,走这个逻辑
		Node h = head;
		// h不为null,说明有排队的(录课时估计脑袋蒙圈圈。)
 		// 如果h的状态不为0(为-1),说明后面有排队的Node,并且线程已经挂起了。
 		if (h != null && h.waitStatus != 0)
 			// 唤醒排队的线程
 			unparkSuccessor(h);
 		return true;
	 }
 	return false;
}

// ReentrantLock释放锁资源操作
protected final boolean tryRelease(int releases) {
 	// 拿到state - 1(并没有赋值给state)
 	int c = getState() - releases;
 	// 判断当前持有锁的线程是否是当前线程,如果不是,直接抛出异常
	if (Thread.currentThread() != getExclusiveOwnerThread())
		 throw new IllegalMonitorStateException();
	
	 // free,代表当前锁资源是否释放干净了。
 	boolean free = false;
 	if (c == 0) {
	 	// 如果state - 1后的值为0,代表释放干净了。
 		free = true;
 		// 将持有锁的线程置位null
 		setExclusiveOwnerThread(null);
 	}
	// 将c设置给state
 	setState(c);
 	// 锁资源释放干净返回true,否则返回false
 	return free;
}

// 唤醒后面排队的Node
private void unparkSuccessor(Node node) {
 	// 拿到头节点状态
 	int ws = node.waitStatus;
 	if (ws < 0)
 	// 先基于CAS,将节点状态从-1,改为0
 	compareAndSetWaitStatus(node, ws, 0);
 	// 拿到头节点的后续节点。
 	Node s = node.next;
 	// 如果后续节点为null或者,后续节点的状态为1,代表节点取消了。
 	if (s == null || s.waitStatus > 0) {
 		s = null;
 		// 如果后续节点为null,或者后续节点状态为取消状态,从后往前找到一个有效节点环境
 		for (Node t = tail; t != null && t != node; t = t.prev)
 		// 从后往前找到状态小于等于0的节点
 		// 找到离head最新的有效节点,并赋值给s
 		if (t.waitStatus <= 0)
 			s = t;
 	}
 	// 只要找到了这个需要被唤醒的节点,执行unpark唤醒
 	if (s != null)
 		LockSupport.unpark(s.thread);
}
3.5 AQS常见的问题
3.5.1 AQS中为什么要有一个虚拟的head节点

因为AQS提供了ReentrantLock的基本实现,而在ReentrantLock释放锁资源时,需要去考虑是否需要执行unparkSuccessor方法,去唤醒后继节点。

因为Node中存在waitStatus的状态,默认情况下状态为0,如果当前节点的后继节点线程挂起了,那么就将当前节点的状态设置为-1。这个-1状态的出现是为了避免重复唤醒或者释放资源的问题。

因为AQS中排队的Node中的线程如果挂起了,是无法自动唤醒的。需要释放锁或者释放资源后,再被释放的线程去唤醒挂起的线程。 因为唤醒节点需要从整个AQS双向链表中找到离head最近的有效节点去唤醒。而这个找离head最近的Node可能需要遍历整个双向链表。如果AQS中,没有挂起的线程,代表不需要去遍历AQS双向链表去找离head最近的有效节点。

为了避免出现不必要的循环链表操作,提供了一个-1的状态。如果只有一个Node进入到AQS中排队,所以发现如果是第一个Node进来,他必须先初始化一个虚拟的head节点作为头,来监控后继节点中是否有挂起的线程。

3.5.2 AQS中为什么选择使用双向链表,而不是单向链表

首先AQS中一般是存放没有获取到资源的Node,而在竞争锁资源时,ReentrantLock提供了一个方法,lockInterruptibly方法,也就是线程在竞争锁资源的排队途中,允许中断。中断后会执行cancelAcquire方法,从而将当前节点状态置位1,并且从AQS队列中移除掉。如果采用单向链表,当前节点只能按到后继或者前继节点,这样是无法将前继节点指向后继节点的,需要遍历整个AQS从头或者从尾去找。单向链表在移除AQS中排队的Node时,成本很高。

当前在唤醒后继节点时,如果是单向链表也会出问题,因为节点插入方式的问题,导致只能单向的去找有效节点去唤醒,从而造成很多次无效的遍历操作,如果是双向链表就可以解决这个问题。