深入理解Java并发框架AQS系列:条件队列(Condition)

Java
316
0
0
2023-09-17

一、前言

AQS中的条件队列相比较前文中的“独占锁”、“共享锁”等比较独立,即便没有条件队列也丝毫不影响诸如 ReentrantLock Semaphore 类的实现,那如此说来条件队列是否就是一个可有可无的产物?答案是否定的,我们来看下直接或间接用到条件队列的 jdk 并发类:

  • ReentrantLock 独占锁经典类
  • ReentrantReadWriteLock 读写锁
  • ArrayBlockingQueue 基于数组的阻塞队列
  • CyclicBarrier 循环栅栏,解决线程同步问题
  • DelayQueue 延时队列
  • LinkedB lock ingDeque 双向阻塞队列
  • PriorityBlockingQueue 支持优先级的无界阻塞队列
  • ThreadPoolExecutor 线程池构造器
  • ScheduledThreadPoolExecutor 可基于时间调度的 线程池 构造器
  • StampedLock 邮戳锁,1.8后引入,更高效地读写锁

如此豪华的阵容,可见 Condition 的地位不可小觑

我们简单描述下条件队列实现的功能:有3个 线程 A、B、C,分别调用 wait/await 方法后,线程进入阻塞,在没有其他线程去唤醒的情况下,3个线程将永远处于阻塞状态。此时如果有另外线程调用 notify/signal ,那么A、B、C线程中的某一个将被激活(根据其进入条件队列的顺序而定),从而执行后续的逻辑;如果调用 notifyAll/signalAll 的话,那么3个线程都将被激活,这可能是我们对条件队列的简单认识。这样的描述是否准确呢?可能不太严谨,我们引入JDK的条件队列来做说明

统一话术:其实语法层面支持的 wait/notify 与AQS都属于JDK的范畴,但为了区分两者,我们定义如下:

  • JDK条件队列 :语法层面提供支持的 wait/notify ,即 Object 类中的 wait()/notify() 方法
  • AQS条件队列 :AQS提供的条件队列,即AQS内部的 ConditionObject

二、JDK中的条件队列(wait/notify)

众所周知,在JDK中, wait/notify/notifyAll 是根对象 Object 中内置的方法,且方法均被定义为 native 本地方法

 // 等待
public final native  void  wait(long timeout) throws Interrupted Exception ;
// 唤醒
public final native void notify();
// 唤醒所有等待线程
public final native void notifyAll();

2.1、wait

 // 步骤
synchronized (obj) {
  // 步骤
  before();
  // 步骤
  obj.wait();
  // 步骤
  after();
}

相信大家对上述代码并不陌生,我们将JDK的条件队列抽象为4步,逐一阐述

  • 步骤1: synchronized (obj) 在jdk中如果想调用 Object.wait() 方法,必须首先获取该对象的 synchronized 锁,当前步骤,如果成功获取到锁,那么将进入“步骤2”,如果存在并发,当前线程将会进入阻塞(线程状态为BLOCKED),知道获取到锁为止
  • 步骤2: before() 我们知道 synchronized 是独占锁,所以在执行步骤2代码时,程序是不存在并发的,即同一时刻,只有一个线程正在执行,此处也相对好理解
  • 步骤3: obj.wait() 此步骤是将当前线程放入条件队列,同时释放 obj 的同步锁。此处跟我们对 synchronized 的认知有悖,我们一般认为 synchronized (obj) {……} 在大括号中的代码会一直持有锁,而事实情况却是,当程序执行 wait() 方法时,会释放 obj 的同步锁
  • 步骤4: after() 此步骤是并发执行还是串行执行?假设我们现在有3个线程A、B、C都已经执行完毕 wait() 方法,并进入了条件队列,等待其他线程唤醒;此时另外一个线程执行了 notifyAll() 时,后续的激活流程是怎么样的?错误观点:有很多同学直观感受是,线程A、B、C同时被激活,所以步骤4是并发执行的;就像是百米赛跑,所有同学都准备就绪(wait),一声枪响后(notifyAll),所有人开始赛跑,并跑到终点(步骤4)正确观点:其实“步骤4”是串行执行的,大家再检查下代码后便可发现,“步骤4”处于 synchronized 的大括号之间;还是拿上述赛跑举例,如果认为从听到枪响至跑到终点是“步骤4”的话,那真实的场景应该是这样的:一声枪响后,A起跑,B、C原地不动;A跑到终点后,B开始起跑,C原地不动;最后是C跑到终点

