查看原文
其他

JAVA原生线程池源码解析及使用建议

平台研发王红刚 京东零售技术 2021-10-12
线程池概念


01线程池的基本概念


线程池(Thread pool)是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。(来源:维基百科)

Java线程池相关类是在1.5新增的,所属包是rt.jar,包路径是java.util.concurrent,作者是:Doug Lea,从属JSR-166。

Java线程池也遵循线程池的核心设计思路,复用线程,降低线程创建销毁的资源消耗,提供了多种线程池的实现模型,同时也允许开发者定制化开发其他特色线程池。


02java线程池优势


A) 降低资源消耗,提升效率 :通过重复利用已创建的线程,降低线程创建和销毁造成的消耗,从而提高整体的执行效率。

B) 提高线程的管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

C) 可扩展的开发模式:除JVM提供的三种线程池,可以通过实现AbstractExecutorService类定制自己的线程池而支持不同的业务场景。

(这里很多资料都会有额外一条:提高响应速度,即通过复用以创建好的线程,而无需等待新线程的创建,这里个人认为严格讲这一条不完全符合线程池的使用情况,而且复用线程和第一条优势基本吻合,所以合并到一起,至于特殊场景后面学习的过程中会详细讲解。)


03java原生线程池简介


首先介绍下java原生的三个线程池,ForkJoinPool,ThreadPoolExecutor,ScheduledThreadPoolExecutor。三个线程池的UML类图如下(version:1.8.0_131,后面的源码都是基于这个版本)。



A) ForkJoinPool是Java 1.7 引入的一种新的并发框架,核心思想是将大的任务拆分成多个小任务(fork),然后在将多个小任务处理汇总到一个结果上(join),充分利用多cpu,多核CPU的优势,引入了“work-stealing”机制,更有效的利用线程。

B) ThreadPoolExecutor是 java常用的线程池,提供基础的线程池功能。初始化传入不同类型的工作队列和拒绝策略参数,可以定制不同类型和功能的线程池,应用最为广泛。

C) ScheduledThreadPoolExecutor 从类图上可以看出,它继承了ThreadPoolExecutor,并实现了ScheduledExecutorService,是对ThreadPoolExecutor做的功能扩展,本质上是一个使用线程池执行定时任务的类,可以用来在给定延时后执行异步任务或者周期性执行任务,较任务调度的Timer来说,其功能更加强大。


线程池的实现原理


接下来以java的基础线程池ThreadPoolExecutor为主介绍下线程池的工作原理和实现方式。


01ThreadPoolExector类


2.1.1 ThreadPoolExector介绍

A) ThreadPoolExecutor实现的顶层接口是Executor,内部只有一个方法execute(Runable),标识出执行任务这个核心方法。限制了任务类型为:Runable,即线程的接口类。

B)ExecutorService接口扩展了很多能力,比如对线程池的管理。以及扩展了执行任务的能力,支持多个任务批量执行。

C)AbstractExecutorService是对ExecutorService抽象类,这里对任务的执行和调用做了基础的实现,可以看出目前都是在对任务的执行做层层抽象,也规范了任务的基础类型。

D)ThreadPoolExecutor是java原生线程池的一个基础实现类,完成了线程池的各种功能,内部维护了存储任务的阻塞队列,以及执行任务的worker线程,还有线程池的相关状态管理及任务管理。同时提供了一些扩展方法。供开发者定制特色能力。


    

2.1.2 线程池的基础参数

接下来说一下创建ThreadPoolExecutor比较重要的参数。

  • corePoolSize:线程池核心线程个数。

  • queue:用于保存等待执行的任务的阻塞队列;如基于数组的有界 ArrayBlockingQueue,基于链表的无界 LinkedBlockingQueue,优先级队列 PriorityBlockingQueue 等。

  • maximunPoolSize:线程池最大线程数量。

  • ThreadFactory:创建线程的工厂。可以自定义工工厂,控制产生的线程名称辅助排查问题。

  • RejectedExecutionHandler:饱和策略,当队列满了且线程个数到 达maximunPoolSize 后采取的策略,如AbortPolicy 抛出拒绝执行;DiscardPolicy 丢弃该任务。

  • keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量要多,并且是闲置状态的话,这些闲置的线程能存活的最大时间。

  • TimeUnit,存活时间的时间单位。



