JDK线程池源码研究

Java
351
0
0
2022-07-21
标签   Java多线程

主要构成

workers: 工作组

queue: 任务队列

threadFactory: 线程生产工厂

handler: 异常处理

线程池状态

private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

1. RUNNING:线程池一旦被创建,就处于 RUNNING 状态,任务数为 0,能够接收新任务,对已排队的任务进行处理。

2. SHUTDOWN:不接收新任务,但能处理已排队的任务。调用线程池的 shutdown() 方法,线程池由 RUNNING 转变为 SHUTDOWN 状态。

3. STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。调用线程池的 shutdownNow() 方法,线程池由(RUNNING 或 SHUTDOWN ) 转变为 STOP 状态。

4. TIDYING:

  • SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态,会执行 terminated() 方法。线程池中的 terminated() 方法是空实现,可以重写该方法进行相应的处理。
  • 线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池就会由 SHUTDOWN 转变为 TIDYING 状态。
  • 线程池在 STOP 状态,线程池中执行中任务为空时,就会由 STOP 转变为 TIDYING 状态。

5. TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会由 TIDYING 转变为 TERMINATED 状态。

参考链接

主要流程

创建线程池 -> 创建 worker 或放入任务队列 -> 线程开始执行任务

源码流程:

execute

JDK线程池源码研究

        // 获取线程池句柄 
        int c = ctl.get();
        // 当前线程数量是否小于设置的核心线程池大小 
        if (workerCountOf(c) < corePoolSize) {
            // 添加一个 woker 
            if (addWorker(command, true))
               // 添加成功后推出 
                return;
            // 刷新线程池句柄
            c = ctl.get();
        }
        // 以下是线程池数量达到上限后的处理 
        // 如果当前线程池是 RUNNING,且添加任务队列成功 
        if (isRunning(c) && workQueue.offer(command)) {
            // 刷新线程池句柄 
            int recheck = ctl.get();
            // 如果当前线程池非 RUNNING状态,则移除任务 
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果是RUNNING状态或移除任务失败,且当前工作线程为0 
            else if (workerCountOf(recheck) == 0)
                // 添加一个没有任务的 worker 去执行任务队列中剩余的任务
                addWorker(null, false);
        }
        // 如果线程池非RUNNING或,添加任务队列失败,则新增一个 woker 执行任务 
        // 如果增加 worker 失败则抛出异常拒绝任务 
        else if (!addWorker(command, false))
            reject(command);

addWorker

JDK线程池源码研究

  • 判断线程状态,增加 woker 数量
       retry:
      for (;;) {
          // 获取线程池句柄
          int c = ctl.get();
          // 当前线程池状态
          int rs = runStateOf(c);

          // 1.如果当前线程池状态为 STOP 及以上,返回false 
          // 2.如果当前线程状态为 SHUTDOWN,且任务和任务队列都为空,返回false 
          if (rs >= SHUTDOWN &&
              ! (rs == SHUTDOWN &&
                 firstTask == null &&
                 ! workQueue.isEmpty()))
              return false;

          for (;;) {
              // 获取线程数量
              int wc = workerCountOf(c);
              // 如果线程数量大于系统最大值,或大于设置的最大线程数量,返回false 
              if (wc >= CAPACITY ||
                  wc >= (core ? corePoolSize : maximumPoolSize))
                  return false;
              // CAS 操作 worker 数量,操作成功后跳出最外层循环,开始执行任务 
              if (compareAndIncrementWorkerCount(c))
                  break retry;
              // 刷新线程池句柄
              c = ctl.get();  // Re-read ctl 
              // 如果当前线程池状态有变化,跳出第二层循环, 
              // 即重新 check 线程池、任务、任务队列状态 
              if (runStateOf(c) != rs)
                  continue retry;
              // else CAS failed due to workerCount change; retry inner loop
          }
      }
  • 创建 woker,线程执行任务
      // woker 是否启动标记 
      boolean workerStarted = false;
      // woker 是否添加进 woker 组标记 
      boolean workerAdded = false;
      Worker w = null;
      try {
          // 创建 woker,在 woker 中通过线程工厂会创建线程
          w = new Worker(firstTask);
          // 获取 woker 的线程 
          final Thread t = w.thread;
          if (t != null) {
              // 获取全局锁 
              final ReentrantLock mainLock = this.mainLock;
              // 阻塞取锁
              mainLock.lock();
              try {
                  // 重新获取线程池状态 
                  int rs = runStateOf(ctl.get());
                  // 如果线程池是 RUNNING 状态 
                  // 或者是 (SHUTDOWN 状态 且任务为 null)-> excute 中删除失败的任务 
                  if (rs < SHUTDOWN ||
                      (rs == SHUTDOWN && firstTask == null)) {
                      // 如果当前线程已经启动过了,抛出异常 
                      if (t.isAlive()) // precheck that t is startable 
                          throw new IllegalThreadStateException();
                      // 讲 woker 添加进 woker 组中
                      workers.add(w);
                      // 记录线程峰值 
                      int s = workers.size();
                      if (s > largestPoolSize)
                          largestPoolSize = s;
                      // woker添加成功标记
                      workerAdded = true;
                  }
              } finally {
                  // 释放全局锁
                  mainLock.unlock();
              }
              // 如果添加 woker 时没有异常,开始执行任务,标记 woker 已开始工作 
              if (workerAdded) {
                  t.start();
                  workerStarted = true;
              }
          }
      } finally {
          // 如果 woker 启动失败,执行添加失败的策略  
          if (! workerStarted)
              // woker 组中移除 woker 
              // 减少 woker 组 woker 总数 
              // 尝试结束线程池
              addWorkerFailed(w);
      }
      // 返回 woker 是否开始 
      return workerStarted;

