Java线程池的具体实现(附带源码和解析)
使用线程池最重要的原因就是可以根据系统的需求和硬件环境灵活地控制线程的数量,并且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统的运行压力。
使用线程池具有以下优势:
- 使线程和任务分离,提升线程重用性;
- 控制线程并发数量,降低服务器压力,统一管理所有线程;
- 提升系统响应速度。如果创建线程的时间为 T1,执行任务的时间为 T2,销毁线程的时间为 T3,那么使用线程池则免去了时间 T1 和时间 T3。
创建线程池有两种方式:
- 一种是使用 Executors 的默认方法;
- 另一种是通过 ThreadPoolExecutor 进行自定义的方法。
不推荐前者是因为前者的配置很多,而且都是取 Integer 的最大值,很容易造成内存溢出。
ThreadPoolExecutor 自定义的语法格式如下:
ThreadPoolExecutor tpe = new ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler );参数说明:
- int corePoolSize:核心线程数;
- int maximumPoolSize:最大线程数;
- long keepAliveTime:非核心线程的闲置超时时间;
- TimeUnit unit:释放时间的单位,如 m(分钟)、h(小时)、d(天)等;
- BlockingQueue<Runnable> workQueue:阻塞消息队列;
- ThreadFactory threadFactory:线程工厂;
- RejectedExecutionHandler handler:拒绝策略。
下面对上述 7 个参数中的“核心线程数”、“最大线程数”、“阻塞消息队列”、“线程工厂”、“拒绝策略”进行详解。
1) 核心线程数
当线程是 IO 密集型时,主要消耗磁盘的读写性能,设置为 2*n,n 为当前服务器核数(如 8 核 16G 的服务器被设置为 16,由 Runtime.getRuntime().availableProcessors() 进行获取)。当线程是 CPU 密集型时,主要消耗 CPU 的性能,设置为 n+1。
2) 最大线程数
当核心线程数和消息队列都满了后,才会创建最大线程,直到达到最大线程数。之后的线程就会执行拒绝策略。3) 阻塞消息队列
- ArrayBlockingQueue:基于数组的先进先出队列。此队列创建时必须指定大小,读写用一把锁,性能较差;
- LinkedBlockingQueue:基于链表的先进先出队列。如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE。写和读用两把锁进行操作,因此性能较好;
- synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是直接新建一个线程来执行新的任务。
需要注意的是,当核心线程数满了后,新线程会先被存储在消息队列中;当消息队列也满了后,才会创建最大线程;直到达到最大线程数,之后的线程就会执行拒绝策略。
4) 线程工厂
创建线程的类,既可以使用默认工厂,也可以自定义线程工厂实现 ThreadFactory 接口,重写 newThread() 方法。自定义工厂的优势在于可以设置线程名或者定义辅助线程。5) 拒绝策略
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常;
- ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常;
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务;
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务。
当自定义拒绝策略时,需要实现 RejectedExecutionHandler 接口,重写 rejectedExecution(Runnable r,ThreadPoolExecutor executor) 方法。此方法中可以通过类型转换确定线程具体的类型,从而获取线程的相关信息。
需要注意的是,只能转换线程池中通过 Runnable 接口实现的线程。如果线程池执行的线程是通过实现 Callable 实现的,在执行前会把线程封装成 FutureTask,这样相当于转换再转换,就没法转换成原来的对象了。
execute 只能提交 Runnable 线程,submit 可以提交所有的 Runnable、Callable、Thread 线程。
下面将演示如何通过 ThreadPoolExecutor 自定义一个线程池。代码如下:
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class ExecutorTest { public static class ThreadOne implements Runnable { private Integer number; public ThreadOne(Integer temp) { this.number = temp; } @Override public void run() { System.out.println(Thread.currentThread().getName() + ":ThreadOne--" + number); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class ThreadTwo extends Thread { @Override public void run() { // 线程二的逻辑 } } public static class ThreadThree implements Callable<Integer> { Integer number; public ThreadThree(Integer temp) { this.number = temp; } @Override public Integer call() throws Exception { System.out.println(Thread.currentThread().getName() + ":ThreadThree--" + number); Thread.sleep(3000); return number; } } public static void main(String[] args) { ExecutorService es = new ThreadPoolExecutor(3, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5), new Factory(), new Handler()); try { for (int i = 0; i < 50; i++) { es.execute(new ThreadOne(i)); } } finally { es.shutdown(); } } } private static class Factory implements ThreadFactory { private AtomicInteger count = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thd = new Thread(r); String threadName = "Factory--" + count.addAndGet(1); thd.setName(threadName); System.out.println("线程" + threadName + "创建完成"); return thd; } } private static class Handler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { ThreadOne thdOne = (ThreadOne) r; System.out.println(thdOne.number + "被阻塞了"); } }运行结果如下(运行结果不唯一):
线程Factory--1创建完成
线程Factory--2创建完成
Factory--1:ThreadOne--0
线程Factory--3创建完成
线程Factory--4创建完成
Factory--2:ThreadOne--1
线程Factory--5创建完成
线程Factory--6创建完成
线程Factory--7创建完成
线程Factory--8创建完成
Factory--3:ThreadOne--2
线程Factory--9创建完成
线程Factory--10创建完成
15被阻塞了
16被阻塞了
17被阻塞了
18被阻塞了
19被阻塞了
20被阻塞了
21被阻塞了
22被阻塞了
23被阻塞了
24被阻塞了
25被阻塞了
26被阻塞了
27被阻塞了
28被阻塞了
29被阻塞了
30被阻塞了
31被阻塞了
32被阻塞了
Factory--4:ThreadOne--8
33被阻塞了
34被阻塞了
35被阻塞了
36被阻塞了
37被阻塞了
38被阻塞了
39被阻塞了
40被阻塞了
41被阻塞了
42被阻塞了
43被阻塞了
44被阻塞了
45被阻塞了
46被阻塞了
47被阻塞了
48被阻塞了
49被阻塞了
Factory--5:ThreadOne--9
Factory--6:ThreadOne--10
Factory--7:ThreadOne--11
Factory--8:ThreadOne--12
Factory--9:ThreadOne--13
Factory--10:ThreadOne--14
Factory--1:ThreadOne--3
Factory--2:ThreadOne--4
Factory--3:ThreadOne--5
Factory--4:ThreadOne--6
Factory--10:ThreadOne--7