查看原文
其他

面试题19解析-线程池(上)

2017-05-19 javatiku Java面试那些事儿

本文阅读大概需要18分钟。


为什么离不开线程池?

多线程开发是提高程序性能的一种方式,但线程的创建与销毁,以及运行线程上下文切换都是需要消耗cpu资源的,相对来说任务的执行所占整个线程运行的cpu时间越短,线程的运行效率也相应越低。而在有些系统中,我们需要反复频繁地创建线程,例如tomcat,每个http的处理handle都必须运行在一个线程中,这样在并访问量很大的情况下,就会造成系统中创建了很多系统线程,使得cpu频繁的进行线程上下文切换,从而导致了整个系统的行能低下。为了解决这样的问题,编程领域设计了线程池来解决线程切换带来的性能损耗。

线程池的设计思想是创建一定数量的运行线程,将要执行的任务,放入线程池,线程池会自动分配线程去执行任务,执行完任务的线程又会被放入池中,等待新任务的到来,而不是退出线程,从而实现了线程的重复利用,避免了系统反复创建销毁线程,造成的性能损耗。另一方面,线程池将程序员的关注点由线程转向了任务,对于使用者来说,线程池就像一个盒子,使用者无需关心线程操作相关的实现细节,可以将更多的精力放在任务本身上,只需在合适的时机将任务丢给线程池即可。线程池将任务与线程进行解绑,更有利于将程序解耦。线程与线程池的编程模型如下图所示:


线程池怎么玩?

首先线程池的使用需要通过ThreadPoolExecutor的构造函数来创建一个线程池:


new ThreadPoolExecutor(int corePoolSize,                        int maximumPoolSize,                        long keepAliveTime,                        TimeUnit unit,                        BlockingQueue<Runnable> workQueue,                        ThreadFactory threadFactory,                        RejectedExecutionHandler handler)


构造函数参数意义如下:


参数意义
corePoolSize线程池核心线程数量
maximumPoolSize线程池最大线程数量
keepAliveTime线程保持时间,空闲线程可以存活时间
TimeUnit线程保持时间的单位(keepAliveTime的单位)
workQueue任务队列
threadFactory线程创建工厂
RejectedExecutionHandler线程数超过最大线程数后,任务将被拒绝并回调的handler


在我们创建了一个线程池后,便可以向线程池中提交一个Runnable类型的任务了:


