面试题19解析-线程池(上)

Java
462
0
0
2022-06-22
标签   Java面试
题目:聊一下Java线程池的实现原理?线程是否可以共享?如果可共享的话,那么应该如何实现线程共享?

本文通过ThreadPoolExecutor来分析线程池使用及内部实现原理。

为什么离不开线程池?

多线程开发是提高程序性能的一种方式,但线程的创建与销毁,以及运行线程上下文切换都是需要消耗cpu资源的,相对来说任务的执行所占整个线程运行的cpu时间越短,线程的运行效率也相应越低。而在有些系统中,我们需要反复频繁地创建线程,例如tomcat,每个http的处理handle都必须运行在一个线程中,这样在并访问量很大的情况下,就会造成系统中创建了很多系统线程,使得cpu频繁的进行线程上下文切换,从而导致了整个系统的行能低下。为了解决这样的问题,编程领域设计了线程池来解决线程切换带来的性能损耗。

线程池的设计思想是创建一定数量的运行线程,将要执行的任务,放入线程池,线程池会自动分配线程去执行任务,执行完任务的线程又会被放入池中,等待新任务的到来,而不是退出线程,从而实现了线程的重复利用,避免了系统反复创建销毁线程,造成的性能损耗。另一方面,线程池将程序员的关注点由线程转向了任务,对于使用者来说,线程池就像一个盒子,使用者无需关心线程操作相关的实现细节,可以将更多的精力放在任务本身上,只需在合适的时机将任务丢给线程池即可。线程池将任务与线程进行了解绑,更有利于将程序解耦。线程与线程池的编程模型转变由下图所示:

面试题19解析-线程池(上)

线程池怎么玩?

首先线程池的使用需要通过ThreadPoolExecutor的构造函数来创建一个线程池:

new ThreadPoolExecutor(int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler)

构造函数参数意义如下:

参数意义corePoolSize线程池核心线程数量maximumPoolSize线程池最大线程数量keepAliveTime线程保持时间,空闲线程可以存活时间TimeUnit线程保持时间的单位(keepAliveTime的单位)workQueue任务队列threadFactory线程创建工厂RejectedExecutionHandler线程数超过最大线程数后,任务将被拒绝并回调的handler

在我们创建了一个线程池后,便可以向线程池中提交一个Runnable类型的任务了:

threadPool.execute( new Runnable(){
  public void run(){
    ...//任务代码
  }
} )

这样我们就将任务提交到了线程池去运行了,至于线程池如何实现任务运行,就不是我们需要考虑的事情了,从而将任务与线程进行了解耦。但是我们也无法得知任务是否执行成功,如果我们需要得知任务的执行结果,则需要使用ThreadPoolExecutor.submit(Runnable task)方法来向线程池提交任务,该方法会返回一个Futrue类型的结果,通过以下代码便可以判断任务是否执行成功了。

 Future<Object> threadFuture = threadPoolExecutor.submit(task);
 try{
   Object resualt = threadFuture.get();
 }catch (InterruptedException e){
   // 处理线程中断异常
 }catch (ExecutionException e){
   // 处理无法执行异常
 } finally {
   threadPoolExecutor.shutdown();
 }

ThreadPoolExecutor的执行流程

上一节我们简单描述了线程池的使用方式,这里我们来探究一下ThreadPoolExecutor的执行流程,其流程如下:

  1. 创建线程池,等待任务执行。
  2. 当任务提交给线程池后,会判断核心线程池是否已满,即当前线程数与corePoolSize进行比较,如果核心线程池未满,则创建新线程来执行任务,如果核心线程池已满则将任务加入任务队列BlockingQueue中,等待执行。
  3. 如果任务队列也满了,则ThreadPoolExecutor会继续创建新的线程来处理任务,但是线程池中线程数目不得超过最大线程数maximumPoolSize,否则线程池将会采取饱和策略,拒绝处理任务,并将调用用户设置的RejectedExecutionHandler策略函数进行处理。这里需要注意,只有BlockingQueue为有界队列时,maximumPoolSize参数才会有作用,否者无界BlockingQueue不可能满,不会触发线程池来处理任务队列已满的情况,无界队列使用不当可能造成线程池无休止创建线程的现象。
  4. 线程池中的线程处理完当前任务后,会从任务队列中尝试取任务,如果取到任务,则执行任务,否则等待keepAliveTime时间,如果在keepAliveTime内都没有取到任务,则该线程会退出。

线程池执行流程图如下:

面试题19解析-线程池(上)