runWorker

Woker 本身是一个 Runnable 对象,在新建 Thread 对象时,线程工厂会把 Woker 传给 Thread,Thread.start 时,会调用 Woker.run,Woker.run 再调用外部的 runWoker 方法

JDK线程池源码研究

        // 获取当前线程 
        Thread wt = Thread.currentThread();
        // 获取当前任务 
        Runnable task = w.firstTask;
        // 将 woker 的任务设置为空
        w.firstTask = null;
        // 在创建 worker 时,会将状态标记为 -1 ,执行任务前将状态改为 0 
        // -1 时不允许结束这个 woker 
        // 防止 worker 刚加入 worker 组还没开始执行任务就被回收了
        w.unlock(); // allow interrupts 
       // 线程是否异常退出的标记 
        boolean completedAbruptly = true;
        try {
            // 当任务不为空,或任务队列的任务不为空时 
            while (task != null || (task = getTask()) != null) {
                // 锁住线程
                w.lock();
                //1. 线程池如果是 STOP 状态,且当前线程不是中断状态,则设置中断状态 
                //2. 线程池不是 STOP 状态,刷新当前线程线程的中断状态,刷新后再次获取线程池状态,如果是 STOP 且当前线程不是中断状态,则设置中断状态 
                // 第二次获取线程池状态是防止刷新线程中断状态后线程池结束,所以刷新后再 check 一次 
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 执行任务之前的钩子
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行任务之后的钩子
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 将任务重置为空
                    task = null;
                    // 增加 woker 的完成任务数量
                    w.completedTasks++;
                    // 释放当前 woker 的锁
                    w.unlock();
                    // 如果没有异常抛出则继续 while 循环
                }
            }
            // 异常退出走不到这里
            completedAbruptly = false;
        } finally {
            // 处理 while 循环结束后的工作 
            // 如果是异常退出,则新增一个 woker 代替 
            // 如果是正常退出,则清理 woker 组,尝试关闭线程池
            processWorkerExit(w, completedAbruptly);
        }

getTask

JDK线程池源码研究

        // 是否获取任务超时的标记 
        boolean timedOut = false; // Did the last poll() time out? 
        for (;;) {
            // 线程池句柄 
            int c = ctl.get();
            // 线程池状态 
            int rs = runStateOf(c);

            // 如果当前线程池是 STOP 及以上状态,则减少 woker 数量,返回 null 
            // 如果当前线程是 SHUTDOWN 且任务队列为空,则减少 woker 数量,返回 null 
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            // 当前线程数量 
            int wc = workerCountOf(c);

            // 是否允许超时 
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 1. 线程总数大于最大线程数且任务队列为空,减少 woker 数量,减少成功后返回 null,减少失败时回到循环第一行 
            // 2. 线程总数小于或等于最大线程数,允许超时且已超过时,且任务队列为空或线程数大于1,减少 woker 数量,减少成功后返回 null,减少失败时回到循环第一行 
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
               // 获取任务,如果允许超时,则使用 poll 方法并设置超时时间 
               // 否则使用 take 方法, 
               // 在 take 方法中,如果任务队列为空会调用 unsafe.park 将线程挂起 
               // workQueue.offer 添加任务时,会调用 unsafe.unpark 唤醒一个线程 
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 如果任务不会 null 则返回任务 
                if (r != null)
                    return r;
                // 任务为 null 将超时标记为 true
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }