知道CountDownLatch是做什么的,那你知道它的底层是如何实现的吗?

Java
214
0
0
2024-01-13

一、概述

CountDownLatch是一个多线程控制工具,用来控制线程的等待。设置需要countDown的数量num,然后每一个线程执行完毕后,调用countDown()方法,而主线程调用await()方法执行等待,直到num个子线程执行了countDown()方法 ,则主线程解除阻塞,开始继续执行。

其具体操作流程类似火箭发射,我们通过倒数三二一(3个子线程分别调用countDown()),那么火箭就发射升空了(主线程await()方法处就释放了阻塞,可以继续向下执行):

代码上的使用方法如下所示:

首先创建CountDownLatch实例对象,并传入需要倒数的count值; 【其次】在主线程处通过调用await()方法进行阻塞操作; 【最后】当子线程执行完某个任务之后,调用countDown()方法执行倒计时减1操作;当倒计时为0的时候,主线程解除阻塞,继续执行await()方法下面的代码逻辑;

我们以实例CountDownLatchDemo为例,看一下具体的代码实现:

二、构造函数解析

在CountDownLatch的构造函数中,我们通过指定入参count的值,来设置需要调用多少次countDown()方法才会释放对当前线程的阻塞。构造方法逻辑比较简单,如果我们设置的count值小于0,则说明是一个违规值,会随之抛出IllegalArgumentException异常;代码如下所示:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

如果设置的count值是合法值,那么则通过setState(count)方法,将count赋值给AQS中的state变量。那么这个state值,就可以用来做倒计时的计数用了,如果为0,则表示倒计时结束,否则,则依然无法解除主线程的阻塞状态。

三、await()方法源码解析

从上面的演示示例中,我们已经看到,通过在主线程中调用countDownLatch.await()方法,使得主线程进入阻塞状态,那么其内部是如何实现的呢?我们把视角转移到await()方法中。在其方法内,只有一行代码,即,调用sync的acquireSharedInterruptibly(1)方法,此处需要额外说明一下,这个sync其实是继承了AQS类的实例对象,所以,它同时也具备了AQS的所有功能,那么从这里大家也能得出一个结论,就是CountDownLatch所具备的能力其实底层都是通过AQS实现的。代码如下所示:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly(...)方法中,如果发现发生过interrupt,则抛出InterruptedException异常;如果没发生过interrupt,则通过调用tryAcquireShared(arg)方法来判断是否倒计时已经结束了,如果state等于0,则表示倒计时结束了,那么该方法返回1,否则,返回-1;如果倒计时没有结束(即:tryAcquireShared(arg)返回-1),则继续执行doAcquireSharedInterruptibly(arg)方法,代码如下所示:

此处展示了tryAcquireShared(arg)方法的内部处理逻辑,即:如果state等于0,则表示倒计时结束了,那么该方法返回1,否则,返回-1;代码如下所述:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; // 1表示倒计时结束;-1表示倒计时进行中;
}

如果倒计时没有结束,则会执行doAcquireSharedInterruptibly(-1)方法,在该方法内部主要由四部分逻辑组成,下面我们也会依次针对这些部分进行详细解析:

步骤1】创建AQS队列,将Node节点放到队列中。 【步骤2】如果倒计时未完成,则执行阻塞操作。 【步骤3】如果倒计时完成,解除阻塞操作。 【步骤4】如果存在异常发生,则对失败进行收尾工作。

3.1> 创建AQS队列

因为在上面已经说过——CountDownLatch所具备的能力其实底层都是通过AQS实现的。而AQS底层就是通过维护节点链表实现的抢锁行为,那么对于CountDownLatch我们也需要创建这样一个链表数据结构,这部分逻辑就在addWaiter(Node.SHARED)方法中。此处需要额外说明一下的就是,对于入参值Node.SHARED,仅仅是一个空属性值的Node节点。

addWaiter(...)方法内部,主要针对两种情况进行了处理:

情况1】如果链表已经创建过了,那么直接讲node放置到链表末尾即可,返回node; 【情况2】如果没创建,则创建链表,并将node插入到链表末尾,返回node;

针对enq(node)方法的内部逻辑,下图以节点数据结构进行了进一步的解释,请见下图所示:

