目录
- SpringBoot线程池和Java线程池的用法和实现原理
- 使用默认的线程池
- 方式一:通过@Async注解调用
- 方式二:直接注入 ThreadPoolTaskExecutor
- 线程池默认配置信息
- SpringBoot 线程池的实现原理
- 覆盖默认的线程池
- 管理多个线程池
- JAVA常用的四种线程池
- newCachedThreadPool
- newFixedThreadPool
- newScheduledThreadPool
- newSingleThreadExecutor
- Java 线程池中的四种拒绝策略
- CallerRunsPolicy
- AbortPolicy
- Java 线程复用的原理
SpringBoot线程池和Java线程池的用法和实现原理
使用默认的线程池
方式一:通过@Async注解调用
public class AsyncTest { | |
@Async | |
public void async(String name) throws InterruptedException { | |
System.out.println("async" + name + " " + Thread.currentThread().getName()); | |
Thread.sleep(1000); | |
} | |
} |
启动类上需要添加@EnableAsync
注解,否则不会生效。
//@EnableAsync | |
public class Test1Application { | |
public static void main(String[] args) throws InterruptedException { | |
ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args); | |
AsyncTest bean = run.getBean(AsyncTest.class); | |
for(int index = 0; index <= 10; ++index){ | |
bean.async(String.valueOf(index)); | |
} | |
} | |
} |
方式二:直接注入 ThreadPoolTaskExecutor
此时可不加 @EnableAsync
注解
class Test1ApplicationTests { | |
ThreadPoolTaskExecutor threadPoolTaskExecutor; | |
void contextLoads() { | |
Runnable runnable = () -> { | |
System.out.println(Thread.currentThread().getName()); | |
}; | |
for(int index = 0; index <= 10; ++index){ | |
threadPoolTaskExecutor.submit(runnable); | |
} | |
} | |
} |
线程池默认配置信息
SpringBoot线程池的常见配置:
spring: | |
task: | |
execution: | |
pool: | |
core-size: 8 | |
max-size: 16 # 默认是 Integer.MAX_VALUE | |
keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止 | |
allow-core-thread-timeout: true # 是否允许核心线程超时,默认true | |
queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE | |
shutdown: | |
await-termination: false # 线程关闭等待 | |
thread-name-prefix: task- # 线程名称的前缀 |
SpringBoot 线程池的实现原理
TaskExecutionAutoConfiguration
类中定义了 ThreadPoolTaskExecutor
,该类的内部实现也是基于java原生的 ThreadPoolExecutor
类。initializeExecutor()
方法在其父类中被调用,但是在父类中 RejectedExecutionHandler
被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
,并通过initialize()
方法将AbortPolicy
传入initializeExecutor()
中。
注意在TaskExecutionAutoConfiguration
类中,ThreadPoolTaskExecutor
类的bean的名称为: applicationTaskExecutor
和 taskExecutor
。
// TaskExecutionAutoConfiguration#applicationTaskExecutor() | |
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { | |
return builder.build(); | |
} | |
// ThreadPoolTaskExecutor#initializeExecutor() | |
protected ExecutorService initializeExecutor( | |
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { | |
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); | |
ThreadPoolExecutor executor; | |
if (this.taskDecorator != null) { | |
executor = new ThreadPoolExecutor( | |
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, | |
queue, threadFactory, rejectedExecutionHandler) { | |
public void execute(Runnable command) { | |
Runnable decorated = taskDecorator.decorate(command); | |
if (decorated != command) { | |
decoratedTaskMap.put(decorated, command); | |
} | |
super.execute(decorated); | |
} | |
}; | |
} | |
else { | |
executor = new ThreadPoolExecutor( | |
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, | |
queue, threadFactory, rejectedExecutionHandler); | |
} | |
if (this.allowCoreThreadTimeOut) { | |
executor.allowCoreThreadTimeOut(true); | |
} | |
this.threadPoolExecutor = executor; | |
return executor; | |
} | |
// ExecutorConfigurationSupport#initialize() | |
public void initialize() { | |
if (logger.isInfoEnabled()) { | |
logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); | |
} | |
if (!this.threadNamePrefixSet && this.beanName != null) { | |
setThreadNamePrefix(this.beanName + "-"); | |
} | |
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); | |
} |
覆盖默认的线程池
覆盖默认的 taskExecutor
对象,bean的返回类型可以是ThreadPoolTaskExecutor
也可以是Executor
。
public class ThreadPoolConfiguration { | |
public ThreadPoolTaskExecutor taskExecutor() { | |
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); | |
//设置线程池参数信息 | |
taskExecutor.setCorePoolSize(10); | |
taskExecutor.setMaxPoolSize(50); | |
taskExecutor.setQueueCapacity(200); | |
taskExecutor.setKeepAliveSeconds(60); | |
taskExecutor.setThreadNamePrefix("myExecutor--"); | |
taskExecutor.setWaitForTasksToCompleteOnShutdown(true); | |
taskExecutor.setAwaitTerminationSeconds(60); | |
//修改拒绝策略为使用当前线程执行 | |
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); | |
//初始化线程池 | |
taskExecutor.initialize(); | |
return taskExecutor; | |
} | |
} |
管理多个线程池
如果出现了多个线程池,例如再定义一个线程池 taskExecutor2
,则直接执行会报错。此时需要指定bean的名称即可。
public ThreadPoolTaskExecutor taskExecutor2() { | |
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); | |
//设置线程池参数信息 | |
taskExecutor.setCorePoolSize(10); | |
taskExecutor.setMaxPoolSize(50); | |
taskExecutor.setQueueCapacity(200); | |
taskExecutor.setKeepAliveSeconds(60); | |
taskExecutor.setThreadNamePrefix("myExecutor2--"); | |
taskExecutor.setWaitForTasksToCompleteOnShutdown(true); | |
taskExecutor.setAwaitTerminationSeconds(60); | |
//修改拒绝策略为使用当前线程执行 | |
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); | |
//初始化线程池 | |
taskExecutor.initialize(); | |
return taskExecutor; | |
} |
引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。
@Resource | |
ThreadPoolTaskExecutor taskExecutor2; |
对于使用@Async
注解的多线程则在注解中指定bean的名字即可。
@Async("taskExecutor2") | |
public void async(String name) throws InterruptedException { | |
System.out.println("async" + name + " " + Thread.currentThread().getName()); | |
Thread.sleep(1000); | |
} |
线程池的四种拒绝策略
JAVA常用的四种线程池
ThreadPoolExecutor
类的构造函数如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, | |
BlockingQueue<Runnable> workQueue) { | |
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, | |
Executors.defaultThreadFactory(), defaultHandler); | |
} |
newCachedThreadPool
不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE
),如果有空闲的线程超过需要,则回收,否则重用已有的线程。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, | |
60L, TimeUnit.SECONDS, | |
new SynchronousQueue<Runnable>()); |
newFixedThreadPool
定长线程池,超出线程数的任务会在队列中等待。
return new ThreadPoolExecutor(nThreads, nThreads, | |
0L, TimeUnit.MILLISECONDS, | |
new LinkedBlockingQueue<Runnable>()); |
newScheduledThreadPool
类似于newCachedThreadPool
,线程数无上限,但是可以指定corePoolSize
。可实现延迟执行、周期执行。
public ScheduledThreadPoolExecutor(int corePoolSize) { | |
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, | |
new DelayedWorkQueue()); | |
} |
周期执行:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); | |
scheduledThreadPool.scheduleAtFixedRate(()->{ | |
System.out.println("rate"); | |
}, 1, 1, TimeUnit.SECONDS); |
延时执行:
scheduledThreadPool.schedule(()->{ | |
System.out.println("delay 3 seconds"); | |
}, 3, TimeUnit.SECONDS); |
newSingleThreadExecutor
单线程线程池,可以实现线程的顺序执行。
public static ExecutorService newSingleThreadExecutor() { | |
return new FinalizableDelegatedExecutorService | |
(new ThreadPoolExecutor(1, 1, | |
0L, TimeUnit.MILLISECONDS, | |
new LinkedBlockingQueue<Runnable>())); | |
} |
Java 线程池中的四种拒绝策略
CallerRunsPolicy
:线程池让调用者去执行。AbortPolicy
:如果线程池拒绝了任务,直接报错。DiscardPolicy
:如果线程池拒绝了任务,直接丢弃。DiscardOldestPolicy
:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。
CallerRunsPolicy
直接在主线程中执行了run方法。
public static class CallerRunsPolicy implements RejectedExecutionHandler { | |
public CallerRunsPolicy() { } | |
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { | |
if (!e.isShutdown()) { | |
r.run(); | |
} | |
} | |
} |
效果类似于:
Runnable thread = ()->{ | |
System.out.println(Thread.currentThread().getName()); | |
try { | |
Thread.sleep(0); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
}; | |
thread.run(); |
AbortPolicy
直接抛出RejectedExecutionException
异常,并指示任务的信息,线程池的信息。、
public static class AbortPolicy implements RejectedExecutionHandler { | |
public AbortPolicy() { } | |
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { | |
throw new RejectedExecutionException("Task " + r.toString() + | |
" rejected from " + | |
e.toString()); | |
} | |
} |
DiscardPolicy
什么也不做。
public static class DiscardPolicy implements RejectedExecutionHandler { | |
public DiscardPolicy() { } | |
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { | |
} | |
} |
DiscardOldestPolicy
e.getQueue().poll()
: 取出队列最旧的任务。e.execute(r)
: 当前任务入队。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { | |
public DiscardOldestPolicy() { } | |
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { | |
if (!e.isShutdown()) { | |
e.getQueue().poll(); | |
e.execute(r); | |
} | |
} | |
} |
Java 线程复用的原理
java
的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker
对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();
。workQueue
是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue
队列中。
private final class Worker | |
extends AbstractQueuedSynchronizer | |
implements Runnable | |
{ | |
/** | |
* This class will never be serialized, but we provide a | |
* serialVersionUID to suppress a javac warning. | |
*/ | |
private static final long serialVersionUID = 6138294804551838833L; | |
/** Thread this worker is running in. Null if factory fails. */ | |
final Thread thread; | |
/** Initial task to run. Possibly null. */ | |
Runnable firstTask; | |
/** Per-thread task counter */ | |
volatile long completedTasks; | |
/** | |
* Creates with given first task and thread from ThreadFactory. | |
* @param firstTask the first task (null if none) | |
*/ | |
Worker(Runnable firstTask) { | |
setState(-1); // inhibit interrupts until runWorker | |
this.firstTask = firstTask; | |
this.thread = getThreadFactory().newThread(this); | |
} | |
/** Delegates main run loop to outer runWorker */ | |
public void run() { | |
runWorker(this); | |
} | |
// Lock methods | |
// | |
// The value 0 represents the unlocked state. | |
// The value 1 represents the locked state. | |
protected boolean isHeldExclusively() { | |
return getState() != 0; | |
} | |
protected boolean tryAcquire(int unused) { | |
if (compareAndSetState(0, 1)) { | |
setExclusiveOwnerThread(Thread.currentThread()); | |
return true; | |
} | |
return false; | |
} | |
protected boolean tryRelease(int unused) { | |
setExclusiveOwnerThread(null); | |
setState(0); | |
return true; | |
} | |
public void lock() { acquire(1); } | |
public boolean tryLock() { return tryAcquire(1); } | |
public void unlock() { release(1); } | |
public boolean isLocked() { return isHeldExclusively(); } | |
void interruptIfStarted() { | |
Thread t; | |
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { | |
try { | |
t.interrupt(); | |
} catch (SecurityException ignore) { | |
} | |
} | |
} | |
} |
work对象的执行依赖于 runWorker()
,与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。
final void runWorker(Worker w) { | |
Thread wt = Thread.currentThread(); | |
Runnable task = w.firstTask; | |
w.firstTask = null; | |
w.unlock(); // allow interrupts | |
boolean completedAbruptly = true; | |
try { | |
while (task != null || (task = getTask()) != null) { | |
w.lock(); | |
// If pool is stopping, ensure thread is interrupted; | |
// if not, ensure thread is not interrupted. This | |
// requires a recheck in second case to deal with | |
// shutdownNow race while clearing interrupt | |
if ((runStateAtLeast(ctl.get(), STOP) || | |
(Thread.interrupted() && | |
runStateAtLeast(ctl.get(), STOP))) && | |
!wt.isInterrupted()) | |
wt.interrupt(); | |
try { | |
beforeExecute(wt, task); | |
Throwable thrown = null; | |
try { | |
task.run(); | |
} catch (RuntimeException x) { | |
thrown = x; throw x; | |
} catch (Error x) { | |
thrown = x; throw x; | |
} catch (Throwable x) { | |
thrown = x; throw new Error(x); | |
} finally { | |
afterExecute(task, thrown); | |
} | |
} finally { | |
task = null; | |
w.completedTasks++; | |
w.unlock(); | |
} | |
} | |
completedAbruptly = false; | |
} finally { | |
processWorkerExit(w, completedAbruptly); | |
} | |
} |