前 言 🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端 🍌 专栏简介:juc并发编程,讲解锁原理、锁机制、线程池、AQS、并发容器、并发工具等,深入源码,持续更新。 🌰 文章简介:本文主要介绍常用的并发工具类:循环屏障CyclickBarrier,将深入剖析源码,讲解其使用与原理
1.循环屏障的使用
如果打一场游戏,必须等待游戏的玩家足够以后才开始,并且为了公平,所有玩家必须同时进入游戏。循环屏障CyclickBarrier
就是为了解决这种场景而设计的.
假如现在游戏需要10人才能开始,并且所有玩家需要同时进入游戏。我们可以这样实现。
public static void main(String[] args) { | |
CyclicBarrier barrier = new CyclicBarrier(10, () -> { | |
System.out.println("Ready to start,please prepare..."); | |
}); | |
for (int i = 0; i < 10; i++) { | |
final int finalI = i; | |
new Thread(()->{ | |
try { | |
Thread.sleep((long) (2000 * new Random().nextDouble())); | |
System.out.println("Player:" + finalI + " prepared,"+ barrier.getNumberWaiting()+ "/10"); | |
barrier.await(); | |
System.out.println("Player:" + finalI + " Join Game..."); | |
} catch (InterruptedException | BrokenBarrierException exception) { | |
exception.printStackTrace(); | |
} | |
}).start(); | |
} | |
} |
输出结果如下。
可以看到循环屏障会不断的阻挡线程,知道线程数量足够多时,再一起冲破线程屏障。并且在冲破屏障后,可以执行屏障创建时指定的任务。
屏障是可以循环使用的,在被冲破后,会重新开始计数,继续阻挡后续的线程。比如我们将屏障的初始容量设置为5,看看执行结果。
public static void main(String[] args) { | |
CyclicBarrier barrier = new CyclicBarrier(5, () -> { | |
System.out.println("Ready to start,please prepare..."); | |
}); | |
for (int i = 0; i < 10; i++) { | |
final int finalI = i; | |
new Thread(()->{ | |
try { | |
Thread.sleep((long) (2000 * new Random().nextDouble())); | |
System.out.println("Player:" + finalI + " prepared,"+ barrier.getNumberWaiting()+ "/5"); | |
barrier.await(); | |
System.out.println("Player:" + finalI + " Join Game..."); | |
} catch (InterruptedException | BrokenBarrierException exception) { | |
exception.printStackTrace(); | |
} | |
}).start(); | |
} | |
} |
执行结果如下。
当然,除了自动清零,我们也可以将循环屏障手动置零。
public static void main(String[] args) throws InterruptedException { | |
CyclicBarrier barrier = new CyclicBarrier(5, () -> { | |
System.out.println("Ready to start,please prepare..."); | |
}); | |
for (int i = 0; i < 3; i++) { | |
new Thread(()->{ | |
try { | |
barrier.await(); | |
} catch (InterruptedException | BrokenBarrierException exception) { | |
exception.printStackTrace(); | |
} | |
}).start(); | |
} | |
Thread.sleep(500); | |
System.out.println(barrier.getNumberWaiting()); | |
barrier.reset(); | |
System.out.println(barrier.getNumberWaiting()); | |
} |
执行结果如下。报了BrokenBarrierException
,这是因为在循环屏障数达到3以后,还没有冲破屏障,我们就将循环屏障的计数清零了,之前处于等待状态的线程全部被中断,屏障被破坏了。
要是处于等待状态的线程被中断了呢?循环屏障的计数会不会自动减少?
public static void main(String[] args) { | |
CyclicBarrier barrier = new CyclicBarrier(5, () -> { | |
System.out.println("Ready to start,please prepare..."); | |
}); | |
Runnable r = ()-> | |
{ | |
try { | |
barrier.await(); | |
} catch (InterruptedException | BrokenBarrierException exception) { | |
exception.printStackTrace(); | |
} | |
}; | |
Thread t = new Thread(r); | |
t.start(); | |
t.interrupt(); | |
new Thread(r).start(); | |
} |
其结果如下。
第一个异常那个信息很好理解,是异常中断。第二个异常信息是因为屏障里的线程被取消,导致本轮屏障被破坏了。可以这么理解,约了三个朋友一起打麻将,结果有一个坑爹的队友临时爽约了,那他一个人就导致这局麻将组不成了。当然,我们还可以重新组局,我们也可以使用reset
对屏障重新计数。
public static void main(String[] args) throws InterruptedException { | |
CyclicBarrier barrier = new CyclicBarrier(5, () -> { | |
System.out.println("Ready to start,please prepare..."); | |
}); | |
Runnable r = ()-> | |
{ | |
try { | |
barrier.await(); | |
} catch (InterruptedException | BrokenBarrierException exception) { | |
exception.printStackTrace(); | |
} | |
}; | |
Thread t = new Thread(r); | |
t.start(); | |
t.interrupt(); | |
Thread.sleep(500); // 等待中断结束 | |
barrier.reset(); | |
new Thread(r).start(); | |
} |
执行结果如下。
大家是不是有种感觉,CountdownLatch
和CyclickBarrier
还挺相似的。我们来总结下他们的区别。
- CountdownLatch 一次性的,仅仅可以使用一次 多个线程等待指定数量的其它线程完成任务的同步工具
- CyclickBarrier 可以重复使用 多个线程在同一个时间开始执行的工具
2.循环屏障的源码剖析
public class CyclicBarrier { | |
// 每一代都会生成新的Generation | |
private static class Generation { | |
// broken标记,用来存放屏障是否被损坏 | |
// 被损坏的屏障是不能被使用的 | |
boolean broken = false; | |
} | |
/** 内部维护一个可重入锁 */ | |
private final ReentrantLock lock = new ReentrantLock(); | |
/** 内部维护一个Condition */ | |
private final Condition trip = lock.newCondition(); | |
/** 屏障的最大容量 */ | |
private final int parties; | |
/* 冲破屏障后被执行的任务 */ | |
private final Runnable barrierCommand; | |
/** 生成当前轮的Generation */ | |
private Generation generation = new Generation(); | |
// 默认为最大的阻挡容量,每加入一个线程减1 | |
// 与CountDownLatch类似 | |
// 当屏障被冲破或重置,会将count重置为最大的阻挡容量 | |
private int count; | |
// 当屏障被冲破后,将调用该方法开启下一轮 | |
private void nextGeneration() { | |
// 唤醒所有等待中的线程 | |
trip.signalAll(); | |
// 重置count | |
count = parties; | |
//创建新的Generation对象 | |
generation = new Generation(); | |
} | |
// 破坏当前的屏障,破坏后当前轮次的屏障就不能够再使用了 | |
// 除非重置生成新代 | |
private void breakBarrier() { | |
generation.broken = true; | |
count = parties; | |
trip.signalAll(); | |
} | |
// 开始等待 | |
public int await() throws InterruptedException, BrokenBarrierException { | |
try { | |
return dowait(false, 0L); | |
} catch (TimeoutException toe) { | |
// 由于这里没有使用时间策略,因此如果出现超时,就是异常状况 | |
throw new Error(toe); | |
} | |
} | |
// 可超时的等待 | |
public int await(long timeout, TimeUnit unit) | |
throws InterruptedException, | |
BrokenBarrierException, | |
TimeoutException { | |
return dowait(true, unit.toNanos(timeout)); | |
} | |
// 真正的等待流程 | |
private int dowait(boolean timed, long nanos) | |
throws InterruptedException, BrokenBarrierException, | |
TimeoutException { | |
final ReentrantLock lock = this.lock; | |
//加锁 因为会有多个线程同时调用await方法, | |
// 需要保证每次只有一个线程能进入 | |
lock.lock(); | |
try { | |
final Generation g = generation; | |
// 确定屏障未被破坏 | |
if (g.broken) | |
throw new BrokenBarrierException(); | |
// 需要破坏屏障的第一种情况:线程中断 | |
if (Thread.interrupted()) { | |
breakBarrier(); | |
throw new InterruptedException(); | |
} | |
int index = --count; | |
// 可以冲破屏障了 | |
if (index == 0) { | |
boolean ranAction = false; | |
try { | |
// 执行冲破屏障后的任务 | |
final Runnable command = barrierCommand; | |
if (command != null) | |
command.run(); | |
ranAction = true; | |
// 更新代数 | |
nextGeneration(); | |
return 0; | |
} finally { | |
// 损坏屏障的第二种情况:执行任务异常 | |
if (!ranAction) | |
breakBarrier(); | |
} | |
} | |
// 走到这说明加入的线程数量不够冲破屏障 | |
for (;;) { // 无限循环,直到冲破屏障,超时或者出现异常 | |
try { | |
// 看看是否是限时的 | |
if (!timed) | |
// breakBarrier|nextGeneration会唤醒trip | |
trip.await(); //非定时永久等待 | |
else if (nanos > 0L) //定时等待指定时间 | |
nanos = trip.awaitNanos(nanos); | |
} catch (InterruptedException ie) { | |
// 破坏屏障的第一种情况:中断 | |
if (g == generation && ! g.broken) { | |
breakBarrier(); | |
throw ie; | |
} else { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
//走到这说明trip被唤醒 | |
// 即使被唤醒,但是屏障被损坏的情况还是需要抛异常 | |
if (g.broken) | |
throw new BrokenBarrierException(); | |
// 代数有更新,说明进行了换代 | |
// 返回,并带返回参数:当前是第几个等待的线程 | |
if (g != generation) | |
return index; | |
// 等待超时,破坏屏障的第三种情况 | |
if (timed && nanos <= 0L) { | |
breakBarrier(); | |
throw new TimeoutException(); | |
} | |
} | |
} finally { | |
lock.unlock(); | |
} | |
} | |
public CyclicBarrier(int parties, Runnable barrierAction) { | |
if (parties <= 0) throw new IllegalArgumentException(); | |
this.parties = parties; | |
this.count = parties; | |
this.barrierCommand = barrierAction; | |
} | |
public CyclicBarrier(int parties) { | |
this(parties, null); | |
} | |
public int getParties() { | |
return parties; | |
} | |
public boolean isBroken() { | |
final ReentrantLock lock = this.lock; | |
lock.lock(); | |
try { | |
return generation.broken; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
public void reset() { | |
final ReentrantLock lock = this.lock; | |
lock.lock(); | |
try { | |
breakBarrier(); // break the current generation | |
nextGeneration(); // start a new generation | |
} finally { | |
lock.unlock(); | |
} | |
} | |
public int getNumberWaiting() { | |
final ReentrantLock lock = this.lock; | |
lock.lock(); | |
try { | |
return parties - count; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
} |
上面是通过内部类generation来实现屏障的更新迭代的,这种处理方式值得关注学习。
除此以外,上面的源码部分应该很好理解,这里就介绍到这里,下一篇文章将介绍并发工具类Semaphore和Exchanger,以及Fork/Join框架,这也会是并发编程基础篇的最后一篇,后面笔者还可能输出一些高阶内容,敬请期待。