前 言 🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端 🍌 专栏简介:juc并发编程,讲解锁原理、锁机制、线程池、AQS、并发容器、并发工具等,深入源码,持续更新。 🌰 文章简介:本文主要介绍常用的并发工具类:Semaphore,Exchanger,Fork/Join框架,讲解其使用与原理
文章目录
- 1.Semaphore
- 2.数据交换Exchanger
- 3.Fork/Join框架
1.Semaphore
还记得我们在操作系统中学习的信号量吗?它在解决进程间的同步问题起着非常大的作用。
java中的信号量也有很大的作用,它可以限制一个代码块可以同时被访问的线程数量(加排他锁锁可以限制只被一个线程访问),相当于流量控制。简单来说,它就是一个可以被N个线程同时占用的排它锁(因此也支持公平锁和非公平锁)。在初始时,可以指定Semaphore的许可证个数,一个线程可以获取一个或者多个许可证,当许可证不足以供其它线程获取时,想要竞争同步资源的其它线程将会被阻塞。
public static void main(String[] args) { | |
Semaphore semaphore = new Semaphore(2); | |
for (int i = 0; i < 3; i++) { | |
new Thread(() -> { | |
try { | |
semaphore.acquire(); //可以设定参数指定许可证数量 | |
System.out.println("get a license..."); | |
Thread.sleep(100); | |
semaphore.release(); //可以设定参数指定许可证数量 | |
System.out.println("release a license..."); | |
} catch (InterruptedException exception) { | |
exception.printStackTrace(); | |
} | |
}).start(); | |
} | |
} |
输出结果如下。
其它的一些api真的也特别简单,这里写个demo演示下。
public class Demo36 { | |
public static void main(String[] args) throws InterruptedException { | |
Semaphore semaphore = new Semaphore(2); | |
for (int i = 0; i < 5; i++) { | |
new Thread(()->{ | |
try { | |
semaphore.acquire(); | |
} catch (InterruptedException exception) { | |
exception.printStackTrace(); | |
} | |
}).start(); | |
} | |
Thread.sleep(10); | |
// 获取剩余许可证数量 | |
System.out.println(semaphore.availablePermits()); | |
// 是否有等待线程 | |
System.out.println(semaphore.hasQueuedThreads()); | |
// 等待线程数量 | |
System.out.println(semaphore.getQueueLength()); | |
} | |
} |
结果如下。
许可证还可以被回收。
public static void main(String[] args) throws InterruptedException { | |
Semaphore semaphore = new Semaphore(3); | |
new Thread(() -> { | |
try { | |
semaphore.acquire(); | |
System.out.println("acquire permit"); | |
} catch (InterruptedException exception) { | |
exception.printStackTrace(); | |
} | |
}).start(); | |
TimeUnit.SECONDS.sleep(1); | |
System.out.println("drain permit number" + semaphore.drainPermits()); | |
new Thread(() -> { | |
try { | |
semaphore.acquire(); | |
System.out.println("acquire permit"); | |
} catch (InterruptedException exception) { | |
exception.printStackTrace(); | |
} | |
}).start(); | |
} |
结果如下。
2.数据交换Exchanger
Exchanger可以让两个线程在同一个时间点发生数据交换。
public static void main(String[] args) throws InterruptedException { | |
Exchanger<Object> exchanger = new Exchanger<>(); | |
new Thread(() -> { | |
try { | |
System.out.println(exchanger.exchange("AAA...")); | |
} catch (InterruptedException exception) { | |
exception.printStackTrace(); | |
} | |
}).start(); | |
exchanger.exchange("BBB"); | |
} |
结果如下。
过于简单,不再赘述。
3.Fork/Join框架
在jdk7中,出现了一个新的框架用于并行执行任务,它可以把大任务拆分为多个小任务并行执行,最大程度的利用多核cpu的优势,最后汇总执行的结果。很强大很高效吧。
比如计算:18x7 + 36 x 8 + 9 x 77 + 8 x 53,可以进行如下图的拆分汇总操作。
而且它不仅仅是拆分任务,使用多个线程并行执行任务,还可以工作窃取算法,提高线程的利用率。其原理是,把每个线程的任务进一步拆分为若干子任务,并且每个线程创建一个队列来存放自己的子任务,当某个线程的子任务全部完成,可以从其它线程的队列中获取任务执行。可以参考下图进行理解。
下面结合实例使用下。比如我们需要计算1-1000的和。
public static void main(String[] args) { | |
ForkJoinPool pool = new ForkJoinPool(); | |
try { | |
System.out.println(pool.submit(new SubTask(1, 1000)).get()); | |
} catch (InterruptedException exception) { | |
exception.printStackTrace(); | |
} catch (ExecutionException e) { | |
e.printStackTrace(); | |
} | |
} | |
static class SubTask extends RecursiveTask<Integer>{ | |
private int start; | |
private int end; | |
public SubTask(int start, int end) { | |
this.start = start; | |
this.end = end; | |
} | |
@Override | |
protected Integer compute() { | |
if((end - start) > 125) { | |
SubTask subTask1 = new SubTask(start, (start + end) / 2); | |
subTask1.fork(); | |
SubTask subTask2 = new SubTask((start + end) / 2 + 1, end); | |
subTask2.fork(); | |
return subTask1.join() + subTask2.join(); | |
} else { | |
System.out.println(Thread.currentThread().getName() + "start add from " + start +"end" +end); | |
int result = 0; | |
for (int i = start; i <= end; i++) { | |
result += i; | |
} | |
return result; | |
} | |
} | |
} |
其结果如下。
其实,arrays中的parallelSort
就是使用的Fork/Join
框架
public static void parallelSort(byte[] a) { | |
int n = a.length, p, g; | |
if (n <= MIN_ARRAY_SORT_GRAN || | |
(p = ForkJoinPool.getCommonPoolParallelism()) == 1) | |
DualPivotQuicksort.sort(a, 0, n - 1); | |
else | |
new ArraysParallelSortHelpers.FJByte.Sorter | |
(null, a, new byte[n], 0, n, 0, | |
((g = n / (p << 2)) <= MIN_ARRAY_SORT_GRAN) ? | |
MIN_ARRAY_SORT_GRAN : g).invoke(); | |
} |
在多核环境下,Fork/Join框架效率会随着运算规模增大而对效能的提升效果更显著,大规模运算推荐使用哦。