前 言 🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端 🍌 专栏简介:juc并发编程,讲解锁原理、锁机制、线程池、AQS、并发容器、并发工具等,深入源码,持续更新。 🌰 文章简介:本文主要介绍常用的并发工具类:CountdownLatch,将深入剖析源码,讲解其使用与原理
线程计数器CountdownLatch源码剖析
文章目录
- 线程计数器CountdownLatch源码剖析
- 1 使用计数器锁实现任务计数
- 2 await的源码剖析
- 3 countdown源码剖析
1 使用计数器锁实现任务计数
多任务同步神器,它允许一个或多个线程,等待其它线程完成工作,比如我们现在有一个需求:
- 有20个任务,需要将每个任务的执行结果算出来,但是每个任务执行的时间未知。
- 当所有的任务执行结束后,立即整合统计所有的执行结果。
我们并不知道任务可以在什么时间完成,因此执行统计的时间不好设置,设置短了则还有任务没有完成,设置长了则统计延迟。
CountdownLatch
可以做到,它是一个实现子任务同步的工具。Demo如下。
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
final int finalI = i;
new Thread(() ->{
try {
Thread.sleep((long)(2000 + new Random().nextDouble()));
System.out.println("thread " + finalI + " finished");
} catch (InterruptedException exception) {
exception.printStackTrace();
}
latch.countDown(); // 相当于计数器,每次减少1
}).start();
}
latch.await(); //可以多个线程同时等待,这里仅演示了一个线程进行等待
System.out.println("all sub task finished!");
}
执行结果如下。
其实它就是一个线程计数器,注意CountDownLatch
是一次性的,不能重复使用。比如下面再多调用一次latch.await
,程序还是正常结束的(毕竟计数不可逆,已经是0了,而且无法将计数器重置).
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
final int finalI = i;
new Thread(() ->{
try {
Thread.sleep((long)(2000 + new Random().nextDouble()));
System.out.println("thread " + finalI + " finished");
} catch (InterruptedException exception) {
exception.printStackTrace();
}
latch.countDown();
}).start();
}
latch.await();
latch.await();
System.out.println("all sub task finished!");
}
2 await的源码剖析
上面已经演示了使用,下面来看看它的原理吧。
public class CountDownLatch {
// 使用了AQS,不过是基于共享锁实现的
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//这里实际上使用count作为了共享锁的state值
// count数与共享锁的数量相同
// 每调用一次countdown就是解一层锁
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 重写了共享锁的实现
// 获取共享锁其实就是等待其它线程把state减到0
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 解锁的过程
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count); //构造sync方法
}
//通过acquireSharedInterruptibly获取共享锁,
// 但是如果state不为0,将会被持续阻塞,后文详解
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 同上,但是会被阻塞
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//countDown其实就是解锁一次
public void countDown() {
sync.releaseShared(1);
}
// 获取当前的计数,也就是AQS的state值
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
我们终于知道CountDownLatch
它的原理了,原来它就是利用共享锁来实现计数的,锁的数量就是计数数量,countdown的过程就是解锁的过程。
我们在该系列博客已经介绍过独占锁,但是没有深入剖析过共享锁,这里我们来深入共享锁的源码进行剖析,以便大家对CountDownLatch
具有更为深入的理解。
点进AbstractQueuedSynchronizer
类中。先看看acquireShared
获取共享锁,这个就是CountDownLatch
中await
方法调用的底层方法(实际上是acquireSharedInterruptibly
,不过原理是一样的)。
public final void acquireShared(int arg) {
// 尝试获取共享锁,小于0则失败
if (tryAcquireShared(arg) < 0)
// 获取共享锁失败,进入阻塞
doAcquireShared(arg);
}
接下来看看doAcquireShared
是如何进行阻塞的。
private void doAcquireShared(int arg) {
//向等待队列中添加一个新的共享锁节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 无限循环
// 获取当前新建节点的前驱节点
final Node p = node.predecessor();
//如果前驱节点是头节点,说明当前节点是等待队列的队首节点
if (p == head) {
//再次尝试获取共享锁
int r = tryAcquireShared(arg);
if (r >= 0) { //获取成功
//将当前节点设置为头节点,并且继续唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 和独占锁类似,没有获取到共享锁,挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 如果最后还没有获取到,直接cancel
cancelAcquire(node);
}
}
原来阻塞就是挂起线程呀,我们其实早就知道了,只不过验证了下。而且上面的过程和独占锁其实很类似,不过在获取到节点后,不仅将当前线程设置成了头节点,而且还唤醒了后继节点。这就是共享锁的传播性:当前节点被唤醒后,后继节点也会被唤醒。这是因为可能不止一个线程调用了await
方法进行等待。
究竟是如何进行的传播与唤醒呢?走进setHeadAndPropagate
来一探究竟吧。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //取出头节点
setHead(node); //将当前节点设置为头节点
//doAcquireShared中传参propagate一定大于0
//waitStatus初始为0,SIGNAL=-1;
// CONDITION = -2,PROPAGATE = -3;
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 取出当前节点的后继节点
Node s = node.next;
//共享模式或者s为空,继续doReleaseShared
if (s == null || s.isShared())
//继续唤醒下一个节点
doReleaseShared();
}
}
原来就是判断是不是共享模式,是就继续调用doReleaseShared
唤醒下一个节点,doReleaseShared
后面会讲.
3 countdown源码剖析
了解完await
的底层原理,这里我们接下来看下countdown
方法的底层原理。
看看它的底层调用方法
public void countDown() {
sync.releaseShared(1);
}
点进releaseShared
public final boolean releaseShared(int arg) {
// 尝试释放锁,成功返回true
// tryReleaseShared在前面讲await源码时讲过
//只有当锁数量为0时才会释放成功
if (tryReleaseShared(arg)) {
// 继续唤醒后续节点
doReleaseShared();
return true;
}
return false;
}
原来countdown
和await
最后都会调用doReleaseShared
唤醒其它节点,前文留下的悬念是时候解开了,那就看看doReleaseShared
吧。
private void doReleaseShared() {
for (;;) {
Node h = head;
// 如果头节点不为空,而且头、尾节点不相同
//说明等待队列中存在节点
if (h != null && h != tail) {
// 获取头节点的等待状态
int ws = h.waitStatus;
// 如果是SIGNAL(表示后继节点被挂起),
//就将头节点的状态设置为初始值
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; //失败就重来
// 当头节点被唤醒,会唤醒头节点的后继节点
unparkSuccessor(h);
}
// 如果头节点的等待状态是初始状态0
// 尝试将其状态设置为PROPAGATE(表示后继节点已经被唤醒)
//PROPAGATE状态在setHeadAndPropagate中用到
//可以让唤醒操作向后继节点传播
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
可能看下来觉得还是比较凌乱,没关系,我们可以回过头对本文章的源码与本专栏的AQS源码多读几遍,这里也梳理下,方便大家理解。
- 共享锁是线程共享的,同一个时刻可能有多个线程拥有共享锁。
- 如果一个线程刚获取到了共享锁,那么在其之后等待的线程很有可能也能够获取到共享锁,因此需要传播唤醒后继节点
- 如果一个线程刚刚释放了线程锁,那么无论是共享锁还是独占锁,都需要传播唤醒后继节点。
这篇文章就更新完了,下篇文章将讲解循环屏障,CyclickBarrier,敬请期待。