Java线程池梳理

一、线程池简介

两类线程池,始于jdk5,java.util.concurrent包下,开发者:Doug Lea

使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

1.1 Executors

内部主要是有四类线程池:

  • newFixedThreadPool:线程数固定,但是阻塞队列不限,会堆积大量请求,内部调用ThreadPoolExecutor
  • newCachedThreadPool:最大线程数不限,每次来一个新的请求,若没有空闲线程,就创建新线程,内部调用ThreadPoolExecutor
  • newSingleThreadPool:只有一个线程的线程池,队列不限,可保证顺序的执行
  • newScheduleThreadPool:线程不限,用于定时任务或延时任务的执行,一般也可以用Timer来实现

阿里巴巴Java开发手册 1.7.5编程规约并发处理章节明确指出 【 强制】:

1.2 ThreadPoolExecutor

开发者可以更加直观的进行参数设置,而不是无脑调用Executors里包装好的线程池

1.2.1 参数介绍

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • workQueue

    :等待队列

    • ArrayBlockingQueue LinkedBlockingQueue:常用的FIFO阻塞队列
    • PriorityBlockingQueue DelayedWorkQueue:用于优先级设置的,或者定时任务的线程池
    • SynchronousQueue:不存储队列的阻塞队列,用于cached线程池,线程可无限制创建
  • keepAliveTime:线程空闲时间
  • allowCoreThreadTimeout:是否允许核心线程的kill,一般为false,否则线程仍会重新创建
  • ThreadFactory:用于产生线程的工厂
  • RejectedExecutionHandler:拒绝策略
    • AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,适用于比较重要的任务,可及时发现系统异常
    • DiscardPolicy:丢弃任务但不抛出异常,适用于业务不重要的一些任务
    • DiscardOldestPolicy:丢弃队列最老的任务,然后重新提交被拒绝的任务
    • CallerRunsPolicy:由调用线程处理该任务,即谁提交谁处理

二、源码分析

2.1 提交任务

execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * clt记录着runState和workerCount
     */
    int c = ctl.get();
    /*
     * workerCountOf方法取出低29位的值,表示当前活动的线程数;
     * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
     * 并把任务添加到该线程中。
     */
    if (workerCountOf(c) < corePoolSize) {
        /*
         * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
         * 如果为true,根据corePoolSize来判断;
         * 如果为false,则根据maximumPoolSize来判断
         */
        if (addWorker(command, true))
            return;
        /*
         * 如果添加失败,则重新获取ctl值
         * 失败的原因可能是:
         * 1.线程池已经shutdown,shutdown的线程池不再接收新任务
         * 2.workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
         */
        c = ctl.get();
    }
    /*
     * 如果当前线程池是运行状态并且任务添加到队列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新获取ctl值
        int recheck = ctl.get();
        // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
        // 这时需要移除该command
        // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
        /*
         * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
         * 这里传入的参数表示:
         * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
         * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
         * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
         */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /*
     * 如果执行到这里,有两种情况:
     * 1. 线程池已经不是RUNNING状态;
     * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
     * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
     * 如果失败则拒绝该任务
     */
    else if (!addWorker(command, false))
        reject(command);
}

2.2 新增线程

addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 获取运行状态
        int rs = runStateOf(c);
         
        /*
         * 这个if判断
         * 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
         * 接着判断以下3个条件,只要有1个不满足,则返回false:
         * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
         * 2. firsTask为空
         * 3. 阻塞队列为空
         *
         * 首先考虑rs == SHUTDOWN的情况
         * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
         * 然后,如果firstTask为空,并且workQueue也为空,则返回false,
         * 因为队列中已经没有任务了,不需要再添加线程了
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 获取线程数
            int wc = workerCountOf(c);
            // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
            // 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
            // 如果为false则根据maximumPoolSize来比较。
            //
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 尝试增加workerCount,如果成功,则跳出第一个for循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果增加workerCount失败,则重新获取ctl的值
            c = ctl.get();  // Re-read ctl
            // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根据firstTask来创建Worker对象
        w = new Worker(firstTask);
        // 每一个Worker对象都会创建一个线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING状态;
                // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
                // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一个HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

2.3 获取任务&执行任务

获取任务过程:

  1. 首先判断是否可以满足从workQueue中获取任务的条件,不满足return null

A、线程池状态是否满足:

(a)shutdown状态 + workQueue为空 或 stop状态,都不满足,因为被shutdown后还是要执行workQueue剩余的任务,但workQueue也为空,就可以退出了

(b)stop状态,shutdownNow()操作会使线程池进入stop,此时不接受新任务,中断正在执行的任务,workQueue中的任务也不执行了,故return null返回

B、线程数量是否超过maximumPoolSize 或 获取任务是否超时

(a)线程数量超过maximumPoolSize可能是线程池在运行时被调用了setMaximumPoolSize()被改变了大小,否则已经addWorker()成功且不会超过maximumPoolSize

(b)如果 当前线程数量>corePoolSize,才会检查是否获取任务超时,这也体现了当线程数量达到maximumPoolSize后,如果一直没有新任务,会逐渐终止worker线程直到corePoolSize

  1. 如果满足获取任务条件,根据是否需要定时获取调用不同方法:

A、workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null

B、workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务

  1. 在阻塞从workQueue中获取任务时,可以被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,满足就继续获取任务,不满足return null,会进入worker退出的流程

处理任务过程:

  1. while循环不断地通过getTask()方法获取任务;

  2. getTask()方法从阻塞队列中取任务;

  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;

  4. 调用task.run()执行任务;

  5. 如果task为null则跳出循环,执行processWorkerExit()方法;

  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕。

2.4 线程退出&中断&终止

processWorkerExit

tryTerminate

shutdown

interruptIdleWorkers

shutdownNow

… 不再一一赘述,具体可参考线程池源码学习

三、最佳实践

3.1 线程数和队列size如何设置

3.1.1 基于CPU核数的设置

N=(等待时间 + 执行时间) / 执行时间 * cpu个数 * cpu使用率

其中cpu核数可以通过Runtime.availableProcessors来获得,但是其他参数的设置未给出明确公式

3.1.2 基于进程类型

3.1.1的变形,通过进程类型来得到线程数

如果进程是IO密集型,N=2 * cpu个数,由于主要时间花在io操作上,需要启动更多的线程来让资源利用最大化

如果进程是cpu密集型,N=cpu个数 + 1,主要时间花在内部计算上,cpu利用率较大,对应的线程数应该少点

个人感觉这个也不太好判断,除非是单单的一些DB模块,不涉及复杂计算的项目,可以设置为IO密集型

3.1.2 基于请求量设置

tasks,项目每秒需要处理的最大任务数量(假设系统每秒任务数为100~1000) tasktime,单线程处理一个任务所需要的时间(每个任务耗时0.1秒) responsetime,系统允许任务最大的响应时间(每个任务的响应时间不得超过2秒)

coreSize:tasks * tasktime = 10 ~ 100 ,再根据8020原则,若80%的情况下任务数小于200,最多时为1000,则corePoolSize可设置为20

workQueue:coreSize / taskTime * responsetime = 20/0.1*2 = 400,这样可保证任务积压时间不超过2s

maxSize:(1000 - workQueue) * tasktime = 60,最大线程数60,可保证抗住最大的流量压力

3.1.3 基于tps设置

coreSize:tps * time

maxSize:tps * time * (1.7 ~2)

假定流量平均分布得出的计算公式。但在实际业务情景中,以爱番番为例,白天和晚上的流量相差较大

3.1.4 动态设置

美团在线程池参数动态设置上做了一些探索,可以对coreSize,maxSize和queueSize进行动态设置,并且基于线程池状态进行监控告警,如队列积压,触发拒绝策略次数上限,逼近maxSize等

修改完coreSize和maxSize后,线程池能够实现动态的平滑调整,实现core和max的切换

目前通过Configmap的热加载功能能够实现动态设置,或者代码中根据线程池状态来进行调整,可以将core和max进行扩容,增强线程池处理能力,待流量下去之后,再进行缩容,但是注意线程数最大限制

但是线程的反复创建和销毁,也是开发者应该考虑的一个方面,否则可能带来适得其反的效果

BlockingQueue的队列capacity参数,为final int类型,只支持赋值一次 。 美团通过重写了BlockingQueue,继承类中去掉final关键字,并提供get/set方法,实现对阻塞队列的动态设置

3.2 线程池命名

【推荐】ThreadFactory 根据情景设置线程池名称,便于查看日志和问题定位,不建议使用defaulThreadFactory(名称:pool-x-thread-xx)

实现方式:可重写ThreadFactory;可使用guava的ThreadFactoryBuilder直接构造带有线程池名称的ThreadFactory

3.3 提前启动核心线程

【推荐】线程池创建成功后,在没有提交任务时,线程还未创建,可以提前调用prestartAllCoreThreads()将所有核心线程启动,这样任务提交时就可以直接进行处理了,减小任务执行中的等待时间

3.4 异常处理

3.4.1 基于submit的处理

threadPoolExecutor.submit(),重载方法可以传入Runnable,Callable,区别是否带有返回值,并以Future 接收返回值(Runnable下value为null)

并且能够将异常进行封装,不对线程池造成影响;获取结果的过程中,会阻塞等待该线程执行完后才会返回

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

通过try/catch包含future.get(),处理抛出的异常

缺点:需要显式的遍历Future,调用get方法获取每个任务执行抛出的异常,然后处理

3.4.2 基于execute的处理

对业务代码直接进行try/catch处理,不同的业务都要书写这类代码,冗余;

做了处理后,对线程池的影响消除;否则会kill掉当前worker,同时新增一个worker

3.4.3 重写afterExecute()处理

afterExecute()父类方法为空,不做处理。子类重写该方法,对传入的异常及Runnable做后续的处理;但是也会导致Worker的回收

public class ExtendedExecutor extends ThreadPoolExecutor {
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null) {
            // 做些具体的补偿逻辑,比如重试等
            System.out.println("异常处理");
        }
    }
}

3.4.4 重写UncaughtExceptionHandler处理

public class MyHandler implements Thread.UncaughtExceptionHandler {
    private final static Logger LOGGER = LoggerFactory.getLogger(MyHandler.class);
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        LOGGER.error("threadId = {}, threadName = {}, ex = {}", t.getId(), t.getName(), e.getMessage());
    }
}
public class ThreadPoolTest {
    public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setUncaughtExceptionHandler(new MyHandler()).build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 4, 0,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(1 / 0);
            }
        });
    }

该方案不适用与使用submit方式提交任务的情况:

submit会将任务执行中的异常不对外抛出,因此工作线程不会退出,处于正常运行状态

UncaughtExceptionHandler是任务抛出异常导致线程退出时触发的处理,如果没有提供自定义的异常处理器,那么默认的行为就是将堆栈信息输送到System.err

3.5 优雅关闭线程池

当系统异常时,需要关闭线程池时,要做到优雅的关闭,实现思想类似HTTP的四次挥手

先看看线程池的状态和生命周期

【推荐】先shutdown(),再执行 awaitTermination(long timeout, TimeUnit unit),尽量使其队列清空之后再进行终止,但是也要加入超时时间,避免永久阻塞等待

以通知服务为例,为了避免服务上线时的流量损失,需要将所有的任务处理完才可以终止线程池,可以采用shutdown的方式;

有的服务可以采用shutdownNow()的方式,会将未处理的任务返回,进行中间存储,后续再处理,但是执行中的任务会被中断

3.6 手动kill running的线程

以通知服务的线上事故为例,如果在task执行一直没有返回,长时间hang住,也会导致执行该task的线程处于running的状态,如果可以提供一个手段,针对线程的超时配置,那么就能够在线程执行异常时,手动进行kill,保证服务的稳定性

现在JDK已经不推荐用Thread.stop()来停止线程了,否则可能会导致死锁等问题

通过Future.cancel(true),此方法需要搭配submit来使用;同时不能传false,否则不会中断处于Running的任务,失去了手动kill的效果

3.7 线程池监控

  • getPoolSize:获取当前线程池的线程数量
  • getCorePoolSize:获取核心线程数量配置
  • getActiveCount:当前线程池中正在执行任务的线程数量
  • getLargestPoolSize:线程池曾经创建过的最大线程数量;通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize
  • getTaskCount:线程池已经执行的和未执行的任务总数
  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount

四、事故分析

4.1. 通知服务

事故描述:通知服务队列积压导致消息无法发送的S2级线上事故

事故原因:线程池设置不合理,线程池配置为:核心线程数=最大线程数,队列为无限LinkedBlockingQueue,最大值为Integer.MAX_VALUE(2^31 - 1);事故当天线程池内调用第三方服务超时,且没有返回,导致核心线程一直处于RUNNING状态,从而使其他任务长时间积压在队列中无法处理

修复建议:根据具体的业务情景,通知服务每天早上的时候会有一批定时任务,通过峰值的请求量设置工作队列的大小,重写拒绝策略,可通过配置报警的形式发现系统异常,并且已被拒绝的任务,根据任务的重要程度,可选择重新阻塞的放置到工作队列中;或者暂时放到缓存,DB等存储介质,后续触发补发逻辑重新提交任务

4.2. 其他服务

事故描述:XX页面展示接口产生大量调用降级,数量级在几十到上百

事故原因:由于没有预估好调用的流量,导致最大核心数设置偏小,大量抛出RejectedExecutionException,触发接口降级条件

修复建议:拒绝策略需要慎用,建议是在不重要的业务情景下触发降级策略;如果不能触发拒绝策略,可通过机器CPU和数量扩容,并根据业务请求量来调整coreSize,queue,maxSize的大小

五、参考文献