主要构成
workers: 工作组
queue: 任务队列
threadFactory: 线程生产工厂
handler: 异常处理
线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
1. RUNNING:线程池一旦被创建,就处于 RUNNING 状态,任务数为 0,能够接收新任务,对已排队的任务进行处理。
2. SHUTDOWN:不接收新任务,但能处理已排队的任务。调用线程池的 shutdown() 方法,线程池由 RUNNING 转变为 SHUTDOWN 状态。
3. STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。调用线程池的 shutdownNow() 方法,线程池由(RUNNING 或 SHUTDOWN ) 转变为 STOP 状态。
4. TIDYING:
- SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态,会执行 terminated() 方法。线程池中的 terminated() 方法是空实现,可以重写该方法进行相应的处理。
- 线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池就会由 SHUTDOWN 转变为 TIDYING 状态。
- 线程池在 STOP 状态,线程池中执行中任务为空时,就会由 STOP 转变为 TIDYING 状态。
5. TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会由 TIDYING 转变为 TERMINATED 状态。
参考链接
主要流程
创建线程池 -> 创建 worker 或放入任务队列 -> 线程开始执行任务
源码流程:
execute
// 获取线程池句柄
int c = ctl.get();
// 当前线程数量是否小于设置的核心线程池大小
if (workerCountOf(c) < corePoolSize) {
// 添加一个 woker
if (addWorker(command, true))
// 添加成功后推出
return;
// 刷新线程池句柄
c = ctl.get();
}
// 以下是线程池数量达到上限后的处理
// 如果当前线程池是 RUNNING,且添加任务队列成功
if (isRunning(c) && workQueue.offer(command)) {
// 刷新线程池句柄
int recheck = ctl.get();
// 如果当前线程池非 RUNNING状态,则移除任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果是RUNNING状态或移除任务失败,且当前工作线程为0
else if (workerCountOf(recheck) == 0)
// 添加一个没有任务的 worker 去执行任务队列中剩余的任务
addWorker(null, false);
}
// 如果线程池非RUNNING或,添加任务队列失败,则新增一个 woker 执行任务
// 如果增加 worker 失败则抛出异常拒绝任务
else if (!addWorker(command, false))
reject(command);
addWorker
- 判断线程状态,增加 woker 数量
retry:
for (;;) {
// 获取线程池句柄
int c = ctl.get();
// 当前线程池状态
int rs = runStateOf(c);
// 1.如果当前线程池状态为 STOP 及以上,返回false
// 2.如果当前线程状态为 SHUTDOWN,且任务和任务队列都为空,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数量
int wc = workerCountOf(c);
// 如果线程数量大于系统最大值,或大于设置的最大线程数量,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 操作 worker 数量,操作成功后跳出最外层循环,开始执行任务
if (compareAndIncrementWorkerCount(c))
break retry;
// 刷新线程池句柄
c = ctl.get(); // Re-read ctl
// 如果当前线程池状态有变化,跳出第二层循环,
// 即重新 check 线程池、任务、任务队列状态
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
- 创建 woker,线程执行任务
// woker 是否启动标记
boolean workerStarted = false;
// woker 是否添加进 woker 组标记
boolean workerAdded = false;
Worker w = null;
try {
// 创建 woker,在 woker 中通过线程工厂会创建线程
w = new Worker(firstTask);
// 获取 woker 的线程
final Thread t = w.thread;
if (t != null) {
// 获取全局锁
final ReentrantLock mainLock = this.mainLock;
// 阻塞取锁
mainLock.lock();
try {
// 重新获取线程池状态
int rs = runStateOf(ctl.get());
// 如果线程池是 RUNNING 状态
// 或者是 (SHUTDOWN 状态 且任务为 null)-> excute 中删除失败的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果当前线程已经启动过了,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 讲 woker 添加进 woker 组中
workers.add(w);
// 记录线程峰值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// woker添加成功标记
workerAdded = true;
}
} finally {
// 释放全局锁
mainLock.unlock();
}
// 如果添加 woker 时没有异常,开始执行任务,标记 woker 已开始工作
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果 woker 启动失败,执行添加失败的策略
if (! workerStarted)
// woker 组中移除 woker
// 减少 woker 组 woker 总数
// 尝试结束线程池
addWorkerFailed(w);
}
// 返回 woker 是否开始
return workerStarted;
runWorker
Woker 本身是一个 Runnable 对象,在新建 Thread 对象时,线程工厂会把 Woker 传给 Thread,Thread.start 时,会调用 Woker.run,Woker.run 再调用外部的 runWoker 方法
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取当前任务
Runnable task = w.firstTask;
// 将 woker 的任务设置为空
w.firstTask = null;
// 在创建 worker 时,会将状态标记为 -1 ,执行任务前将状态改为 0
// -1 时不允许结束这个 woker
// 防止 worker 刚加入 worker 组还没开始执行任务就被回收了
w.unlock(); // allow interrupts
// 线程是否异常退出的标记
boolean completedAbruptly = true;
try {
// 当任务不为空,或任务队列的任务不为空时
while (task != null || (task = getTask()) != null) {
// 锁住线程
w.lock();
//1. 线程池如果是 STOP 状态,且当前线程不是中断状态,则设置中断状态
//2. 线程池不是 STOP 状态,刷新当前线程线程的中断状态,刷新后再次获取线程池状态,如果是 STOP 且当前线程不是中断状态,则设置中断状态
// 第二次获取线程池状态是防止刷新线程中断状态后线程池结束,所以刷新后再 check 一次
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务之前的钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务之后的钩子
afterExecute(task, thrown);
}
} finally {
// 将任务重置为空
task = null;
// 增加 woker 的完成任务数量
w.completedTasks++;
// 释放当前 woker 的锁
w.unlock();
// 如果没有异常抛出则继续 while 循环
}
}
// 异常退出走不到这里
completedAbruptly = false;
} finally {
// 处理 while 循环结束后的工作
// 如果是异常退出,则新增一个 woker 代替
// 如果是正常退出,则清理 woker 组,尝试关闭线程池
processWorkerExit(w, completedAbruptly);
}
getTask
// 是否获取任务超时的标记
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 线程池句柄
int c = ctl.get();
// 线程池状态
int rs = runStateOf(c);
// 如果当前线程池是 STOP 及以上状态,则减少 woker 数量,返回 null
// 如果当前线程是 SHUTDOWN 且任务队列为空,则减少 woker 数量,返回 null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 当前线程数量
int wc = workerCountOf(c);
// 是否允许超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1. 线程总数大于最大线程数且任务队列为空,减少 woker 数量,减少成功后返回 null,减少失败时回到循环第一行
// 2. 线程总数小于或等于最大线程数,允许超时且已超过时,且任务队列为空或线程数大于1,减少 woker 数量,减少成功后返回 null,减少失败时回到循环第一行
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 获取任务,如果允许超时,则使用 poll 方法并设置超时时间
// 否则使用 take 方法,
// 在 take 方法中,如果任务队列为空会调用 unsafe.park 将线程挂起
// workQueue.offer 添加任务时,会调用 unsafe.unpark 唤醒一个线程
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果任务不会 null 则返回任务
if (r != null)
return r;
// 任务为 null 将超时标记为 true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}