首页 > 编程笔记 > Java笔记 阅读:3

Java线程池的底层实现(新手必看)

线程池的主要作用是控制运行的线程数,在处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数超过了最大线程数,那么超出数量的线程将排队等候,直到其他线程执行完毕,再从队列中取出任务来执行。线程池的主要特点为线程复用、控制最大并发数、线程管理等。

Java 线程池的实现主要依赖于 java.util.concurrent 包中的 ThreadPoolExecutor 类。它允许配置多个参数,包括核心线程数、最大线程数、保持活动时间、工作等待队列等。

下面我们通过 ThreadPoolExecutor 类的一些核心源码,更深入地探讨 Java 线程池 ThreadPoolExecutor 的底层实现原理。

线程管理

Java 线程池是通过内部的 HashSet 集合来管理工作线程的,线程在创建时会被包装成 Worker 对象。Worker 是 ThreadPoolExecutor 的一个内部类,实现了 Runnable 接口。
private final HashSet<Worker> workers = new HashSet<Worker>();
线程池维护一组活跃的工作线程,这些线程用来执行队列中的任务。当有新任务提交时,线程池会检查当前活跃线程数是否小于核心线程数,如果是,则创建一个新的核心线程来处理任务;如果核心线程都在忙,则将任务添加到工作等待队列中。

工作等待队列

工作等待队列是一个阻塞队列,线程池使用一个阻塞队列来存放待执行的任务。

常用的工作等待队列有 LinkedBlockingQueue、SynchronousQueue 和 ArrayBlockingQueue 等多种类型。ThreadPoolExecutor 可以接收不同类型的阻塞队列实例。
private final BlockingQueue<Runnable> workQueue;
当我们在程序中执行 execute(Runnable command) 方法提交任务时,线程池会判断线程数是否已达到核心线程数,如果未达到核心线程数,则优先创建线程执行任务;否则,任务会被添加到这个队列中等待执行。

任务执行

线程池中的每个工作线程都会不断地从任务队列中取任务来执行,核心代码如下:
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Runnable task = w.firstTask;
    w.firstTask = null;
    while (task != null || (task = getTask()) != null) {
        task.run();
        task = null;
    }
}
在上述代码中,getTask() 方法用于从队列中取出任务。

线程回收

对于非核心线程,线程池通过设置存活时间来控制超时空闲线程的回收。当线程池中的线程数超过核心线程数时,额外的线程在空闲一段指定的时间后,如果没有被分配新的任务,将被终止并回收,这个指定的时间就是线程的保持活动时间。
private volatile long keepAliveTime;
当工作线程在等待新任务时,如果其空闲时间超过了 keepAliveTime,并且线程池中的线程数超过核心线程数,则这个线程会被终止并回收。

拒绝策略

当工作等待队列已满且线程数已达到最大线程数时,线程池无法处理更多新的任务,这时它会使用拒绝策略(通过 RejectedExecutionHandler 执行)来处理新提交的任务。

ThreadPoolExecutor 提供了 4 种拒绝策略,分别为直接拒绝(AbortPolicy)、抛弃队列中最早的未处理的任务(DiscardOldestPolicy)、丢弃无法处理的任务(DiscardPolicy)、由提交任务的线程自己执行任务(CallerRunsPolicy)。
private volatile RejectedExecutionHandler handler;
开发者可以基于实现 RejectedExecutionHandler 接口的 rejectedExecution(Runna-ble r, ThreadPoolExecutor e) 方法自定义拒绝策略。

ThreadFactory

ThreadFactory 用于创建新线程,通过 ThreadFactory 可以自定义线程工厂来设置线程的名称、优先级、守护状态等。
private volatile ThreadFactory threadFactory;

ThreadPoolExecutor

ThreadPoolExecutor 提供了 beforeExecute()、afterExecute() 和 terminated() 等方法,可以在任务执行前、后以及线程池完全终止时执行一些自定义的操作。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
通过重写上述这些方法,可以在任务执行之前和之后及线程池终止时插入自定义逻辑。

内部流程控制

ThreadPoolExecutor 内部使用 AtomicInteger() 来维护线程池的状态和控制线程数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
通过位运算和原子操作来确保线程池状态的正确性。

工作线程的实现

工作线程是在调用 execute() 方法时创建的,这些线程会从队列中获取任务并执行。Worker 包装类不仅封装了原始线程,还负责管理线程的生命周期和任务的执行状态。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;
    Runnable firstTask;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    public void run() {
        runWorker(this);
    }
    //  其他方法省略
}
实际的 ThreadPoolExecutor 类代码十分复杂,涵盖很多细节和错误处理逻辑,但线程池的核心实现原理不会变化。如果大家需要更深入地了解,可以认真阅读和研究 java.util.concurrent.ThreadPoolExecutor 源码,这样更有利于对线程池原理进行理解。

相关文章