3.2> 执行阻塞操作

当我们执行完上面的addWaiter(Node.SHARED)方法,创建了AQS队列之后,我们就开始了下面的无限for循环逻辑。在for(;;)无限循环中,会尝试获得r值,其含义如下所示:

r==1】表示state等于0,倒计时完毕。 【r==-1】表示state不等于0,倒计时还在进行中。

那么,此处我们的前提条件就是——倒计时还在进行中;所以r等于-1,无法满足下面一行的if(r>=0)的判断条件,所以,不执行该if逻辑。而是直接跳转到“步骤3:执行阻塞操作”这部分红框代码中了,具体请见下图所示:

在“步骤3:执行阻塞操作”这步骤中,主要执行了两个方法:shouldParkAfterFailedAcquire(p, node)parkAndCheckInterrupt(),下面我们就分别来分析这两个方法的具体执行过程。

shouldParkAfterFailedAcquire(p, node)方法中,会执行如下逻辑:

static final int CANCELLED =  1; /** waitStatus value to indicate thread has cancelled */
static final int SIGNAL    = -1; /** waitStatus value to indicate successor's thread needs unparking */
static final int CONDITION = -2; /** waitStatus value to indicate thread is waiting on condition */
static final int PROPAGATE = -3; /** waitStatus value to indicate the next acquireShared should unconditionally propagate */

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; 
    if (ws == Node.SIGNAL) // SIGNAL值,表示后继线程需要unparking
        return true;
    if (ws > 0) { // ws大于0,说明是CANCELLED节点,清理该节点
        do { 
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 否则,将当前节点赋值为SIGNAL
    }
    return false;
}
如果节点的waitStatus == -1】则直接返回true; 【如果节点的waitStatus > 0】说明是CANCELLED节点,那么清理该节点及所有相邻前置的CANCELLED节点,并返回false; 【如果节点的waitStatus是其他值】通过CAS将节点的waitStatus值变为-1(Node.SIGNAL),并返回false

那么由于head指针指向的Node节点waitStatus等于0,所以,第一次执行shouldParkAfterFailedAcquire(...)方法之后,head节点的waitStatus从0变为-1;那么当再次执行shouldParkAfterFailedAcquire(...)方法的时候,则满足:waitStatus == -1,直接返回true了,请见下图所示:

shouldParkAfterFailedAcquire(p, node)方法在执行第二遍之后返回了true,那么就轮到触发parkAndCheckInterrupt()方法的时刻了,它内部逻辑非常简单,就是执行了两个步骤:步骤1,调用LockSupport.park方法对当前线程进行阻塞;步骤2,解除阻塞后,如果发生了interrupt,则返回true;否则返回false;代码如下所示:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 线程在此处被阻塞
    return Thread.interrupted(); // 如果发生了interrupt,则返回true;否则返回false
}

3.3> 解除阻塞操作

当“倒计时”结束,即:执行了足够次数的countDown()方法(此步骤会在“四、countDown()方法源码解析”章节进行介绍);则会触发解除阻塞的操作了,即:下图红框内的代码。

那么在上述红框代码中,关键的代码逻辑就是setHeadAndPropagate(node, r),其中:node为存储了当前线程的节点(即:node.thread=主线程),r等于1

setHeadAndPropagate(node, r)方法的作用是用于,请见如下源码所示:

static final Node SHARED = new Node(); // 空值节点

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 暂存旧的头节点
    setHead(node); // 设置新的头节点
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;  // 获取node的下一个节点
        if (s == null || s.isShared()) // 如果node就是尾节点或者s.nextWaiter等于SHARED
            doReleaseShared();
    }
}

/**
 * 设置头节点
 */
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

/**
 * 如果node在共享模式下等待,返回true。
 */
final boolean isShared() {
    return nextWaiter == SHARED;
}

对于CountDownLatch来说,doReleaseShared()方法其实没有什么作用,因为原本链表就两个节点,一个虚拟头结点(head指针),一个是当下主线程节点(tail指针);当head指针指向下一个节点时,则head==tail,那么就会直接break跳出无限for循环(for(;;)

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 由于此时h等于tail,所以不满足if判断
        if (h != null && h != tail) { 
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
                    continue;         
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;               
        }

        // 由于h等于tail(可参照3.2画的链表图),跳出该方法
        if (h == head) break;
    }
}

