线程池-从零到一了解并掌握线程池

Java
217
0
0
2024-01-27

创建线程的方式

主要有两大类方式:
  1. 通过Executors创建(6种)
  2. 通过ThreadPoolExecutorPools创建(1种)
  3. Executors.newFixedThreadPool()

注意:这里主要是考察你实际到底用没用过。真正使用过的一定会说这些创建方式的优缺点。!!!不建议使用Executors创建线程:

  • FixedThreadPoolSingleThreadPool允许的请求队列长度为Integer.MAX_VALUE,从而可能会堆积大量请求。造成OOM(因为newFixedThreadPool中队列的实现是LinkedBlockingQueue而LinkedBlockingQueue 的最大容量是 Integer.MAX_VALUE)源码如下:
  • CachedThreadPoolScheduledThreadPool 允许的创建线程数量为Integer.MAX_VALUE可能会创建大量的线程,从而导致 OOM。

image.png

Executors

突击面试可忽略 每个线程池的具体demo
Executors.newFixedThreadPool:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待
    public static void fixedThreadPool() {
        // 创建 2 个数据级的线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        // 创建任务
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
            }
        };

        // 线程池执行任务(一次添加 4 个任务)
        // 执行任务的方法有两种:submit 和 execute
        threadPool.submit(runnable);  // 执行方式 1:submit
        threadPool.execute(runnable); // 执行方式 2:execute
        threadPool.execute(runnable);
        threadPool.execute(runnable);
    }

Lambda表达式的写法:

public static void fixedThreadPool() {
    // 创建 2 个数据级的线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(2);

    // 创建任务
    Runnable runnable = () -> System.out.println("任务被执行,线程:" + Thread.currentThread().getName());

    // 线程池执行任务(一次添加 4 个任务)
    // 执行任务的方法有两种:submit 和 execute
    threadPool.submit(runnable);  // 执行方式 1:submit
    threadPool.execute(runnable); // 执行方式 2:execute
    threadPool.execute(runnable);
    threadPool.execute(runnable);
}

Executors.newCachedThreadPool():创建一个可以缓存的线程池,若线程数超过处理所需,则会缓存一段时间后回收。若线程数不够,则新建线程。
public static void cachedThreadPool() {
    // 创建线程池
    ExecutorService threadPool = Executors.newCachedThreadPool();
    // 执行任务
    for (int i = 0; i < 10; i++) {
        threadPool.execute(() -> {
            System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        });
    }
}
Executors.newSingleThreadExecutor():创建单个线程数的线程池,它可以保证先进先出的执行顺序。
public static void singleThreadExecutor() {
    // 创建线程池
    ExecutorService threadPool = Executors.newSingleThreadExecutor();
    // 执行任务
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(() -> {
            System.out.println(index + ":任务被执行");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        });
    }
}
Executors.newScheduledThreadPool():创建一个可以执行延迟任务的线程池
public static void scheduledThreadPool() {
    // 创建线程池
    ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
    // 添加定时执行任务(1s 后执行)
    System.out.println("添加任务,时间:" + new Date());
    threadPool.schedule(() -> {
        System.out.println("任务被执行,时间:" + new Date());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
    }, 1, TimeUnit.SECONDS);
}

Executors.newSingleThreadScheuledExecutor():创建一个单线程的可以执行延迟任务的线程池。
public static void SingleThreadScheduledExecutor() {
    // 创建线程池
    ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
    // 添加定时执行任务(2s 后执行)
    System.out.println("添加任务,时间:" + new Date());
    threadPool.schedule(() -> {
        System.out.println("任务被执行,时间:" + new Date());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
    }, 2, TimeUnit.SECONDS);
}

Executors.newWorkStealingPool():创建一个抢占式执行的线程池(任务执行顺序不确定),注意此方法只有在 JDK 1.8+ 版本中才能使用。
public static void workStealingPool() {
    // 创建线程池
    ExecutorService threadPool = Executors.newWorkStealingPool();
    // 执行任务
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(() -> {
            System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
        });
    }
    // 确保任务执行完成
    while (!threadPool.isTerminated()) {
    }
}