threadPool.execute( new Runnable(){                            public void run(){                                ...//任务代码                            }                        } )


这样我们就将任务提交到了线程池去运行了,至于线程池如何实现任务运行,就不是我们需要考虑的事情了,从而将任务与线程进行了解耦。但是我们也无法得知任务是否执行成功,如果我们需要得知任务的执行结果,则需要使用ThreadPoolExecutor.submit(Runnable task)方法来向线程池提交任务,该方法会返回一个Futrue类型的结果,通过以下代码便可以判断任务是否执行成功了。


   Future<Object> threadFuture = threadPoolExecutor.submit(task);    try{        Object resualt = threadFuture.get();    }catch (InterruptedException e){        // 处理线程中断异常    }catch (ExecutionException e){        // 处理无法执行异常    } finally {        threadPoolExecutor.shutdown();    }

ThreadPoolExecutor的执行流程


上一节我们简单描述了线程池的使用方式,这里我们来探究一下ThreadPoolExecutor的执行流程,其流程如下:

  1. 创建线程池,等待任务执行。

  2. 当任务提交给线程池后,会判断核心线程池是否已满,即当前线程数与corePoolSize进行比较,如果核心线程池未满,则创建新线程来执行任务,如果核心线程池已满则将任务加入任务队列BlockingQueue中,等待执行。

  3. 如果任务队列也满了,则ThreadPoolExecutor会继续创建新的线程来处理任务,但是线程池中线程数目不得超过最大线程数maximumPoolSize,否则线程池将会采取饱和策略,拒绝处理任务,并将调用用户设置的RejectedExecutionHandler策略函数进行处理。这里需要注意,只有BlockingQueue为有界队列时,maximumPoolSize参数才会有作用,否者无界BlockingQueue不可能满,不会触发线程池来处理任务队列已满的情况,无界队列使用不当可能造成线程池无休止创建线程的现象。

  4. 线程池中的线程处理完当前任务后,会从任务队列中尝试取任务,如果取到任务,则执行任务,否则等待keepAliveTime时间,如果在keepAliveTime内都没有取到任务,则该线程会退出。

线程池执行流程图如下:

execute的实现源码如下(JDK8):


   public void execute(Runnable command) {        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {                //step 1: 核心线程数判断            if (addWorker(command, true))                     //step 1.1 添加核心线程执行任务                  return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {       //step 2尝试任务加入队列            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))      //step 2.1判断线程池是否运行                    reject(command);            else if (workerCountOf(recheck) == 0)             //step 2.2判断当前工作线程数量如果等于0,直接添加工作线程                            addWorker(null, false);                                    }        else if (!addWorker(command, false))                  //step 3 任务无法队列,尝试创建线程执行任务            reject(command);    }


读者结合笔者的注释,应该不难理解这段源码。这里我们需要注意一下线程池的控制变量ctl,该变量是一个AtomicInteger类型的原子变量,这个变量在这个线程池的工作中至关重要,该变量控制了线程池的两个属性:线程的数目和线程池的当前运行状态(线程池拥有的状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED)。这个变量设计的非常巧妙,一方面减少了线程池的变量数量,更重要的一方面是,该变量是原子类型变量,线程池的实现函数中,往往需要同时获取这两个属性,如果将两个属性放入一个原子变量中,根据Atomic类支持线程的重入,线程池也就只需获取一把锁,便可以控制线程池的两个属性,这里实际上变相减少了一把锁的使用,非常巧妙,Doug Lea不愧被称为Java并发大师!下面源码展示线程池通过ctl变量的位运算获取线程属性的操作(JDK8):


private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY   = (1 << COUNT_BITS) - 1; private static int workerCountOf(int c)  {     return c & CAPACITY; } private static int runStateOf(int c) {    return c & ~CAPACITY; }


这里我们可以从ThreadPoolExecutor的执行流程中看到,线程池并不是一开始就创建好coolPoolSize个线程,而是随着任务的添加,来逐步添加工作线程的。当然线程池也提供了线程池的预热功能prestartAllThreads(),该方法线程池会通过addWorker(null, true)函数来创建coolPoolSize个核心线程来等待任务的到来,addWorker()方法的分析见下节。


public int prestartAllCoreThreads() {    int n = 0;    while (addWorker(null, true))        ++n;    return n; }

Worker工作线程


在ThreadPoolExecutor.execute()方法中调用了addworker()方法,其中方法addworker(Runnable firstTask, boolean core)的第一个参表示该工作线程创建后第一个执行的任务,该参数为null时,表示线程池只是创建了一个等待任务的工作线程;第二参数表示添加的线程是否是核心线程,用于区分线程池使用coolPoolSize还是maximumPoolSize进行线程池线程数目的控制。在addworker()方法中创建了一个Worker对象,一个Worker对象就是ThreadPoolExecutor中的一个线程。当一个任务提交时,Worker对象就会使用线程工厂创建一个线程,并将该线程与当前firstTask绑定,Worker对象就像线程池工厂中的劳工一样,会不停的获取新的任务来执行。新创建的Worker线程都会保存在线程池的HashSet<Worker>成员变量中,这里我们来看一下工作线程的运行核心函数Worker.run()的实现(JDK8,部分代码省略):


private final class Worker extends AbstractQueuedSynchronizer implements Runnable{    Worker(Runnable firstTask) {        this.firstTask = firstTask;        this.thread = getThreadFactory().newThread(this);    //默认线程工厂会调用创建一个线程,并与firstTask绑定    }      public void run() {            runWorker(this);        } }


ThreadPoolExecutor的默认线程工厂newThread(Runnable)的实现如下,这里便将Worker与实际线程绑定了,并使用firstTask创建了线程:


static class DefaultThreadFactory implements ThreadFactory {    public Thread newThread(Runnable r) {        Thread t = new Thread(group, r,                              namePrefix + threadNumber.getAndIncrement(),                              0);        if (t.isDaemon())            t.setDaemon(false);        if (t.getPriority() != Thread.NORM_PRIORITY)            t.setPriority(Thread.NORM_PRIORITY);        return t;    } }


而Worker.run()被调用后,Worker对应的线程会调用ThreadPoolExecute.runWorker()来执行firstTask任务,并循环从任务队列中取任务:


final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;                            boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                task.run();              }        } finally {            processWorkerExit(w, completedAbruptly);        }    }


那么问题来了,runWorker()方法会在worker工作线程没有取到任务时,退出循环,此时工作线程便会退出,那keepAliveTime参数是如何控制工作线程去任务的存活时间的?

奥秘就在取任务getTask()的实现中,Worker.getTask()实现如下(JDK 8):


 private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?        for (;;) {            try {                Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();                if (r != null)                    return r;                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }


这里就清晰了,原来Worker工作线程会尝试在keepAliveTime时间内从workQueue队列中取任务,线程的超时控制依赖于队列取元素的超时控制,也就是说在keepAliveTime时间类,工作线程会阻塞在getTask()方法上,直到线程取到任务或者取任务超时。

Worker的时序图如下:


线程池的饱和策略


饱和策略是线程池应对任务队列和线程池饱和时所采取的策略,ThreadPoolExecutor提供了setRejectedExecutionHandler()方法设置自定义饱和策略的接口,如果没有设置该接口,Java便会采取默认饱和策略AbortPolicy才处理,JDK提供了4中饱和策略:

  1. AbortPolicy : 默认饱和策略,直接抛出异常。

  2. CallerRunsPolicy : 使用调用者线程来执行任务。

  3. DiscardOldestPolicy : 丢弃队列中最近一个任务,并执行当前任务。

  4. DiscardPolicy : 不处理,直接丢弃当前任务。

这四种JDK提供的饱和策略都实现了RejectedExecutionHandler接口,并且只有AbortPolicy策略才会抛出RejectedExecutionException异常,如果实际开发环境中需要实现自定义饱和策略,可以参考以上四种饱和策略的实现方式。

线程池的关闭

做人做事要善始善终,软件开发也一样,占用了的资源要记得释放,使用了的线程要记得归还,有借有还,再借不难。线程池不适合处理需要长期运行的任务,长任务应该开辟专用线程进行处理。线程池提供了shutdown()和shutdownNow()两种方式来主动关闭线程池,虽然两者都可以关闭线程池,但是还是有一定区别的:

  1. shutdown():当线程池调用该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

  2. shutdownNow():线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep/wait/Condition/定时锁等应用,interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

在调用shutdown()时,shutdown()只会将空闲线程进行关闭,而shutdownNow()方法会尝试关闭所有线程,因此如果任务是否正常执行完,对于系统没有影响,可以使用shutdownNow()方法,一般开发中都会使用shutdown()来优雅的关闭线程池。

线程池的配置原则


线程池提供了统一管理线程的机制,但是线程池的运行效率的高低,一方面也需要程序员自己进行调优把控。在HotSpot虚拟机中Java线程的创建使用了底层操作系统的线程创建接口来系统线程,并不是伪线程(这里说句题外话,有同学和我说python使用的是伪线程,其实自己写个多线程小程序就可以判断出是python虚拟机采用的是真实线程还是伪线程。学习软件开发,门槛需要自己迈,坑需要自己踩,多动手,不可懒)。我们知道线程是CPU执行的基本单位,单个处理器同一时间内只能运行一个线程,因此线程池的大小的配置,也应该与CPU的核心数目相关(通过Runtime.getRuntime().availableProcessors()方法可以获取到当前系统处理器数目),过多的创建线程并不一定能带来系统总体性能的提升,反而会使处理器性能浪费在频繁的线程切换中。线程数目与效率的关系图如下:

那么线程池到底应该配置多大,才能高效的利用线程池?这里没有固定的答案,这里需要根据任务类型来进行配置。如果任务是CPU密集型任务,那么线程池应该配置较小,例如线程池可以配置CPU核心数目相等的大小;如果是需要资源等待类型的任务(如I/O等访问,数据库操作等),则应该根据等待的平均时间,来配置N倍于CPU核心数目的大小。线程池数目配置的具体的大小,还需要在实际开发工作中,编写行能测试类,结合虚拟机行能监控工具(如VisualVM),来进行配置调优。


说明:

线程池提交的是一个Runnable类型的任务,因此线程池变量共享的问题,也就是多线程变量共享的问题。在多线程环境下,变量当然是可以共享的,例如售票系统中的票数限制,订单系统中的订单号等,都需对同一变量进行操作。为了控制篇幅,多线程共享问题在公众号下一篇分析。



记住,这里全部都是干货!!!

这是一个靠谱的Java圈子



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存