查看原文
其他

漫画:怎么给女朋友讲明白线程池?

Isabella 程序人生 2019-06-22


作者 | Isabella

责编 | 伍杏玲

线程池是Java面试必问问题之一!

有没有对源码滚瓜烂熟的童鞋?请举手!🙋🙋‍♂️(怎么没人举手。。)

对了,今天先来撒一波狗狼~ (表打我~)

来,介绍下:

她叫码妞,是我码仔的女朋友喔!

她也在学习各类前端技术,可厉害了!

大家鼓掌欢迎吧!以后她会经常来问我问题的,要被烦了~ 

最近码妞也在看Java线程池呢,已经看得一头雾水了,正准备去问问码仔,

看码仔能不能给她讲明白了!




线程


线程是一种资源,并不是只存在程序的世界里。

程序,本来就是对生活的一种抽象表述。

比如像车站的售票窗口、退票窗口、检票窗口,每个窗口都在做不同的事情,就是车站里同时运行着的不同线程。

线程多了,需要管理,不同的线程也要能保证不会互相干扰,各做各的。


线程的生命周期

这个图很熟悉的吧~

好,开始讲线程池啦~


ThreadPoolExecutor


线程池源码里最主要的类了~

看下开头的这段注释:

/**
 * The main pool control state, ctl, is an atomic integer packing
 * two conceptual fields
 * workerCount, indicating the effective number of threads
 * runState, indicating whether running, shutting down etc
 *
 * The runState provides the main lifecycle control, taking on values:
看到英文就头晕?没事啦~

主要讲线程池重要的两个状态:

  • runState:线程池运行状态

  • workerCount:工作线程的数量

@ReachabilitySensitive
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

线程池用一个32位的int来同时保存runState和workerCount,其中高3位(第31到29位)是runState,其余29位是workerCount(大约500 million)。

来看看存储结构(码仔手动画的哦

它的构造方法:

public ThreadPoolExecutor(int corePoolSize,
     int maximumPoolSize,
     long keepAliveTime,
     TimeUnit unit,
     BlockingQueue<Runnable> workQueue,
     ThreadFactory threadFactory,
     RejectedExecutionHandler handler) 
{
        ...
}


  • corePoolSize

  • 核心线程数,好比班干部的人数。

  • maximumPoolSize

  • 最大线程数,好比教室里的座位数。当提交任务数超过了这个最大值,线程还有拒绝策略——RejectExecutionHandler,做不动了嘛。

  • keepAliveTime

  • 除核心线程外的空闲线程保持存活时间。当线程池里线程数超过corePoolSize数量了,keepAliveTime时间到,就把空闲线程关了,不然也闲置了呀,节省能量嘛。

  • workQueue

    任务阻塞队列。通过workQueue,线程池实现了阻塞功能。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。

  • threadFactory

    创建线程的工厂。所有的线程都是通过这个Factory创建的。

    默认会使用

    Executors.defaultThreadFactory() 来作线程工厂。

  • handler 线程池的饱和策略。做不了任务了找理由罢工 

    • AbortPolicy

      • 直接抛出异常,默认策略;

    • CallerRunsPolicy

      • 用调用者所在的线程来执行任务;

    • DiscardOldestPolicy

      • 丢弃阻塞队列中靠最前的任务,并执行当前任务;

    • DiscardPolicy

      • 直接丢弃任务。


Worker来了!


你看Worker的定义,其实它就是封装了的工作线程~

private final class Worker
     extends AbstractQueuedSynchronizer
     implements Runnable

Worker既实现了Runnable,又继承了AbstractQueuedSynchronizer(AQS),所以它既是一个可执行的任务,又可以达到锁的效果。

看看Worker构造方法:

/**
 * Creates with given first task and thread from ThreadFactory.
 * @param firstTask the first task (null if none)
 */

 Worker(Runnable firstTask) {
     setState(-1); // inhibit interrupts until runWorker
     this.firstTask = firstTask;
     this.thread = getThreadFactory().newThread(this);
 }



线程池是怎么工作的?


DuangDuangDuang!

