Juc并发编程06——深入剖析队列同步器AQS源码

Java
309
0
0
2022-12-05
标签   Java并发

我们看下Reentrantock的源码。

public void lock() {
    sync.lock();
}

public void unlock() {
    sync.release(1);
}

原来lock,unlock等核心方法都是通过sync来实现的。而sync其实是它的一个内部类。

abstract static class Sync extends AbstractQueuedSynchronizer {...}

这个内部类继承了AbstractQueuedSynchronizer,也就是我们今天要重点介绍的队列同步器AQS。它其实就是我们锁机制的基础,它封装了包括锁的获取、释放以及等待队列。

线程的调度其关键就在于等待队列,其数据结构就是双向链表,可以参考下图。

img

我们先来了解以下每个Node有什么内容,点开AQS的源码,它定义了内部类Node

static final class Node {
    // 每个节点分为独占模式和共享模式、分别适用于独占锁和共享锁     
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    // 定义等待状态 
    // CANCELLED:唯一一个数值大与0的状态,说明此节点已经被取消 
    static final int CANCELLED =  1;
    // 此节点后面的节点被挂起,进入等待状态 
    static final int SIGNAL    = -1;
    // 在条件队列中的状态 
    static final int CONDITION = -2;
    // 传播,一般用于共享锁 
    static final int PROPAGATE = -3;

    volatile int waitStatus; //等待状态值 
    volatile Node prev; //双向链表基本操作 
    volatile Node next;
    volatile Thread thread; //每一个线程可以封装到一个节点进入等待队列

    Node nextWaiter; //在等待队列中表示模式,在条件队列中表示下一个节点

   // 判断是否为共享节点 
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

   // 返回前驱节点 
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else 
            return p;
    }

    Node() {   // 初始化建立节点或共享标记(Used to establish initial head or SHARED marker)
    }

    Node(Thread thread, Node mode) {     // 等待队列使用 
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // 条件对立使用 
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

再跳出NodeAQS,它定义了三个属性,head,tail默认为null,state默认为0,并且AQS的构造器并没有给它们赋值。

private transient volatile Node head;
private transient volatile Node tail;
private volatile int state; // 当前锁的状态

实际上,双向链表初始化是在实际使用时完成的,后面将演示使用。看看其中一个置状态的操作。

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

原来是通过unsafecompareAndSwapInt()实现的。这个是CAS算法。不妨看看unsafe,它也是内部的属性。

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

	// 找到各个属性锁在的内存地址(相对于unsafe类的偏移地址)
static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

/**
 * CAS 操作头节点
 */
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

/**
 * CAS 操作尾节点
 */
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

/**
 * CAS 操作WaitStatus属性
 */
private static final boolean compareAndSetWaitStatus(Node node,
                                                     int expect,
                                                     int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                    expect, update);
}

/**
 * CAS 操作next属性 
 */
private static final boolean compareAndSetNext(Node node,
                                               Node expect,
                                               Node update) {
    return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

其实,Unsafe里面调用的都是native方法,读者可以自己点进去看看。它会直接找到属性的内存地址,操作内存中的数据,效率比较高。在AQS中静态块计算了各个属性的相对于类的偏移地址,并且在调用Unsafe中的方法时会将偏移地址传过去哦。

并且我们回过头看看,这里unsafe操作的属性在被定义时都是定义为volatile修饰,这是因为他们在被修改时都是使用的CAS算法,我们要使用vilotile修饰保证其可见性。

    private volatile int state; // 当前锁的状态

现在我们已经大致了解了AQS的底层机制,接着来看看它到底时如何被使用的。先看看它可以被重写的五个方法吧。

// 独占式获取同步状态,查看同步状态是否与参数一致,如果没有问题则通过CAS设置同步状态并返回true
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

 // 独占式释放同步状态
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

 // 共享式获取同步状态,返回值大于0表示成功,否则失败
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

 // 共享式释放同步状态
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

 // 是否在独占模式下被当前线程占有(是否当前线程持有锁)
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

现在我们以ReentantLock的公平锁为例,看看它怎么被重写的。

 static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }
    ...
}

先看看lock方法。调AQSacquire()方法。里面会调用AQS中定义的tryAquire方法。而在ReentrantLocktryAuire方法公平锁与非公平锁的实现不同,其具体内容我们暂时略过。这里使用短路&&运算,如果拿到锁了,就不会走后面的逻辑。否则会调用acquireQueued,其内部调用了addWaiter。这就是说如果其它线程持有锁,就会把当前节点加入等待队列中。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //节点为独占模式,EXCLUSIVE 
        selfInterrupt();
}

跟到addWaiter中看看。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 先尝试用CAS直接入队,如果CAS入队成功,则return 
    Node pred = tail;
    if (pred != null) { //初始状态tail尾节点未赋值会指null,如果不为空说明有其它节点插入了
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // CAS失败(其它线程也在获取锁或者tail节点为空无法cas)
    enq(node);
    return node;
}

