Juc并发编程14——线程计数器CountdownLatch源码剖析

Java
345
0
0
2022-12-06
标签   Java多线程
前 言 🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端 🍌 专栏简介:juc并发编程,讲解锁原理、锁机制、线程池、AQS、并发容器、并发工具等,深入源码,持续更新。 🌰 文章简介:本文主要介绍常用的并发工具类:CountdownLatch,将深入剖析源码,讲解其使用与原理

线程计数器CountdownLatch源码剖析

文章目录

  • 线程计数器CountdownLatch源码剖析
  • 1 使用计数器锁实现任务计数
  • 2 await的源码剖析
  • 3 countdown源码剖析

1 使用计数器锁实现任务计数

多任务同步神器,它允许一个或多个线程,等待其它线程完成工作,比如我们现在有一个需求:

  • 有20个任务,需要将每个任务的执行结果算出来,但是每个任务执行的时间未知。
  • 当所有的任务执行结束后,立即整合统计所有的执行结果。

我们并不知道任务可以在什么时间完成,因此执行统计的时间不好设置,设置短了则还有任务没有完成,设置长了则统计延迟。

CountdownLatch可以做到,它是一个实现子任务同步的工具。Demo如下。

public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(20);
    for (int i = 0; i < 20; i++) {
        final int  finalI = i;
        new Thread(() ->{
            try {
                Thread.sleep((long)(2000 + new Random().nextDouble()));
                System.out.println("thread " + finalI + " finished");
            } catch (InterruptedException exception) {
                exception.printStackTrace();
            }
            latch.countDown(); // 相当于计数器,每次减少1
        }).start();
    }

    latch.await(); //可以多个线程同时等待,这里仅演示了一个线程进行等待
    System.out.println("all sub task finished!");
}

执行结果如下。

img

其实它就是一个线程计数器,注意CountDownLatch是一次性的,不能重复使用。比如下面再多调用一次latch.await,程序还是正常结束的(毕竟计数不可逆,已经是0了,而且无法将计数器重置).

 public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(20);
    for (int i = 0; i < 20; i++) {
        final int  finalI = i;
        new Thread(() ->{
            try {
                Thread.sleep((long)(2000 + new Random().nextDouble()));
                System.out.println("thread " + finalI + " finished");
            } catch (InterruptedException exception) {
                exception.printStackTrace();
            }
            latch.countDown();
        }).start();
    }

    latch.await();
    
    latch.await();
    System.out.println("all sub task finished!");
}

2 await的源码剖析

上面已经演示了使用,下面来看看它的原理吧。

public class CountDownLatch {
 	// 使用了AQS,不过是基于共享锁实现的  
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
			//这里实际上使用count作为了共享锁的state值
			// count数与共享锁的数量相同
			// 每调用一次countdown就是解一层锁
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }
			// 重写了共享锁的实现
			// 获取共享锁其实就是等待其它线程把state减到0 
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

			// 解锁的过程 
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero 
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

  
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count); //构造sync方法
    }

	//通过acquireSharedInterruptibly获取共享锁,
	// 但是如果state不为0,将会被持续阻塞,后文详解
	public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
	
	// 同上,但是会被阻塞
  	public boolean await(long timeout, TimeUnit unit) 
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
	//countDown其实就是解锁一次 
  public void countDown() {
        sync.releaseShared(1);
    }

// 获取当前的计数,也就是AQS的state值
	public long getCount() {
        return sync.getCount();
    }
    
	public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

我们终于知道CountDownLatch它的原理了,原来它就是利用共享锁来实现计数的,锁的数量就是计数数量,countdown的过程就是解锁的过程。

我们在该系列博客已经介绍过独占锁,但是没有深入剖析过共享锁,这里我们来深入共享锁的源码进行剖析,以便大家对CountDownLatch具有更为深入的理解。

点进AbstractQueuedSynchronizer类中。先看看acquireShared获取共享锁,这个就是CountDownLatchawait方法调用的底层方法(实际上是acquireSharedInterruptibly,不过原理是一样的)。

public final void acquireShared(int arg) {
	// 尝试获取共享锁,小于0则失败 
    if (tryAcquireShared(arg) < 0)
    	// 获取共享锁失败,进入阻塞 
        doAcquireShared(arg);
}