public void execute(Runnable command) {
     if (command == null)
         throw new NullPointerException();
 /*
 * Proceed in 3 steps:
 *
 * 1. If fewer than corePoolSize threads are running, try to
 * start a new thread with the given command as its first
 * task. The call to addWorker atomically checks runState and
 * workerCount, and so prevents false alarms that would add
 * threads when it shouldn't, by returning false.
 * 2. If a task can be successfully queued, then we still need
 * to double-check whether we should have added a thread
 * (because existing ones died since last checking) or that
 * the pool shut down since entry into this method. So we
 * recheck state and if necessary roll back the enqueuing if
 * stopped, or start a new thread if there are none.
 * 3. If we cannot queue task, then we try to add a new
 * thread. If it fails, we know we are shut down or saturated
 * and so reject the task.
 */

     int c = ctl.get();
     if (workerCountOf(c) < corePoolSize) {
         if (addWorker(command, true))
             return;
        c = ctl.get();
     }
     /**
     * 2、如果线程池RUNNING状态,且入队列成功
     */

     if (isRunning(c) && workQueue.offer(command)) {
         int recheck = ctl.get();

     //如果再次校验过程中,线程池不是RUNNING状态,
     // 并且remove(command)--workQueue.remove()成功,拒绝当前command
         if (! isRunning(recheck) && remove(command))
             reject(command);

         //为什么只检查运行的worker数量是不是0呢??为什么不和corePoolSize比较呢??
         // 只保证有一个worker线程可以从queue中获取任务执行就行了??
         // 因为只要还有活动的worker线程,就可以消费workerQueue中的任务
         else if (workerCountOf(recheck) == 0)
             addWorker(nullfalse);
         }
         /**
         * 3、如果线程池不是running状态 或者 无法入队列
         * 尝试开启新线程,扩容至maxPoolSize,
         * 如果addWork(command, false) 失败了,拒绝当前command
         */

         else if (!addWorker(command, false))
             reject(command);
         }}

看execute方法里的注释,一步步说得很清楚。

  1. 如果当前正在运行的线程数 < corePoolSize,尝试用给到的command来启动一个新线程作为第一个任务。
    调用addWorker方法,检查runState和workerCount,并且如果增加线程的话,能防止产生错误警报,如果不能增加线程,则返回false。

  2. 如果一个任务被成功地加到队列里,仍然需要双重检验来确认是否需要新建一个线程。

    (因为可能在上一次检查后,已经存在的线程已经died)或者进入这个方法后,线程池已经被关闭了。所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新建一个线程。

  3. 如果不能把任务加入队列(可能线程池已经关闭或者满了),那么需要新开一个线程(往maxPoolSize发展)。如果失败的话,说明线程池shutdown了或者满了,就要拒绝这个任务了。

给你流程图!



工具类 Executors


线程池里还有个重要的类:Executors。

Executors是一个Java中的工具类,它提供工厂方法来创建不同类型的线程池。

用它可以很方便地创建出下面几种线程池来:

ExecutorService singleService = Executors.newSingleThreadExecutor();
ExecutorService fixedService = Executors.newFixedThreadPool(9);
ExecutorService cacheService = Executors.newCacheThreadPool();

或者通过ThreadPoolExecutor的构造函数自定义需要的线程池。

作者简介:公众号码个蛋,码上养成好习惯。

为码一代,想教码二代却无从下手:

听说少儿编程很火,可它有哪些好处呢?

孩子多大开始学习比较好呢?又该如何学习呢?

最新的编程教育政策又有哪些呢?

下面给大家介绍CSDN新成员:极客宝宝(ID:geek_baby)

戳他了解更多↓↓↓

 热 文 推 荐 

北漂杭漂的程序员,是如何买到第一套房子?

支离破碎的 Android

马斯克是如何成为表情包之王的

前端开发 20 年变迁史

☞直接拿来用!灵跃模组机器人硬核评测(编程篇)

☞容器云常见安全威胁与防范 | 技术干货

敲诈团伙将黑手伸向宅男, 你在家看不可描述的视频, 竟被骗走100万美元!

各方最新回应!如何看待IEEE官方声明“学术禁令”?

代码整洁之道-编写 Pythonic 代码

☞敲代码时,程序员戴耳机究竟在听什么?

你点的每个“在看”,我都认真当成了喜欢

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存