第十四章 java中的daemon thread
java中有两种类型的 thread ,user threads 和 daemon threads。
User threads是高优先级的thread,JVM将会等待所有的User Threads运行完毕之后才会结束运行。
daemon threads是低优先级的 Thread ,它的作用是为User Thread提供服务。 因为daemon threads的低优先级,并且仅为user thread提供服务,所以当所有的user thread都结束之后,JVM会自动退出,不管是否还有daemon threads在运行中。
因为这个特性,所以我们通常在daemon threads中处理无限循环的操作,因为这样不会影响user threads的运行。
daemon threads并不推荐使用在I/O操作中。
但是有些不当的操作也可能导致daemon threads阻塞JVM关闭,比如在daemon thread中调用join()方法。
我们看下怎么创建daemon thread:
public class DaemonThread extends Thread{
public void run(){
while(true){
log.info("Thread A run");
try {
log.info("Thread A is daemon {}" ,Thread.currentThread().isDaemon());
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
DaemonThread daemonThread = new DaemonThread();
daemonThread.setDaemon(true);
daemonThread.start();
}
}
创建 daemon thread很简单,只需要在创建之后,设置其daemon属性为true即可。
上面的例子将会立刻退出。
如果我们将daemonThread.setDaemon(true);去掉,则因为user thread一直执行,JVM将会一直运行下去,不会退出。
这是在main中运行的情况,如果我们在一个@Test中运行,会发生什么现象呢?
public class DaemonThread extends Thread{
public void run(){
while(true){
log.info("Thread A run");
try {
log.info("Thread A is daemon {}" ,Thread.currentThread().isDaemon());
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Test
public void testDaemon() throws InterruptedException {
DaemonThread daemonThread = new DaemonThread();
daemonThread.start();
}
}
我们将main方法改成了@Test执行。执行之后我们会发现,不管是不是daemon thread, Test都会立刻结束。
再看一个daemon线程中启动一个user thread的情况:
public class DaemonBThread extends Thread{
Thread worker = new Thread(()->{
while(true){
log.info("Thread B run");
log.info("Thread B is daemon {}",Thread.currentThread().isDaemon());
}
});
public void run(){
log.info("Thread A run");
worker.start();
}
public static void main(String[] args) {
DaemonBThread daemonThread = new DaemonBThread();
daemonThread.setDaemon(true);
daemonThread.start();
}
}
这个例子中,daemonThread启动了一个user thread,运行之后我们会发现,即使有user thread正在运行,JVM也会立刻结束执行。
第十五章 java中ThreadPool的介绍和使用
Thread Pool简介
在Java中,threads是和系统的threads相对应的,用来处理一系列的系统资源。不管在windows和linux下面,能开启的线程个数都是有限的,如果你在java程序中无限制的创建thread,那么将会遇到无线程可创建的情况。
CPU 的核数是有限的,如果同时有多个 线程 正在运行中,那么CPU将会根据线程的优先级进行轮循,给每个线程分配特定的CPU时间。所以线程也不是越多越好。
在java中,代表管理ThreadPool的接口有两个:ExecutorService和Executor。
我们运行线程的步骤一般是这样的:1. 创建一个ExecutorService。 2.将任务提交给ExecutorService。3.ExecutorService调度线程来运行任务。
画个图来表示:
下面我讲一下,怎么在java中使用ThreadPool。
Executors, Executor 和 ExecutorService
Executors 提供了一系列简便的方法,来帮助我们创建ThreadPool。
Executor接口定义了一个方法:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/ void execute(Runnable command);
}
ExecutorService继承了Executor,提供了更多的 线程池 的操作。是对Executor的补充。
根据接口实现分离的原则,我们通常在java代码中使用ExecutorService或者Executor,而不是具体的实现类。
我们看下怎么通过Executors来创建一个Executor和ExecutorService:
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> log.info("in Executor"));
ExecutorService executorService= Executors.newCachedThreadPool();
executorService.submit(()->log.info("in ExecutorService"));
executorService.shutdown();
关于ExecutorService的细节,我们这里就多讲了,感兴趣的朋友可以参考之前我写的ExecutorService的详细文章。
ThreadPoolExecutor
ThreadPoolExecutor是ExecutorService接口的一个实现,它可以为线程池添加更加精细的配置,具体而言它可以控制这三个参数:corePoolSize, maximumPoolSize, 和 keepAliveTime。
PoolSize就是线程池里面的线程个数,corePoolSize表示的是线程池里面初始化和保持的最小的线程个数。
如果当前等待线程太多,可以设置maximumPoolSize来提供最大的线程池个数,从而线程池会创建更多的线程以供任务执行。
keepAliveTime是多余的线程未分配任务将会等待的时间。超出该时间,线程将会被线程池回收。
我们看下怎么创建一个ThreadPoolExecutor:
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
threadPoolExecutor.submit(()->log.info("submit through threadPoolExecutor"));
threadPoolExecutor.shutdown();
上面的例子中我们通过ThreadPoolExecutor的构造函数来创建ThreadPoolExecutor。
通常来说Executors已经内置了ThreadPoolExecutor的很多实现,我们来看下面的例子:
ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newFixedThreadPool();
executor.submit(() -> {
Thread.sleep();
return null;
});
executor.submit(() -> {
Thread.sleep();
return null;
});
executor.submit(() -> {
Thread.sleep();
return null;
});
log.info("executor poolsize {}",executor1.getPoolSize());
log.info("executor queuesize {}", executor1.getQueue().size());
executor.shutdown();
上的例子中我们Executors.newFixedThreadPool(2)来创建一个ThreadPoolExecutor。
上面的例子中我们提交了3个task。但是我们pool size只有2。所以还有一个1个不能立刻被执行,需要在queue中等待。
我们再看一个例子:
ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
Thread.sleep();
return null;
});
executor.submit(() -> {
Thread.sleep();
return null;
});
executor.submit(() -> {
Thread.sleep();
return null;
});
log.info("executor poolsize {}", executor2.getPoolSize());
log.info("executor queue size {}", executor2.getQueue().size());
executor.shutdown();
上面的例子中我们使用Executors.newCachedThreadPool()来创建一个ThreadPoolExecutor。 运行之后我们可以看到poolsize是3,而queue size是0。这表明newCachedThreadPool会自动增加pool size。
如果thread在60秒钟之类没有被激活,则会被收回。
这里的Queue是一个SynchronousQueue,因为插入和取出基本上是同时进行的,所以这里的queue size基本都是0.
ScheduledThreadPoolExecutor
还有个很常用的ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor, 并且实现了ScheduledExecutorService接口。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
我们看下怎么使用:
ScheduledExecutorService executor = Executors.newScheduledThreadPool();
executor.schedule(() -> {
log.info("Hello World");
},, TimeUnit.MILLISECONDS);
上面的例子中,我们定义了一个定时任务将会在500毫秒之后执行。
之前我们也讲到了ScheduledExecutorService还有两个非常常用的方法:
- scheduleAtFixedRate – 以开始时间为间隔。
- scheduleWithFixedDelay – 以结束时间为间隔。
CountDownLatch lock = new CountDownLatch();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
log.info("in ScheduledFuture");
lock.countDown();
},, 100, TimeUnit.MILLISECONDS);
lock.await(, TimeUnit.MILLISECONDS);
future.cancel(true);
ForkJoinPool
ForkJoinPool是在java 7 中引入的新框架,我们将会在后面的文章中详细讲解。 这里做个简单的介绍。
ForkJoinPool主要用来生成大量的任务来做算法运算。如果用线程来做的话,会消耗大量的线程。但是在fork/join框架中就不会出现这个问题。
在fork/join中,任何task都可以生成大量的子task,然后通过使用join()等待子task结束。
这里我们举一个例子:
static class TreeNode {
int value;
Set<TreeNode> children;
TreeNode(int value, TreeNode... children) {
this.value = value;
this.children = Sets.new Hash Set(children);
}
}
定义一个TreeNode,然后遍历所有的value,将其加起来:
public class CountingTask extends RecursiveTask<Integer> {
private final TreeNode node;
public CountingTask(TreeNode node) {
this.node = node;
}
@ Override
protected Integer compute() {
return node.value + node.children.stream()
.map(childNode -> new CountingTask(childNode).fork()).mapToInt(ForkJoinTask::join).sum();
}
}
下面是调用的代码:
public static void main(String[] args) {
TreeNode tree = new TreeNode(,
new TreeNode(), new TreeNode(2,
new TreeNode(), new TreeNode(8)));
ForkJoinPool fork JoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));
}
第十六章 java 中的fork join框架
fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力。
fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一个任务的结果join起来,生成最后的结果。如果第一步中并没有任何返回值,join将会等到所有的小任务都结束。
还记得之前的文章我们讲到了thread pool的基本结构吗?
- ExecutorService – ForkJoinPool 用来调用任务执行。
- workerThread – ForkJoinWorkerThread 工作线程,用来执行具体的任务。
- task – ForkJoinTask 用来定义要执行的任务。
下面我们从这三个方面来详细讲解fork join框架。
ForkJoinPool
ForkJoinPool是一个ExecutorService的一个实现,它提供了对工作线程和线程池的一些便利管理方法。
public class ForkJoinPool extends AbstractExecutorService
一个work thread一次只能处理一个任务,但是ForkJoinPool并不会为每个任务都创建一个单独的线程,它会使用一个特殊的数据结构double-ended queue来存储任务。这样的结构可以方便的进行工作窃取(work-stealing)。
什么是work-stealing呢?
默认情况下,work thread从分配给自己的那个队列头中取出任务。如果这个队列是空的,那么这个work thread会从其他的任务队列尾部取出任务来执行,或者从全局队列中取出。这样的设计可以充分利用work thread的性能,提升并发能力。
下面看下怎么创建一个ForkJoinPool。
最常见的方法就是使用ForkJoinPool.commonPool()来创建,commonPool()为所有的ForkJoinTask提供了一个公共默认的线程池。
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
另外一种方式是使用 构造函数 :
ForkJoinPool forkJoinPool = new ForkJoinPool();
这里的参数是并行级别,2指的是线程池将会使用2个处理器核心。
ForkJoinWorkerThread
ForkJoinWorkerThread是使用在ForkJoinPool的工作线程。
public class ForkJoinWorkerThread extends Thread
}
和一般的线程不一样的是它定义了两个变量:
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
一个是该worker thread所属的ForkJoinPool。 另外一个是支持 work-stealing机制的Queue。
再看一下它的run方法:
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
简单点讲就是从Queue中取出任务执行。
ForkJoinTask
ForkJoinTask是ForkJoinPool中运行的任务类型。通常我们会用到它的两个子类:RecursiveAction和RecursiveTask。
他们都定义了一个需要实现的compute()方法用来实现具体的业务逻辑。不同的是RecursiveAction只是用来执行任务,而RecursiveTask可以有返回值。
既然两个类都带了Recursive,那么具体的实现逻辑也会跟递归有关,我们举个使用RecursiveAction来打印字符串的例子:
public class CustomRecursiveAction extends RecursiveAction {
private String workload = "";
private static final int THRESHOLD =;
private static Logger logger =
Logger.getAnonymousLogger();
public CustomRecursiveAction(String workload) {
this.workload = workload;
}
@Override
protected void compute() {
if (workload.length() > THRESHOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}
private List<CustomRecursiveAction> createSubtasks() {
List<CustomRecursiveAction> subtasks = new ArrayList<>();
String partOne = workload.substring(, workload.length() / 2);
String partTwo = workload.substring(workload.length() /, workload.length());
subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));
return subtasks;
}
private void processing(String work) {
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
}
}
上面的例子使用了二分法来打印字符串。
我们再看一个RecursiveTask的例子:
public class CustomRecursiveTask extends RecursiveTask<Integer> {
private int[] arr;
private static final int THRESHOLD =;
public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}
@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}
private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr,, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length /, arr.length)));
return dividedTasks;
}
private Integer processing(int[] arr) {
return Arrays.stream(arr)
.filter(a -> a > && a < 27)
.map(a -> a *)
.sum();
}
}
和上面的例子很像,不过这里我们需要有返回值。
在ForkJoinPool中提交Task
有了上面的两个任务,我们就可以在ForkJoinPool中提交了:
int[] intArray= {,12,13,14,15};
CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);
int result = forkJoinPool.invoke(customRecursiveTask);
System.out.println(result);
上面的例子中,我们使用invoke来提交,invoke将会等待任务的执行结果。
如果不使用invoke,我们也可以将其替换成fork()和join():
customRecursiveTask.fork();
int result= customRecursiveTask.join();
System.out.println(result);
fork() 是将任务提交给pool,但是并不触发执行, join()将会真正的执行并且得到返回结果。
第十七章 java并发中CountDownLatch的使用
在java并发中,控制共享变量的访问非常重要,有时候我们也想控制并发线程的执行顺序,比如:等待所有线程都执行完毕之后再执行另外的线程,或者等所有线程都准备好了才开始所有线程的执行等。
这个时候我们就可以使用到CountDownLatch。
简单点讲,CountDownLatch存有一个放在QueuedSynchronizer中的计数器。当调用countdown() 方法时,该计数器将会减一。然后再调用await()来等待计数器归零。
private static final class Sync extends AbstractQueuedSynchronizer {
...
}
private final Sync sync;
public void countDown() {
sync.releaseShared();
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly();
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(, unit.toNanos(timeout));
}
下面我们举两个使用的例子:
主线程等待子线程全都结束之后再开始运行
这里我们定义子线程类,在子线程类里面,我们传入一个CountDownLatch用来计数,然后在子线程结束之前,调用该CountDownLatch的countDown方法。最后在主线程中调用await()方法来等待子线程结束执行。
@Slfj
public class MainThreadWaitUsage implements Runnable {
private List<String> outputScraper;
private CountDownLatch countDownLatch;
public MainThreadWaitUsage(List<String> outputScraper, CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
outputScraper.add("Counted down");
countDownLatch.countDown();
}
}
看下怎么调用:
@Test
public void testCountDownLatch()
throws InterruptedException {
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch();
List<Thread> workers = Stream
.generate(() -> new Thread(new MainThreadWaitUsage(outputScraper, countDownLatch)))
.limit()
.collect(toList());
workers.forEach(Thread::start);
countDownLatch.await();
outputScraper.add("Latch released");
log.info(outputScraper.toString());
}
执行结果如下:
:37:27.388 [main] INFO MainThreadWaitUsageTest - [Counted down, Counted down, Counted down, Counted down, Counted down, Latch released]
等待所有线程都准备好再一起执行
上面的例子中,我们是主线程等待子线程,那么在这个例子中,我们将会看看怎么子线程一起等待到准备好的状态,再一起执行。
思路也很简单,在子线程开始之后,将等待的子线程计数器减一,在主线程中await该计数器,等计数器归零之后,主线程再通知子线程运行。
public class ThreadWaitThreadUsage implements Runnable {
private List<String> outputScraper;
private CountDownLatch readyThreadCounter;
private CountDownLatch callingThreadBlocker;
private CountDownLatch completedThreadCounter;
public ThreadWaitThreadUsage(
List<String> outputScraper,
CountDownLatch readyThreadCounter,
CountDownLatch callingThreadBlocker,
CountDownLatch completedThreadCounter) {
this.outputScraper = outputScraper;
this.readyThreadCounter = readyThreadCounter;
this.callingThreadBlocker = callingThreadBlocker;
this.completedThreadCounter = completedThreadCounter;
}
@Override
public void run() {
readyThreadCounter.countDown();
try {
callingThreadBlocker.await();
outputScraper.add("Counted down");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completedThreadCounter.countDown();
}
}
}
看下怎么调用:
@Test
public void testCountDownLatch()
throws InterruptedException {
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch readyThreadCounter = new CountDownLatch();
CountDownLatch callingThreadBlocker = new CountDownLatch();
CountDownLatch completedThreadCounter = new CountDownLatch();
List<Thread> workers = Stream
.generate(() -> new Thread(new ThreadWaitThreadUsage(
outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
.limit()
.collect(toList());
workers.forEach(Thread::start);
readyThreadCounter.await();
outputScraper.add("Workers ready");
callingThreadBlocker.countDown();
completedThreadCounter.await();
outputScraper.add("Workers complete");
log.info(outputScraper.toString());
}
输出结果如下:
:41:47.861 [main] INFO ThreadWaitThreadUsageTest - [Workers ready, Counted down, Counted down, Counted down, Counted down, Counted down, Workers complete]
停止CountdownLatch的await
如果我们调用await()方法,该方法将会等待一直到count=0才结束。但是如果在线程执行过程中出现了异常,可能导致countdown方法执行不了。那么await()方法可能会出现无限等待的情况。
这个时候我们可以使用:
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(, unit.toNanos(timeout));
}
第十八章 java中CyclicBarrier的使用
CyclicBarrier是java 5中引入的线程安全的组件。它有一个barrier的概念,主要用来等待所有的线程都执行完毕,然后再去执行特定的操作。
假如我们有很多个线程,每个线程都计算出了一些数据,然后我们需要等待所有的线程都执行完毕,再把各个线程计算出来的数据加起来,的到最终的结果,那么我们就可以使用CyclicBarrier。
CyclicBarrier的方法
我们先看下CyclicBarrier的构造函数:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <=) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier有两个构造函数,第一个只接受一个参数,表示需要统一行动的线程个数。第二个参数叫做barrierAction,表示出发barrier是需要执行的方法。
其中barrierAction是一个Runnable,我们可以在其中定义最后需要执行的工作。
再看下重要await方法:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false,L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
await也有两个方法,一个是带时间参数的,一个是不带时间参数的。
await本质上调用了lock.newCondition().await()方法。
因为有多个parties,下面我们考虑两种情况。
- 该线程不是最后一个调用await的线程
在这种情况下,该线程将会进入等待状态,直到下面的情况发送:
- 最后一个线程调用await()
- 其他线程中断了当前线程
- 其他线程中断了其他正在等待的线程
- 其他线程在等待barrier的时候超时
- 其他线程在该barrier上调用的reset()方法
如果该线程在调用await()的时候已经设置了interrupted的状态,或者在等待的时候被interrupted,那么将会抛出InterruptedException异常,并清除中断状态。(这里和Thread的interrupt()方法保持一致)
如果任何线程正在等待状态中,这时候barrier被重置。或者在线程调用await方法或者正在等待中,barrier被broken,那么将会抛出BrokenBarrierException。
如果任何线程在等待的时候被中断,那么所有其他等待的线程将会抛出BrokenBarrierException,barrier将会被置为broken状态。
- 如果该线程是最后一个调用await方法的
在这种情况,如果barrierAction不为空,那么该线程将会在其他线程继续执行前调用这个barrierAction。
如果该操作抛出异常,那么barrier的状态将会被置为broken状态。
再看看这个reset() 方法:
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
该方法将会将barrier置为broken状态,并且开启一个新的generation,来进行下一轮的操作。
CyclicBarrier的使用
我们在子线程中生成一个随机的整数队列,当所有的线程都生成完毕之后,我们再将生成的整数全都加起来。看下怎么实现。
定义生成整数队列的子线程:
public class CyclicBarrierUsage implements Runnable {
private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults;
private Random random = new Random();
public CyclicBarrierUsage(CyclicBarrier cyclicBarrier,List<List<Integer>> partialResults){
this.cyclicBarrier=cyclicBarrier;
this.partialResults=partialResults;
}
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();
// Crunch some numbers and store the partial result
for (int i =; i < 10; i++) {
Integer num = random.nextInt();
System.out.println(thisThreadName
+ ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}
partialResults.add(partialResult);
try {
System.out.println(thisThreadName
+ " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException e) {
// ...
} catch (BrokenBarrierException e) {
// ...
}
}
}
上面的子线程接收外部传入的cyclicBarrier和保存数据的partialResults,并在运行完毕调用cyclicBarrier.await()来等待其他线程执行完毕。
看下CyclicBarrier的构建:
CyclicBarrier cyclicBarrier=new CyclicBarrier(,()->{
String thisThreadName = Thread.currentThread().getName();
System.out.println(
thisThreadName + ": Computing sum of workers, having 10 results each.");
int sum =;
for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult+" ");
sum += partialResult;
}
System.out.println();
}
System.out.println(thisThreadName + ": Final result = " + sum);
});
在CyclicBarrier中,我们定义了一个BarrierAction来做最后数据的汇总处理。
运行:
for (int i =; i < 5; i++) {
Thread worker = new Thread(new CyclicBarrierUsage(cyclicBarrier,partialResults));
worker.setName("Thread " + i);
worker.start();
}
输出结果如下:
Spawning worker threads to compute 10 partial results each
Thread: Crunching some numbers! Final result - 5
Thread: Crunching some numbers! Final result - 3
Thread: Crunching some numbers! Final result - 1
Thread: Crunching some numbers! Final result - 7
Thread: Crunching some numbers! Final result - 8
Thread: Crunching some numbers! Final result - 4
Thread: Crunching some numbers! Final result - 6
Thread: Crunching some numbers! Final result - 9
Thread: Crunching some numbers! Final result - 3
Thread: Crunching some numbers! Final result - 1
Thread: Crunching some numbers! Final result - 0
Thread: Crunching some numbers! Final result - 9
Thread: Crunching some numbers! Final result - 3
Thread: Crunching some numbers! Final result - 7
Thread: Crunching some numbers! Final result - 2
Thread: Crunching some numbers! Final result - 6
Thread: Crunching some numbers! Final result - 6
Thread: Crunching some numbers! Final result - 5
Thread: Crunching some numbers! Final result - 0
Thread: Crunching some numbers! Final result - 1
Thread: Crunching some numbers! Final result - 5
Thread: Crunching some numbers! Final result - 1
Thread: Crunching some numbers! Final result - 7
Thread: Crunching some numbers! Final result - 8
Thread: Crunching some numbers! Final result - 2
Thread: Crunching some numbers! Final result - 4
Thread waiting for others to reach barrier.
Thread: Crunching some numbers! Final result - 0
Thread waiting for others to reach barrier.
Thread: Crunching some numbers! Final result - 7
Thread: Crunching some numbers! Final result - 6
Thread: Crunching some numbers! Final result - 9
Thread waiting for others to reach barrier.
Thread: Crunching some numbers! Final result - 9
Thread: Crunching some numbers! Final result - 3
Thread: Crunching some numbers! Final result - 8
Thread: Crunching some numbers! Final result - 8
Thread: Crunching some numbers! Final result - 1
Thread: Crunching some numbers! Final result - 8
Thread: Crunching some numbers! Final result - 0
Thread: Crunching some numbers! Final result - 5
Thread: Crunching some numbers! Final result - 9
Thread: Crunching some numbers! Final result - 1
Thread waiting for others to reach barrier.
Thread: Crunching some numbers! Final result - 2
Thread: Crunching some numbers! Final result - 2
Thread: Crunching some numbers! Final result - 5
Thread: Crunching some numbers! Final result - 5
Thread: Crunching some numbers! Final result - 3
Thread: Crunching some numbers! Final result - 7
Thread: Crunching some numbers! Final result - 4
Thread: Crunching some numbers! Final result - 8
Thread: Crunching some numbers! Final result - 4
Thread: Crunching some numbers! Final result - 3
Thread waiting for others to reach barrier.
Thread: Computing sum of 5 workers, having 10 results each.
Adding 3 7 4 6 9 0 2 0 7
Adding 9 7 6 5 1 1 8 4 0
Adding 8 3 3 6 5 2 7 6 9
Adding 3 8 8 1 8 0 5 9 1
Adding 2 5 5 3 7 4 8 4 3
Thread: Final result = 230
Process finished with exit code
第十九章 在java中使用JMH(Java Microbenchmark Harness)做性能测试
JMH的全称是Java Microbenchmark Harness,是一个open JDK中用来做性能测试的套件。该套件已经被包含在了JDK 12中。
本文将会讲解如何使用JMH来在java中做性能测试。
如果你使用的不是JDK 12,那么需要添加如下依赖:
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>.19</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>.19</version>
</dependency>
使用JMH做性能测试
如果我们想测试某个方法的性能,一般来说就是重复执行某个方法n次,求出总的执行时间,然后求平均值。
但是这样通常会有一些问题,比如程序的头几次执行通常会比较慢,因为JVM会对多次执行的代码进行优化。另外得出的统计结果也不够直观,需要我们自行解析。
如果使用JMH可以轻松解决这些问题。
在JMH中,将要测试的方法添加@Benchmark注解即可:
@Benchmark
public void measureThroughput() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep();
}
看下怎么调用:
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(BenchMarkUsage.class.getSimpleName())
// .include(BenchMarkUsage.class.getSimpleName()+".*measureThroughput*")
// 预热轮
.warmupIterations()
// 度量轮
.measurementIterations()
.forks()
.build();
new Runner(opt).run();
}
上面的例子,我们通过OptionsBuilder的include方法添加了需要进行测试的类。
默认情况下,该类的所有@Benchmark方法都将会被测试,如果我们只想测试其中的某个方法,我们可以在类后面加上方法的名字:
.include(BenchMarkUsage.class.getSimpleName()+".*measureAll*")
上面的代码支持通配符。
warmupIterations(3)意思是在真正的执行前,先热身三次。
measurementIterations(5)表示我们将方法运行5次来测试性能。
forks(1)表示启动一个进程来执行这个任务。
上面是最基本的运行,我们看下运行结果:
# JMH version:.19
# VM version: JDK.8.0_171, VM 25.171-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk.8.0_171.jdk/Contents/Home/jre/bin/java
# VM options: -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=55941:/Applications/IntelliJ IDEA 2.app/Contents/bin -Dfile.encoding=UTF-8
# Warmup: iterations, 1 s each
# Measurement: iterations, 1 s each
# Timeout: min per iteration
# Threads: thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: com.flydean.BenchMarkUsage.measureThroughput
# Run progress:.66% complete, ETA 00:01:42
# Fork: of 1
# Warmup Iteration: 9.727 ops/s
# Warmup Iteration: 9.684 ops/s
# Warmup Iteration: 9.678 ops/s
Iteration: 9.652 ops/s
Iteration: 9.678 ops/s
Iteration: 9.733 ops/s
Iteration: 9.651 ops/s
Iteration: 9.678 ops/s
Result "com.flydean.BenchMarkUsage.measureThroughput":.678 ±(99.9%) 0.129 ops/s [Average]
(min, avg, max) = (.651, 9.678, 9.733), stdev = 0.034
CI (.9%): [9.549, 9.808] (assumes normal distribution)
ops/s 是每秒的OPS次数。程序会给出运行的最小值,平均值和最大值。同时给出标准差stdev和置信区间CI。
BenchmarkMode
上面的例子中, 我们只用了最简单的@Benchmark。如果想实现更加复杂和自定义的BenchMark,我们可以使用@BenchmarkMode。
先举个例子:
@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void measureThroughput() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep();
}
上面的例子中,我们指定了@BenchmarkMode(Mode.Throughput),Throughput的意思是整体吞吐量,表示给定的时间内执行的次数。
这里我们通过 @OutputTimeUnit(TimeUnit.SECONDS)来指定时间单位。
Mode除了Throughput还有如下几种模式:
- AverageTime – 调用的平均时间
- SampleTime – 随机取样,最后输出取样结果的分布
- SingleShotTime – 只会执行一次,通常用来测试冷启动时候的性能。
- All – 所有的benchmark modes。
Fork和Warmup
上面的例子中我们通过代码来显式的制定Fork和Warmup,我们也可以使用注解来实现:
@Fork(value =, warmups = 2)
@Warmup(iterations =)
上面的例子中value表示该benchMark执行多少次,warmups表示fork多少个进程来执行。iterations表示warmup的iterations个数。
如果你同时在代码中和注解中都配置了相关的信息,那么注解将会覆盖掉代码中的显示配置。
State和Scope
如果我们在多线程环境中使用beachMark,那么多线程中用到的类变量是共享还是每个线程一个呢?
这个时候我们就要用到@State注解。
@State(Scope.Benchmark)
public class StateUsage {
}
Scope有三种:
- Scope.Thread:默认的State,每个测试线程分配一个实例;
- Scope.Benchmark:所有测试线程共享一个实例,用于测试有状态实例在多线程共享下的性能;
- Scope.Group:每个线程组共享一个实例;
第二十章 java中ThreadLocalRandom的使用
在java中我们通常会需要使用到java.util.Random来便利的生产随机数。但是Random是线程安全的,如果要在线程环境中的话就有可能产生性能瓶颈。
我们以Random中常用的nextInt方法为例来具体看一下:
public int nextInt() {
return next();
}
nextInt方法实际上调用了下面的方法:
protected int next(int bits) {
long oldseed, nextseed;
AtomicLong seed = this.seed;
do {
oldseed = seed.get();
nextseed = (oldseed * multiplier + addend) & mask;
} while (!seed.compareAndSet(oldseed, nextseed));
return (int)(nextseed >>> ( - bits));
}
从代码中我们可以看到,方法内部使用了AtomicLong,并调用了它的compareAndSet方法来保证线程安全性。所以这个是一个线程安全的方法。
其实在多个线程环境中,Random根本就需要共享实例,那么该怎么处理呢?
在JDK 7 中引入了一个ThreadLocalRandom的类。ThreadLocal大家都知道就是线程的本地变量,而ThreadLocalRandom就是线程本地的Random。
我们看下怎么调用:
ThreadLocalRandom.current().nextInt();
我们来为这两个类分别写一个benchMark测试:
public class RandomUsage {
public void testRandom() throws InterruptedException {
ExecutorService executorService=Executors.newFixedThreadPool();
Random random = new Random();
List<Callable<Integer>> callables = new ArrayList<>();
for (int i =; i < 1000; i++) {
callables.add(() -> {
return random.nextInt();
});
}
executorService.invokeAll(callables);
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(RandomUsage.class.getSimpleName())
// 预热轮
.warmupIterations()
// 度量轮
.measurementIterations()
.forks()
.build();
new Runner(opt).run();
}
}
public class ThreadLocalRandomUsage {
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void testThreadLocalRandom() throws InterruptedException {
ExecutorService executorService=Executors.newFixedThreadPool();
List<Callable<Integer>> callables = new ArrayList<>();
for (int i =; i < 1000; i++) {
callables.add(() -> {
return ThreadLocalRandom.current().nextInt();
});
}
executorService.invokeAll(callables);
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(ThreadLocalRandomUsage.class.getSimpleName())
// 预热轮
.warmupIterations()
// 度量轮
.measurementIterations()
.forks()
.build();
new Runner(opt).run();
}
}
分析运行结果,我们可以看出ThreadLocalRandom在多线程环境中会比Random要快。
第二十一章 java中FutureTask的使用
FutureTask简介
FutureTask是java 5引入的一个类,从名字可以看出来FutureTask既是一个Future,又是一个Task。
我们看下FutureTask的定义:
public class FutureTask<V> implements RunnableFuture<V> {
...
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/ void run();
}
FutureTask实现了RunnableFuture接口,RunnableFuture接口是Runnable和Future的综合体。
作为一个Future,FutureTask可以执行异步计算,可以查看异步程序是否执行完毕,并且可以开始和取消程序,并取得程序最终的执行结果。
除此之外,FutureTask还提供了一个runAndReset()的方法, 该方法可以运行task并且重置Future的状态。
Callable和Runnable的转换
我们知道Callable是有返回值的,而Runnable是没有返回值的。 Executors提供了很多有用的方法,将Runnable转换为Callable:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
FutureTask内部包含一个Callable,并且可以接受Callable和Runnable作为构造函数:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
它的内部就是调用了Executors.callable(runnable, result);方法进行转换的。
以Runnable运行
既然是一个Runnable,那么FutureTask就可以以线程的方式执行,我们来看一个例子:
@Test
public void convertRunnableToCallable() throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.info("inside callable future task ...");
return;
}
});
Thread thread= new Thread(futureTask);
thread.start();
log.info(futureTask.get().toString());
}
上面例子是以单个线程来执行的,同样我们也可以将FutureTask提交给线程池来执行:
@Test
public void workWithExecutorService() throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.info("inside futureTask");
return;
}
});
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(futureTask);
executor.shutdown();
log.info(futureTask.get().toString());
}
第二十二章 java中CompletableFuture的使用
之前的文章中,我们讲解了Future, 本文我们将会继续讲解java 8中引入的CompletableFuture的用法。
CompletableFuture首先是一个Future,它拥有Future所有的功能,包括获取异步执行结果,取消正在执行的任务等。
除此之外,CompletableFuture还是一个CompletionStage。
我们看下CompletableFuture的定义:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
什么是CompletionStage呢?
在异步程序中,如果将每次的异步执行都看成是一个stage的话,我们通常很难控制异步程序的执行顺序,在javascript中,我们需要在回调中执行回调。这就会形成传说中的回调地狱。
好在在ES6中引入了promise的概念,可以将回调中的回调转写为链式调用,从而大大的提升了程序的可读性和可写性。
同样的在java中,我们使用CompletionStage来实现异步调用的链式操作。
CompletionStage定义了一系列的then*** 操作来实现这一功能。
CompletableFuture作为Future使用
调用CompletableFuture.complete方法可以立马返回结果,我们看下怎么使用这个方法来构建一个基本的Future:
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep();
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
上面我们通过调动ExecutorService来提交一个任务从而得到一个Future。如果你知道执行的结果,那么可以使用CompletableFuture的completedFuture方法来直接返回一个Future。
public Future<String> useCompletableFuture(){
Future<String> completableFuture =
CompletableFuture.completedFuture("Hello");
return completableFuture;
}
CompletableFuture还提供了一个cancel方法来立马取消任务的执行:
public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep();
completableFuture.cancel(false);
return null;
});
return completableFuture;
}
如果这个时候调用Future的get方法,将会报CancellationException异常。
Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException
异步执行code
CompletableFuture提供了runAsync和supplyAsync的方法,可以以异步的方式执行代码。
我们看一个runAsync的基本应用,接收一个Runnable参数:
public void runAsync(){
CompletableFuture<Void> runAsync= CompletableFuture.runAsync(()->{
log.info("runAsync");
});
}
而supplyAsync接受一个Supplier:
public void supplyAsync(){
CompletableFuture<String> supplyAsync=CompletableFuture.supplyAsync(()->{
return "supplyAsync";
});
}
他们两个的区别是一个没有返回值,一个有返回值。
组合Futures
上面讲到CompletableFuture的一个重大作用就是将回调改为链式调用,从而将Futures组合起来。
而链式调用的返回值还是CompletableFuture,我们看一个thenCompose的例子:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
thenCompose将前一个Future的返回结果作为后一个操作的输入。
如果我们想合并两个CompletableFuture的结果,则可以使用thenCombine:
public void thenCombine(){
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s, s2) -> s1 + s2));
}
如果你不想返回结果,则可以使用thenAcceptBoth:
public void thenAcceptBoth(){
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s, s2) -> System.out.println(s1 + s2));
}
thenApply() 和 thenCompose()的区别
thenApply()和thenCompose()两个方法都可以将CompletableFuture连接起来,但是两个有点不一样。
thenApply()接收的是前一个调用返回的结果,然后对该结果进行处理。
thenCompose()接收的是前一个调用的stage,返回flat之后的的CompletableFuture。
简单点比较,两者就像是map和flatMap的区别。
并行执行任务
当我们需要并行执行任务时,通常我们需要等待所有的任务都执行完毕再去处理其他的任务,那么我们可以用到CompletableFuture.allOf方法:
public void allOf(){
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future, future2, future3);
}
allOf只保证task全都执行,而并没有返回值,如果希望带有返回值,我们可以使用join:
public void join(){
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "World");
String combined = Stream.of(future, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
}
上面的程序将会返回:“Hello Beautiful World”。
异常处理
如果在链式调用的时候抛出异常,则可以在最后使用handle来接收:
public void handleError(){
String name = null;
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
}
这和Promise中的catch方法使用类似。
第二十三章 java中使用Semaphore构建阻塞对象池
Semaphore是java 5中引入的概念,叫做计数信号量。主要用来控制同时访问某个特定资源的访问数量或者执行某个操作的数量。
Semaphore中定义了一组虚拟的permits,通过获取和释放这些permits,Semaphore可以控制资源的个数。
Semaphore的这个特性可以用来构造资源池,比如数据库连接池等。
Semaphore有两个构造函数:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
permits定义了许可资源的个数,而fair则表示是否支持FIFO的顺序。
两个比较常用的方法就是acquire和release了。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly();
}
public void release() {
sync.releaseShared();
}
其中acquire用来获取资源,release用来释放资源。
有了这两个特性, 我们看一下怎么使用Semaphore来定义一个一个有界容器。
我们可以将Semaphore初始化为容器池大小,并且在容器池获取资源时调用acquire,将资源返回给容器池之后再调用release。
我们看下面的一个实现:
public class SemaphoreUsage<T> {
private final Set<T> set;
private final Semaphore sem;
public SemaphoreUsage(int bound){
this.set = Collections.synchronizedSet(new HashSet<T>());
sem= new Semaphore(bound);
}
public boolean add (T o) throws InterruptedException{
sem.acquire();
boolean wasAdded = false;
try{
wasAdded=set.add(o);
return wasAdded;
}finally {
if(!wasAdded){
sem.release();
}
}
}
public boolean remove(Object o){
boolean wasRemoved = set.remove(o);
if(wasRemoved){
sem.release();
}
return wasRemoved;
}
}
上面的例子我们定义了一个有界的synchronizedSet。 要注意一点是在add方法中,只有add成功之后才会调用release方法。
第二十四章 在java中构建高效的结果缓存
缓存是现代应用服务器中非常常用的组件。除了第三方缓存以外,我们通常也需要在java中构建内部使用的缓存。那么怎么才能构建一个高效的缓存呢? 本文将会一步步的进行揭秘。
使用HashMap
缓存通常的用法就是构建一个内存中使用的Map,在做一个长时间的操作比如计算之前,先在Map中查询一下计算的结果是否存在,如果不存在的话再执行计算操作。
我们定义了一个代表计算的接口:
public interface Calculator<A, V> {
V calculate(A arg) throws InterruptedException;
}
该接口定义了一个calculate方法,接收一个参数,并且返回计算的结果。
我们要定义的缓存就是这个Calculator具体实现的一个封装。
我们看下用HashMap怎么实现:
public class MemoizedCalculator<A, V> implements Calculator<A, V> {
private final Map<A, V> cache= new HashMap<A, V>();
private final Calculator<A, V> calculator;
public MemoizedCalculator(Calculator<A, V> calculator){
this.calculator=calculator;
}
@Override
public synchronized V calculate(A arg) throws InterruptedException {
V result= cache.get(arg);
if( result ==null ){
result= calculator.calculate(arg);
cache.put(arg, result);
}
return result;
}
}
MemoizedCalculator1封装了Calculator,在调用calculate方法中,实际上调用了封装的Calculator的calculate方法。
因为HashMap不是线程安全的,所以这里我们使用了synchronized关键字,从而保证一次只有一个线程能够访问calculate方法。
虽然这样的设计能够保证程序的正确执行,但是每次只允许一个线程执行calculate操作,其他调用calculate方法的线程将会被阻塞,在多线程的执行环境中这会严重影响速度。从而导致使用缓存可能比不使用缓存需要的时间更长。
使用ConcurrentHashMap
因为HashMap不是线程安全的,那么我们可以尝试使用线程安全的ConcurrentHashMap来替代HashMap。如下所示:
public class MemoizedCalculator<A, V> implements Calculator<A, V> {
private final Map<A, V> cache= new ConcurrentHashMap<>();
private final Calculator<A, V> calculator;
public MemoizedCalculator(Calculator<A, V> calculator){
this.calculator=calculator;
}
@Override
public V calculate(A arg) throws InterruptedException {
V result= cache.get(arg);
if( result ==null ){
result= calculator.calculate(arg);
cache.put(arg, result);
}
return result;
}
}
上面的例子中虽然解决了之前的线程等待的问题,但是当有两个线程同时在进行同一个计算的时候,仍然不能保证缓存重用,这时候两个线程都会分别调用计算方法,从而导致重复计算。
我们希望的是如果一个线程正在做计算,其他的线程只需要等待这个线程的执行结果即可。很自然的,我们想到了之前讲到的FutureTask。FutureTask表示一个计算过程,我们可以通过调用FutureTask的get方法来获取执行的结果,如果该执行正在进行中,则会等待。
下面我们使用FutureTask来进行改写。
FutureTask
@Slfj
public class MemoizedCalculator<A, V> implements Calculator<A, V> {
private final Map<A, Future<V>> cache= new ConcurrentHashMap<>();
private final Calculator<A, V> calculator;
public MemoizedCalculator(Calculator<A, V> calculator){
this.calculator=calculator;
}
@Override
public V calculate(A arg) throws InterruptedException {
Future<V> future= cache.get(arg);
V result=null;
if( future ==null ){
Callable<V> callable= new Callable<V>() {
@Override
public V call() throws Exception {
return calculator.calculate(arg);
}
};
FutureTask<V> futureTask= new FutureTask<>(callable);
future= futureTask;
cache.put(arg, futureTask);
futureTask.run();
}
try {
result= future.get();
} catch (ExecutionException e) {
log.error(e.getMessage(),e);
}
return result;
}
}
上面的例子,我们用FutureTask来封装计算,并且将FutureTask作为Map的value。
上面的例子已经体现了很好的并发性能。但是因为if语句是非原子性的,所以对这一种先检查后执行的操作,仍然可能存在同一时间调用的情况。
这个时候,我们可以借助于ConcurrentHashMap的原子性操作putIfAbsent来重写上面的类:
@Slfj
public class MemoizedCalculator<A, V> implements Calculator<A, V> {
private final Map<A, Future<V>> cache= new ConcurrentHashMap<>();
private final Calculator<A, V> calculator;
public MemoizedCalculator(Calculator<A, V> calculator){
this.calculator=calculator;
}
@Override
public V calculate(A arg) throws InterruptedException {
while (true) {
Future<V> future = cache.get(arg);
V result = null;
if (future == null) {
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return calculator.calculate(arg);
}
};
FutureTask<V> futureTask = new FutureTask<>(callable);
future = cache.putIfAbsent(arg, futureTask);
if (future == null) {
future = futureTask;
futureTask.run();
}
try {
result = future.get();
} catch (CancellationException e) {
log.error(e.getMessage(), e);
cache.remove(arg, future);
} catch (ExecutionException e) {
log.error(e.getMessage(), e);
}
return result;
}
}
}
}
上面使用了一个while循环,来判断从cache中获取的值是否存在,如果不存在则调用计算方法。
上面我们还要考虑一个缓存污染的问题,因为我们修改了缓存的结果,如果在计算的时候,计算被取消或者失败,我们需要从缓存中将FutureTask移除。
第二十五章 java中CompletionService的使用
之前的文章中我们讲到了ExecutorService,通过ExecutorService我们可以提交一个个的task,并且返回Future,然后通过调用Future.get方法来返回任务的执行结果。
这种方式虽然有效,但是需要保存每个返回的Future值,还是比较麻烦的,幸好ExecutorService提供了一个invokeAll的方法,来保存所有的Future值,我们看一个具体的实现:
public void useExecutorService() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool();
Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep();
return "Task's execution";
};
List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
List<Future<String>> futures = executor.invokeAll(callableTasks);
executor.shutdown();
}
上面的例子中,我们定义了3个task,通过调用executor.invokeAll(callableTasks)返回了一个 List<Future>,这样我们就可以得到所有的返回值了。
除了上面的invokeAll方法外,我们今天要介绍一个CompletionService接口。
CompletionService实际上是ExecutorService和BlockingQueue的结合体,ExecutorService用来提交任务,而BlockingQueue用来保存封装成Future的执行结果。通过调用take和poll的方法来获取到Future值。
CompletionService是一个接口,我们看下它的一个具体实现ExecutorCompletionService:
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
ExecutorCompletionService接收一个Executor作为参数。
我们看下上面的例子如果用ExecutorCompletionService重写是怎么样的:
public void useCompletionService() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool();
CompletionService<String> completionService=new ExecutorCompletionService<String>(executor);
Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep();
return "Task's execution";
};
for(int i=; i< 5; i ++){
completionService.submit(callableTask);
}
for(int i=; i<5; i++){
Future<String> result=completionService.take();
System.out.println(result.get());
}
}
上面的例子通过completionService.submit来提交任务,通过completionService.take()来获取结果值。
其实CompletionService还有一个poll的方法,poll和take的区别在于:take如果获取不到值则会等待,而poll则会返回null。
第二十六章 使用ExecutorService来停止线程服务
之前的文章中我们提到了ExecutorService可以使用shutdown和shutdownNow来关闭。
这两种关闭的区别在于各自的安全性和响应性。shutdownNow强行关闭速度更快,但是风险也更大,因为任务可能正在执行的过程中被结束了。而shutdown正常关闭虽然速度比较慢,但是却更安全,因为它一直等到队列中的所有任务都执行完毕之后才关闭。
使用shutdown
我们先看一个使用shutdown的例子:
public void useShutdown() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool();
Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executor.submit(runnableTask);
executor.shutdown();
executor.awaitTermination(, TimeUnit.MILLISECONDS);
}
awaitTermination将会阻塞直到所有正在执行的任务完成,或者达到指定的timeout时间。
使用shutdownNow
当通过shutdownNow来强行关闭ExecutorService是, 它会尝试取消正在执行的任务,并返回所有已经提交但是还没有开始的任务。从而可以将这些任务保存起来,以便以后进行处理。
但是这样我们只知道了还没有开始执行的任务,对于那些已经开始执行但是没有执行完毕却被取消的任务我们无法获取。
我们看下如何获得开始执行但是还没有执行完毕的任务:
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService executorService;
private final Set<Runnable> taskCancelledAtShutdown= Collections.synchronizedSet(new HashSet<Runnable>());
public TrackingExecutor(ExecutorService executorService){
this.executorService=executorService;
}
@Override
public void shutdown() {
executorService.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return executorService.shutdownNow();
}
@Override
public boolean isShutdown() {
return executorService.isShutdown();
}
@Override
public boolean isTerminated() {
return executorService.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorService.awaitTermination(timeout,unit);
}
@Override
public void execute(Runnable command) {
executorService.execute(() -> {
try {
command.run();
}finally {
if(isShutdown() && Thread.currentThread().isInterrupted()){
taskCancelledAtShutdown.add(command);
}
}
});
}
public List<Runnable> getCancelledTask(){
if(! executorService.isTerminated()){
throw new IllegalStateException("executorService is not terminated");
}
return new ArrayList<>(taskCancelledAtShutdown);
}
}
上面的例子中我们构建了一个新的ExecutorService,他传入一个ExecutorService,并对其进行封装。
我们重写了execute方法,在执行完毕判断该任务是否被中断,如果被中断则将其添加到CancelledTask列表中。
并提供一个getCancelledTask方法来返回未执行完毕的任务。
我们看下怎么使用:
public void useShutdownNow() throws InterruptedException {
TrackingExecutor trackingExecutor=new TrackingExecutor(Executors.newCachedThreadPool());
Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
trackingExecutor.submit(runnableTask);
List<Runnable> notrunList=trackingExecutor.shutdownNow();
if(trackingExecutor.awaitTermination(, TimeUnit.SECONDS)){
List<Runnable> runButCancelledList= trackingExecutor.getCancelledTask();
}
}
trackingExecutor.shutdownNow()返回的是未执行的任务。而trackingExecutor.getCancelledTask()返回的是被取消的任务。
上面的任务其实还有一个缺点,因为我们在存储被取消的任务列表的额时候taskCancelledAtShutdown.add(command),因为之前的判断不是原子操作,则可能会产生误报。
第二十七章 我们的线程被饿死了
我们在构建线程池的时候可以构建单个线程的线程池和多个线程的线程池。
那么线程池使用不当可不可能产生死锁呢?我们知道死锁是循环争夺资源而产生的。线程池中的线程也是资源的一种,那么如果对线程池中的线程进行争夺的话也是可能产生死锁的。
在单个线程的线程池中,如果一个正在执行的线程中,使用该线程池再去提交第二个任务,因为线程池中的线程只有一个,那么第二个任务将会等待第一个任务的执行完成来释放线程,而第一个任务又在等待第二任务的执行来完成任务。从而产生了线程饥饿死锁(Thread Starvation Deadlock).
线程饥饿死锁并不一定在单个线程的线程池中产生,只要有这种循环使用线程池的情况都可能产生这种问题。
我们看下例子:
public class ThreadPoolDeadlock {
ExecutorService executorService= Executors.newSingleThreadExecutor();
public class RenderPageTask implements Callable<String> {
public String call() throws Exception{
Future<String> header, footer;
header= executorService.submit(()->{
return "header";
});
footer= executorService.submit(()->{
return "footer";
});
return header.get()+ footer.get();
}
}
public void submitTask(){
executorService.submit(new RenderPageTask());
}
}
我们在executorService中提交了一个RenderPageTask,而RenderPageTask又提交了两个task。因为ExecutorService线程池只有一个线程,则会产生死锁。
我们的线程被饿死了!
第二十八章 java中有界队列的饱和策略(reject policy)
我们在使用ExecutorService的时候知道,在ExecutorService中有个一个Queue来保存提交的任务,通过不同的构造函数,我们可以创建无界的队列(ExecutorService.newCachedThreadPool)和有界的队列(ExecutorService newFixedThreadPool(int nThreads))。
无界队列很好理解,我们可以无限制的向ExecutorService提交任务。那么对于有界队列来说,如果队列满了该怎么处理呢?
今天我们要介绍一下java中ExecutorService的饱和策略(reject policy)。
以ExecutorService的具体实现ThreadPoolExecutor来说,它定义了4种饱和策略。分别是AbortPolicy,DiscardPolicy,DiscardOldestPolicy和CallerRunsPolicy。
如果要在ThreadPoolExecutor中设定饱和策略可以调用setRejectedExecutionHandler方法,如下所示:
ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(20));
threadPoolExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.AbortPolicy()
);
上面的例子中我们定义了一个初始5个,最大10个工作线程的Thread Pool,并且定义其中的Queue的容量是20。如果提交的任务超出了容量,则会使用AbortPolicy策略。
AbortPolicy
AbortPolicy意思是如果队列满了,最新的提交任务将会被拒绝,并抛出RejectedExecutionException异常:
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/ public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
上面的代码中,rejectedExecution方法中我们直接抛出了RejectedExecutionException异常。
DiscardPolicy
DiscardPolicy将会悄悄的丢弃提交的任务,而不报任何异常。
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/ public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy
DiscardOldestPolicy将会丢弃最老的任务,保存最新插入的任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/ public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
我们看到在rejectedExecution方法中,poll了最老的一个任务,然后使用ThreadPoolExecutor提交了一个最新的任务。
CallerRunsPolicy
CallerRunsPolicy和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务,从而降低调用者的调用速度。我们看下是怎么实现的:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/ public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
在rejectedExecution方法中,直接调用了 r.run()方法,这会导致该方法直接在调用者的主线程中执行,而不是在线程池中执行。从而导致主线程在该任务执行结束之前不能提交任何任务。从而有效的阻止了任务的提交。
使用Semaphore
如果我们并没有定义饱和策略,那么有没有什么方法来控制任务的提交速度呢?考虑下之前我们讲到的Semaphore,我们可以指定一定的资源信号量来控制任务的提交,如下所示:
public class SemaphoreUsage {
private final Executor executor;
private final Semaphore semaphore;
public SemaphoreUsage(Executor executor, int count) {
this.executor = executor;
this.semaphore = new Semaphore(count);
}
public void submitTask(final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executor.execute(() -> {
try {
command.run();
} finally {
semaphore.release();
}
}
);
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
第二十九章 由于不当的执行顺序导致的死锁
为了保证线程的安全,我们引入了加锁机制,但是如果不加限制的使用加锁,就有可能会导致顺序死锁(Lock-Ordering Deadlock)。上篇文章我们也提到了在线程词中因为资源的不足而导致的资源死锁(Resource Deadlock)。
本文将会讨论一下顺序死锁的问题。
我们来讨论一个经常存在的账户转账的问题。账户A要转账给账户B。为了保证在转账的过程中A和B不被其他的线程意外的操作,我们需要给A和B加锁,然后再进行转账操作, 我们看下转账的代码:
public void transferMoneyDeadLock(Account from,Account to, int amount) throws InsufficientAmountException {
synchronized (from){
synchronized (to){
transfer(from,to,amount);
}
}
}
private void transfer(Account from,Account to, int amount) throws InsufficientAmountException {
if(from.getBalance() < amount){
throw new InsufficientAmountException();
}else{
from.debit(amount);
to.credit(amount);
}
}
看起来上面的程序好像没有问题,因为我们给from和to都加了锁,程序应该可以很完美的按照我们的要求来执行。
那如果我们考虑下面的一个场景:
A:transferMoneyDeadLock(accountA, accountB,) B:transferMoneyDeadLock(accountB, accountA,)
如果A和B同时执行,则可能会产生A获得了accountA的锁,而B获得了accountB的锁。从而后面的代码无法继续执行,从而导致了死锁。
对于这样的情况,我们有没有什么好办法来处理呢?
加入不管参数怎么传递,我们都先lock accountA再lock accountB是不是就不会出现死锁的问题了呢?
我们看下代码实现:
private void transfer(Account from,Account to, int amount) throws InsufficientAmountException {
if(from.getBalance() < amount){
throw new InsufficientAmountException();
}else{
from.debit(amount);
to.credit(amount);
}
}
public void transferMoney(Account from,Account to, int amount) throws InsufficientAmountException {
int fromHash= System.identityHashCode(from);
int toHash = System.identityHashCode(to);
if(fromHash < toHash){
synchronized (from){
synchronized (to){
transfer(from,to, amount);
}
}
}else if(fromHash < toHash){
synchronized (to){
synchronized (from){
transfer(from,to, amount);
}
}
}else{
synchronized (lock){
synchronized (from) {
synchronized (to) {
transfer(from, to, amount);
}
}
}
}
}
上面的例子中,我们使用了System.identityHashCode来获得两个账号的hash值,通过比较hash值的大小来选定lock的顺序。
如果两个账号的hash值恰好相等的情况下,我们引入了一个新的外部lock,从而保证同一时间只有一个线程能够运行内部的方法,从而保证了任务的执行而不产生死锁。
第三十章 非阻塞同步机制和CAS
我们知道在java 5之前同步是通过Synchronized关键字来实现的,在java 5之后,java.util.concurrent包里面添加了很多性能更加强大的同步类。这些强大的类中很多都实现了非阻塞的同步机制从而帮助其提升性能。
什么是非阻塞同步
非阻塞同步的意思是多个线程在竞争相同的数据时候不会发生阻塞,从而能够在更加细粒度的维度上进行协调,从而极大的减少线程调度的开销,从而提升效率。非阻塞算法不存在锁的机制也就不存在死锁的问题。
在基于锁的算法中,如果一个线程持有了锁,那么其他的线程将无法进行下去。使用锁虽然可以保证对资源的一致性访问,但是在挂起和恢复线程的执行过程中存在非常大的开销,如果锁上面存在着大量的竞争,那么有可能调度开销比实际工作开销还要高。
悲观锁和乐观锁
我们知道独占锁是一个悲观锁,悲观锁的意思就是假设最坏的情况,如果你不锁定该资源,那么就有其他的线程会修改该资源。悲观锁虽然可以保证任务的顺利执行,但是效率不高。
乐观锁就是假设其他的线程不会更改要处理的资源,但是我们在更新资源的时候需要判断该资源是否被别的线程所更改。如果被更改那么更新失败,我们可以重试,如果没有被更改,那么更新成功。
使用乐观锁的前提是假设大多数时间系统对资源的更新是不会产生冲突的。
乐观锁的原子性比较和更新操作,一般都是由底层的硬件支持的。
CAS
大多数的处理器都实现了一个CAS指令(compare and swap),通常来说一个CAS接收三个参数,数据的现值V,进行比较的值A,准备写入的值B。只有当V和A相等的时候,才会写入B。无论是否写入成功,都会返回V。翻译过来就是“我认为V现在的值是A,如果是那么将V的值更新为B,否则不修改V的值,并告诉我现在V的值是多少。”
这就是CAS的含义,JDK中的并发类是通过使用Unsafe类来使用CAS的,我们可以自己构建一个并发类,如下所示:
public class CasCounter {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
private volatile int value;
static {
try {
valueOffset = unsafe.objectFieldOffset
(CasCounter.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
public CasCounter(int initialValue) {
value = initialValue;
}
public CasCounter() {
}
public final int get() {
return value;
}
public final void set(int newValue) {
value = newValue;
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
}
上面的例子中,我们定义了一个原子操作compareAndSet, 它内部调用了unsafe的compareAndSwapInt方法。
看起来上面的CAS使用比直接使用锁复杂,但实际上在JVM中实现锁定时需要遍历JVM中一条非常复杂的代码路径,并可能导致操作系统级的锁定,线程挂机和上下文切换等操作。在最好的情况下,锁定需要执行一次CAS命令。
CAS的主要缺点就是需要调用者自己来处理竞争问题(重试,回退,放弃),而在锁中可以自动处理这些问题。
前面的文章我们也讲到了原子变量,原子变量的底层就是使用CAS。
第三十一章 非阻塞算法(Lock-Free)的实现
上篇文章我们讲到了使用锁会带来的各种缺点,本文将会讲解如何使用非阻塞算法。非阻塞算法一般会使用CAS来协调线程的操作。
虽然非阻塞算法有诸多优点,但是在实现上要比基于锁的算法更加繁琐和负责。
本文将会介绍两个是用非阻塞算法实现的数据结构。
非阻塞的栈
我们先使用CAS来构建几个非阻塞的栈。栈是最简单的链式结构,其本质是一个链表,而链表的根节点就是栈顶。
我们先构建Node数据结构:
public class Node<E> {
public final E item;
public Node<E> next;
public Node(E item){
this.item=item;
}
}
这个Node保存了内存item和它的下一个节点next。
然后我们构建非阻塞的栈,在该栈中我们需要实现pop和push方法,我们使用一个Atomic类来保存top节点的引用,在pop和push之前调用compareAndSet命令来保证命令的原子性。同时,我们需要不断的循环,以保证在线程冲突的时候能够重试更新。
public class ConcurrentStack<E> {
AtomicReference<Node<E>> top= new AtomicReference<>();
public void push(E item){
Node<E> newNode= new Node<>(item);
Node<E> oldNode;
do{
oldNode=top.get();
newNode.next= oldNode;
}while(!top.compareAndSet(oldNode, newNode));
}
public E pop(){
Node<E> oldNode;
Node<E> newNode;
do {
oldNode = top.get();
if(oldNode == null){
return null;
}
newNode=oldNode.next;
}while(!top.compareAndSet(oldNode, newNode));
return oldNode.item;
}
}
非阻塞的链表
构建链表要比构建栈复杂。因为我们要维持头尾两个指针。以put方法来说,我们需要执行两步操作:1. 在尾部插入新的节点。2.将尾部指针指向最新的节点。
我们使用CAS最多只能保证其中的一步是原子执行。那么对于1和2的组合步骤该怎么处理呢?
我们再仔细考虑考虑,其实1和2并不一定要在同一个线程中执行,其他线程在检测到有线程插入了节点,但是没有将tail指向最后的节点时,完全帮忙完成这个操作。
我们看下具体的代码实现:
public class LinkedNode<E> {
public final E item;
public final AtomicReference<LinkedNode<E>> next;
public LinkedNode(E item, LinkedNode<E> next){
this.item=item;
this.next=new AtomicReference<>(next);
}
}
先构建一个LinkedNode类。
public class LinkedQueue<E> {
private final LinkedNode<E> nullNode= new LinkedNode<>(null, null);
private final AtomicReference<LinkedNode<E>> head= new AtomicReference<>(nullNode);
private final AtomicReference<LinkedNode<E>> tail= new AtomicReference<>(nullNode);
public boolean put(E item){
LinkedNode<E> newNode = new LinkedNode<>(item, null);
while (true){
LinkedNode<E> currentTail= tail.get();
LinkedNode<E> tailNext= currentTail.next.get();
if(currentTail == tail.get()){
if (tailNext != null) {
//有其他的线程已经插入了一个节点,但是还没有将tail指向最新的节点
tail.compareAndSet(currentTail, tailNext);
}else{
//没有其他的线程插入节点,那么做两件事情:. 插入新节点,2.将tail指向最新的节点
if(currentTail.next.compareAndSet(null, newNode)){
tail.compareAndSet(currentTail, newNode);
}
}
}
}
}
}
第三十二章 java内存模型(JMM)和happens-before
我们知道java程序是运行在JVM中的,而JVM就是构建在内存上的虚拟机,那么内存模型JMM是做什么用的呢?
我们考虑一个简单的赋值问题:
int a=;
JMM考虑的就是什么情况下读取变量a的线程可以看到值为100。看起来这是一个很简单的问题,赋值之后不就可以读到值了吗?
但是上面的只是我们源码的编写顺序,当把源码编译之后,在编译器中生成的指令的顺序跟源码的顺序并不是完全一致的。处理器可能采用乱序或者并行的方式来执行指令(在JVM中只要程序的最终执行结果和在严格串行环境中执行结果一致,这种重排序是允许的)。并且处理器还有本地缓存,当将结果存储在本地缓存中,其他线程是无法看到结果的。除此之外缓存提交到主内存的顺序也肯能会变化。
上面提到的种种可能都会导致在多线程环境中产生不同的结果。在多线程环境中,大部分时间多线程都是在执行各自的任务,只有在多个线程需要共享数据的时候,才需要协调线程之间的操作。
而JMM就是JVM中必须遵守的一组最小保证,它规定了对于变量的写入操作在什么时候对其他线程是可见的。
重排序
上面讲了JVM中的重排序,这里我们举个例子,以便大家对重排序有一个更深入的理解:
@Slfj
public class Reorder {
int x=, y=0;
int a=, b=0;
private void reorderMethod() throws InterruptedException {
Thread one = new Thread(()->{
a=;
x=b;
});
Thread two = new Thread(()->{
b=;
y=a;
});
one.start();
two.start();
one.join();
two.join();
log.info("{},{}", x, y);
}
public static void main(String[] args) throws InterruptedException {
for (int i=; i< 100; i++){
new Reorder().reorderMethod();
}
}
}
上面的例子是一个很简单的并发程序。由于我们没有使用同步限制,所以线程one和two的执行顺序是不定的。有可能one在two之前执行,也有可能在two之后执行,也可能两者同时执行。不同的执行顺序可能会导致不同的输出结果。
同时虽然我们在代码中指定了先执行a=1, 再执行x=b,但是这两条语句实际上是没有关系的,在JVM中完全可能将两条语句重排序成x=b在前,a=1在后,从而导致输出更多意想不到的结果。
Happens-Before
为了保证java内存模型中的操作顺序,JMM为程序中的所有操作定义了一个顺序关系,这个顺序叫做Happens-Before。要想保证操作B看到操作A的结果,不管A和B是在同一线程还是不同线程,那么A和B必须满足Happens-Before的关系。如果两个操作不满足happens-before的关系,那么JVM可以对他们任意重排序。
我们看一下happens-before的规则:
- 程序顺序规则: 如果在程序中操作A在操作B之前,那么在同一个线程中操作A将会在操作B之前执行。
注意,这里的操作A在操作B之前执行是指在单线程环境中,虽然虚拟机会对相应的指令进行重排序,但是最终的执行结果跟按照代码顺序执行是一样的。虚拟机只会对不存在依赖的代码进行重排序。
- 监视器锁规则: 监视器上的解锁操作必须在同一个监视器上面的加锁操作之前执行。
锁我们大家都很清楚了,这里的顺序必须指的是同一个锁,如果是在不同的锁上面,那么其执行顺序也不能得到保证。
- volatile变量规则: 对volatile变量的写入操作必须在对该变量的读操作之前执行。
原子变量和volatile变量在读写操作上面有着相同的语义。
- 线程启动规则: 线程上对Thread.start的操作必须要在该线程中执行任何操作之前执行。
- 线程结束规则: 线程中的任何操作都必须在其他线程检测到该线程结束之前执行。
- 中断规则: 当一个线程再另一个线程上调用interrupt时,必须在被中断线程检测到interrupt调用之前执行。
- 终结器规则: 对象的构造函数必须在启动该对象的终结器之前执行完毕。
- 传递性: 如果操作A在操作B之前执行,并且操作B在操作C之前执行,那么操作A必须在操作C之前执行。
上面的规则2很好理解,在加锁的过程中,不允许其他的线程获得该锁,也意味着其他的线程必须等待锁释放之后才能加锁和执行其业务逻辑。
4,5,6,7规则也很好理解,只有开始,才能结束。这符合我们对程序的一般认识。
8的传递性相信学过数学的人应该也不难理解。
接下来我们重点讨论一下规则3和规则1的结合。讨论之前我们再总结一下happens-before到底是做什么的。
因为JVM会对接收到的指令进行重排序,为了保证指令的执行顺序,我们才有了happens-before规则。上面讲到的2,3,4,5,6,7规则可以看做是重排序的节点,这些节点是不允许重排序的,只有在这些节点之间的指令才允许重排序。
结合规则1程序顺序规则,我们得到其真正的含义:代码中写在重排序节点之前的指令,一定会在重排序节点执行之前执行。
重排序节点就是一个分界点,它的位置是不能够移动的。看一下下面的直观例子:
线程1中有两个指令:set i=1, set volatile a=2。 线程2中也有两个指令:get volatile a, get i。
按照上面的理论,set和get volatile是两个重排序节点,set必须排在get之前。而依据规则1,代码中set i=1 在set volatile a=2之前,因为set volatile是重排序节点,所以需要遵守程序顺序执行规则,从而set i=1要在set volatile a=2之前执行。同样的道理get volatile a在get i之前执行。最后导致i=1在get i之前执行。
这个操作叫做借助同步。
安全发布
我们经常会用到单例模式来创建一个单的对象,我们看下下面的方法有什么不妥:
public class Book {
private static Book book;
public static Book getBook(){
if(book==null){
book = new Book();
}
return book;
}
}
上面的类中定义了一个getBook方法来返回一个新的book对象,返回对象之前,我们先判断了book是否为空,如果不为空的话就new一个book对象。
初看起来,好像没什么问题,但是如果仔细考虑JMM的重排规则,就会发现问题所在。 book=new Book()其实一个复杂的命令,并不是原子性操作。它大概可以分解为1.分配内存,2.实例化对象,3.将对象和内存地址建立关联。
其中2和3有可能会被重排序,然后就有可能出现book返回了,但是还没有初始化完毕的情况。从而出现不可以预见的错误。
根据上面我们讲到的happens-before规则, 最简单的办法就是给方法前面加上synchronized关键字:
public class Book {
private static Book book;
public synchronized static Book getBook(){
if(book==null){
book = new Book();
}
return book;
}
}
我们再看下面一种静态域的实现:
public class BookStatic {
private static BookStatic bookStatic= new BookStatic();
public static BookStatic getBookStatic(){
return bookStatic;
}
}
JVM在类被加载之后和被线程使用之前,会进行静态初始化,而在这个初始化阶段将会获得一个锁,从而保证在静态初始化阶段内存写入操作将对所有的线程可见。
上面的例子定义了static变量,在静态初始化阶段将会被实例化。这种方式叫做提前初始化。
下面我们再看一个延迟初始化占位类的模式:
public class BookStaticLazy {
private static class BookStaticHolder{
private static BookStaticLazy bookStatic= new BookStaticLazy();
}
public static BookStaticLazy getBookStatic(){
return BookStaticHolder.bookStatic;
}
}
上面的类中,只有在调用getBookStatic方法的时候才会去初始化类。
接下来我们再介绍一下双重检查加锁。
public class BookDLC {
private volatile static BookDLC bookDLC;
public static BookDLC getBookDLC(){
if(bookDLC == null ){
synchronized (BookDLC.class){
if(bookDLC ==null){
bookDLC=new BookDLC();
}
}
}
return bookDLC;
}
}
上面的类中检测了两次bookDLC的值,只有bookDLC为空的时候才进行加锁操作。看起来一切都很完美,但是我们要注意一点,这里bookDLC一定要是volatile。
因为bookDLC的赋值操作和返回操作并没有happens-before,所以可能会出现获取到一个仅部分构造的实例。这也是为什么我们要加上volatile关键词。
初始化安全性
本文的最后,我们将讨论一下在构造函数中含有final域的对象初始化。
对于正确构造的对象,初始化对象保证了所有的线程都能够正确的看到由构造函数为对象给各个final域设置的正确值,包括final域可以到达的任何变量(比如final数组中的元素,final的hashMap等)。
public class FinalSafe {
private final HashMap<String,String> hashMap;
public FinalSafe(){
hashMap= new HashMap<>();
hashMap.put("key","value1");
}
}
上面的例子中,我们定义了一个final对象,并且在构造函数中初始化了这个对象。那么这个final对象是将不会跟构造函数之后的其他操作重排序。
第三十三章 java多线程之Phaser
前面的文章中我们讲到了CyclicBarrier、CountDownLatch的使用,这里再回顾一下CountDownLatch主要用在一个线程等待多个线程执行完毕的情况,而CyclicBarrier用在多个线程互相等待执行完毕的情况。
Phaser是java 7 引入的新的并发API。他引入了新的Phaser的概念,我们可以将其看成一个一个的阶段,每个阶段都有需要执行的线程任务,任务执行完毕就进入下一个阶段。所以Phaser特别适合使用在重复执行或者重用的情况。
基本使用
在CyclicBarrier、CountDownLatch中,我们使用计数器来控制程序的顺序执行,同样的在Phaser中也是通过计数器来控制。在Phaser中计数器叫做parties, 我们可以通过Phaser的构造函数或者register()方法来注册。
通过调用register()方法,我们可以动态的控制phaser的个数。如果我们需要取消注册,则可以调用arriveAndDeregister()方法。
我们看下arrive:
public int arrive() {
return doArrive(ONE_ARRIVAL);
}
Phaser中arrive实际上调用了doArrive方法,doArrive接收一个adjust参数,ONE_ARRIVAL表示arrive,ONE_DEREGISTER表示arriveAndDeregister。
Phaser中的arrive()、arriveAndDeregister()方法,这两个方法不会阻塞,但是会返回相应的phase数字,当此phase中最后一个party也arrive以后,phase数字将会增加,即phase进入下一个周期,同时触发(onAdvance)那些阻塞在上一phase的线程。这一点类似于CyclicBarrier的barrier到达机制;更灵活的是,我们可以通过重写onAdvance方法来实现更多的触发行为。
下面看一个基本的使用:
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}
// allow threads to start and deregister self
phaser.arriveAndDeregister();
}
上面的例子中,我们在执行每个Runnable之前调用register()来注册, 然后调用arriveAndAwaitAdvance()来等待这一个Phaser周期结束。最后我们调用 phaser.arriveAndDeregister();来取消注册主线程。
下面来详细的分析一下运行步骤:
- final Phaser phaser = new Phaser(1);
这一步我们初始化了一个Phaser,并且指定其现在party的个数为1。
- phaser.register();
这一步注册Runnable task到phaser,同时将party+1。
- phaser.arriveAndAwaitAdvance()
这一步将会等待直到所有的party都arrive。这里只会将步骤2中注册的party标记为arrive,而步骤1中初始化的party一直都没有被arrive。
- phaser.arriveAndDeregister();
在主线程中,arrive了步骤1中的party,并且将party的个数减一。
- 步骤3中的phaser.arriveAndAwaitAdvance()将会继续执行,因为最后一个phaser在步骤4中arrive了。
多个Phaser周期
Phaser的值是从0到Integer.MAX_VALUE,每个周期过后该值就会加一,如果到达Integer.MAX_VALUE则会继续从0开始。
如果我们执行多个Phaser周期,则可以重写onAdvance方法:
protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties ==;
}
onAdvance将会在最后一个arrive()调用的时候被调用,如果这个时候registeredParties为0的话,该Phaser将会调用isTerminated方法结束该Phaser。
如果要实现多周期的情况,我们可以重写这个方法:
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties ==;
}
上面的例子中,如果phase次数超过了指定的iterations次数则就会自动终止。
我们看下实际的例子:
void startTasks(List<Runnable> tasks, final int iterations) {
final Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties ==;
}
};
phaser.register();
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
}.start();
}
phaser.arriveAndDeregister(); // deregister self, don't wait
}
上面的例子将会执行iterations次。
第三十四章 java中Locks的使用
之前文章中我们讲到,java中实现同步的方式是使用synchronized block。在java 5中,Locks被引入了,来提供更加灵活的同步控制。
本文将会深入的讲解Lock的使用。
Lock和Synchronized Block的区别
我们在之前的Synchronized Block的文章中讲到了使用Synchronized来实现java的同步。既然Synchronized Block那么好用,为什么会引入新的Lock呢?
主要有下面几点区别:
- synchronized block只能写在一个方法里面,而Lock的lock()和unlock()可以分别在不同的方法里面。
- synchronized block 不支持公平锁,一旦锁被释放,任何线程都有机会获取被释放的锁。而使用 Lock APIs则可以支持公平锁。从而让等待时间最长的线程有限执行。
- 使用synchronized block,如果线程拿不到锁,将会被Blocked。 Lock API 提供了一个tryLock() 的方法,可以判断是否可以获得lock,这样可以减少线程被阻塞的时间。
- 当线程在等待synchronized block锁的时候,是不能被中断的。如果使用Lock API,则可以使用 lockInterruptibly()来中断线程。
Lock interface
我们来看下Lock interface的定义, Lock interface定义了下面几个主要使用的方法:
- void lock() – 尝试获取锁,如果获取不到锁,则会进入阻塞状态。
- void lockInterruptibly() – 和lock()很类似,但是它可以将正在阻塞的线程中断,并抛出java.lang.InterruptedException。
- boolean tryLock() – 这是lock()的非阻塞版本,它回尝试获取锁,并立刻返回是否获取成功。
- boolean tryLock(long timeout, TimeUnit timeUnit) – 和tryLock()很像,只是多了一个尝试获取锁的时间。
- void unlock() – unlock实例。
- Condition newCondition() – 生成一个和当前Lock实例绑定的Condition。
在使用Lock的时候,一定要unlocked,以避免死锁。所以,通常我们我们要在try catch中使用:
Lock lock = ...;
lock.lock();
try {
// access to the shared resource
} finally {
lock.unlock();
}
除了Lock接口,还有一个ReadWriteLock接口,在其中定义了两个方法,实现了读锁和写锁分离:
- Lock readLock() – 返回读锁
- Lock writeLock() – 返回写锁
其中读锁可以同时被很多线程获得,只要不进行写操作。写锁同时只能被一个线程获取。
接下来,我们几个Lock的常用是实现类。
ReentrantLock
ReentrantLock是Lock的一个实现,什么是ReentrantLock(可重入锁)呢?
简单点说可重入锁就是当前线程已经获得了该锁,如果该线程的其他方法在调用的时候也需要获取该锁,那么该锁的lock数量+1,并且允许进入该方法。
不可重入锁:只判断这个锁有没有被锁上,只要被锁上申请锁的线程都会被要求等待。实现简单
可重入锁:不仅判断锁有没有被锁上,还会判断锁是谁锁上的,当就是自己锁上的时候,那么他依旧可以再次访问临界资源,并把加锁次数加一。
我们看下怎么使用ReentrantLock:
public void perform() {
lock.lock();
try {
counter++;
} finally {
lock.unlock();
}
}
下面是使用tryLock()的例子:
public void performTryLock() throws InterruptedException {
boolean isLockAcquired = lock.tryLock(, TimeUnit.SECONDS);
if(isLockAcquired) {
try {
counter++;
} finally {
lock.unlock();
}
}
}
ReentrantReadWriteLock
ReentrantReadWriteLock是ReadWriteLock的一个实现。上面也讲到了ReadWriteLock主要有两个方法:
- Read Lock – 如果没有线程获得写锁,那么可以多个线程获得读锁。
- Write Lock – 如果没有其他的线程获得读锁和写锁,那么只有一个线程能够获得写锁。
我们看下怎么使用writeLock:
Map<String,String> syncHashMap = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
Lock writeLock = lock.writeLock();
public void put(String key, String value) {
try {
writeLock.lock();
syncHashMap.put(key, value);
} finally {
writeLock.unlock();
}
}
public String remove(String key){
try {
writeLock.lock();
return syncHashMap.remove(key);
} finally {
writeLock.unlock();
}
}
再看下怎么使用readLock:
Lock readLock = lock.readLock();
public String get(String key){
try {
readLock.lock();
return syncHashMap.get(key);
} finally {
readLock.unlock();
}
}
public boolean containsKey(String key) {
try {
readLock.lock();
return syncHashMap.containsKey(key);
} finally {
readLock.unlock();
}
}
StampedLock
StampedLock也支持读写锁,获取锁的是会返回一个stamp,通过该stamp来进行释放锁操作。
上我们讲到了如果写锁存在的话,读锁是无法被获取的。但有时候我们读操作并不想进行加锁操作,这个时候我们就需要使用乐观读锁。
StampedLock中的stamped类似乐观锁中的版本的概念,当我们在 StampedLock中调用lock方法的时候,就会返回一个stamp,代表锁当时的状态,在乐观读锁的使用过程中,在读取数据之后,我们回去判断该stamp状态是否变化,如果变化了就说明该stamp被另外的write线程修改了,这说明我们之前的读是无效的,这个时候我们就需要将乐观读锁升级为读锁,来重新获取数据。
我们举个例子,先看下write排它锁的情况:
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
再看下乐观读锁的情况:
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
上面使用tryOptimisticRead()来尝试获取乐观读锁,然后通过sl.validate(stamp)来判断该stamp是否被改变,如果改变了,说明之前的read是无效的,那么需要重新来读取。
最后,StampedLock还提供了一个将read锁和乐观读锁升级为write锁的功能:
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x ==.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws !=L) {
stamp = ws;
x = newX;
y = newY;
break;
}
else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
上面的例子是通过使用tryConvertToWriteLock(stamp)来实现升级的。
Conditions
上面讲Lock接口的时候有提到其中的一个方法:
Condition newCondition();
Condition提供了await和signal方法,类似于Object中的wait和notify。
不同的是Condition提供了更加细粒度的等待集划分。我们举个例子:
public class ConditionUsage {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr =;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count ==)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr =;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
上面的例子实现了一个ArrayBlockingQueue,我们可以看到在同一个Lock实例中,创建了两个Condition,分别代表队列未满,队列未空。通过这种细粒度的划分,我们可以更好的控制业务逻辑。
第三十五章 ABA问题的本质及其解决办法
简介
CAS的全称是compare and swap,它是java同步类的基础,java.util.concurrent中的同步类基本上都是使用CAS来实现其原子性的。
CAS的原理其实很简单,为了保证在多线程环境下我们的更新是符合预期的,或者说一个线程在更新某个对象的时候,没有其他的线程对该对象进行修改。在线程更新某个对象(或值)之前,先保存更新前的值,然后在实际更新的时候传入之前保存的值,进行比较,如果一致的话就进行更新,否则失败。
注意,CAS在java中是用native方法来实现的,利用了系统本身提供的原子性操作。
那么CAS在使用中会有什么问题呢?一般来说CAS如果设计的不够完美的话,可能会产生ABA问题,而ABA问题又可以分为两类,我们先看来看一类问题。
更多内容请访问www.flydean.com
第一类问题
我们考虑下面一种ABA的情况:
- 在多线程的环境中,线程a从共享的地址X中读取到了对象A。
- 在线程a准备对地址X进行更新之前,线程b将地址X中的值修改为了B。
- 接着线程b将地址X中的值又修改回了A。
- 最新线程a对地址X执行CAS,发现X中存储的还是对象A,对象匹配,CAS成功。
上面的例子中CAS成功了,但是实际上这个CAS并不是原子操作,如果我们想要依赖CAS来实现原子操作的话可能就会出现隐藏的bug。
第一类问题的关键就在2和3两步。这两步我们可以看到线程b直接替换了内存地址X中的内容。
在拥有自动GC环境的编程语言,比如说java中,2,3的情况是不可能出现的,因为在java中,只要两个对象的地址一致,就表示这两个对象是相等的。
2,3两步可能出现的情况就在像C++这种,不存在自动GC环境的编程语言中。因为可以自己控制对象的生命周期,如果我们从一个list中删除掉了一个对象,然后又重新分配了一个对象,并将其add back到list中去,那么根据 MRU memory allocation算法,这个新的对象很有可能和之前删除对象的内存地址是一样的。这样就会导致ABA的问题。
第二类问题
如果我们在拥有自动GC的编程语言中,那么是否仍然存在CAS问题呢?
考虑下面的情况,有一个链表里面的数据是A->B->C,我们希望执行一个CAS操作,将A替换成D,生成链表D->B->C。考虑下面的步骤:
- 线程a读取链表头部节点A。
- 线程b将链表中的B节点删掉,链表变成了A->C
- 线程a执行CAS操作,将A替换成D。
最后我们的到的链表是D->C,而不是D->B->C。
问题出在哪呢?CAS比较的节点A和最新的头部节点是不是同一个节点,它并没有关心节点A在步骤1和3之间是否内容发生变化。
我们举个例子:
public void useABAReference(){
CustUser a= new CustUser();
CustUser b= new CustUser();
CustUser c= new CustUser();
AtomicReference<CustUser> atomicReference= new AtomicReference<>(a);
log.info("{}",atomicReference.compareAndSet(a,b));
log.info("{}",atomicReference.compareAndSet(b,a));
a.setName("change for new name");
log.info("{}",atomicReference.compareAndSet(a,c));
}
上面的例子中,我们使用了AtomicReference的CAS方法来判断对象是否发生变化。在CAS b和a之后,我们将a的name进行了修改,我们看下最后的输出结果:
[main] INFO com.flydean.aba.ABAUsage - true
[main] INFO com.flydean.aba.ABAUsage - true
[main] INFO com.flydean.aba.ABAUsage - true
三个CAS的结果都是true。说明CAS确实比较的两者是否为统一对象,对其中内容的变化并不关心。
第二类问题可能会导致某些集合类的操作并不是原子性的,因为你并不能保证在CAS的过程中,有没有其他的节点发送变化。
第一类问题的解决
第一类问题在存在自动GC的编程语言中是不存在的,我们主要看下怎么在C++之类的语言中解决这个问题。
根据官方的说法,第一类问题大概有四种解法:
- 使用中间节点 – 使用一些不代表任何数据的中间节点来表示某些节点是标记被删除的。
- 使用自动GC。
- 使用hazard pointers – hazard pointers 保存了当前线程正在访问的节点的地址,在这些hazard pointers中的节点不能够被修改和删除。
- 使用read-copy update (RCU) – 在每次更新的之前,都做一份拷贝,每次更新的是拷贝出来的新结构。
第二类问题的解决
第二类问题其实算是整体集合对象的CAS问题了。一个简单的解决办法就是每次做CAS更新的时候再添加一个版本号。如果版本号不是预期的版本,就说明有其他的线程更新了集合中的某些节点,这次CAS是失败的。
我们举个AtomicStampedReference的例子:
public void useABAStampReference(){
Object a= new Object();
Object b= new Object();
Object c= new Object();
AtomicStampedReference<Object> atomicStampedReference= new AtomicStampedReference(a,);
log.info("{}",atomicStampedReference.compareAndSet(a,b,,1));
log.info("{}",atomicStampedReference.compareAndSet(b,a,,2));
log.info("{}",atomicStampedReference.compareAndSet(a,c,,1));
}
AtomicStampedReference的compareAndSet方法,多出了两个参数,分别是expectedStamp和newStamp,两个参数都是int型的,需要我们手动传入。
总结
ABA问题其实是由两类问题组成的,需要我们分开来对待和解决。
第三十六章 并发和Read-copy update(RCU)
简介
在上一篇文章中的并发和ABA问题的介绍中,我们提到了要解决ABA中的memory reclamation问题,有一个办法就是使用RCU。
详见ABA问题的本质及其解决办法,今天本文将会深入的探讨一下RCU是什么,RCU和COW(Copy-On-Write)之间的关系。
RCU(Read-copy update)是一种同步机制,并在2002年被加入了Linux内核中。它的优点就是可以在更新的过程中,运行多个reader进行读操作。
熟悉锁的朋友应该知道,对于排它锁,同一时间只允许一个操作进行,不管这个操作是读还是写。
对于读写锁,可以允许同时读,但是不能允许同时写,并且这个写锁是排他的,也就是说写的同时是不允许进行读操作的。
RCU可以支持一个写操作和多个读操作同时进行。
更多内容请访问www.flydean.com
Copy on Write和RCU
什么是Copy on Write? 它和read copy update有什么关系呢?
我们把Copy on Write简写为COW,COW是并发中经常会用到的一种算法,java里面就有java.util.concurrent.CopyOnWriteArrayList和java.util.concurrent.CopyOnWriteArraySet。
COW的本质就是,在并发的环境中,如果想要更新某个对象,首先将它拷贝一份,在这个拷贝的对象中进行修改,最后把指向原对象的指针指回更新好的对象。
CopyOnWriteArrayList和CopyOnWriteArraySet中的COW使用在遍历的时候。
我们知道使用Iterator来遍历集合的时候,是不允许在Iterator外部修改集合的数据的,只能在Iterator内部遍历的时候修改,否则会抛出ConcurrentModificationException。
而对于CopyOnWriteArrayList和CopyOnWriteArraySet来说,在创建Iterator的时候,就对原List进行了拷贝,Iterator的遍历是在拷贝过后的List中进行的,这时候如果其他的线程修改了原List对象,程序正常执行,不会抛出ConcurrentModificationException。
同时CopyOnWriteArrayList和CopyOnWriteArraySet中的Iterator是不支持remove,set,add方法的,因为这是拷贝过来的对象,在遍历过后是要被丢弃的。在它上面的修改是没有任何意义的。
在并发情况下,COW其实还有一个问题没有处理,那就是对于拷贝出来的对象什么时候回收的问题,是不是可以马上将对象回收?有没有其他的线程在访问这个对象? 处理这个问题就需要用到对象生命周期的跟踪技术,也就是RCU中的RCU-sync。
所以RCU和COW的关系就是:RCU是由RCU-sync和COW两部分组成的。
因为java中有自动垃圾回收功能,我们并不需要考虑拷贝对象的生命周期问题,所以在java中我们一般只看到COW,看不到RCU。
RCU的流程和API
我们将RCU和排它锁和读写锁进行比较。
对于排它锁来说,需要这两个API:
lock()
unlock()
对于读写锁来说,需要这四个API:
read_lock()
read_unlock()
write_lock()
write_unlock()
而RCU需要下面三个API:
rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
rcu_read_lock和rcu_read_unlock必须是成对出现的,并且synchronize_rcu不能出现在rcu_read_lock和rcu_read_unlock之间。
虽然RCU并不提供任何排他锁,但是RCU必须要满足下面的两个条件:
- 如果Thread1(T1)中synchronize_rcu方法在Thread2(T2)的rcu_read_lock方法之前返回,则happens before synchronize_rcu的操作一定在T2的rcu_read_lock方法之后可见。
- 如果T2的rcu_read_lock方法调用在T1的synchronize_rcu方法调用之前,则happens after synchronize_rcu的操作一定在T2的rcu_read_unlock方法之前不可见。
听起来很拗口,没关系,我们画个图来理解一下:
记住RCU比较的是synchronize_rcu和rcu_read_lock的顺序。
Thread2和Thread3中rcu_read_lock在synchronize_rcu之前执行,则b=2在T2,T3中一定不可见。
Thread4中rcu_read_lock虽然在synchronize_rcu启动之后才开始执行的,但是rcu_read_unlock是在synchronize_rcu返回之后才执行的,所以可以等同于看做Thread5的情况。
Thread5中,rcu_read_lock在synchronize_rcu返回之后才执行的,所以a=1一定可见。
RCU要注意的事项
RCU虽然没有提供锁的机制,但允许同时多个线程进行读操作。注意,RCU同时只允许一个synchronize_rcu操作,所以需要我们自己来实现synchronize_rcu的排它锁操作。
所以对于RCU来说,它是一个写多个读的同步机制,而不是多个写多个读的同步机制。
RCU的java实现
最后放上一段大神的RCU的java实现代码:
public class RCU {
final static long NOT_READING = Long.MAX_VALUE;
final static int MAX_THREADS =;
final AtomicLong reclaimerVersion = new AtomicLong();
final AtomicLongArray readersVersion = new AtomicLongArray(MAX_THREADS);
public RCU() {
for (int i=; i < MAX_THREADS; i++) readersVersion.set(i, NOT_READING);
}
public static int getTID() {
return (int)(Thread.currentThread().getId() % MAX_THREADS);
}
public void read_lock(final int tid) { // rcu_read_lock()
final long rv = reclaimerVersion.get();
readersVersion.set(tid, rv);
final long nrv = reclaimerVersion.get();
if (rv != nrv) readersVersion.lazySet(tid, nrv);
}
public void read_unlock(final int tid) { // rcu_read_unlock()
readersVersion.set(tid, NOT_READING);
}
public void synchronize_rcu() {
final long waitForVersion = reclaimerVersion.incrementAndGet();
for (int i=; i < MAX_THREADS; i++) {
while (readersVersion.get(i) < waitForVersion) { } // spin
}
}
}
简单讲解一下这个RCU的实现:
readersVersion是一个长度为128的Long数组,里面存放着每个reader的读数。默认情况下reader存储的值是NOT_READING,表示未存储任何数据。
在RCU初始化的时候,将会初始化这些reader。
read_unlock方法会将reader的值重置为NOT_READING。
reclaimerVersion存储的是修改的数据,它的值将会在synchronize_rcu方法中进行更新。
同时synchronize_rcu将会遍历所有的reader,只有当所有的reader都读取完毕才继续执行。
最后,read_lock方法将会读取reclaimerVersion的值。这里会读取两次,如果两次的结果不同,则会调用readersVersion.lazySet方法,延迟设置reader的值。
为什么要读取两次呢?因为虽然reclaimerVersion和readersVersion都是原子性操作,但是在多线程环境中,并不能保证reclaimerVersion一定就在readersVersion之前执行,所以我们需要添加一个内存屏障:memory barrier来实现这个功能。
总结
本文介绍了RCU算法和应用。希望大家能够喜欢。
第三十七章 同步类的基础AbstractQueuedSynchronizer(AQS)
我们之前介绍了很多同步类,比如ReentrantLock,Semaphore, CountDownLatch, ReentrantReadWriteLock,FutureTask等。
AQS封装了实现同步器时设计的大量细节问题。他提供了FIFO的wait queues并且提供了一个int型的state表示当前的状态。
根据JDK的说明,并不推荐我们直接使用AQS,我们通常需要构建一个内部类来继承AQS并按照需要重写下面几个方法:
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
在这些方法中,我们需要调用getState, setState 或者 compareAndSetState这三种方法来改变state值。
上面的方法提到了两种操作,独占操作(如:ReentrantLock)和共享操作(如:Semaphore,CountdownLatch)。
两种的区别在于同一时刻能否有多个线程同时获取到同步状态。
比如我们运行同时多个线程去读,但是同时只允许一个线程去写,那么这里的读锁就是共享操作,而写锁就是独占操作。
在基于QAS构建的同步类中,最基本的操作就是获取操作和释放操作。而这个state就表示的是这些获取和释放操作所依赖的值。
State是一个int值,你可以使用它来表示任何状态,比如ReentrantLock用它来表示所有者线程重复获取该锁的次数。Semaphore用它来表示剩余的许可量,而FutureTask用它来表示任务的状态(开始,运行,完成或者取消)。当然你还可以自定义额外的状态变量来表示其他的信息。
下的伪代码表示的是AQS中获取和释放操作的形式:
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release:
if (tryRelease(arg))
unblock the first queued thread;
获取操作,首先判断当前状态是否允许获取操作,如果如果不允许,则将当前的线程入Queue,并且有可能阻塞当前线程。
释放操作,则先判断是否运行释放操作,如果允许,则解除queue中的thread,并运行。
我们看一个具体的实现:
public class AQSUsage {
private final Sync sync= new Sync();
private class Sync extends AbstractQueuedSynchronizer{
protected int tryAcquireShared(int ignored){
return (getState() == )? 1: -1;
}
protected boolean tryReleaseShared(int ignored){
setState();
return true;
}
}
public void release() {
sync.releaseShared();
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly();
}
}
上面的例子中,我们定义了一个内部类Sync,在这个类中我们实现了tryAcquireShared和tryReleaseShared两个方法,在这两个方法中我们判断并设置了state的值。
sync.releaseShared和sync.acquireSharedInterruptibly会分别调用tryAcquireShared和tryReleaseShared方法。
前面我们也提到了很多同步类都是使用AQS来实现的,我们可以再看看其他标准同步类中tryAcquire的实现。
首先看下ReentrantLock:
final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c ==) {
if (compareAndSetState(, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc <) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantLock只支持独占锁。所以它需要实现tryAcquire方法。除此之外它还维护了一个owner变量来保存当前所有者线程的标志符,从而来实现可重入锁。
我们再看下Semaphore和CountDownLatch的实现,因为他们是共享操作,所以需要实现tryAcqureShared方法:
final int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < ||
compareAndSetState(available, remaining))
return remaining;
}
}
第三十八章 java并发Exchanger的使用
简介
Exchanger是java 5引入的并发类,Exchanger顾名思义就是用来做交换的。这里主要是两个线程之间交换持有的对象。当Exchanger在一个线程中调用exchange方法之后,会等待另外的线程调用同样的exchange方法。
两个线程都调用exchange方法之后,传入的参数就会交换。
类定义
public class Exchanger<V>
其中V表示需要交换的对象类型。
类继承
java.lang.Object
↳ java.util.concurrent.Exchanger<V>
Exchanger直接继承自Object。
构造函数
Exchanger()
Exchanger提供一个无参构造函数。
两个主要方法
- public V exchange(V x) throws InterruptedException
当这个方法被调用的时候,当前线程将会等待直到其他的线程调用同样的方法。当其他的线程调用exchange之后,当前线程将会继续执行。
在等待过程中,如果有其他的线程interrupt当前线程,则会抛出InterruptedException。
- public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
和第一个方法类似,区别是多了一个timeout时间。如果在timeout时间之内没有其他线程调用exchange方法,则会抛出TimeoutException。
具体的例子
我们先定义一个带交换的类:
@Data
public class CustBook {
private String name;
}
然后定义两个Runnable,在run方法中调用exchange方法:
@Slfj
public class ExchangerOne implements Runnable{
Exchanger<CustBook> ex;
ExchangerOne(Exchanger<CustBook> ex){
this.ex=ex;
}
@Override
public void run() {
CustBook custBook= new CustBook();
custBook.setName("book one");
try {
CustBook exhangeCustBook=ex.exchange(custBook);
log.info(exhangeCustBook.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Slfj
public class ExchangerTwo implements Runnable{
Exchanger<CustBook> ex;
ExchangerTwo(Exchanger<CustBook> ex){
this.ex=ex;
}
@Override
public void run() {
CustBook custBook= new CustBook();
custBook.setName("book two");
try {
CustBook exhangeCustBook=ex.exchange(custBook);
log.info(exhangeCustBook.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
最后在主方法中调用:
public class ExchangerUsage {
public static void main(String[] args) {
Exchanger<CustBook> exchanger = new Exchanger<>();
// Starting two threads
new Thread(new ExchangerOne(exchanger)).start();
new Thread(new ExchangerTwo(exchanger)).start();
}
}
我们看下结果:
:14:09.069 [Thread-1] INFO com.flydean.ExchangerTwo - book one
:14:09.073 [Thread-0] INFO com.flydean.ExchangerOne - book two
可以看到对象已经被交换了。
结语
Exchanger在两个线程需要交换对象的时候非常好用。大家可以在实际工作生活中使用。