目录
- 概述
- 线程池框架设计
- 代码实现
- 阻塞队列的实现
- 线程池消费端实现
- 获取任务超时设计
- 拒绝策略设计
概述
线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记。
线程池框架设计
我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的。同时,线程也不是任意多创建的,因为活跃的线程会消耗系统资源,特别是内存,在一定的范围内,增加线程可以提高系统的吞吐率,如果超过了这个范围,反而会降低程序的执行速度。
因此,设计一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作, 达到下面的目标:
- 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
- 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
- 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
线程池的核心思想: 线程复用,同一个线程可以被重复使用,来处理多个任务。
为了实现线程池功能,需要考虑下面几个设计要点:
- 线程池可以接口外部提交的任务执行
- 线程池有工作线程的数量,有任务执行,没有任务也空闲在那,等待任务过来,这样既避免线程频繁创建销毁带来的开销,同时也可以避免线程池无限制的创建线程
- 如果线程池接受提交的任务超过工作线程的数量了,该怎么办?可以用一个队列把任务存下来,等工作线程完成任务后去队列中获取任务,执行
- 那如果任务实在是太多太多了,达到了我们认为的队列最大值,怎么办,我们可以设计一种任务太多的策略,可以进行切换,比如直接丢弃任务、报错等等
看了上面的设计目标和要点,是不是能立刻想到一个非常经典的设计模型——生产者消费者模型。
- 阻塞队列存储执行任务,比如外部main函数作为生产者向队列生产任务。
- 线程池中的工作线程作为消费者获取任务执行。
现在我们将我们的设计思路转换为代码。
代码实现
阻塞队列的实现
- 阻塞队列主要存放任务,有容量限制
- 阻塞队列提供添加和删除任务的API, 如果超过容量,阻塞不能添加任务,如果没有任务,阻塞无法获取任务。
/** | |
* <p>自定义任务队列, 用来存放任务 </p> | |
* | |
* @author: cxw (332059317@qq.com) | |
* @date: 2022/10/18 10:15 | |
* @version: 1.0.0 | |
*/ | |
@Slf4j(topic = "c.BlockingQueue") | |
public class BlockingQueue<T> { | |
// 容量 | |
private int capcity; | |
// 双端任务队列容器 | |
private Deque<T> deque = new ArrayDeque<>(); | |
// 重入锁 | |
private ReentrantLock lock = new ReentrantLock(); | |
// 生产者条件变量 | |
private Condition fullWaitSet = lock.newCondition(); | |
// 生产者条件变量 | |
private Condition emptyWaitSet = lock.newCondition(); | |
public BlockingQueue(int capcity) { | |
this.capcity = capcity; | |
} | |
// 阻塞的方式添加任务 | |
public void put(T task) { | |
lock.lock(); | |
try { | |
// 通过while的方式 | |
while (deque.size() >= capcity) { | |
log.debug("wait to add queue"); | |
try { | |
fullWaitSet.await(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
deque.offer(task); | |
log.debug("task add successfully"); | |
emptyWaitSet.signal(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
// 阻塞获取任务 | |
public T take() { | |
lock.lock(); | |
try { | |
// 通过while的方式 | |
while (deque.isEmpty()) { | |
try { | |
log.debug("wait to take task"); | |
emptyWaitSet.await(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
fullWaitSet.signal(); | |
T task = deque.poll(); | |
log.debug("take task successfully"); | |
// 从队列中获取元素 | |
return task; | |
} finally { | |
lock.unlock(); | |
} | |
} | |
} |
- put()方法是向阻塞队列中添加任务
- take()方法是向阻塞队列中获取任务
线程池消费端实现
1.定义执行器接口
/** | |
* <p>定义一个执行器的接口:</p> | |
* | |
* @author: cxw (332059317@qq.com) | |
* @date: 2022/10/18 12:31 | |
* @version: 1.0.0 | |
*/ | |
public interface Executor { | |
/** | |
* 提交任务执行 | |
* @param task 任务 | |
*/ | |
void execute(Runnable task); | |
} |
2.定义线程池类实现该接口
public class ThreadPool implements Executor { | |
/** | |
* 任务队列 | |
*/ | |
private BlockingQueue<Runnable> taskQueue; | |
/** | |
* 核心工作线程数 | |
*/ | |
private int coreSize; | |
/** | |
* 工作线程集合 | |
*/ | |
private Set<Worker> workers = new HashSet<>(); | |
/** | |
* 创建线程池 | |
* @param coreSize 工作线程数量 | |
* @param capcity 阻塞队列容量 | |
*/ | |
public ThreadPool(int coreSize, int capcity) { | |
this.coreSize = coreSize; | |
this.taskQueue = new BlockingQueue<>(capcity); | |
} | |
/** | |
* 提交任务执行 | |
*/ | |
public void execute(Runnable task) { | |
synchronized (workers) { | |
// 如果工作线程数小于阈值,直接开始任务执行 | |
if(workers.size() < coreSize) { | |
Worker worker = new Worker(task); | |
workers.add(worker); | |
worker.start(); | |
} else { | |
// 如果超过了阈值,加入到队列中 | |
taskQueue.put(task); | |
} | |
} | |
} | |
/** | |
* 工作线程,对执行的任务做了一层包装处理 | |
*/ | |
class Worker extends Thread { | |
private Runnable task; | |
public Worker(Runnable task) { | |
this.task = task; | |
} | |
public void run() { | |
// 如果任务不为空,或者可以从队列中获取任务 | |
while (task != null || (task = taskQueue.take()) != null) { | |
try { | |
task.run(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} finally { | |
// 执行完后,设置任务为空 | |
task = null; | |
} | |
} | |
// 移除工作线程 | |
synchronized (workers){ | |
log.debug("remove worker successfully"); | |
workers.remove(this); | |
} | |
} | |
} | |
} |
- Worker类是工作线程类,包装了执行任务,里面实现了从队列获取任务,然后执行任务。
- execute方法的实现中,如果工作线程数量小于阈值的话,直接创建新的工作线程,否则将任务添加到队列中。
3.演示
public void testThreadPool1() throws InterruptedException { | |
Executor executor = new ThreadPool(2, 4); | |
// 提交任务 | |
for (int i = 0; i < 6; i++) { | |
final int j = i; | |
executor.execute(() -> { | |
try { | |
Thread.sleep(10); | |
log.info("run task {}", j); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}); | |
Thread.sleep(10); | |
} | |
Thread.sleep(10000); | |
} |
运行结果:
获取任务超时设计
目前从队列中获取任务是永久阻塞等待的,可以改成阻塞一段时间没有获取任务,丢弃的策略。
@Slf4j(topic = "c.TimeoutBlockingQueue") | |
public class TimeoutBlockingQueue<T> { | |
// 容量 | |
private int capcity; | |
// 双端任务队列容器 | |
private Deque<T> deque = new ArrayDeque<>(); | |
// 重入锁 | |
private ReentrantLock lock = new ReentrantLock(); | |
// 生产者条件变量 | |
private Condition fullWaitSet = lock.newCondition(); | |
// 生产者条件变量 | |
private Condition emptyWaitSet = lock.newCondition(); | |
public TimeoutBlockingQueue(int capcity) { | |
this.capcity = capcity; | |
} | |
// 带超时时间的获取 | |
public T poll(long timeout, TimeUnit unit){ | |
lock.lock(); | |
try{ | |
// 将 timeout 统一转换为 纳秒 | |
long nanos = unit.toNanos(timeout); | |
while (deque.isEmpty()){ | |
try { | |
if (nanos<=0){ | |
return null; | |
} | |
// 返回的是剩余的等待时间,更改navos的值,使虚假唤醒的时候可以继续等待 | |
nanos = emptyWaitSet.awaitNanos(nanos); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
fullWaitSet.signal(); | |
return deque.getFirst(); | |
}finally { | |
lock.unlock(); | |
} | |
} | |
// 带超时时间的增加 | |
public boolean offer(T task , long timeout , TimeUnit unit){ | |
lock.lock(); | |
try{ | |
// 将 timeout 统一转换为 纳秒 | |
long nanos = unit.toNanos(timeout); | |
while (deque.size() == capcity){ | |
try { | |
if (nanos<=0){ | |
return false; | |
} | |
// 更新剩余需要等待的时间 | |
nanos = fullWaitSet.awaitNanos(nanos); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
log.debug("加入任务队列 {}", task); | |
deque.addLast(task); | |
emptyWaitSet.signal(); | |
return true; | |
}finally { | |
lock.unlock(); | |
} | |
} | |
} |
新加TimeoutBlockingQueue类,添加offer和poll待超时的添加和获取任务的方法。
拒绝策略设计
目前的实现还是有个漏洞,无法自定义任务超出阈值的一个拒绝策略,我们可以通过利用函数式编程+策略模式去实现。
1.定义策略模式的函数式接口
/** | |
* <p>拒绝策略的函数式接口:</p> | |
* | |
* @author: cxw (332059317@qq.com) | |
* @date: 2022/10/18 13:15 | |
* @version: 1.0.0 | |
*/ | |
@FunctionalInterface | |
public interface RejectPolicy<T> { | |
/** | |
* 拒绝策略的接口 | |
* @param queue | |
* @param task | |
*/ | |
void reject(BlockingQueue<T> queue, T task); | |
} |
2.添加函数式接口的调用入口
我们可以在阻塞队列添加任务新加一个api, 添加任务如果超过容量,调用函数式接口。
@Slf4j(topic = "c.BlockingQueue") | |
public class BlockingQueue<T> { | |
........ | |
/** | |
* 尝试添加任务 | |
* @param rejectPolicy | |
* @param task | |
*/ | |
public void tryPut(RejectPolicy<T> rejectPolicy, T task) { | |
lock.lock(); | |
try{ | |
// 如果队列超过容量 | |
if (deque.size()> capcity){ | |
log.debug("task too much, do reject"); | |
rejectPolicy.reject(this, task); | |
}else { | |
deque.offer(task); | |
emptyWaitSet.signal(); | |
} | |
}finally { | |
lock.unlock(); | |
} | |
} | |
} |
3.修改ThreadPool类
public class ThreadPool implements Executor { | |
..... | |
/** | |
* 拒绝策略 | |
*/ | |
private RejectPolicy rejectPolicy; | |
// 通过构造方法传入执行的拒绝策略 | |
public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) { | |
this.coreSize = coreSize; | |
this.taskQueue = new BlockingQueue<>(capcity); | |
this.rejectPolicy = rejectPolicy; | |
} | |
/** | |
* 提交任务执行 | |
*/ | |
public void execute(Runnable task) { | |
synchronized (workers) { | |
// 如果工作线程数小于阈值,直接开始任务执行 | |
if(workers.size() < coreSize) { | |
Worker worker = new Worker(task); | |
workers.add(worker); | |
worker.start(); | |
} else { | |
// 如果超过了阈值,加入到队列中 | |
//taskQueue.put(task); | |
// 调用tryPut的方式 | |
taskQueue.tryPut(rejectPolicy, task); | |
} | |
} | |
} | |
.... | |
} |
通过构造方法的方式传入要执行的拒绝策略
调用tryPut方法添加任务
4.演示