上面的注释写的很清楚,我们接着看看enq怎么实现的,它其实是AQS的一个自旋机制。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 说明头尾节点没有初始化 
            if (compareAndSetHead(new Node())) //新建空节点置为头节点
                tail = head; //头尾节点指向同一个节点
        } else {
            node.prev = t; //队列插入节点操作,把当前节点的prev指向尾节点 
            if (compareAndSetTail(t, node)) { //设置队列的尾节点为刚插入的当前节点
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter终于看完了,再退回去看下,它的结果会作为参数传给acquireQueued,我们接着来看下acquireQueued。它在得到返回的节点时也会进入自旋状态(入等待队列成功,准备排队获取锁了)。

其过程可以结合下图理解。

img

具体代码如下。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; // 成功与否标记,初始置为true 
    try {
        boolean interrupted = false; //中断标记 
        for (;;) {
            final Node p = node.predecessor(); //获取当前已经插入节点的前驱节点 
            if (p == head && tryAcquire(arg)) { //如果前驱节点是头节点,说明当前节点位于队首(上图节点1),会调用tryAcquire抢锁
                setHead(node); //抢锁成功,节点出队
                p.next = null; // 建议 GC
                failed = false;
                return interrupted; //正常返回,未在等待队列中被中断
            }
            // 当前节点不是队首节点,将当前节点的前驱节点的等待状态设置为signal(siganl含义:siganl状态的节点下一个节点处于等锁状态)。如果设置失败进行下一轮循环,否则往下执行 
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 通过Unsafe类操作底层挂起线程(直接进入阻塞状态,也就是等待锁的状态) 
    return Thread.interrupted();
}

再来看看shouldParkAfterFailedAcquire的具体逻辑。

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // 如果前驱节点已经是signal,则直接返回true 
        return true;
    if (ws > 0) { // ws>0表示前驱节点已经被取消,不能是被取消的节点,向前遍历直到找到第一个未被取消的节点 
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node; //把被取消的节点全部抛弃
    } else {
    	   //前驱节点不是signal, 使用CAS设置为signal
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false; // 返回false,直接进入下一轮,判断CAS是否成功将前驱节点状态设置为signal
}

在上面的代码分析过程中,我们频繁看到park,unpark方法,他们的作用是将线程挂起,和解除线程的挂起状态。看下列示例代码。

public class Demo22 {
    public static void main(String[] args) {
        Thread t = Thread.currentThread();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("unpark thread t");
                LockSupport.unpark(t);
//                thread.interrupt();
           } catch (InterruptedException e) {

            }
        }).start();
        System.out.println("Thread t to be park...");
        LockSupport.park();
        System.out.println("Thread t unpark successfully");
    }
}

其运行结果是。

Thread t to be park...
unpark thread t
Thread t unpark successfully

到此为止,ReentrantLock公平锁的Lock方法已经讲解完毕了。继续深入。我们接着来看它的tryAcquire方法。也就是看看它究竟是如何去抢锁的。

// 可重入独占锁的公平实现
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState(); // 获取当前AQS的状态,独占模式下如果为0说明未占用,如果大于0说明已经被占用 
    if (c == 0) {
        if (!hasQueuedPredecessors() && //判断是否等待队列不为空且当前线程没有获得锁,其实就是当前线程是否需要排队
            compareAndSetState(0, acquires)) { // CAS设置状态,如果成功则说明成功的获取到了这把锁
            setExclusiveOwnerThread(current); // 将独占锁的线程拥有者设置为当前线程 
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 如果AQS状态不为0,说明锁被占用,判断占用者是否是当前线程 
        int nextc = c + acquires; // 每加锁一次则状态值加一 
        if (nextc < 0) //加到int溢出了 
            throw new Error("Maximum lock count exceeded");
        setState(nextc); 
        return true;
    }
    return false;  // 任何其它情况返回false,加锁失败
}
}

加锁过程算是讲完了,接下来看看它的解锁过程。

ReentrantLockunlock方法。原来调用的是release方法,状态传参1,因为释放锁的次数为1.

public void unlock() {
    sync.release(1);
}

看看AQSrelease方法。

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 尝试解锁 
        Node h = head; 
        if (h != null && h.waitStatus != 0) //头节点不为空且waitStatus状态不为0(初始状态为0,当被设置成为signal后为-1)
            unparkSuccessor(h); // 唤醒下一个后继节点 
        return true;
    }
    return false;
}

接下来看看unparkSuccessor.看看它到底是怎么唤醒下一个节点的。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) // 如果等待状态<0,说明是signal状态,将其设置为0,即恢复状态
        compareAndSetWaitStatus(node, ws, 0); 
    // 获取当前节点的后继节点 
    Node s = node.next;
    if (s == null || s.waitStatus > 0) { //如果没有下一个节点或者状态>0(已经取消),遍历节点找其它符合unpark要求的节点
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) //从队尾往队前找 
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) //要是没有找到,就算了,找到了就unpark
        LockSupport.unpark(s.thread);
}

再看看release中的tryRelease

protected final boolean tryRelease(int releases) {
      int c = getState() - releases; //当前状态值减去要释放锁的次数(之前传的是1) 
      if (Thread.currentThread() != getExclusiveOwnerThread()) // 独占锁,如果不是当前线程持有锁,抛出异常 
          throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) { //解锁后状态值为0,则完全释放这把锁
          free = true;
          setExclusiveOwnerThread(null);
      }
      setState(c); //状态值 
      return free; // 是否完全释放
  }

下面通过一张图对锁的加锁、释放机制做一个总结。

img

非公平锁的情况大致进行介绍如下。压根没啥等待队列,上来直接哐哐CAS.

final void lock() {
      if (compareAndSetState(0, 1))
          setExclusiveOwnerThread(Thread.currentThread());
      else
          acquire(1);
  }

  final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
          if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
          }
      }
      else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow 
              throw new Error("Maximum lock count exceeded");
          setState(nextc);
          return true;
      }
      return false;
  }