Juc并发编程08——Condition实现源码分析

Java
271
0
0
2022-12-06
标签   Java并发

看看ReentrantLock中的newCondition方法

final ConditionObject newCondition() {
      return new ConditionObject();
}

再点进去发现原来就是AQS中的内部类。

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    .....
}

发现没有,又有链表操作的定义,其实现的原理就是条件队列哦。其结构可以参考下图哦。

img

我们知道,当一个线程调用await方法时,会进入等待状态,直到被其它线程使用signal方法唤醒。这里的等待队列就是用来存储处于await等待状态的线程的。

我们先来看看最关键的await方法是如何实现的。这里我们先来阐述几个前提。

  • 只有已经持有锁的线程才可以调用此方法。
  • 当调用此方法时,会直接释放锁,无论加了多少次锁。
  • 只有其它线程调用signal方法或者是中断时才会唤醒等待中的线程。
  • 被唤醒后要等其它线程释放锁,拿到锁后才能够继续执行,并且会恢复之前的状态(await之前加了几层锁之后依旧是几层锁)。

来看下await方法的源码吧。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException(); //如果在调用await之前就已经中断的话,直接抛出中断异常。 
    Node node = addConditionWaiter(); // 为当前线程创建一个新的节点,将其加入条件队列中 
    int savedState = fullyRelease(node); // 完全释放当前线程的锁,并且保存当前线程的状态,因为还得恢复 
    int interruptMode = 0; // 中断状态 
    while (!isOnSyncQueue(node)) { //循环判断是否存在于AQS的同步队列中(如果处于等待状态的线程被其它唤醒,会进入AQS的同步队列中,后面我们会讲)
        LockSupport.park(this); // 如果还处于等待状态,将线程挂起 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查是否在等待过程中被中断了 
            break;
    }
    // 跳出循环表示线程肯定已经被唤醒了,这个时候只差拿到锁就可以运行了 
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 直接去尝试拿锁,这里和一个线程去抢锁的过程基本上是一样的
        interruptMode = REINTERRUPT; 
     // 拿到锁基本就可以继续运行了,这里再进行一些基本的清理工作 
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0) // 依旧是响应中断
        reportInterruptAfterWait(interruptMode);
}

上面的过程很中规中矩,无非就是响应中断、线程挂起、强锁、清理工作等基本的逻辑,再来看看signal方法是怎么实现的。在阅读源码之前,我们先明确几点:

  • 只有持有锁的线程才能调用siganl方法唤醒其它线程。
  • 优先唤醒条件队列中的第一个,如果在唤醒过程中出现问题,则接着往下找,直到找到第一个可以被唤醒的线程。
  • 唤醒结果本质上来说就是将条件队列的节点直接丢进等待队列中,让其参与锁资源的竞争。
  • 拿到锁之后,线程才能继续执行。

其过程可以参考下图哟。

img

上源码。

public final void signal() {
    if (!isHeldExclusively()) // 查看当前线程是不是持有锁的状态 
        throw new IllegalMonitorStateException(); // 不持有锁不能唤醒其它线程 
    Node first = firstWaiter; // 获取条件队列的第一个节点 
    if (first != null) //条件队列不为空
        doSignal(first); // 唤醒
}

doSignal

 private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null) // 如果当前节点没有后继节点了,条件队列直接为空了
            lastWaiter = null; // 这里相当于是清空条件队列了
        first.nextWaiter = null; // 将当前节点的nextWaiter置为空,这是因为当前节点出条件队列了哦
    } while (!transferForSignal(first) && // 将第一个节点唤醒,没有唤醒(被取消)而且条件队列不为空就一直循环
             (first = firstWaiter) != null);
}

transferForSignal

final boolean transferForSignal(Node node) {
    // 如果这里CAS失败,可能是因为线程被取消了 
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
        
    // CAS成功,直接将节点丢入AQS的等待队列中 
    Node p = enq(node); // enq返回的是传入节点的前驱节点,不记得可以往下看 
    // 核心逻辑结束,后面代码是做了一层优化,可以提前跳出之前的while循环 
    int ws = p.waitStatus; // 保存前驱节点的等待状态 
    // 如果前驱节点的状态为取消,或者CAS将其置为signal失败(可能是在ws>0后的瞬间取消了) 
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) 
        LockSupport.unpark(node.thread); // 直接唤醒线程 
    return true;
}

不知道入队方法大家还记得不。这里不记得可以看看代码。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

下图总结了await,signal的过错。

img