execute的实现源码如下(JDK8):

 public void execute(Runnable command) {
   int c = ctl.get();
   if (workerCountOf(c) < corePoolSize) { //step 1: 核心线程数判断  
     if (addWorker(command, true)) //step 1.1 添加核心线程执行任务   
       return;
     c = ctl.get();
   }
   if (isRunning(c) && workQueue.offer(command)) { //step 2尝试任务加入队列  
     int recheck = ctl.get();
     if (! isRunning(recheck) && remove(command)) //step 2.1判断线程池是否运行 
       reject(command);
     else if (workerCountOf(recheck) == 0) //step 2.2判断当前工作线程数量如果等于0,直接添加工作线程 
       addWorker(null, false); 
   } else if (!addWorker(command, false)) //step 3 任务无法队列,尝试创建线程执行任务
     reject(command);
 }

读者结合笔者的注释,应该不难理解这段源码。这里我们需要注意一下线程池的控制变量ctl,该变量是一个AtomicInteger类型的原子变量,这个变量在这个线程池的工作中至关重要,该变量控制了线程池的两个属性:线程的数目和线程池的当前运行状态(线程池拥有的状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED)。这个变量设计的非常巧妙,一方面减少了线程池的变量数量,更重要的一方面是,该变量是原子类型变量,线程池的实现函数中,往往需要同时获取这两个属性,如果将两个属性放入一个原子变量中,根据Atomic类支持线程的重入,线程池也就只需获取一把锁,便可以控制线程池的两个属性,这里实际上变相减少了一把锁的使用,非常巧妙,Doug Lea不愧被称为Java并发大师!下面源码展示线程池通过ctl变量的位运算获取线程属性的操作(JDK8):

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static int workerCountOf(int c) {
  return c & CAPACITY;
}
private static int runStateOf(int c) {
  return c & ~CAPACITY;
}

这里我们可以从ThreadPoolExecutor的执行流程中看到,线程池并不是一开始就创建好coolPoolSize个线程,而是随着任务的添加,来逐步添加工作线程的。当然线程池也提供了线程池的预热功能prestartAllThreads(),该方法线程池会通过addWorker(null, true)函数来创建coolPoolSize个核心线程来等待任务的到来,addWorker()方法的分析见下节。

public int prestartAllCoreThreads() {
  int n = 0;
  while (addWorker(null, true))
    ++n;
  return n;
}

Worker工作线程

ThreadPoolExecutor.execute()方法中调用了addworker()方法,其中方法addworker(Runnable firstTask, boolean core)的第一个参表示该工作线程创建后第一个执行的任务,该参数为null时,表示线程池只是创建了一个等待任务的工作线程;第二参数表示添加的线程是否是核心线程,用于区分线程池使用coolPoolSize还是maximumPoolSize进行线程池线程数目的控制。在addworker()方法中创建了一个Worker对象,一个Worker对象就是ThreadPoolExecutor中的一个线程。当一个任务提交时,Worker对象就会使用线程工厂创建一个线程,并将该线程与当前firstTask绑定,Worker对象就像线程池工厂中的劳工一样,会不停的获取新的任务来执行。新创建的Worker线程都会保存在线程池的HashSet<Worker>成员变量中,这里我们来看一下工作线程的运行核心函数Worker.run()的实现(JDK8,部分代码省略):

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
  Worker(Runnable firstTask) {
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this); //默认线程工厂会调用创建一个线程,并与firstTask绑定
  }
  public void run() {
    runWorker(this);
  }
}

ThreadPoolExecutor的默认线程工厂newThread(Runnable)的实现如下,这里便将Worker与实际线程绑定了,并使用firstTask创建了线程:

static class DefaultThreadFactory implements ThreadFactory {
  public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
    namePrefix + threadNumber.getAndIncrement(),
  0);
  if (t.isDaemon())
    t.setDaemon(false);
  if (t.getPriority() != Thread.NORM_PRIORITY)
    t.setPriority(Thread.NORM_PRIORITY);
    return t;
  }
}

而Worker.run()被调用后,Worker对应的线程会调用