接下来看看doAcquireShared是如何进行阻塞的。

 private void doAcquireShared(int arg) {
 		//向等待队列中添加一个新的共享锁节点 
    final Node node = addWaiter(Node.SHARED); 
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { // 无限循环 
           // 获取当前新建节点的前驱节点 
            final Node p = node.predecessor(); 
            //如果前驱节点是头节点,说明当前节点是等待队列的队首节点 
            if (p == head) {
            	//再次尝试获取共享锁 
                int r = tryAcquireShared(arg);
                if (r >= 0) { //获取成功 
                   //将当前节点设置为头节点,并且继续唤醒后继节点
                    setHeadAndPropagate(node, r); 
                    p.next = null; // help GC 
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 和独占锁类似,没有获取到共享锁,挂起线程 
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
        	// 如果最后还没有获取到,直接cancel
            cancelAcquire(node);
    }
}

原来阻塞就是挂起线程呀,我们其实早就知道了,只不过验证了下。而且上面的过程和独占锁其实很类似,不过在获取到节点后,不仅将当前线程设置成了头节点,而且还唤醒了后继节点。这就是共享锁的传播性:当前节点被唤醒后,后继节点也会被唤醒。这是因为可能不止一个线程调用了await方法进行等待。

究竟是如何进行的传播与唤醒呢?走进setHeadAndPropagate来一探究竟吧。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; //取出头节点
    setHead(node); //将当前节点设置为头节点 
   //doAcquireShared中传参propagate一定大于0    
   //waitStatus初始为0,SIGNAL=-1;        
  // CONDITION = -2,PROPAGATE = -3; 
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        // 取出当前节点的后继节点 
        Node s = node.next;
        //共享模式或者s为空,继续doReleaseShared 
        if (s == null || s.isShared())
        	//继续唤醒下一个节点
            doReleaseShared(); 
    }
}

原来就是判断是不是共享模式,是就继续调用doReleaseShared唤醒下一个节点,doReleaseShared后面会讲.

3 countdown源码剖析

了解完await的底层原理,这里我们接下来看下countdown方法的底层原理。

看看它的底层调用方法

public void countDown() {
    sync.releaseShared(1);
}

点进releaseShared

public final boolean releaseShared(int arg) {
	// 尝试释放锁,成功返回true
	// tryReleaseShared在前面讲await源码时讲过
	//只有当锁数量为0时才会释放成功 
    if (tryReleaseShared(arg)) {
    		// 继续唤醒后续节点
        doReleaseShared();
        return true;
    }
    return false;
}

原来countdownawait最后都会调用doReleaseShared唤醒其它节点,前文留下的悬念是时候解开了,那就看看doReleaseShared吧。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 如果头节点不为空,而且头、尾节点不相同 
        //说明等待队列中存在节点 
        if (h != null && h != tail) {
        	// 获取头节点的等待状态 
            int ws = h.waitStatus;
            // 如果是SIGNAL(表示后继节点被挂起), 
            //就将头节点的状态设置为初始值 
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; //失败就重来           
                // 当头节点被唤醒,会唤醒头节点的后继节点
                unparkSuccessor(h);
            }
            // 如果头节点的等待状态是初始状态0 
            // 尝试将其状态设置为PROPAGATE(表示后继节点已经被唤醒) 
            //PROPAGATE状态在setHeadAndPropagate中用到 
            //可以让唤醒操作向后继节点传播 
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;         // loop on failed CAS
        }
        if (h == head)         // loop if head changed 
            break;
    }
}

可能看下来觉得还是比较凌乱,没关系,我们可以回过头对本文章的源码与本专栏的AQS源码多读几遍,这里也梳理下,方便大家理解。

  • 共享锁是线程共享的,同一个时刻可能有多个线程拥有共享锁。
  • 如果一个线程刚获取到了共享锁,那么在其之后等待的线程很有可能也能够获取到共享锁,因此需要传播唤醒后继节点
  • 如果一个线程刚刚释放了线程锁,那么无论是共享锁还是独占锁,都需要传播唤醒后继节点。

这篇文章就更新完了,下篇文章将讲解循环屏障,CyclickBarrier,敬请期待。