Java线程池
一、介绍
线程池,顾名思义,这是管理一堆线程而出现的对象。与数据库的连接池一致,它的出现解决了线程的频繁创建和销毁,从而浪费大量资源的问题。
所以,线程池中有提前创建好的线程,使用时直接分配获取,使用完再由线程池管理是否销毁。
优点
- 降低资源消耗,也就是不需要重复多次的创建线程
- 更好的管理线程
- 比如可以获取当前运行的线程是什么
- 还在等待执行的任务有什么
二、使用线程池
在JDK5起提供了线程池的对象,ExecutorService
和Executors
其中,ExecutorService
和它的子类ThreadPoolExecutor
是线程池的关键
而Executors
是对应的工具类,里面有些工厂方法可以快速创建线程池
查看ThreadPoolExecutor
的构造方法
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
}
参数 | 说明 |
corePoolSize | 核心线程数。就算目前空闲,也不会回收这几个线程 |
maximumPoolSize | 最大线程数。当前线程池可以容纳的最大线程数量 |
keepAliveTime | 线程保持存活时间。如果线程空间时间到达,将会进行销毁(除了核心线程) |
unit | 与keepAliveTime一起使用,仅是个时间单位 |
workQueue | 工作等待队列。当线程池所有的线程都繁忙运行时,新添加的执行任务会暂时保留至此队列 |
threadFactory | 创建线程的线程工厂 |
handler | 拒绝策略。当队列满了后,还有执行任务进入时的策略 |
workQueue
参数需要传入一个BlockingQueue
,这是个双缓冲队列。BlockingQueue内部使用两条队列,允许两个线程同时向队列一个存储,一个取出操作。在保证并发安全的同时,提高了队列的存取效率,不能传入空对象,可设置容量大小,也可以不设置容量大小,那么它的容量就是Integer.MAX_VALUE
。常用的几种实现类
类 | 说明 |
ArrayBlockingQueue | 规定容量大小的阻塞队列 |
LinkedBlockingQueue | 既可以规定容量大小,也可以不规定的阻塞队列 |
SynchronizedQueue | 一个特殊的队列,生产消费必须交替完成的队列生产一个元素后,必须要有进行消费后,才能继续往队列内生产元素 |
handler
拒绝策略
当线程池指定的队列容量满了时,将执行哪种拒绝任务的策略
策略类 | 说明 |
AbortPolicy | 默认,不执行新任务,直接抛出异常,提示线程池已满 |
DiscardPolicy | 不执行新任务,也不抛出异常 |
DiscardOldestPolicy | 它丢弃最老的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。 |
CallerRunsPolicy | 直接在外层调用者的线程中调用新任务 |
1)小试牛刀
package com.banmoon.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
// lambda表达式
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
// 关闭线程池,如果不关闭,线程池将一直存在,池子内保留着核心线程,等待着调用
executorService.shutdown();
}
}
class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
2)Executors
工具类
关于此的三个相关方法源码,其中还有一些他们的重载,这边就不细细讲了。
这些工具类方法,主要是快速创建ThreadPoolExecutor
对象的方法,只是它们的参数各有所不同
public class Executors {
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
}
方法 | 参数说明 | 效果 |
newCachedThreadPool | 核心线程数为0最大线程数已调到Integer.MAX_VALUE | 每提交一个线程任务,都将新创建一个新的线程来执行如果需要执行的任务很多,这有可能会导致CPU100%的问题 |
newFixedThreadPool | 核心线程数和最大线程数一致但队列长度为Integer.MAX_VALUE | 提交的任务将正常交给池子中的线程执行,执行完成也不会销毁,等待执行新的任务如果执行的任务很多,队列会一直添加任务等待执行,可能会造成内存溢出的问题 |
newSingleThreadExecutor | 核心线程数和最大线程数都为1但队列长度为Integer.MAX_VALUE | 和newFixedThreadPool类似,但池子中只有一个线程 |
根据需要来进行使用合适的线程池,测试下他们的执行方式和快慢
package com.banmoon.pool;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo2 {
public static void main(String[] args) {
ExecutorService executorService1 = Executors.newCachedThreadPool();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
executorService1.execute(new MyDemo2(i));
// executorService2.execute(new MyDemo2(i));
// executorService3.execute(new MyDemo2(i));
}
executorService1.shutdown();
executorService2.shutdown();
executorService3.shutdown();
}
}
class MyDemo2 implements Runnable {
private Integer i;
public MyDemo2(Integer i) {
this.i = i;
}
@Override
public void run() {
System.out.println(StrUtil.format("{}:{},时间:{}", Thread.currentThread().getName(), i, DateUtil.now()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
newCachedThreadPool
执行结果可以看到,一共有100个线程被创建出来
newFixedThreadPool
执行结果,执行的永远都是那几个固定的线程,这里我们指定了10个线程,所以打印也是10个为一批来进行的。
newSingleThreadExecutor
执行结果,从头到尾就只有一个线程在执行
3)线程工厂
虽然有默认的线程工厂,但如果有需要进行处理的话,还是得记录一下
package com.banmoon.pool;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Demo3 {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), new MyThreadFactory("BANMOON-TEST"));
for (int i = 0; i < 100; i++) {
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
class MyThreadFactory implements ThreadFactory{
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private String poolName;
MyThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
this.poolName = poolName;
}
@Override
public Thread newThread(Runnable r) {
String threadName = poolName + "-" + threadNumber.getAndIncrement();
Thread t = new Thread(group, r, threadName, 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
执行结果
4)拒绝策略
拒绝策略没什么好讲的,平常在使用时,注意下容量的大小,以及使用的策略。自己需要执行的任务数量多少,会不会照成内存溢出等,从这几个方面入手,选择最适合业务的队列容量和拒绝策略。
策略类 | 说明 |
AbortPolicy | 默认,不执行新任务,直接抛出异常,提示线程池已满 |
DiscardPolicy | 不执行新任务,也不抛出异常 |
DiscardOldestPolicy | 它丢弃最老的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。 |
CallerRunsPolicy | 直接在外层调用者的线程中调用新任务 |
演示CallerRunsPolicy
,会在调用者的线程中,执行超出容量的任务
package com.banmoon.pool;
import java.util.concurrent.*;
public class Demo4 {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i <= 100; i++) {
executorService.execute(new MyDemo4(i));
}
executorService.shutdown();
}
}
class MyDemo4 implements Runnable{
private Integer i;
public MyDemo4(Integer i) {
this.i = i;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ":" + i);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行结果,上述线程池指定了最大线程数为20,队列容量为20。所以当执行第41个任务时,队列满了,将由调用者的线程来执行这个任务,此处是主线程
三、其他
1)执行任务的优先级
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
// 判断是否为空
if (command == null)
throw new NullPointerException();
// 判断当前正在运行的线程数是否小于核心线程数
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 添加任务至线程执行,成功添加则结束
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果核心线程都有在运行,将任务放至队列中
if (isRunning(c) && workQueue.offer(command)) {
// 如果成功推入队列,将再次检查线程状态,有线程死亡则将当前任务添加至线程执行
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列推入任务失败了,那将直接添加至线程执行
else if (!addWorker(command, false))
// 如果任务添加至线程失败,则将进行拒绝策略
reject(command);
}
/**
* 会从线程工厂获取线程,并添加执行任务
* @param firstTask 执行的任务
* @param core 是否可以添加至核心线程
* @return true:成功添加至线程执行
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// ...
}
}
四、最后
线程池这东西干货还是挺多的,还有挺多没有整理完。比如说addWorker
方法,线程池的执行调度等
后续有什么新的理解继续补上,未完待续
关于本文出现的代码示例,已提交至码云,只看文章不懂时,一定要敲代码进行理解。
我是半月,祝你幸福!