由此我们断定, obj.wait() 虽然是native方法,但其内部经历了释放锁、重新抢锁的两个大环节

2.2、notify

 synchronized (obj) {
  obj.notify();
  // obj.notifyAll();
}

所有因 obj.wait() 阻塞的线程,都要通过 notify 来唤醒

  • notify() 唤醒条件队列中,队首节点
  • notifyAll() 唤醒条件队列中所有节点

三、AQS中的条件队列(await/signal)

我们初看AQS中的条件队列时,发现其提供了与JDK条件队列几乎一致的功能

JDK


AQS


wait


await


notify


singal


notifyAll


singalAll


用法上也极其相似:

await

 // 初始化
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
  lock.lock();
  condition.await();
} catch (InterruptedException e) {
  e.printStackTrace();
} finally {
  lock.unlock();
}

singal

 Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
  lock.lock();
  condition.signal();
} finally {
  lock.unlock();
}

3.1、条件队列

我们知道在AQS内部维护了一个阻塞队列,数据结构如下:

上图描述的是一个长度为 3 的FIFO阻塞队列,因为头结点常驻内存,所以不算在内;我们可以发现阻塞队列中每个节点都包含了前、后引用

那AQS内部的另一个条件队列又是什么样的数据结构呢?

可见,条件队列为单向列表,只有指向下一个节点的引用;没有被唤醒的节点全部存储在条件队列上。上图描述的是一个长度为 5 的条件队列,即有5个线程执行了 await() 方法;与阻塞队列不同,条件队列没有常驻内存的“head结点”,且一个处于正常状态节点的 waitStatus 为 -2 。当有新节点加入时,将会追加至队列尾部

3.2、唤醒

当我们调用 signal () 方法时,会发生什么?我们还是拿长度为 5 的条件队列举例说明,在AQS内部会经历队列转移,即由条件队列转移至阻塞队列

signalAll() 执行时,具体执行流程与 signal() 类似,即会将条件队列中的所有节点全部转移至阻塞队列(并发度为1,按顺序依次激活)中,依靠阻塞队列自身依次唤醒的机制,达到激活所有线程的目的

四、JDK vs AQS

经过上文的介绍,似乎AQS做了与 wait/notify 相同的功能,相比较而言,甚至JDK的写法更简洁;那他们在性能上的表现如何呢?让我们来做个对比

4.1、对比

我们模拟这样的一个场景:启动10个线程,分别调用 wait() 方法,当所有线程都进入阻塞后,调用 notifyAll() ,10个线程均被唤醒并执行完毕后,方法结束。 上述方法执行10000次,对比JDK与AQS耗时

