一、线程池简介
两类线程池,始于jdk5,java.util.concurrent包下,开发者:Doug Lea
使用线程池的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
1.1 Executors
内部主要是有四类线程池:
newFixedThreadPool:线程数固定,但是阻塞队列不限,会堆积大量请求,内部调用ThreadPoolExecutor
newCachedThreadPool:最大线程数不限,每次来一个新的请求,若没有空闲线程,就创建新线程,内部调用ThreadPoolExecutor
newSingleThreadPool:只有一个线程的线程池,队列不限,可保证顺序的执行
newScheduleThreadPool:线程不限,用于定时任务或延时任务的执行,一般也可以用Timer来实现
阿里巴巴Java开发手册 1.7.5编程规约并发处理章节明确指出 【 强制】:
1.2 ThreadPoolExecutor
开发者可以更加直观的进行参数设置,而不是无脑调用Executors里包装好的线程池
1.2.1 参数介绍
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
-
workQueue
:等待队列
- ArrayBlockingQueue LinkedBlockingQueue:常用的FIFO阻塞队列
- PriorityBlockingQueue DelayedWorkQueue:用于优先级设置的,或者定时任务的线程池
- SynchronousQueue:不存储队列的阻塞队列,用于cached线程池,线程可无限制创建
- keepAliveTime:线程空闲时间
- allowCoreThreadTimeout:是否允许核心线程的kill,一般为false,否则线程仍会重新创建
- ThreadFactory:用于产生线程的工厂
- RejectedExecutionHandler:拒绝策略
- AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,适用于比较重要的任务,可及时发现系统异常
- DiscardPolicy:丢弃任务但不抛出异常,适用于业务不重要的一些任务
- DiscardOldestPolicy:丢弃队列最老的任务,然后重新提交被拒绝的任务
- CallerRunsPolicy:由调用线程处理该任务,即谁提交谁处理
二、源码分析
2.1 提交任务
execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* clt记录着runState和workerCount
*/
int c = ctl.get();
/*
* workerCountOf方法取出低29位的值,表示当前活动的线程数;
* 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
* 并把任务添加到该线程中。
*/
if (workerCountOf(c) < corePoolSize) {
/*
* addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
* 如果为true,根据corePoolSize来判断;
* 如果为false,则根据maximumPoolSize来判断
*/
if (addWorker(command, true))
return;
/*
* 如果添加失败,则重新获取ctl值
* 失败的原因可能是:
* 1.线程池已经shutdown,shutdown的线程池不再接收新任务
* 2.workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
*/
c = ctl.get();
}
/*
* 如果当前线程池是运行状态并且任务添加到队列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl值
int recheck = ctl.get();
// 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
// 这时需要移除该command
// 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
* 这里传入的参数表示:
* 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
* 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
* 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}
2.2 新增线程
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);
/*
* 这个if判断
* 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
* 接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列为空
*
* 首先考虑rs == SHUTDOWN的情况
* 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false,
* 因为队列中已经没有任务了,不需要再添加线程了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
// 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
// 如果为false则根据maximumPoolSize来比较。
//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失败,则重新获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建Worker对象
w = new Worker(firstTask);
// 每一个Worker对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2.3 获取任务&执行任务
获取任务过程:
- 首先判断是否可以满足从workQueue中获取任务的条件,不满足return null
A、线程池状态是否满足:
(a)shutdown状态 + workQueue为空 或 stop状态,都不满足,因为被shutdown后还是要执行workQueue剩余的任务,但workQueue也为空,就可以退出了
(b)stop状态,shutdownNow()操作会使线程池进入stop,此时不接受新任务,中断正在执行的任务,workQueue中的任务也不执行了,故return null返回
B、线程数量是否超过maximumPoolSize 或 获取任务是否超时
(a)线程数量超过maximumPoolSize可能是线程池在运行时被调用了setMaximumPoolSize()被改变了大小,否则已经addWorker()成功且不会超过maximumPoolSize
(b)如果 当前线程数量>corePoolSize,才会检查是否获取任务超时,这也体现了当线程数量达到maximumPoolSize后,如果一直没有新任务,会逐渐终止worker线程直到corePoolSize
- 如果满足获取任务条件,根据是否需要定时获取调用不同方法:
A、workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null
B、workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务
- 在阻塞从workQueue中获取任务时,可以被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,满足就继续获取任务,不满足return null,会进入worker退出的流程
处理任务过程:
-
while循环不断地通过getTask()方法获取任务;
-
getTask()方法从阻塞队列中取任务;
-
如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
-
调用task.run()执行任务;
-
如果task为null则跳出循环,执行processWorkerExit()方法;
-
runWorker方法执行完毕,也代表着Worker中的run方法执行完毕。
2.4 线程退出&中断&终止
processWorkerExit
tryTerminate
shutdown
interruptIdleWorkers
shutdownNow
… 不再一一赘述,具体可参考线程池源码学习
三、最佳实践
3.1 线程数和队列size如何设置
3.1.1 基于CPU核数的设置
N=(等待时间 + 执行时间) / 执行时间 * cpu个数 * cpu使用率
其中cpu核数可以通过Runtime.availableProcessors来获得,但是其他参数的设置未给出明确公式
3.1.2 基于进程类型
3.1.1的变形,通过进程类型来得到线程数
如果进程是IO密集型,N=2 * cpu个数,由于主要时间花在io操作上,需要启动更多的线程来让资源利用最大化
如果进程是cpu密集型,N=cpu个数 + 1,主要时间花在内部计算上,cpu利用率较大,对应的线程数应该少点
个人感觉这个也不太好判断,除非是单单的一些DB模块,不涉及复杂计算的项目,可以设置为IO密集型
3.1.2 基于请求量设置
tasks,项目每秒需要处理的最大任务数量(假设系统每秒任务数为100~1000) tasktime,单线程处理一个任务所需要的时间(每个任务耗时0.1秒) responsetime,系统允许任务最大的响应时间(每个任务的响应时间不得超过2秒)
coreSize:tasks * tasktime = 10 ~ 100 ,再根据8020原则,若80%的情况下任务数小于200,最多时为1000,则corePoolSize可设置为20
workQueue:coreSize / taskTime * responsetime = 20/0.1*2 = 400,这样可保证任务积压时间不超过2s
maxSize:(1000 - workQueue) * tasktime = 60,最大线程数60,可保证抗住最大的流量压力
3.1.3 基于tps设置
coreSize:tps * time
maxSize:tps * time * (1.7 ~2)
假定流量平均分布得出的计算公式。但在实际业务情景中,以爱番番为例,白天和晚上的流量相差较大
3.1.4 动态设置
美团在线程池参数动态设置上做了一些探索,可以对coreSize,maxSize和queueSize进行动态设置,并且基于线程池状态进行监控告警,如队列积压,触发拒绝策略次数上限,逼近maxSize等
修改完coreSize和maxSize后,线程池能够实现动态的平滑调整,实现core和max的切换
目前通过Configmap的热加载功能能够实现动态设置,或者代码中根据线程池状态来进行调整,可以将core和max进行扩容,增强线程池处理能力,待流量下去之后,再进行缩容,但是注意线程数最大限制
但是线程的反复创建和销毁,也是开发者应该考虑的一个方面,否则可能带来适得其反的效果
BlockingQueue的队列capacity参数,为final int类型,只支持赋值一次 。 美团通过重写了BlockingQueue,继承类中去掉final关键字,并提供get/set方法,实现对阻塞队列的动态设置
3.2 线程池命名
【推荐】ThreadFactory 根据情景设置线程池名称,便于查看日志和问题定位,不建议使用defaulThreadFactory(名称:pool-x-thread-xx)
实现方式:可重写ThreadFactory;可使用guava的ThreadFactoryBuilder直接构造带有线程池名称的ThreadFactory
3.3 提前启动核心线程
【推荐】线程池创建成功后,在没有提交任务时,线程还未创建,可以提前调用prestartAllCoreThreads()将所有核心线程启动,这样任务提交时就可以直接进行处理了,减小任务执行中的等待时间
3.4 异常处理
3.4.1 基于submit的处理
threadPoolExecutor.submit(),重载方法可以传入Runnable,Callable,区别是否带有返回值,并以Future
并且能够将异常进行封装,不对线程池造成影响;获取结果的过程中,会阻塞等待该线程执行完后才会返回
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
通过try/catch包含future.get(),处理抛出的异常
缺点:需要显式的遍历Future,调用get方法获取每个任务执行抛出的异常,然后处理
3.4.2 基于execute的处理
对业务代码直接进行try/catch处理,不同的业务都要书写这类代码,冗余;
做了处理后,对线程池的影响消除;否则会kill掉当前worker,同时新增一个worker
3.4.3 重写afterExecute()处理
afterExecute()父类方法为空,不做处理。子类重写该方法,对传入的异常及Runnable做后续的处理;但是也会导致Worker的回收
public class ExtendedExecutor extends ThreadPoolExecutor {
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
// 做些具体的补偿逻辑,比如重试等
System.out.println("异常处理");
}
}
}
3.4.4 重写UncaughtExceptionHandler处理
public class MyHandler implements Thread.UncaughtExceptionHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(MyHandler.class);
@Override
public void uncaughtException(Thread t, Throwable e) {
LOGGER.error("threadId = {}, threadName = {}, ex = {}", t.getId(), t.getName(), e.getMessage());
}
}
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setUncaughtExceptionHandler(new MyHandler()).build();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 4, 0,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(1 / 0);
}
});
}
该方案不适用与使用submit
方式提交任务的情况:
submit会将任务执行中的异常不对外抛出,因此工作线程不会退出,处于正常运行状态
UncaughtExceptionHandler是任务抛出异常导致线程退出时触发的处理,如果没有提供自定义的异常处理器,那么默认的行为就是将堆栈信息输送到System.err
3.5 优雅关闭线程池
当系统异常时,需要关闭线程池时,要做到优雅的关闭,实现思想类似HTTP的四次挥手
先看看线程池的状态和生命周期
【推荐】先shutdown(),再执行 awaitTermination(long timeout, TimeUnit unit),尽量使其队列清空之后再进行终止,但是也要加入超时时间,避免永久阻塞等待
以通知服务为例,为了避免服务上线时的流量损失,需要将所有的任务处理完才可以终止线程池,可以采用shutdown的方式;
有的服务可以采用shutdownNow()的方式,会将未处理的任务返回,进行中间存储,后续再处理,但是执行中的任务会被中断
3.6 手动kill running的线程
以通知服务的线上事故为例,如果在task执行一直没有返回,长时间hang住,也会导致执行该task的线程处于running的状态,如果可以提供一个手段,针对线程的超时配置,那么就能够在线程执行异常时,手动进行kill,保证服务的稳定性
现在JDK已经不推荐用Thread.stop()来停止线程了,否则可能会导致死锁等问题
通过Future.cancel(true),此方法需要搭配submit来使用;同时不能传false,否则不会中断处于Running的任务,失去了手动kill的效果
3.7 线程池监控
- getPoolSize:获取当前线程池的线程数量
- getCorePoolSize:获取核心线程数量配置
- getActiveCount:当前线程池中正在执行任务的线程数量
- getLargestPoolSize:线程池曾经创建过的最大线程数量;通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize
- getTaskCount:线程池已经执行的和未执行的任务总数
- getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount
四、事故分析
4.1. 通知服务
事故描述:通知服务队列积压导致消息无法发送的S2级线上事故
事故原因:线程池设置不合理,线程池配置为:核心线程数=最大线程数,队列为无限LinkedBlockingQueue,最大值为Integer.MAX_VALUE(2^31 - 1);事故当天线程池内调用第三方服务超时,且没有返回,导致核心线程一直处于RUNNING状态,从而使其他任务长时间积压在队列中无法处理
修复建议:根据具体的业务情景,通知服务每天早上的时候会有一批定时任务,通过峰值的请求量设置工作队列的大小,重写拒绝策略,可通过配置报警的形式发现系统异常,并且已被拒绝的任务,根据任务的重要程度,可选择重新阻塞的放置到工作队列中;或者暂时放到缓存,DB等存储介质,后续触发补发逻辑重新提交任务
4.2. 其他服务
事故描述:XX页面展示接口产生大量调用降级,数量级在几十到上百
事故原因:由于没有预估好调用的流量,导致最大核心数设置偏小,大量抛出RejectedExecutionException,触发接口降级条件
修复建议:拒绝策略需要慎用,建议是在不重要的业务情景下触发降级策略;如果不能触发拒绝策略,可通过机器CPU和数量扩容,并根据业务请求量来调整coreSize,queue,maxSize的大小
五、参考文献
- 【1】JDK 1.8源码
- 【2】阿里巴巴Java开发手册-泰山版
- 【3】Java线程池实现原理及其在美团业务中的实践
- 【4】Java线程池最佳实践
- 【5】Java线程池ThreadPoolExecutor使用和分析