彻底理解Java并发:AQS抽象队列同步器

Java
351
0
0
2022-12-20
标签   Java并发
本篇内容包括:抽象队列同步器(抽象队列同步器、同步状态的处理)以及FIFO队列的设计——AQS灵魂(Node 节点的设计、共享资源的竞争、共享资源的释放)等内容。

一、抽象队列同步器

1、抽象队列同步器

AQS,即 AbstractQueuedSynchronizer,抽象队列同步器,它是是一个集同步状态管理、线程阻塞、线程释放及队列管理功能与一身的同步框架。其核心思想是当多个线程竞争资源时会将未成功竞争到资源的线程构造为 Node 节点放置到一个双向 FIFO 队列中。被放入到该队列中的线程会保持阻塞直至被前驱节点唤醒。值得注意的是该队列中只有队首节点有资格被唤醒竞争锁

AQS就是JDK中为“线程同步”提供的一套基础工具类(网上其他帖子叫做“框架”我觉得这个就说的太大了,其实就是一个类而已(不算它衍生出来的子类)

2、同步状态的处理

成员变量 state。用于表示锁现在的状态,用 volatile 修饰,保证内存一致性及其在多线程之间的可见性。同时所用对 state 的操作都是使用 CAS 进行的。state 为0表示没有任何线程持有这个锁,线程持有该锁后将 state 加1,释放时减1。多次持有释放则多次加减。

private volatile int state;

以ReentrantLock 举例,请求锁时有三种可能:

  1. 如果没有线程持有锁,则请求成功,当前线程直接获取到锁。
  2. 如果当前线程已经持有锁,则使用 CAS 将 state 值加1,表示自己再次申请了锁,释放锁时减1。这就是可重入性的实现
  3. 如果由其他线程持有锁,那么将自己添加进等待队列
final void lock() {
    if (compareAndSetState(0, 1)) 
        setExclusiveOwnerThread(Thread.currentThread()); //没有线程持有锁时,直接获取锁,对应情况1
    else
        acquire(1);
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) && //在此方法中会判断当前持有线程是否等于自己,对应情况2 
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //将自己加入队列中,对应情况3 
        selfInterrupt();
}

此外,AQS 还为 state提供了相关方法:getState ,setState 以及保证原子性 compareAndSetState

二、FIFO队列的设计——AQS灵魂

对于整个 AQS 框架来说,队列的设计可以说重中之重。那么为什么 AQS 需要一个队列呢?

对于一个资源同步竞争框架来说,如何处理没有获取到锁的线程是非常重要的,比方说现在有 ABCD 四个线程同时竞争锁,其中线程 A 竞争成功了。那么剩下的线程 BCD 该咋办呢?

我们可以尝试试想下自己会如何解决:

  1. 类似于sync的轻量级锁实现:CAS使线程自旋等待,不断重新尝试获取锁。这样虽然可以满足需求,但是众多线程同时自旋等待实际上是对 CPU 资源的一种浪费,这么做不太合适。
  2. 类似于sync的重量级锁实现:将线程挂起阻塞,等待锁释放时唤醒,再竞争获取。如果等待的线程比较多,同时被唤醒可能会发生“惊群”问题,也会产生大量的资源浪费。

面两种方法的可行性其实都不太高,对于一个同步框架来说,当有多个线程尝试竞争资源时,我们并不希望所有的线程同时来竞争锁。而且更重要的是,能够有效的监控当前处于等待过程中的线程也十分必要。

这个时候借助 FIFO 队列(线程先入先出)管理线程,既可以有效的帮助开发者监控线程,同时也可以在一定程度上减少饥饿问题出现的概率。

1、Node 节点的设计

如果没竞争到锁,这时候就要进入等待队列。线程的2种等待模式:

  • SHARED:表示线程以共享的模式等待锁(如ReadLock)
  • EXCLUSIVE:表示线程以互斥的模式等待锁(如ReentrantLock),互斥就是一把锁只能由一个线程持有,不能同时存在多个线程使用同一个锁

队列是默认有一个 head 节点的,并且不包含线程信息。

将线程添加进等待队列的情况下,addWaiter 会创建一个 Node,并添加到链表的末尾,Node 中持有当前线程的引用。同时还有一个成员变量 waitStatus,表示线程的等待状态,初始值为0。线程在队列中的状态枚举:

  • CANCELLED:值为1,表示线程的获锁请求已经“取消”
  • SIGNAL:值为-1,表示该线程一切都准备好了,就等待锁空闲出来给我
  • CONDITION:值为-2,表示线程等待某一个条件(Condition)被满足
  • PROPAGATE:值为-3,当线程处在“SHARED”模式时,该字段才会被使用上

同时,加到链表末尾的操作使用了 CAS+死循环的模式,很有代表性,拿出来看一看:

Node node = new Node(mode);
for (;;) {
    Node oldTail = tail;
    if (oldTail != null) {
        U.putObject(node, Node.PREV, oldTail);
        if (compareAndSetTail(oldTail, node)) {
            oldTail.next = node;
            return node;
        }
    } else {
        initializeSyncQueue();
    }
}

可以看到,在死循环里调用了 CAS 的方法。如果多个线程同时调用该方法,那么每次循环都只有一个线程执行成功,其他线程进入下一次循环,重新调用。N个线程就会循环N次。这样就在无锁的模式下实现了并发模型。

2、共享资源的竞争

如果此节点的上一个节点是头部节点,则再次尝试获取锁,获取到了就移除并返回。获取不到就进入下一步;

判断前一个节点的 waitStatus,

如果是 SINGAL,则返回 true,并调用 LockSupport.park() 将线程挂起;

如果是 CANCELLED,则将前一个节点移除;

如果是其他值,则将前一个节点的 waitStatus 标记为 SINGAL,进入下一次循环。

可以看到,一个线程最多有两次机会,还竞争不到就去挂起等待。

final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC 
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

3、共享资源的释放

调用 tryRelease,此方法由子类实现。实现非常简单,如果当前线程是持有锁的线程,就将 state 减1。减完后如果 state 大于0,表示当前线程仍然持有锁,返回 false。如果等于0,表示已经没有线程持有锁,返回 true,进入下一步; 如果头部节点的 waitStatus 不等于0,则调用 LockSupport.unpark() 唤醒其下一个节点。头部节点的下一个节点就是等待队列中的第一个线程,这反映了 AQS 先进先出的特点。另外,即使是非公平锁,进入队列之后,还是得按顺序来。

public final boolean release(int arg) {
    if (tryRelease(arg)) { //将 state 减1 
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    if (s != null) //唤醒第一个等待的线程
        LockSupport.unpark(s.thread);
}