本文重点是从源码层面理解jdk8中 线程池 的实现。
核心机制
再分析源码之前,我们还是先回顾和熟悉下 线程 的核心工作机制。
线程池工作原理
线程池采用的是一种生产者-消费者的模型,如下图:
- 主线程调用execute、或者submit等方法提交任务给线程池。
- 如果线程池中正在运行的工作线程数量小于corePoolSize(核心线程数量),那么马上创建线程运行这个任务。
- 如果线程池中正在运行的工作线程数量大于或等于 corePoolSize(核心线程数量),那么将这个任务放入队列,稍后执行。
- 如果这时队列满了且正在运行的工作线程数量还小于 maximumPoolSize(最大线程数量),那么会创建非核心工作线程立刻运行这个任务,这部分非核心工作线程空闲超过一定的时间(keepAliveTime)时,就会被销毁回收。
- 如果最终提交的任务超过了maximumPoolSize(最大线程数量),那么就会执行拒绝策略。
线程池状态
线程池的状态有5种,他们的状态转换如上图所示,这里记得区别线程的状态,它们不是一回事。
ThreadPoolExecutor 类存放线程池的状态信息很特别,是存储在一个int类型原子变量的高3位,而低29位用来存储线程池当前运行的线程数量。通过将线程池的状态和线程数量合二为一,可以做到一次CAS原子操作更新数据。
状态 | 高3位值 | 说明 |
RUNNING | 111 | 运行状态,线程池被创建后的初始状态,能接受新提交的任务,也能处理阻塞队列中的任务。 |
SHUTDOWN | 000 | 关闭状态,不再接受新提交的任务,但任可以处理阻塞队列中的任务。 |
STOP | 001 | 停止状态,会中断正在处理的线程,不能接受新提交的任务,也不会处理阻塞队列中的任务。 |
TIDYING | 010 | 所有任务都已经终止,有效工作线程为0。 |
TERMINATED | 011 | 终止状态,线程池彻底终止。 |
源码解析
上图是线程池核心类ThreadPoolExecutor的类结构图:
- Executor : 提交任务的基础接口,只有一个execute方法。
- ExecutorService: 继承自Executor,它提供管理终止的方法,以及可以产生Future的方法,用于跟踪一个或多个异步任务的进度。
- AbstractExecutorService: 提供ExecutorService执行方法的默认实现。
- ThreadPoolExecutor: 线程池类本类,实现了线程池的核心逻辑。
- Worker: ThreadPoolExecutor的内部类,工作线程类,继承自 AQS。
- Policy: 其他Policy结尾的都是内置的决策策略类。
关键成员变量
1.线程池的状态信息和线程数量信息(ctl)相关
- 线程的状态信息和数量信息用同一个int的原子变量存储,高3位存储状态信息,低29位存储线程数量。
// ctl,原子变量,存储状态和线程数量,初始化运行状态+ | |
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,)); | |
// 静态常量,表示线程数量存放的位数=32-3 | |
private static final int COUNT_BITS = Integer.SIZE - 3; | |
// 线程数量最大的容量,低 COUNT_BITS 位所能表达的最大数值, 11111111111111111111 => 5亿多 | |
private static final int CAPACITY = ( << COUNT_BITS) - 1; |
- 通过 位运算符 设置各个状态的高三位值。
// 000000000000000000,转换成整数后其实就是一个【负数】 | |
private static final int RUNNING = - << COUNT_BITS; | |
// 000000000000000000 | |
private static final int SHUTDOWN = << COUNT_BITS; | |
// 000000000000000000 | |
private static final int STOP = << COUNT_BITS; | |
// 000000000000000000 | |
private static final int TIDYING = << COUNT_BITS; | |
// 000000000000000000 | |
private static final int TERMINATED = << COUNT_BITS; |
- 从ctl中获取线程池的状态值
// ~CAPACITY = ~ 11111111111111111111 = 111 000000000000000000000(取反) | |
// & 运算符 ,和1&是它本身,和0&就是0,就可以获得高位值。 | |
private static int runStateOf(int c) { return c & ~CAPACITY; } |
- 从ctl中获取线程池的数量
// CAPACITY = 11111111111111111111 | |
// &运算符,和&是它本身,和0&就是0,就可以获得低29位 | |
private static int workerCountOf(int c) { return c & CAPACITY; } |
- 生成ctl值
// rs 表示线程池状态,wc 表示当前线程池中 worker(线程)数量,相与以后就是合并后的状态 | |
private static int ctlOf(int rs, int wc) { return rs | wc; } |
- 比较当前线程池 ctl 所表示的状态
线程池状态值的大小关系:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
// 比较当前线程池 ctl 所表示的状态,是否小于某个状态 s | |
private static boolean runStateLessThan(int c, int s) { return c < s; } | |
// 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态s | |
private static boolean runStateAtLeast(int c, int s) { return c >= s; } | |
// 小于 SHUTDOWN 的一定是 RUNNING,SHUTDOWN == | |
private static boolean isRunning(int c) { return c < SHUTDOWN; } |
- cas设置ctl的值
// 使用 CAS 方式 让 ctl 值 + ,成功返回 true, 失败返回 false | |
private boolean compareAndIncrementWorkerCount(int expect) { | |
return ctl.compareAndSet(expect, expect +); | |
} | |
// 使用 CAS 方式 让 ctl 值 - ,成功返回 true, 失败返回 false | |
private boolean compareAndDecrementWorkerCount(int expect) { | |
return ctl.compareAndSet(expect, expect -); | |
} | |
// 将 ctl 值减一, do while 循环会一直重试,直到成功为止 | |
private void decrementWorkerCount() { | |
do {} while (!compareAndDecrementWorkerCount(ctl.get())); | |
} |
2.线程池中的队列
// 线程池用于保存任务并将任务传递给工作线程的队列 | |
private final BlockingQueue<Runnable> workQueue; |
3.控制并发的锁
// 增加减少 worker 或者时修改线程池运行状态需要持有 mainLock | |
private final Reentrant lock mainLock = new ReentrantLock(); |
4.线程池中工作线程的集合
private final HashSet<Worker> workers = new HashSet<Worker>();
5.线程池构造参数关系属性
// 核心线程数量 | |
private volatile int corePoolSize; | |
// 线程池最大线程数量 | |
private volatile int maximumPoolSize; | |
// 空闲线程存活时间 | |
private volatile long keepAliveTime; | |
// 创建线程时使用的线程工厂,默认是 DefaultThreadFactory | |
private volatile ThreadFactory threadFactory; | |
// 【超过核心线程提交任务就放入 阻塞队列】 | |
private final BlockingQueue<Runnable> workQueue; | |
// 拒绝策略 | |
private volatile RejectedExecutionHandler handler; |
6.线程池监控相关属性
// 记录线程池 生命周期 内线程数最大值 | |
private int largestPoolSize; | |
// 记录线程池所完成任务总数,当某个 worker 退出时将完成的任务累加到该属性 | |
private long completedTaskCount; |
线程提交原理
线程池提交线程有多种方式如execute、submit或者invoke相关方法,我们重点关注在最基础的execute()方法提交任务,把它搞清楚了,其他的都不在话下。
1.execute(Runnable command)方法是线程提交的入口方法。
// ThreadPoolExecutor#execute | |
public void execute(Runnable command) { | |
// 如果任务为空,直接抛空指针 | |
if (command == null) | |
throw new NullPointerException(); | |
// 获取ctl的值,其中高位是状态信息,低3位是线程数量 | |
int c = ctl.get(); | |
// workerCountOf获取当前线程的数量 | |
// 当前线程数量小于核心线程数,调用addWorker创建一个工作线程 | |
if (workerCountOf(c) < corePoolSize) { | |
// 调用addWorker方法创建工作线程,直接执行任务。如果成功的话,直接结束方法。 | |
if (addWorker(command, true)) | |
return; | |
// 由于并发等原因,addWorker添加失败,会走到这里,再次获取ctl的值 | |
c = ctl.get(); | |
} | |
// 如果线程池是运行状态的话,就把任务加入到队列中 | |
if (isRunning(c) && workQueue.offer(command)) { | |
// 双重检查,因为从上次检查到进入此方法,线程池可能已成为 SHUTDOWN 状态 | |
int recheck = ctl.get(); | |
// 如果发现线程池不是运行状态的话,那就移除这个任务 | |
if (!isRunning(recheck) && remove(command)) | |
// 任务出队成功,走拒绝策略 | |
reject(command); | |
// 执行到这说明线程池是 running 状态,获取线程池中的线程数量,判断是否是 | |
// 【担保机制】,保证线程池在 running 状态下,最起码得有一个线程在工作 | |
else if (workerCountOf(recheck) ==) | |
addWorker(null, false); | |
} | |
// 走到这里说明线程不是运行状态,或者就是队列满了,offer返回false | |
// 再次调用addWoker创建新的线程,如果不成功(一般是超过了线程池最大线程数量),执行拒绝策略 | |
else if (!addWorker(command, false)) | |
// 执行拒绝策略 | |
reject(command); | |
} |
这个方法是提交线程的主干逻辑:
- 提交一个任务时,如果运行的线程少于corePoolSize,通过调用addWorker添加一个工作线程,直接开始运行。
- 如果工作线程大于等于corePoolSize,并且前面addWorker失败时,需要将任务加入到队列中,加入成功后,做了一层双重校验,因为这个过程可能线程池状态发生变化了,如果已经关闭,那么要移除刚刚加入的这个任务。
- 如果加入队列失败,说明队列满了,这时候调用addWorker方法再次创建线程,如果返回false,有可能是超过最大线程数量了,那么就执行拒绝策略。
2.addWorker方法也是一个很关键的方法, 添加线程到线程池,返回 true 表示创建 Worker 成功,且启动线程。
// ThreadPoolExecutor#addWorker | |
// core == true 表示采用核心线程数量限制,false 表示采用 maximumPoolSize | |
private boolean addWorker(Runnable firstTask, boolean core) { | |
// 自旋【判断当前线程池状态是否允许创建线程】,允许就设置线程数量 + | |
retry: | |
for (;;) { | |
// 获取 ctl 的值 | |
int c = ctl.get(); | |
// 获取当前线程池运行状态 | |
int rs = runStateOf(c); | |
// 判断当前线程池状态【是否允许添加线程】 | |
// 如果线程池状态大于SHUTDOWN 或者是SHUTDOWN状态,队列是空了的话,都不允许创建新的线程 | |
if (rs >= SHUTDOWN && | |
! (rs == SHUTDOWN && | |
firstTask == null && | |
! workQueue.isEmpty())) | |
// false,没有创建线程 | |
return false; | |
// 再次自旋 | |
for (;;) { | |
// 获取线程池中线程数量 | |
int wc = workerCountOf(c); | |
// 如果线程数量超过 阈值 的话,返回false | |
if (wc >= CAPACITY || | |
wc >= (core ? corePoolSize : maximumPoolSize)) | |
return false; | |
// 记录线程数量已经加,类比于申请到了一块令牌,条件失败说明其他线程修改了数量 | |
if (compareAndIncrementWorkerCount(c)) | |
// 申请成功,跳出了 retry 这个 for 自旋 | |
break retry; | |
// CAS 失败,没有成功的申请到令牌 | |
c = ctl.get(); | |
// 判断当前线程池状态是否发生过变化,被其他线程修改了,可能其他线程调用了 shutdown() 方法 | |
if (runStateOf(c) != rs) | |
// 重新回到retry的执行点 | |
continue retry; | |
// else CAS failed due to workerCount change; retry inner loop | |
} | |
} | |
// 下面开始真正创建线程了 | |
// 运行标记,表示创建的 worker 是否已经启动,false未启动 true启动 | |
boolean workerStarted = false; | |
// 添加标记,表示创建的 worker 是否添加到池子中了,默认false未添加,true是添加。 | |
boolean workerAdded = false; | |
Worker w = null; | |
try { | |
//【创建 Worker,底层通过线程工厂 newThread 方法创建执行线程,指定了首先执行的任务】 | |
w = new Worker(firstTask); | |
// 将新创建的 worker 节点中的线程赋值给 t | |
final Thread t = w.thread; | |
// 这里的判断为了防止 程序员 自定义的 ThreadFactory 实现类有 bug,创造不出线程 | |
if (t != null) { | |
final ReentrantLock mainLock = this.mainLock; | |
// 加互斥锁,要添加 worker 了 | |
mainLock.lock(); | |
try { | |
// 获取最新线程池运行状态 | |
int rs = runStateOf(ctl.get()); | |
// 判断线程池是否为RUNNING状态,不是再【判断当前是否为SHUTDOWN状态且firstTask为空,特殊情况】 | |
if (rs < SHUTDOWN || | |
(rs == SHUTDOWN && firstTask == null)) { | |
// 当线程start后,线程isAlive会返回true,这里还没开始启动线程,如果被启动了就需要报错 | |
if (t.isAlive()) | |
throw new IllegalThreadState Exception (); | |
//将新建的 Worker 添加到线程池中 | |
workers.add(w); | |
int s = workers.size(); | |
// 当前池中的线程数量是一个新高,更新 largestPoolSize | |
if (s > largestPoolSize) | |
largestPoolSize = s; | |
// 添加标记置为 true | |
workerAdded = true; | |
} | |
} finally { | |
mainLock.unlock(); | |
} | |
// 添加成功就【启动线程执行任务】 | |
if (workerAdded) { | |
// 启动线程 | |
t.start(); | |
// 运行标记置为 true | |
workerStarted = true; | |
} | |
} | |
} finally { | |
// 线程启动失败 | |
if (! workerStarted) | |
// 清理工作,比如从线程池中移除。 | |
addWorkerFailed(w); | |
} | |
return workerStarted; | |
} | |
private void addWorkerFailed(Worker w) { | |
final ReentrantLock mainLock = this.mainLock; | |
// 持有线程池全局锁,因为操作的是线程池相关的东西 | |
mainLock.lock(); | |
try { | |
//条件成立需要将 worker 在 workers 中清理出去。 | |
if (w != null) | |
workers.remove(w); | |
// 将线程池计数 -,相当于归还令牌。 | |
decrementWorkerCount(); | |
// 尝试停止线程池 | |
tryTerminate(); | |
} finally { | |
//释放线程池全局锁。 | |
mainLock.unlock(); | |
} | |
} |
- 这里注意一个点,SHUTDOWN 状态也能添加线程,但是要求新加的 Woker 没有 firstTask,而且当前 queue 不为空,所以创建一个线程来帮助线程池执行队列中的任务。
Woker运行原理
Woker类是ThreadPoolExecutor类的内部类,见明知意,它是承担了一个“工人”干活,也就是工作线程的责任。
1.Worker类
每个 Worker 对象有一个初始任务,启动 Worker 时优先执行,这也是造成线程池不公平的原因。Worker 继承自 AQS,本身具有锁的特性,采用独占锁模式,state = 0 表示未被占用,> 0 表示被占用,< 0 表示初始状态不能被抢锁。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { | |
// worker 内部封装的工作线程 | |
final Thread thread; | |
// worker 第一个执行的任务,普通的 Runnable 实现类或者是 FutureTask | |
Runnable firstTask; | |
// 记录当前 worker 所完成任务数量 | |
volatile long completedTasks; | |
// 构造方法 | |
Worker(Runnable firstTask) { | |
// 设置AQS独占模式为初始化中状态,这个状态不能被抢占锁 | |
setState(-); | |
// firstTask不为空时,当worker启动后,内部线程会优先执行firstTask,执行完后会到queue中去获取下个任务 | |
this.firstTask = firstTask; | |
// 使用线程工厂创建一个线程,并且【将当前worker指定为Runnable】,所以thread启动时会调用 worker.run() | |
this.thread = getThreadFactory().newThread(this); | |
} | |
// 不可重入锁,重写了AQS中的方法 | |
protected boolean tryAcquire(int unused) { | |
if (compareAndSetState(, 1)) { | |
setExclusiveOwnerThread(Thread.currentThread()); | |
return true; | |
} | |
return false; | |
} | |
protected boolean tryRelease(int unused) { | |
setExclusiveOwnerThread(null); | |
// 设置state为,开始抢锁 | |
setState(); | |
return true; | |
} | |
} |
2.Worker的工作方法run
// Worker#run | |
public void run() { | |
// 调用自身的runWoker方法 | |
runWorker(this); | |
} | |
// Worker#runWorker | |
final void runWorker(Worker w) { | |
Thread wt = Thread.currentThread(); | |
// 获取 worker 的 firstTask | |
Runnable task = w.firstTask; | |
// 引用置空,【防止复用该线程时重复执行该任务】 | |
w.firstTask = null; | |
// 初始化 worker 时设置 state = -,表示不允许抢占锁 | |
// 这里需要设置 state = 和 exclusiveOwnerThread = null,开始独占模式抢锁 | |
w.unlock(); | |
// true 表示发生异常退出,false 表示正常退出。 | |
boolean completedAbruptly = true; | |
try { | |
// firstTask 不是 null 就直接运行,否则去 queue 中获取任务 | |
while (task != null || (task = getTask()) != null) { | |
// worker 加锁,shutdown 时会判断当前 worker 状态,【根据独占锁状态判断是否空闲】 | |
w.lock(); | |
// 说明线程池状态大于 STOP,目前处于 STOP/TIDYING/TERMINATION,此时给线程一个中断信号 | |
if ((runStateAtLeast(ctl.get(), STOP) || | |
(Thread.interrupted() && | |
runStateAtLeast(ctl.get(), STOP))) && | |
// 线程不是处于中断的情况 | |
!wt.isInterrupted()) | |
// 中断线程,设置线程的中断标志位为 true | |
wt.interrupt(); | |
try { | |
// 任务执行前的回调,空实现,可以在子类中自定义 | |
beforeExecute(wt, task); | |
Throwable thrown = null; | |
try { | |
// 真正执行任务 | |
task.run(); | |
} catch (RuntimeException x) { | |
thrown = x; throw x; | |
} catch (Error x) { | |
thrown = x; throw x; | |
} catch (Throwable x) { | |
thrown = x; throw new Error(x); | |
} finally { | |
// 钩子方法,【任务执行的后置处理】 | |
afterExecute(task, thrown); | |
} | |
} finally { | |
// 将局部变量task置为null,代表任务执行完成 | |
task = null; | |
// 更新worker完成任务数量 | |
w.completedTasks++; | |
// 解锁 | |
w.unlock(); | |
} | |
} | |
// getTask()方法返回null时会走到这里,表示queue为空并且线程空闲超过保活时间,【当前线程执行退出逻辑】 | |
completedAbruptly = false; | |
} finally { | |
// 正常退出 completedAbruptly = false | |
// 异常退出 completedAbruptly = true,【从 task.run() 内部抛出异常】时,跳到这一行 | |
processWorkerExit(w, completedAbruptly); | |
} | |
} |
3.getTask() 获取任务
这个方法主要做了下面几件事情:
- 从阻塞队列中获取任务
- 如果当前线程空闲时间超过 keepAliveTime 就会被回收,主要通过调用队列的超时获取接口poll(long timeout, TimeUnit unit)实现。
private Runnable getTask() { | |
// 超时标记,表示当前线程获取任务是否超时,true 表示已超时 | |
boolean timedOut = false; | |
for (;;) { | |
int c = ctl.get(); | |
// 获取线程池当前运行状态 | |
int rs = runStateOf(c); | |
// 如果发现线程池被关闭了,直接返回null | |
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { | |
// 使用 CAS 自旋的方式让 ctl 值 - | |
decrementWorkerCount(); | |
return null; | |
} | |
// 获取线程池中的线程数量 | |
int wc = workerCountOf(c); | |
//timed用来判断当前线程是否超过一定时间没有获取任务就进行销毁回收,true是需要,false不需要, 有两种情况 | |
//. allowCoreThreadTimeOut为true代表允许回收核心线程,那就无所谓了,全部线程都执行超时回收 | |
//. 线程数量大于核心线程数,当前线程认为是非核心线程 | |
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; | |
// 同时满足下面和2条件下,说明线程要回收,直接返回null | |
//. 如果线程数量超过最大线程数 或者 上面的timed和超时时间timedOut都为true | |
if ((wc > maximumPoolSize || (timed && timedOut)) | |
//.如果线程数量大于1并且队列时空的情况 | |
&& (wc > || workQueue.isEmpty())) { | |
// 使用 CAS 机制将 ctl 值 - ,减 1 成功的线程,返回 null,代表可以退出 | |
if (compareAndDecrementWorkerCount(c)) | |
return null; | |
continue; | |
} | |
try { | |
// 从队列中获取任务,有下面两种方法 | |
// timed为true, 调用超时方法poll获取任务 | |
// timed为false,调用阻塞方法take获取 | |
Runnable r = timed ? | |
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : | |
workQueue.take(); | |
if (r != null) | |
return r; | |
获取任务为 null 说明超时了,将超时标记设置为 true,进入下一次循环,就可以销毁这个线程了 | |
timedOut = true; | |
} catch (InterruptedException retry) { | |
// 阻塞线程被打断后超时标记置为 false,【说明被打断不算超时】,要继续获取,直到超时或者获取到任务 | |
// 如果线程池 SHUTDOWN 状态下的打断,会在循环获取任务前判断,返回 null | |
timedOut = false; | |
} | |
} | |
} |
4.processWorkerExit()工作线程退出方法
// 正常退出 completedAbruptly = false,异常退出为 true | |
private void processWorkerExit(Worker w, boolean completedAbruptly) { | |
// 条件成立代表当前 worker 是发生异常退出的,task 任务执行过程中向上抛出异常了 | |
if (completedAbruptly) | |
// 从异常时到这里 ctl 一直没有 -,需要在这里 -1 | |
decrementWorkerCount(); | |
final ReentrantLock mainLock = this.mainLock; | |
// 加锁 | |
mainLock.lock(); | |
try { | |
// 将当前 worker 完成的 task 数量,汇总到线程池的 completedTaskCount | |
completedTaskCount += w.completedTasks; | |
// 将 worker 从线程池中移除 | |
workers.remove(w); | |
} finally { | |
mainLock.unlock();// 解锁 | |
} | |
// 尝试停止线程池,唤醒下一个线程 | |
tryTerminate(); | |
int c = ctl.get(); | |
// 线程池不是停止状态就应该有线程运行【担保机制】 | |
if (runStateLessThan(c, STOP)) { | |
// 正常退出的逻辑,是对空闲线程回收,不是执行出错 | |
if (!completedAbruptly) { | |
// 根据是否回收核心线程确定【线程池中的线程数量最小值】 | |
int min = allowCoreThreadTimeOut ? : corePoolSize; | |
// 最小值为,但是线程队列不为空,需要一个线程来完成任务担保机制 | |
if (min == && !workQueue.isEmpty()) | |
min =; | |
// 线程池中的线程数量大于最小值可以直接返回 | |
if (workerCountOf(c) >= min) | |
return; | |
} | |
// 执行 task 时发生异常,有个线程因为异常终止了,需要添加 | |
// 或者线程池中的数量小于最小值,这里要创建一个新 worker 加进线程池 | |
addWorker(null, false); | |
} | |
} |
总结
本文主要从源码层面分析了线程池的运行机理,总算知道了execute方法背后是如何运转的。