ThreadPoolExecutor():最原始的创建线程池的方式,它包含了 7 个参数可供设置。
public static void myThreadPoolExecutor() {
    // 创建线程池
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
    // 执行任务
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(() -> {
            System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

线程池的参数

一共有七个参数:

参数1:corePoolSize
核心线程数,线程池中始终存货的线程数
参数2:maximumPoolSize
最大线程数,线程池中允许的最大线程数,当线程池中的任务队列满了之后可以创建的最大线程数
参数3:keepAliveTime
最大线程数可以存活之间,当线程中没有任务执行时,最大线程就会销毁一部分,最终保持核心线程数量的线程
参数4:unit
单位 :是和参数3keepAliveTime配合使用,,合在一起用于设定线程的存活时间,参数keepAliveTime的时间单位i有以下7种可以选择:
  • TimeUnit.DAYS:天
  • TImeUnit.HOURS:小时
  • TimeUnit.MINUTES:分
  • TimeUnit.SECONDS:秒
  • TimeUnit.MILLISECONDS:毫秒
  • TimeUnit.MICROSECONDS:微妙
  • TimeUnit.NANOSECONDS:纳秒
参数5:workQueue

一个阻塞队列,用来存储线程池等待执行的任务,均为线程安全,包含以下7种类型:

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

较常用的是 LinkedBlockingQueue Synchronous,线程池的排队策略与 BlockingQueue 有关。

参数6:threadFactory
线程工厂,主要用来创建线程,默认为正常优先级,非守护线程
参数7:handler
拒绝策略,拒绝处理任务时的策略,系统有4种可选
AbortPolicy:拒绝并抛出异常
CallerRunsPolicy:使用当前调用的线程来执行任务
DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务
DiscardPolicy:忽略并抛弃当前任务
默认的策略为:AbortPolicy

线程池的执行流程

ThreadPoolExecutor 关键节点的执行流程如下:

  • 当线程数小于核心线程数时,创建线程。
  • 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  • 当线程数大于等于核心线程数,且任务队列已满:若线程数小于最大线程数,创建线程;若线程数等于最大线程数,抛出异常,拒绝任务。

线程池的执行流程如下图所示:

image.png

线程池有哪些执行方法

execute和submit区别
从提交的任务类型角度:
  1. execute和submit都是线程池的方法,execute只能提交Runnable类型的任务
  2. submit既能提交Runnable类型的任务,也能提交Callable类型的任务
异常
  • execute会直接抛出任务执行时的异常,可以使用try catch来捕获,和普通线程的处理方式完全一致
  • submit会吃掉异常,可以通过Future的get方法将任务执行时的异常重新抛出。
返回值
  • execute没有返回值
  • submit有返回值
从API层面理解execute和submit
  • execute是在Executor接口中定义的。ThreadPoolExecutor()中并没有定义,但是ThreadPoolExecutor类继承了AbstractExecutorService抽象类,而该抽象类实现了ExecutorService接口,ExecutorService接口又继承了Executor接口。

总结:也就是说ThreadPoolExecutor实现了execute()方法,

  • submit()方法时ExecutorService接口中定义的,具体的实现是由AbstractExecutorService进行,

再AbstractExecutorService中submit方法一共被重载了三次,分别是:

  1. public Future<?> submit(Runnable task)
该重载submit方法提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该Future的get方法在成功完成时将会返回null。
  1. publicFuturesubmit(Runnable task, T result)
Runnable 任务用于执行,并返回一个表示该任务的Future。该Future的get方法在成功完成时将会返回给定的结果。
  1. publicFuturesubmit(Callabletask)
提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。 如果想立即阻塞任务的等待,则可以使用 result = execute.submit(aCallable).get(); 形式的构造。

从上述方法中我们可以看出2、3就是说execute不支持Future那一套 来接收多线程的执行结果 ,而submit可以,1中说该Future的get方法在成功完成时将会返回null,那要是返回null,和用execute有什么区别?我直接使用execute就好了 接下来我们可以看下AbstractExecutorService 源码:

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

可以明显的看到submit底层调用的时候,又将任务交给了execute()方法。 总结:如果提交的任务不需要一个结果的话直接用execute()会提高性能

捕获线程池中的异常

有两种种方法可以捕获线程池中的异常。

  • 一种方法是通过手动使用try-catch块来捕获异常并打印出来,但这样的写法比较繁琐和不够优雅。
  • 另一种方法是利用Thread类中的dispatchUncaughtException(Throwable e)方法。当线程抛出异常时,JVM最终会回调这个方法来进行最后的异常处理,而且该异常会被ThreadGroup类中的uncaughtException方法处理。我们可以在创建Thread对象时绑定一个自定义的异常捕获处理器,最终发生异常时会打印我们的错误日志。

下面是一个示例代码:

public static void main(String[] args) {
    Thread thread = new Thread(() -> {
        log.info("------- info -------");
        throw new RuntimeException("运行时异常~~~~~");
    });
    Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (t, e) -> {
        log.error("Exception in Thread..... ", e);
    };
    thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
    thread.start();
}

然而,在项目中我们更常使用线程池而非单独的线程。线程池中的线程对象实际上是由线程工厂创建的。我们可以在线程工厂中设置一个异常捕获处理器。以下是使用ThreadPoolExecutor创建线程池时设置线程工厂的示例代码:

private static ExecutorService executor = new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(500),
        new NamedThreadFactory("refresh-ipDetail", (ThreadGroup)null,false,
                new GlobalUncaughtExceptionHandler()));
@Slf4j
public class GlobalUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        log.error("Exception in thread {} ", t.getName(), e);
        e.printStackTrace();
    }
}