2.1.3 ThreadPoolExecutor.Worker内部类

ThreadPoolExecutor.Worker这个工作线程,实现了Runnable接口,同时继承AQS类,并持有一个线程thread,一个初始化的任务firstTask。负责处理任务,同时维护工作线程的状态。

thread是在调用构造方法时通过ThreadFactory来创建的线程,用来执行任务。

firstTask是传入的第一个任务,如果非空,那么线程在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行workQueue中的任务,也就是非核心线程的创建。单个任务执行完毕后,worker会继续在workQueue中获取下一个任务继续执行。

    

02ThreadPoolExecutor 工作流程


2.2.1 总体介绍



线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分: 任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的执行策略。主要有以下几种:

(1)直接申请线程执行该任务;

(2)缓冲到队列中等待线程执行;

(3)拒绝该任务,执行拒绝策略。

线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。


2.2.2 状态变化



ThreadPoolExector内部使用AtomicInteger类型的变量ctl来维护线程池的状态和线程池中的线程数量,即高三位为状态位,其余表示线程数目。

每种状态均可以通过调用不同的方法完成相应的状态转化。

1.  RUNNING(-1 : 111):可以接受新的任务,也可以处理阻塞队列里的任务。

2.  SHUTDOWN(0 : 000) :不接受新的任务,但是可以处理阻塞队列里的任务。

3.  STOP(1 : 001):不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务。

4.  TIDYING(2 : 010):过渡状态,也就是说所有的任务都执行完了,当前线程池已经没有有效的线程,这个时候线程池的状态将会TIDYING,并且将要调用terminated方法。

5.  TERMINATED(3: 011) :终止状态。terminated方法调用完成以后的状态。


2.2.3 源码解析-execute

Execute是提交任务的方法入口方法,根据核心线程池的数量、线程池的状态,任务队列大小、最大数量、分成不同情况,创建不同线程,存储到不同位置,或执行拒绝策略。



2.2.4 源码解析-addWorker

addWorker是添加工作线程的方法,通过Worker内部类封装一个Thread实例维护工作线程的执行,同时根据线程池的状态来判断是否增加相应的任务。



2.2.5 源码解析-runWorker

runWorker真正执行任务的地方,先执行第一个任务,再源源不断从任务队列中取任务来执行;如果线程池调用了shutDownNow,这里也会收到影响。



2.2.6 源码解析-getTask

从队列取任务的地方,默认情况下,根据工作线程数量与核心数量的关系判断使用队列的poll()还是take()方法,keepAliveTime参数也是在这里使用的。



2.2.7任务队列

阻塞队列BlockingQueue是用来存放任务的。当线程池中有空闲线程时就回去任务队列中拿任务并处理。

多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。

常见的集中队列有:

无界队列: 使用无界队列(如 LinkedBlockingQueue)将所有的任务都存储到阻塞队列中。这样,创建的线程就不会超过 corePoolSize。

有界队列: 当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue, PriorityBlockingQueue)有助于防止资源耗尽, PriorityBlockingQueue还可以定制任务的优先级。但是需要开发人员根据实际任务情况调整队列大小和线程池大小。

同步移交队列,如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。


2.2.8 拒绝策略



1. CallerRunsPolicy 由调用线程执行该任务,不抛弃任务,会影响效率和性能。

2.  AbortPolicy 抛出拒绝执行的异常,java线程池的默认拒绝策略,保证线程池整体的执行效率。

3.  DiscardPolicy 丢弃该任务,不做任何处理,同时也不抛异常。使用中不太友好。

4.  DiscardOldestPolicy  如果线程池未关闭,则弹出任务队列的第一个,然后运行。

以上是默认提供的四种拒绝策略。除此之外还有一些其他框架中的值得参考的处理。

