任务调度线程池

Java
214
0
0
2024-03-10
Timer

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但 由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个 任务的延迟或异常都将会影响到之后的任务。

public class Test {
    public static void main(String[] args) {
        Timer timer = new Timer();
        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                log.debug("task 1");
                sleep(2);
            }
        };
        TimerTask task2 = new TimerTask() {
            @Override

            public void run() {
                log.debug("task 2");
            }
        };
        // 使用 timer 添加两个任务,希望它们都在 1s 后执行
        // 但由于 timer 内只有一个线程来顺序执行队列中的任务,
        //因此『任务1』的延时,影响了『任务2』的执行

        timer.schedule(task1, 1000);
        timer.schedule(task2, 1000);
    }

}

输出

20:46:09.444 c.TestTimer [main] - start... 20:46:10.447 c.TestTimer [Timer-0] - task 1 20:46:12.448 c.TestTimer [Timer-0] - task 2
ScheduledExecutorService

线程池支持定时以及周期性执行任务,创建一个corePoolSize为传入参数,最大线程数为整形的最大数的线程池

   public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

ScheduledThreadPoolExecutor类的构造:

   public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

// 添加两个任务,希望它们都在 1s 后执行

executor.schedule(() -> {
     System.out.println("任务1,执行时间:" + new Date());
     try { Thread.sleep(2000); } catch (InterruptedException e) { }
}, 1000, TimeUnit.MILLISECONDS);

executor.schedule(() -> {
     System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);

输出

任务2,执行时间:Fri Jun 23 18:04:46 CST 2023 任务1,执行时间:Fri Jun 23 18:04:46 CST 2023

scheduleAtFixedRate 例子:

构造方法

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

提交一个定期操作,该操作在给定的初始延迟后首先启用,随后在给定的时间段内启用;也就是说,执行将在 之后开始initialDelay,然后 、 initialDelay + 2 * period然后 initialDelay + period,依此类推。
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);

log.debug("start...");

pool.scheduleAtFixedRate(() -> {
 log.debug("running...");
}, 1, 1, TimeUnit.SECONDS);

输出

21:45:43.167 c.TestTimer [main] - start... 21:45:44.215 c.TestTimer [pool-1-thread-1] - running... 21:45:45.215 c.TestTimer [pool-1-thread-1] - running... 21:45:46.215 c.TestTimer [pool-1-thread-1] - running... 21:45:47.215 c.TestTimer [pool-1-thread-1] - running...

scheduleAtFixedRate 例子(任务执行时间超过了间隔时间):

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);

log.debug("start...");

pool.scheduleAtFixedRate(() -> {
 log.debug("running...");
 sleep(2);
}, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s

21:44:30.311 c.TestTimer [main] - start... 21:44:31.360 c.TestTimer [pool-1-thread-1] - running... 21:44:33.361 c.TestTimer [pool-1-thread-1] - running... 21:44:35.362 c.TestTimer [pool-1-thread-1] - running... 21:44:37.362 c.TestTimer [pool-1-thread-1] - running...

scheduleWithFixedDelay 例子:

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);

log.debug("start...");

pool.scheduleWithFixedDelay(()-> {
 log.debug("running...");
 sleep(2);
}, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 -> 延时 -> 下一个任务开始 所 以间隔都是 3s

21:40:55.078 c.TestTimer [main] - start... 21:40:56.140 c.TestTimer [pool-1-thread-1] - running... 21:40:59.143 c.TestTimer [pool-1-thread-1] - running... 21:41:02.145 c.TestTimer [pool-1-thread-1] - running... 21:41:05.147 c.TestTimer [pool-1-thread-1] - running...

评价 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线 程也不会被释放。用来执行延迟或反复执行的任务

正确处理执行任务异常

方法1:主动捉异常

ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
 try {
     log.debug("task1");
     int i = 1 / 0;
 } catch (Exception e) {
     log.error("error:", e);
 }
})

方法2:使用 Future

ExecutorService pool = Executors.newFixedThreadPool(1);

Future<Boolean> f = pool.submit(() -> {
     log.debug("task1");
     int i = 1 / 0;
     return true;
});

log.debug("result:{}", f.get());