但是在使用Spring的线程池时,由于其线程工厂无法设置任何值,我们可以采用装饰器模式。我们将Spring的线程池线程工厂传入装饰器中,并调用其创建线程的方法。然后,我们添加我们自定义的异常捕获处理器。在使用线程池时,我们替换掉Spring的线程工厂,并将本类的线程工厂进行包装传递进去,从而实现线程池的异常捕获。以下是具体实现方式的示例代码:

@Slf4j
@AllArgsConstructor
public class MyThreadFactory implements ThreadFactory {
    private ThreadFactory factory;

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = factory.newThread(r);
        thread.setUncaughtExceptionHandler(new GlobalUncaughtExceptionHandler());
        thread.setDaemon(false);
        thread.setPriority(5);
        return thread;
    }
}
@Bean
public ThreadPoolTaskExecutor websocketExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(16);
    executor.setMaxPoolSize(16);
    executor.setQueueCapacity(1000);
    executor.setThreadNamePrefix("websocket-executor-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    executor.setThreadFactory(new MyThreadFactory(executor));
    executor.initialize();
    return executor;
}

线程池中的核心线程数如何确定?

根据以往的经验,对于CPU密集型任务,核心线程数应该等于机器的核数加一。这样可以充分利用多核CPU的计算能力,并保留一个额外的线程用于处理突发任务。对于IO密集型任务,核心线程数应该设置为两倍的CPU核数,因为IO操作通常需要较多的等待时间,可以利用多个线程同时处理。

当任务数超过核心线程数时,如何让它直接启用最大的线程数?

首先需要了解的是默认情况下当任务数超过线程池的核心线程数时,默认会进入到队列中,等队列满了的时候才会启用线程池的最大线程数。如果想要达到直接启用线程池的最大线程数的话,首先我们要知道线程池的工作原理

  • 第一步:预热核心线程
  • 第二部:把任务添加到阻塞队列
  • 第三部:如何阻塞队列已满(添加失败),则创建非核心线程增加处理效率
  • 第四部:如果非核心线程数量达到了阈值,则执行拒绝策略

综上所述的步骤中,如果我们想要这个任务不进入到阻塞队列中。我们只需要人为手动干预第二部即可。那么这就很简单了。因为我们可以使用SynchronousQueue在创建线程池的时候指定线程池的阻塞队列的参数即可。SynchronousQueue队列是不能存储元素的一个队列,它的特性是没生产一个任务就需要指定一个消费者来处理这个任务,否则就会阻塞生产者。

线程池如何知道一个线程的任务执行完成的

首先我们需要了解的是,当我们把一个任务丢给线程池执行的时候,线程池会调度工作线程来执行这个任务的run方法,当任务的run方法正常执行结束后,也就意味着这个任务完成,所以线程池中的工作线程是通过同步调用任务的run方法,并且等待run方法的返回后,再去统计任务的完成数量。 综上所述我们如果从外部想要获取线程池内部的任务执行状态有以下几种方法可以实现。

isTerminated()方法

该方法是线程池内部提供的方法,通过该方法可以去判断线程池的运行状态,这样我们就可以循环去判断isTerminated()方法的返回值,来去获取线程池的运行状态,一旦返回的是Terminated就意味着线程池中的所有任务都已经执行完成了。 但是这个方法有一个弊端:就是通过该方法获取线程状态的一个前提就是程序需要主动调用shutdown()方法,但是我们在实际业务是很少主动调用这个方法的主动关闭线程池的。所以该方法实用性和灵活性上有些欠佳

submit()方法

线程池中有提供一个submit()方法,它提供了一个Future的返回值;可以通过**future.get()**去获取任务的执行结果,因为该方法在线程没有执行完之前一直都是阻塞状态,直到任务执行结束才会正常返回。因此只有该方法正常返回,才意味着传入线程池中的任务已经执行完成了。

CountDownLatch方法

这是Java JUC包下的。它是一个计数器;我们可以通过初试化一个计数器进行倒计时。可以利用它提供的两个方法:await()方法阻塞线程,countDown()倒计时,一旦倒计时为0,所有被阻塞在await方法的线程都会被释放。

总结:

其实想要获取线程是否执行完成,我们需要知道的是线程结束后的状态,而线程本身是没有返回值的。所以只能通过阻塞+唤醒的方式来实现。