JDK测试代码:

 public class ConditionCompareTest {

  @Test
  public void runTest() throws InterruptedException {
    long begin = System.currentTimeMillis();
    for (int i =; i < 10000; i++) {
      if (i % == 0) {
        System.out.println(i);
      }
      jdkTest();
    }
    long cost = System.currentTimeMillis() - begin;
    System.out.println("耗时: " + cost);
  }
  
  public void jdkTest() throws InterruptedException {
    Object lock = new Object();
    List<Thread> list = Lists.newArrayList();
    // 步骤一:启动个线程,并进入wait等待
    for (int i =; i < 10; i++) {
      Thread thread = new Thread(() -> {
        try {
          synchronized (lock) {
            lock.wait();
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      thread.start();
      list.add(thread);
    }

    // 步骤二:等待个线程全部进入wait方法
    while (true) {
      boolean allWaiting = true;
      for (Thread thread : list) {
        if (thread.getState() != Thread.State.WAITING) {
          allWaiting = false;
          break;
        }
      }
      if (allWaiting) {
        break;
      }
    }

    // 步骤三:唤醒个线程
    synchronized (lock) {
      lock.notifyAll();
    }

    // 步骤四:等待个线程全部执行完毕
    for (Thread thread : list) {
      thread.join();
    }
  }
}

AQS测试代码:

 public class ConditionCompareTest {
   private  ReentrantLock lock = new ReentrantLock();
  private Condition condition = lock.newCondition();

  @Test
  public void runTest() throws InterruptedException {
    long begin = System.currentTimeMillis();
    for (int i =; i < 10000; i++) {
      if (i % == 0) {
        System.out.println(i);
      }
      aqsTest();
    }
    long cost = System.currentTimeMillis() - begin;
    System.out.println("耗时: " + cost);
  }

  @Test
  public void aqsTest() throws InterruptedException {
    AtomicInteger lockedNum = new AtomicInteger();
    List<Thread> list = Lists.newArrayList();
    // 步骤一:启动个线程,并进入wait等待
    for (int i =; i < 10; i++) {
      Thread thread = new Thread(() -> {
        try {
          lock.lock();
          lockedNum.incrementAndGet();
          condition.await();
          lock.unlock();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      thread.start();
      list.add(thread);
    }

    // 步骤二:等待个线程全部进入wait方法
    while (true) {
      if (lockedNum.get() !=) {
        continue;
      }
      boolean allWaiting = true;
      for (Thread thread : list) {
        if (thread.getState() != Thread.State.WAITING) {
          allWaiting = false;
          break;
        }
      }
      if (allWaiting) {
        break;
      }
    }

    // 步骤三:唤醒个线程
    lock.lock();
    condition.signalAll();
    lock.unlock();

    // 步骤四:等待个线程全部执行完毕
    for (Thread thread : list) {
      thread.join();
    }
  }
}

条件队列


耗时1


耗时2


耗时3


耗时4


耗时5


平均耗时(ms)


JDK


5000


5076


5054


5089


4942


5032


AQS


5358


5440


5444


5473


5472


5437


4.2、基准测试Q&A

基于以上的测试我们还是有一些疑问的,不要小看这些疑问,通过这些疑问我们可以把之前的知识点全都串联起来

  • Q:AQS测试中的“步骤二”,为什么在判断“等待10个线程全部进入wait方法”时,要引入 lockedNum.get() != 10 的判断?直接通过判断所有线程是否均为 waiting 方法不可以吗?
  • A:如果真的删除 lockedNum.get() != 10 的判断,在多次并发测试时,会有较小的概率出现程序 死锁 的情况(作者电脑的环境是平均5万次调用会出现一次),为什么会出现死锁呢?我们对AQS源码就会发现,不管是调用 lock() 还是 await ,挂起线程使用的方法均为 LockSupport.park() 方法,此方法会将线程置为 WAITING 状态,也就是线程状态是 WAITING 状态时,有可能线程刚进入 lock() 方法,从而导致 await thread.join() 的死锁
  • Q:既然是这样,为什么JDK的测试没有出现死锁?
  • A:我们看到JDK的加锁是通过 synchronized 关键字完成的,而当线程因为等待 synchronized 资源而阻塞时,线程状态将变为 BLOCKED ,而进入 wait() 方法后,状态才会变为 WAITING
  • Q:那看来只有通过引入 AtomicInteger lockedNum 变量才能解决死锁问题了
  • A:其实解决问题的方式有很多种,我们甚至可以简单将 ReentrantLock lock 置为公平锁,也能解决上述死锁问题;因为当前场景发生死锁的情况是, singalAll() 先于 await() 发生,而当所有线程都变成 WAITING 状态后,公平锁则确保了 singalAll() 一定是在所有线程都调用了 await() 。但因为 synchronized 本身是非公平锁,故如果AQS使用公平锁的话,性能偏差较大
  • Q:那这样看来,AQS中的阻塞队列相对比JDK的没有优势可言啊,用法上没有JDK简洁,性能上还没人家快
  • A:的确,如果真是只是单纯地使用阻塞、唤醒功能的话,还是建议使用JDK内置的方式;但AQS的优势并不在此

五、再说AQS条件队列

AQS的优势在于,其提供了丰富的api可以查询条件队列的状态;例如当我们想看一下在条件队列中等待节点的个数时,使用JDK的 wait/notify 时,是无法做的;AQS提供的api如下:

  • boolean hasWaiters() 阻塞队列中是否有等待节点
  • int getWaitQueueLength() 获取阻塞队列长度
  • Collection<Thread> getWaitingThreads() 获取阻塞队列中线程对象

这些api为程序提供了更灵活的控制,条件队列对于javaer已不是黑盒;当然使用AQS的条件队列必然要引入独占锁,例如 ReentrantLock ,自然地我们还可以通过它查看条件队列外围的一些指标,例如:

  • Interrupted 响应中断,借助独占锁,提供响应中断能力; wait/notify 不提供,因为虽然 wait 方法响应中断,但是 synchronized 关键字是会一直阻塞的
  • boolean tryLock() 尝试获取锁; wait/notify 不提供
  • int getHoldCount() 获取阻塞线程的数量
  • boolean isLocked() 是否持有锁
  • fair/nonFair 提供公平/非公平锁

可见整个AQS体系相比较 Object wait/notify 方法是相当灵活的,提供了很多监控条件队列、阻塞队列的指标