ThreadPoolExecute.runWorker()来执行firstTask任务,并循环从任务队列中取任务:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null; 
  boolean completedAbruptly = true;
  try {
    while (task != null || (task = getTask()) != null) {
      task.run(); 
    }
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

那么问题来了,runWorker()方法会在worker工作线程没有取到任务时,退出循环,此时工作线程便会退出,那keepAliveTime参数是如何控制工作线程去任务的存活时间的?

奥秘就在取任务getTask()的实现中,Worker.getTask()实现如下(JDK 8):

 private Runnable getTask() {
   boolean timedOut = false; // Did the last poll() time out?  
   for (;;) {
     try {
       Runnable r = timed ?
       workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
       workQueue.take();
       if (r != null)
         return r;
       timedOut = true;
     } catch (InterruptedException retry) {
       timedOut = false;
     }
   }
 }

这里就清晰了,原来Worker工作线程会尝试在keepAliveTime时间内从workQueue队列中取任务,线程的超时控制依赖于队列取元素的超时控制,也就是说在keepAliveTime时间类,工作线程会阻塞在getTask()方法上,直到线程取到任务或者取任务超时。

Worker的时序图如下:

面试题19解析-线程池(上)

线程池的饱和策略

饱和策略是线程池应对任务队列和线程池饱和时所采取的策略,ThreadPoolExecutor提供了

setRejectedExecutionHandler()方法设置自定义饱和策略的接口,如果没有设置该接口,Java便会采取默认饱和策略AbortPolicy才处理,JDK提供了4中饱和策略:

  • AbortPolicy : 默认饱和策略,直接抛出异常。
  • CallerRunsPolicy : 使用调用者线程来执行任务。
  • DiscardOldestPolicy : 丢弃队列中最近一个任务,并执行当前任务。
  • DiscardPolicy : 不处理,直接丢弃当前任务。

这四种JDK提供的饱和策略都实现了RejectedExecutionHandler接口,并且只有AbortPolicy策略才会抛出

RejectedExecutionException异常,如果实际开发环境中需要实现自定义饱和策略,可以参考以上四种饱和策略的实现方式。

线程池的关闭

做人做事要善始善终,软件开发也一样,占用了的资源要记得释放,使用了的线程要记得归还,有借有还,再借不难。线程池不适合处理需要长期运行的任务,长任务应该开辟专用线程进行处理。线程池提供了shutdown()和shutdownNow()两种方式来主动关闭线程池,虽然两者都可以关闭线程池,但是还是有一定区别的:

  1. shutdown():当线程池调用该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出
  2. RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
  3. shutdownNow():线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep/wait/Condition/定时锁等应用,interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

在调用shutdown()时,shutdown()只会将空闲线程进行关闭,而shutdownNow()方法会尝试关闭所有线程,因此如果任务是否正常执行完,对于系统没有影响,可以使用shutdownNow()方法,一般开发中都会使用shutdown()来优雅的关闭线程池。

线程池的配置原则

线程池提供了统一管理线程的机制,但是线程池的运行效率的高低,一方面也需要程序员自己进行调优把控。在HotSpot虚拟机中Java线程的创建使用了底层操作系统的线程创建接口来系统线程,并不是伪线程(这里说句题外话,有同学和我说python使用的是伪线程,其实自己写个多线程小程序就可以判断出是python虚拟机采用的是真实线程还是伪线程。学习软件开发,门槛需要自己迈,坑需要自己踩,多动手,不可懒)。我们知道线程是CPU执行的基本单位,单个处理器同一时间内只能运行一个线程,因此线程池的大小的配置,也应该与CPU的核心数目相关(通过Runtime.getRuntime().availableProcessors()方法可以获取到当前系统处理器数目),过多的创建线程并不一定能带来系统总体性能的提升,反而会使处理器性能浪费在频繁的线程切换中。线程数目与效率的关系图如下:

面试题19解析-线程池(上)

那么线程池到底应该配置多大,才能高效的利用线程池?这里没有固定的答案,这里需要根据任务类型来进行配置。如果任务是CPU密集型任务,那么线程池应该配置较小,例如线程池可以配置CPU核心数目相等的大小;如果是需要资源等待类型的任务(如I/O等访问,数据库操作等),则应该根据等待的平均时间,来配置N倍于CPU核心数目的大小。线程池数目配置的具体的大小,还需要在实际开发工作中,编写行能测试类,结合虚拟机行能监控工具(如VisualVM),来进行配置调优。

线程池变量共享问题

线程池提交的是一个Runnable类型的任务,因此线程池变量共享的问题,也就是多线程变量共享的问题。在多线程环境下,变量当然是可以共享的,例如售票系统中的票数限制,订单系统中的订单号等,都需对同一变量进行操作。为了控制篇幅,多线程共享问题在公众号下一篇分析。

最近,很多小伙伴私信告诉我们,对有些题目理解还是有问题,我们建立了一个交流群634247407,方便与小伙伴们一起来分析面试题目。

记得关注我们哦,这里全部都是干货!!!