执行完上面所说的setHeadAndPropagate(node, r)方法之后,基本就可以结束await()方法的逻辑,继续执行主线程剩下的逻辑代码了。

3.4> 针对执行失败后的收尾工作

如果顺利的解除阻塞的话,failed变量会被赋值为false,那么在finally中的cancelAcquire(node)方法则不会被调用。反之,如果failed等于true,则说明阻塞并未按照正常的unpark方式解除阻塞,即,通过异常的方式解除的阻塞,那么我们就需要执行cancelAcquire(node)方法进行失败后的收尾工作了,具体代码如下所示:

cancelAcquire方法中,尝试在AQS的队列链表中断开node节点的,即,剔除掉node节点。由于此处并非主流程,所以具体的代码和注释如下所示,就不再赘述了。

private void cancelAcquire(Node node) {
    if (node == null) // node是保存了主线程的节点,不为空
        return;

    node.thread = null; // 将node保存的线程置为空,即,丢弃之前保存的主线程
    Node pred = node.prev;
    while (pred.waitStatus > 0) // pred的waitStatus等于-1,不满足
        node.prev = pred = pred.prev;

    Node predNext = pred.next; // predNext等于null
    node.waitStatus = Node.CANCELLED;
    // node等于tail,将tail指针指向pred节点
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null); // 将p
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

四、countDown()方法源码解析

子线程通过调用countDown()方法来实现“倒计时”操作,所以,下面我们就来着重分析一下这个方法的具体执行过程,代码如下所示:

public void countDown() {
    sync.releaseShared(1);
}

releaseShared(1)方法中,首先通过tryReleaseShared(arg)进行判断,只有倒计时最后一次countDown调用才会返回true,其他情况都会返回false;而如果返回的是true,才会继续执行if方法内的逻辑,即:doReleaseShared()方法。

4.1> tryReleaseShared(arg)

在该方法内部,首先开启了无限for循环,那么首先获取了当前的倒计时总数state的值,如果等于0,则说明在本次调用countDown()方法之前,倒计时就已经结束了,则此时直接返回false

如果倒计是没有结束,则继续往下执行,先将倒计时总数减1,如果等于0,则说明本次调用countDown()方法是倒计时的最后一次,那么应该可以触发后续的解除主线程阻塞的操作了,那么此时直接返回true;但是,如果不等于0,则表示倒计时仍在继续中,则此时直接返回false;具体代码如下所示:

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState(); // 获得倒计时总数state
        if (c == 0) return false; // 如果等于0,则表示倒计时结束,返回false
        int nextc = c - 1; // 否则,倒计时总数减1
        if (compareAndSetState(c, nextc)) // 然后将最新的倒计时数,更新到state的值
            return nextc == 0; // 如果等于0,返回true;否则,返回false
    }
}

4.2> doReleaseShared()

doReleaseShared()方法中,我们要开始真正的执行解除阻塞的操作了。方法首先开启了无限for循环,然后进行了一系列的判断,对于当前AQS队列的情况,上面已经通过图的方式表现了,为了便于大家回忆,我又把它粘贴到了doReleaseShared()方法源码的下面,此时h不等于null,并且h不等于tail,并且h的waitStatus等于-1(Node.SIGNAL),所以是可以顺利执行unparkSuccessor(h)这行代码的;当解除阻塞后,此时head指针向后移动一个节点,那么在第二次循环时,由于无法满足h!=tail,则执行第14行——if(h==head) break;跳出无限循环,结束本方法了。具体代码如下所示:

unparkSuccessor(h)方法中,我们获得了head头节点的下一个节点s,即:也就是保存主线程的节点,然后针对s节点存储的thread(即:主线程)执行unpark操作。因此,主线程的阻塞被解除了。具体代码如下所示:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus; // 此时ws等于0
    if (ws < 0) compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next; // s就是head节点的next节点,也就是保存主线程的节点

    // s不等于null,并且s.waitStatus等于0
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }

    // 针对s中存储的thread(即:主线程)执行unpark操作
    if (s != null) LockSupport.unpark(s.thread);
}