AbortPolicyWithReport 这是dubbo 中的拒绝策略,继承了AbortPolicy拒绝策略。但是在抛出异常前,打印了当前线程池的重要参数信息,以及运行状态,同时定制输出了此时的堆栈信息。方便后续排查问题处理。


线程池的应用场景


01Tomcat中的线程池


Tomcat作为一款优秀的web服务器,为了保证其性能,其内部也有自己的线程池对象:org.apache.tomcat.util.threads.ThreadPoolExecutor 继承自java.util.concurrent.ThreadPoolExecutor。

不同于原生ThreadPoolExecute达到最大线程后,对新增任务立即执行拒绝策略。Tomcat线程池会在此时再次尝试向队列中添加任务,失败后再执行拒绝策略。最大限度保证任务执行。

同时Tomcat内置了TaskQueue作为任务的缓存队列。继承了LinkedBlockingQueue但是重写了offer方法,即当前线程大于核心线程,且提交的任务数大于当前线程数,表示有线程空闲的情况下,返回false,也就是创建线程。主要是为了控制在线程队列无限增长时,无法创建更多的线程而达到最大线程数的问题。


02Sirector 中的线程池


Sirector是JD内一个事件处理编排框架,内置ExecutorService对象,负责对任务的分配处理。

初始化的对象是WorkerExecutor, WorkerExecutor继承自ThreadPoolExecutor,扩展了submit方法用于执行通过sirector编排的具体任务。

WorkerExecutor内置了工厂WorkerThreadFactory主要记录了当前线程池的名称、工厂创建的线程数目等。

使用的拒绝策略为RejectedTaskController,继承自RejectedExecutionHandler,处理方法类似于AbortPolicy策略,丢弃任务抛出异常,抛出异常前也打印了一些异常信息,辅助排查问题。


03个人开发中的线程池


参考上面几种框架的线程池,可以得出大概几点结论。

如果个人开发中涉及线程池,要先确认任务场景,是I/O密集还是CPU密集任务,从而确定线程池类型。

再通过使用场景,是最大限度保证任务执行,还是为了保证服务性能,来定制自己的执行策略,并且确定选择任务队列以及拒绝策略。拒绝策略可以参考Dubbo中,同时打印线程信息,辅助排查问题。

然后确定是否需要自定义线程工厂,这里建议自定义线程工厂,在创建线程的时候打上标识,和系统线程加以区分。

在根据任务类型,配置上合理的线程池参数。一个属于你的线程池就搭好了!


线程池参数设置


这里个人认为,没有一种万能的参数一定适合所有的线程池使用场景。

但是有通用的思路来寻找适合当前线程池的最佳参数。

1、确定当前任务类型,是CPU密集还是I/O密集型任务。这两者差别很大。CPU密集和CPU核数以及CPU超线程有关。而I/O密集则和服务处理的任务有很大关联。

2、如果使用一些已有的技术框架中的线程池。初期建议以默认参数为佳,如Tomcat默认范围25-200,JSF默认cached线程池20-200。

3、在服务稳定之后的性能调优。需要对服务进行多次高保真压测,期间不断控制、调整线程池参数,这样尽可能得到当前服务最优的线程池参数。

4、只有最合适的、没有一定不变的,随着业务不断迭代,每隔一段时间对服务进行压测,通过结果调整相应的参数。


线程池使用过程中的建议


1 、当提交一个任务到线程池时,若线程数量 < corePoolSize,线程池会创建一个新线程放入workers(一个HashSet)中执行任务, 即使其他空闲的基本线程能够执行新任务也还是会创建新线程,直至达到corePoolSize。

2 、默认最初的线程池启动的时候是不初始化线程的,通过调用 prestartAllCoreThreads 方法,可以初始化所有核心线程。

3 、Worker中处理task如果抛出异常,这个work thread不会继续执行任务,但是会创建新的线程, 新线程可以运行其他task。

4 、最好不要使用Executors创建新线程池,因为Executors提供的很多方法,没有指定实际核心及最大线程池参数,容易发生OOM,推荐自己创建相应的线程池,适合自己的才是最好的,同时线程池中有很多钩子方法可以用来定制特色功能。

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存