Java线程池的底层实现(新手必看)
线程池的主要作用是控制运行的线程数,在处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数超过了最大线程数,那么超出数量的线程将排队等候,直到其他线程执行完毕,再从队列中取出任务来执行。线程池的主要特点为线程复用、控制最大并发数、线程管理等。
Java 线程池的实现主要依赖于 java.util.concurrent 包中的 ThreadPoolExecutor 类。它允许配置多个参数,包括核心线程数、最大线程数、保持活动时间、工作等待队列等。
下面我们通过 ThreadPoolExecutor 类的一些核心源码,更深入地探讨 Java 线程池 ThreadPoolExecutor 的底层实现原理。
常用的工作等待队列有 LinkedBlockingQueue、SynchronousQueue 和 ArrayBlockingQueue 等多种类型。ThreadPoolExecutor 可以接收不同类型的阻塞队列实例。
ThreadPoolExecutor 提供了 4 种拒绝策略,分别为直接拒绝(AbortPolicy)、抛弃队列中最早的未处理的任务(DiscardOldestPolicy)、丢弃无法处理的任务(DiscardPolicy)、由提交任务的线程自己执行任务(CallerRunsPolicy)。
Java 线程池的实现主要依赖于 java.util.concurrent 包中的 ThreadPoolExecutor 类。它允许配置多个参数,包括核心线程数、最大线程数、保持活动时间、工作等待队列等。
下面我们通过 ThreadPoolExecutor 类的一些核心源码,更深入地探讨 Java 线程池 ThreadPoolExecutor 的底层实现原理。
线程管理
Java 线程池是通过内部的 HashSet 集合来管理工作线程的,线程在创建时会被包装成 Worker 对象。Worker 是 ThreadPoolExecutor 的一个内部类,实现了 Runnable 接口。private final HashSet<Worker> workers = new HashSet<Worker>();线程池维护一组活跃的工作线程,这些线程用来执行队列中的任务。当有新任务提交时,线程池会检查当前活跃线程数是否小于核心线程数,如果是,则创建一个新的核心线程来处理任务;如果核心线程都在忙,则将任务添加到工作等待队列中。
工作等待队列
工作等待队列是一个阻塞队列,线程池使用一个阻塞队列来存放待执行的任务。常用的工作等待队列有 LinkedBlockingQueue、SynchronousQueue 和 ArrayBlockingQueue 等多种类型。ThreadPoolExecutor 可以接收不同类型的阻塞队列实例。
private final BlockingQueue<Runnable> workQueue;当我们在程序中执行 execute(Runnable command) 方法提交任务时,线程池会判断线程数是否已达到核心线程数,如果未达到核心线程数,则优先创建线程执行任务;否则,任务会被添加到这个队列中等待执行。
任务执行
线程池中的每个工作线程都会不断地从任务队列中取任务来执行,核心代码如下:public void run() { runWorker(this); } final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null) { task.run(); task = null; } }在上述代码中,getTask() 方法用于从队列中取出任务。
线程回收
对于非核心线程,线程池通过设置存活时间来控制超时空闲线程的回收。当线程池中的线程数超过核心线程数时,额外的线程在空闲一段指定的时间后,如果没有被分配新的任务,将被终止并回收,这个指定的时间就是线程的保持活动时间。private volatile long keepAliveTime;当工作线程在等待新任务时,如果其空闲时间超过了 keepAliveTime,并且线程池中的线程数超过核心线程数,则这个线程会被终止并回收。
拒绝策略
当工作等待队列已满且线程数已达到最大线程数时,线程池无法处理更多新的任务,这时它会使用拒绝策略(通过 RejectedExecutionHandler 执行)来处理新提交的任务。ThreadPoolExecutor 提供了 4 种拒绝策略,分别为直接拒绝(AbortPolicy)、抛弃队列中最早的未处理的任务(DiscardOldestPolicy)、丢弃无法处理的任务(DiscardPolicy)、由提交任务的线程自己执行任务(CallerRunsPolicy)。
private volatile RejectedExecutionHandler handler;开发者可以基于实现 RejectedExecutionHandler 接口的 rejectedExecution(Runna-ble r, ThreadPoolExecutor e) 方法自定义拒绝策略。
ThreadFactory
ThreadFactory 用于创建新线程,通过 ThreadFactory 可以自定义线程工厂来设置线程的名称、优先级、守护状态等。private volatile ThreadFactory threadFactory;
ThreadPoolExecutor
ThreadPoolExecutor 提供了 beforeExecute()、afterExecute() 和 terminated() 等方法,可以在任务执行前、后以及线程池完全终止时执行一些自定义的操作。protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }通过重写上述这些方法,可以在任务执行之前和之后及线程池终止时插入自定义逻辑。
内部流程控制
ThreadPoolExecutor 内部使用 AtomicInteger() 来维护线程池的状态和控制线程数。private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));通过位运算和原子操作来确保线程池状态的正确性。
工作线程的实现
工作线程是在调用 execute() 方法时创建的,这些线程会从队列中获取任务并执行。Worker 包装类不仅封装了原始线程,还负责管理线程的生命周期和任务的执行状态。private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } // 其他方法省略 }实际的 ThreadPoolExecutor 类代码十分复杂,涵盖很多细节和错误处理逻辑,但线程池的核心实现原理不会变化。如果大家需要更深入地了解,可以认真阅读和研究 java.util.concurrent.ThreadPoolExecutor 源码,这样更有利于对线程池原理进行理解。