线程池将任务的提交与任务的执行解耦开来,它对线程进行管理和调度,通过合理的设置能够避免创建过多的线程,提高资源利用率和系统吞吐量。
ThreadPoolExecutor源码
Java中线程池为ThreadPoolExecutor,通过不同的参数设置来实现不同的线程池机制。首先ThreadPoolExcutor继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口,它最核心构造函数如下:
|
|
| 参数名 | 作用 |
|---|---|
| corePoolSize | 核心线程池大小 |
| maximumPoolSize | 最大线程池大小 |
| keepAliveTime | 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间 |
| TimeUnit | keepAliveTime时间单位 |
| workQueue | 阻塞任务队列 |
| threadFactory | 新建线程工厂 |
| RejectedExecutionHandler | 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理 |
先看一下线程池的基本信息:
|
|
Execute方法
首先分析一下其核心方法execute:
|
|
由以上代码可知,线程处理、管理流程如下图:

下面简单总结下线程池在提交一个任务时的处理方法:
1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
AddWorker方法
下面继续分析一下addWorker方法:
|
|
RunWorker方法
下面继续看看worker如何执行任务:
|
|
GetTask方法
线程池的线程如何不断获取任务:
|
|
当runWorker执行异常时:
|
|
TryTerminate方法
当线程池异常或shutdown或减少线程池线程数量时,会调用tryTerminate方法:
|
|
Shutdown方法
最后看下shutdown方法:
|
|
WorkQueue的类型
直接提交
SynchronousQueue它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes或者可以拒绝的。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
无界队列
使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
有界队列
当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
ThreadFactory
每当线程池需要创建一个线程时,都会通过threadFactory的工厂方法newThreadFactory来创建一个新的、非守护线程,当然可以继承ThreadFactory来自定义创建的线程,设置线程的优先级、名字、增加日志等功能。
RejectedExecutionHandler
当当提交任务数超过maximumPoolSize、队列已满时将采取拒绝策略,这将调用RejectedExecutionHandler的rejectedExecution方法,ThreadPoolExcutor 提供了4种预定义的拒绝策略:
1.在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时 RejectedExecutionException
2.在 ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的 execute 本身即在excute本身的线程中执行run方法。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
3.在 ThreadPoolExecutor.DiscardPolicy 中,当前的任务将被抛弃。
4.在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部(注意优先级队列)的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
Executors常用的线程池配置
newFixedThreadPool
构造一个固定线程数目的线程池,配置的corePoolSize与maximumPoolSize大小相同,同时使用了一个无界LinkedBlockingQueue存放阻塞任务,因此多余的任务将存在再阻塞队列,不会由RejectedExecutionHandler处理
|
|
newCachedThreadPool
构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过60s将会销毁
|
|
newSingleThreadExecutor
构造一个只支持一个线程的线程池,配置corePoolSize=maximumPoolSize=1,无界阻塞队列LinkedBlockingQueue;保证任务由一个线程串行执行
|
|
newScheduledThreadPool
构造有定时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是无界队列,所以这个值是没有意义的
|
|
自定义线程池
下面我们自定义写一个线程池:
|
|
执行8次任务,那么线程池中达到最大线程数,队列也会打满,那么剩下的2个任务将会采取拒绝策略,我们复写了rejectExecution方法,将任务put阻塞提交到队列,所以所有的任务的都会执行:
|
|
执行结果如下:
|
|
另外,线程池中shutdown方法通过interrupt中断线程,设置为shutdown状态,但只能中断空闲线程,阻止继续提交任务,队列中的任务仍会执行,shutdownNow会interrupt所有的线程,设置为stop 状态,但不能保证所有的线程马上结束(参考interrupt作用),队列中的任务也会丢弃不执行。awaitTermination阻塞等待shutdown后线程结束。
Tips
一般来说,最好使用Executors提供的4种线程池,除非有特别的需求可以定义独特的线程池,线程池的大小确定是重点,线程池过大导致竞争激烈,线程池过小吞吐量较低。所以在任务量较少可以使用无界的队列,任务量很大使用有介的队列防止OOM,也有一种通用的计算方法:N(thread)=N(cpu)*U(cpu)(1+W / C),其中分别为cpu数量,cpu利用率,等待时间和计算时间,最大线程数一般设为2N+1,N是CPU核数。当然线程池大小还受到内存,io等其他因素影响。
最后注意线程间的依赖,在有界线程池中容易产生死锁现象。
线程池这部分知识感觉有些复杂,掌握起来有些困难,底层一些机制源码不太容易理解。