周其仁:停止改革,我们将面临三大麻烦

抛开立场观点不谈,且看周小平写一句话能犯多少语病

罗马尼亚的声明:小事件隐藏着大趋势——黑暗中的风:坚持做对的事相信未来的结果

布林肯突访乌克兰,为何选择去吃麦当劳?

中国不再是美国第一大进口国,贸易战殃及纺织业? 美国进一步延长352项中国商品的关税豁免期

生成图片,分享到微信朋友圈

自由微信安卓APP发布,立即下载! | 提交文章网址
查看原文

Java线程池分享

涂丰毅 冰岩作坊 2022-06-07

Java线程池分享

多线程分享链接(更新中):https://blog.csdn.net/weixin_43093006/category_10650515.html

1. 问题:

  1. 线程池的主要工作流程是什么?
  2. 线程池都有哪几种工作队列?怎么理解无界队列和有界队列?
  3. 线程池的拒绝策略有何用途,有哪些拒绝策略?是否可以自定义拒绝策略
  4. 如何创建、停止线程池?为什么不建议使用Executors构建线程池?
  5. 线程池有哪些种类,各自的使用场景是什么?
  6. 线程池有哪些状态,状态的设计机制是什么?状态是如何相互切换的?
  7. 请谈谈线程池的使用场景?线程池为什么能提高性能?
  8. 线程池有哪些重要参数?如何设置这些重要参数?
  9. 线程池如何获取执行返回的结果?
  10. 出现unable to create new native thread的异常,如何分析解决?

1.1 线程池的主要工作流程

线程池任务提交的主要流程

拒绝策略是一种限流的机制

1.1.1 相关源码

我们看到 ThreadPoolExecutor

public void execute(Runnable command) {
     //如果任务为null,则抛空指针异常
        if (command == null)
            throw new NullPointerException();
        //获取当前线程池的状态+线程个数变量的组合值
        int c = ctl.get();
     //1. 如果当前有效线程数小于核心线程数,调用addWorker执行任务(创建一条线程执行该任务)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
     //2. 如果当前有效线程大于等于核心线程数,并且当前线程池状态为运行状态,同时尝试用非阻塞方法向任务队列中放入任务(放入失败offer()返回false)
        if (isRunning(c) && workQueue.offer(command)) {
             //二次检查
            int recheck = ctl.get();
            //如果当前线程状态不是RUNNING则从队列删除任务,并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                //调用拒绝策略处理任务 - 返回
                reject(command);
            //如果当前线程池空,则添加一个线程
            else if (workerCountOf(recheck) == 0)
                addWorker(nullfalse);
        }
     //3. 如果阻塞队列已满,则调用addWorker执行任务(即创建一条线程执行该任务)
        else if (!addWorker(command, false))
            //如果创建线程失败,则调用线程拒绝策略
            reject(command);
    }

看一下 addWorker() 方法

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //计数器
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //如果工作线程数大于容量或者大于核心或非核心线程数,则返回false,无法创建
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //如果能创建线程,首先计数器+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //如果没有计数器+1失败
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    //如果运行状态有变化,重新尝试
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
  //计数器+1成功
     //准备为任务创建线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //Worker通过ThreadFactory创建一个thread
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //如果线程可用,向HashSet中添加worker
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //如果任务添加,则开启线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //如果线程开启失败
            if (! workerStarted)
                //在HashSet中去掉该worker,计数器-1,尝试结束
                addWorkerFailed(w);
        }
        return workerStarted;
    }

1.1.2 运行示例

public class ThreadPoolDemo1 implements Runnable{

    public static void main(String[] args) {
        //创建有界队列
        ArrayBlockingQueue<Runnable> queue=new ArrayBlockingQueue<Runnable>(12);
//        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(12);
        //Executors工厂类底层用的就是ThreadPoolExecutor
        //核心4 最大池8
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4860, TimeUnit.SECONDS, queue);
        for (int i = 0; i < 30; i++)
        {
            //放入任务,当核心线程数满了之后,就会把任务加到队列中去
            threadPool.execute(
                    new Thread(new ThreadPoolDemo1(), "Thread".concat(i + "")));
//            System.out.println("线程池中活跃的线程数: " +threadPool.getPoolSize()+",核心线程数:"+ threadPool.getCorePoolSize()+",最大线程数:"+threadPool.getMaximumPoolSize());
            if (queue.size() > 0)
            {
                System.out.println("阻塞队列有线程了,队列中阻塞的线程数:" + queue.size()+", 线程池中执行任务的线程数:"+threadPool.getActiveCount());
            }
            System.out.println(" 线程池中当前的线程数:" +threadPool.getPoolSize());

        }
        threadPool.shutdown();
    }

    @Override
    public void run() {
        try
        {
            Thread.sleep(100);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }



}

1.1.3 运行结果

 线程池中当前的线程数:1
 线程池中当前的线程数:2
 线程池中当前的线程数:3
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:1, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:2, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:3, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:4, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:5, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:6, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:7, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:8, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:9, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:10, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:11, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:12, 线程池中执行任务的线程数:4
 线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:12, 线程池中执行任务的线程数:5
 线程池中当前的线程数:5
阻塞队列有线程了,队列中阻塞的线程数:12, 线程池中执行任务的线程数:6
 线程池中当前的线程数:6
阻塞队列有线程了,队列中阻塞的线程数:12, 线程池中执行任务的线程数:7
 线程池中当前的线程数:7
阻塞队列有线程了,队列中阻塞的线程数:12, 线程池中执行任务的线程数:8
 线程池中当前的线程数:8
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Thread[Thread20,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@330bedb4[Running, pool size = 8, active threads = 8, queued tasks = 12, completed tasks = 0]
 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
 at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
 at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
 at 线程.threadpool.ThreadPoolDemo1.main(ThreadPoolDemo1.java:18)

1.2 线程池都有哪几种工作队列

  • ArrayBlockingQueue : 有界队列(基于数组)

  • LinkedBlockingQueue : 有/无界队列 (基于链表(单向链表,头尾指针),传参就有界,不传参就无界)无界:Integer.MAX_VALUE

    •   //不带参数就是无穷大
          public LinkedBlockingQueue() {
              this(Integer.MAX_VALUE);
          }

          //带了参数 不能为非正数
          public LinkedBlockingQueue(int capacity) {
              if (capacity <= 0throw new IllegalArgumentException();
              this.capacity = capacity;
              last = head = new Node<E>(null);
          }
  • SynchronousQueue:同步队列(不存储元素的阻塞队列)

1.2.1 如何理解无界队列和有界队列

有界队列

有固定大小的队列。比如设定了固定大小的 LinkedBlockingQueue ,大小为 0 ,在生产者和消费者做中转用的 SynchronousQueue 都属于有界队列

无界队列

没有设定固定大小的队列,特点是可以直接入列,直到溢出,容量默认为 Integer.MAX_VALUE (2^31-1),相当于无界,比如没有设定固定大小的 LinkedBlockingQueue

1.3 线程池的拒绝策略有哪些用途?有哪些拒绝策略?是否可以自定义拒绝策略

1.3.1 线程池拒绝策略用途及种类

  • 线程池的拒绝策略从设计上来说,是对线程池起限流保护作用
  • ThreadPoolExecutor.AbortPolicy : 丢弃任务,并抛出 RejectedExecutionException 异常
  • ThreadPoolExecutor.DiscardPolicy :丢弃任务,但是不抛出异常
  • ThreadPoolExecutor.DiscardOldestPolicy :  丢弃队列最前面的任务,然后重新尝试执行任务
  • ThreadPoolExecutor.CallerRunsPolicy :  由调用线程处理该任务

1.3.2 自定义线程池拒绝处理策略

  • 可以根据应用场景实现 RejectedExecutionHandler 接口,自定义拒绝策略,比如记录日志或者持久化存储不能处理的任务,便于定位分析问题

自定义线程池Demo

运行实例
public class ThreadPoolRejectDemo {
    public static class MyTask implements Runnable {

        public void run() {
            System.out.println(System.currentTimeMillis() + "thread id:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(4);
        ExecutorService es = new ThreadPoolExecutor(55,
                0L, TimeUnit.MILLISECONDS,
                queue,
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {

                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + " is discard");
                        System.out.println(" log the rejectedExecution");
                    }
                });

        for (int i = 0; i < 200; i++) {
            es.submit(task);

        }
    }
}
运行结果
1607568814423thread id:12
1607568814423thread id:16
1607568814423thread id:13
1607568814423thread id:15
1607568814423thread id:14
java.util.concurrent.FutureTask@135fbaa4 is discard
 log the rejectedExecution
java.util.concurrent.FutureTask@45ee12a7 is discard
 log the rejectedExecution
 ...
 java.util.concurrent.FutureTask@1f17ae12 is discard
 log the rejectedExecution
java.util.concurrent.FutureTask@4d405ef7 is discard
 log the rejectedExecution
1607568814525thread id:14
1607568814525thread id:12
1607568814525thread id:15
1607568814525thread id:13

1.4 如何创建、停止线程池?为什么不建议使用Executors构建线程池?

1.4.1 线程池的几个状态

*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
*   STOP:     Don'
t accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed

* RUNNING -> SHUTDOWN
*    On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
*    On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
*    When both queue and pool are empty
* STOP -> TIDYING
*    When pool is empty
* TIDYING -> TERMINATED
*    When the terminated() hook method has completed

1.4.2 线程池的终止方法

  • 优雅退出- shutdown(),强迫退出- shutdownNow()
  • 如果调用了 shutdown() 方法,则线程池处于 SHUTDOWN 状态,此时线程池不能够接受新的任务( runStateof(c)SHUTDOWN ),它会等待所有任务执行完毕。有个 onShutdown() 方法
  • 如果调用了 shutdownNow() 方法,则线程池处于 STOP 状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务

线程终止Demo

Shutdown()
public class ThreadPoolShutdownDemo {

    public static void main(String[] args) {

        //创建固定 3 个线程的线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        //向线程池提交 10 个任务
        for (int i = 1; i <= 10; i++) {
            final int index = i;
            threadPool.submit(() -> {
                System.out.println("正在执行任务 " + index);
                //休眠 3 秒
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        //休眠 4 秒
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //关闭线程池
        threadPool.shutdown();
    }


}
shutdownNow()
public class ThreadPoolShutdownNowDemo {

    public static void main(String[] args) {
  //创建固定 3 个线程的线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        //向线程池提交 10 个任务
        for (int i = 1; i <= 10; i++) {
            final int index = i;
            threadPool.submit(() -> {
                System.out.println("正在执行任务 " + index);
                //休眠 3 秒
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        //休眠 4 秒
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //关闭线程池
        List<Runnable> tasks = threadPool.shutdownNow();
        System.out.println("剩余 " + tasks.size() + " 个任务未执行");
        
    }


}

shutdownNow()  示例报了错,java.lang.InterruptedException: sleep interrupted, 还在 sleep就被关闭了。

1.4.3 线程池的创建方法

使用ThreadPoolExecutor创建

ThreadPoolExecutor构造方法

使用Executors创建

Executors方法
  • Executors.newFixedThreadPool(10);  // LinkedBlockingQueue 无限加入队列
  • Executors.newScheduledThreadPool(10);  // DelayedWorkQueue 队列如果满了,阻塞
  • Executors.newSingleThreadScheduledExecutor();  // DelayedWorkQueue 队列如果满了,阻塞
  • Executors.newCachedThreadPool(); // SynchronousQueue 队列如果满了,抛异常,它是不存储东西的
  • Executors.newSingleThreadExecutor();  // LinkedBlockingQueue 无限加入队列

1.5 为什么不建议使用Executors构建线程池

Executors提供的很多方法默认使用的都是无界的 LinkedBlockingQueue ,高负载技术场景下,无界队列很容易导致 OOM (内存溢出),因此强烈建议使用有界队列。

存在这种情况,当每个线程获取到一个任务后,执行时间比较长,导致 workQueue 里积压的任务越来越多,机器的内存使用不停的飙升,最后也会导致 OOM 。

是否有解决办法?

线程任务使用有界阻塞队列,队列满了就暂停放入线程任务;但是这样拒绝策略要设置好,因为默认的是超过队列长度就丢弃的策略 AbortPolicy ,这样不行,应该使用 CallerRunsPolicy 策略,直接让调用者执行任务(或者说是 由启动线程池的线程运行多出的线程,如果在主线程中调用。主线程就被阻塞了)。

直接让调用者执行任务?

public static class CallerRunsPolicy implements RejectedExecutionHandler {
       
        public CallerRunsPolicy() { }

     //拒绝策略中,调用它本身的run()去运行?
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

这能解决OOM?可以解决,因为主线程被阻塞,后续不会再添加进来,等到运行完毕,后续才会再添加到队列中,但又有可能导致ANR,因为在主线程中创建的线程池。

能否在子线程中创建线程池?可以,这样就不会导致主线程阻塞。


1.6 为什么不建议使用Executors静态工厂构建线程池?

  • FixedThreadPool 和 SingleThreadPool:
    • 允许的请求队列(底层是 LinkedblockingQueue )长度是 Integer.MAX_VALUE ,可能会堆积大量的请求,造成OOM
  • CachedThreadPool和ScheduledThreadPool
    • 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,导致 OOM
  • 避免使用 Executors 创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用 ThreadPoolExecutor 的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量就可以了

1.7 线程池有哪些种类,各自的使用场景是什么?

  • newSingleThreadPoolExecutor:

    单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。

  • newFixedThreadPool:

    固定大小的线程池,用于已知并发压力的情况下,对线程数做限制。

  • newCachedThreadPool:

    创建一个可缓存的线程池,比较适合处理执行时间比较小的任务。

  • newScheduledThreadPool:

    适用于定时以及周期性执行任务的场景

  • newWorkStealingPool:

    jdk1.8提供的线程池,底层使用 ForkJoinPool 实现,适用于大任务分解并行执行的场景。Fork/Join框架用到了工作窃取 ( work-stealing ) 算法,任务分割为若干互不依赖的子任务(有点像 Master-worker 设计模式)

1.8 线程池有哪些状态,状态的设计机制是什么?状态是如何相互切换的?

1.8.1 线程池状态

  • RUNNING:接受新任务并且处理阻塞队列里的任务
  • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
  • STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务
  • TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用 terminated 方法;
  • TERMINATED:终止状态terminated 方法调用完成以后的状态。

1.8.2 线程池的状态设计机制

状态设计
  • ctl 是一个 AtomicInteger 类型,它的低29位用于存放当前的线程数

  • 高3位用于表示当前线程池的状态,其中高三位的值和状态对应如下:

    111:RUNNING

    000:SHUTDOWN

    001:STOP

    010:TIDING

    011:TERMINATED

  • 源码部分是这样定义的

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

        // 高3位存储当前线程池状态
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;

        // Packing and unpacking ctl
        private static int runStateOf(int c)     return c & ~CAPACITY; }
        private static int workerCountOf(int c)  return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) return rs | wc; }

线程池状态打印Demo

public class ThreadPoolStateTest {
    //ctl是线程池中一个非常重要的变量,以它的低29位表示线程池中处于RUNNING状态的线程个数,高3位表示线程池所处的状态
   public static int COUNT_BITS = Integer.SIZE - 3;
   //左移29位 1000...(29)...0 -1 0 11111...(29)...111
   public static int CAPACITY   = (1 << COUNT_BITS) - 1;
   
   public static void main(String[] args) {
        // <<表示 左移
        int RUNNING    = -1 << COUNT_BITS;
        int SHUTDOWN   =  0 << COUNT_BITS;
        int STOP       =  1 << COUNT_BITS;
        int TIDYING    =  2 << COUNT_BITS;
        int TERMINATED =  3 << COUNT_BITS;
        System.out.println("Integer.SIZE is :"+Integer.SIZE+"  , COUNT_BITS is : "+COUNT_BITS);
        System.out.println("CAPACITY is : "+CAPACITY+" , "+Integer.toBinaryString(CAPACITY));
      //五种状态中SHUTDOWN值等于0,RUNNING值小于0,其他三种状态STOP、TIDYING、TERMINATED值均大于0
        System.out.println("RUNNING    = " + RUNNING + " = " + Integer.toBinaryString(RUNNING));
        System.out.println("SHUTDOWN   = " + SHUTDOWN + " = " + Integer.toBinaryString(SHUTDOWN));
        System.out.println("STOP       = " + STOP + "  = 00" + Integer.toBinaryString(STOP));
        System.out.println("TIDYING    = " + TIDYING + " = 0" + Integer.toBinaryString(TIDYING));
        System.out.println("TERMINATED = " + TERMINATED + " = 0" + Integer.toBinaryString(TERMINATED));
    }
    //CAPACITY低29位全1高3位全0,它与c做与运算得到的就是当前运行的线程个数
    static int workerCountOf(int c)  {
        return c & CAPACITY;
    }

}

打印结果

Integer.SIZE is :32  , COUNT_BITS is : 29
CAPACITY is : 536870911 , 11111111111111111111111111111
RUNNING    = -536870912 = 11100000000000000000000000000000
SHUTDOWN   = 0 = 0
STOP       = 536870912  = 00100000000000000000000000000000
TIDYING    = 1073741824 = 01000000000000000000000000000000
TERMINATED = 1610612736 = 01100000000000000000000000000000

1.8.3 线程池状态切换

  • 1.RUNNING -> SHUTDOWN:显示调用 shutdown() 方法,或者隐式调用了 finalize() ,它里面调用了 shutdown() 方法
  • 2.RUNNING or SHUTDOWN -> STOP:显示调用 shutdownNow() 方法时候
  • 3.SHUTDOWN->TIDYING:当线程池和任务队列都为空的时候
  • 4.STOP->TIDYING:当线程池为空的时候
  • 5.TIDYING->TERMINATED:当 terminated()  hook 方法执行完成时候
线程状态图

1.9 线程池的使用场景?线程池为什么能提升性能?

1.9.1 线程池的使用场景以及作用

  • 场景:适用于高并发、批量处理、性能调优等场景
  • 作用:节省线程创建、销毁的时间,提升性能

1.10 线程池有哪些重要参数?如何设置这些重要参数?

1.10.1 线程池的重要参数(ThreadPoolExecutor构造器传入的参数)

  • corePoolSize :线程池的核心线程数,即便是线程池里没有任何任务,也会有 corePoolSize 个线程在候着等待任务。

  • maximumPoolSize :最大线程数,不管你提交多少任务,线程池里最多工作线程数就是 maximumPoolSize (这里面包括了corePoolSize)

  • keepAliveTime :线程的存活时间。当线程池里的线程数大于 corePoolSize 时候(有非核心线程的时候),如果等了 keepAliveTime 时长还没有任务可执行,则线程退出。在 ThreadPoolExecutor 的 getTask() 方法中,workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)引用到。

  • LinkedBlockingQueue

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            E x = null;
            int c = -1;
            long nanos = unit.toNanos(timeout);
         //count为当前的元素数量
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                //如果当前没有元素,则倒数等待
                while (count.get() == 0) {
                    //如果倒数结束还是没有元素,则返回null
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                //如果有元素,则出队
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
  • unit :这个用来指定 keepAliveTime 的单位,比如秒:TimeUnit.SECONDS

  • workQueue :队列,提交的任务将会被放到这个队列里

  • threadFactory :线程工厂,用来创建线程,主要是为了给线程起名字

  • handler :拒绝策略,当线程池里线程被耗尽,且队列也满了的时候会调用

1.10.2 线程池的重要参数

  • 参考 Executors 类设置、测试

  • CPU 密集型:

    CPU 密集型(计算密集型),大部分时间用来做计算逻辑判断等 CPU 动作。尽量减少 CPU 上下文切换,核心线程数大小 = Ncpu + 1 。(经验值)

  • IO 密集型:

    IO 密集型任务指任务需要执行大量的 IO 操作,涉及到网络、磁盘 IO 操作,对CPU消耗较少。核心线程数大小 = 2 Ncpu 。(经验值)

1.11 线程池如何获取执行返回的结果?

一般使用线程池执行任务都是调用的 execute 方法,这个方法定义在 Executor 接口中:

public interface Executor {
    void execute(Runnable command);
}123

这个方法是没有返回值的,而且只接受 Runnable 。

但是在 ExecutorService 接口 ( ThreadPoolExecutor 实现了这个接口 ) 中能找到这个方法:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

这个方法接收两种参数,Callable 和 Runnable。返回值是 Future。

1.11.1 Callable和Runnable

  • Callable

    public interface Callable<V> {
      V call() throws Exception;
    }
  • Runnable

    interface Runnable {
      public abstract void run();
    }
  • 区别:

    • Callable 可以接受泛型,call() 方法中可以返回一个这个类型的值,而 Runnable 方法没有返回值
    • Callable 的 call 方法可以跑出异常,Runnable 的 run 方法不会抛出异常
  • 什么时候用 Callable 什么时候用 Runnable ?

    需要返回值的时候用 Callable 。Callable 面向 Future 编程

1.11.2 Future

返回值 Future 也是一个接口,通过他可以获得任务执行的返回值。

定义如下:

public interface Future<V> {
    boolean cancel(boolean var1);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}

get()方法的阻塞性

运行到 future.get() 的时候就阻塞住了,一直等到任务执行完毕,拿到了返回的返回值,主线程才会继续运行。

其中一个情况:

Callable myCallable = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(6000);
                System.out.println("calld方法执行了");
                return "call方法返回值";
            }
        };
        Callable myCallable2 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                System.out.println("calld2方法执行了");
                return "call2方法返回值";
            }
        };

        ExecutorService executor = Executors.newFixedThreadPool(2);
        Future future = executor.submit(myCallable);
        Future future2 = executor.submit(myCallable2);
        System.out.println("获取返回值: "+future2.get());
        System.out.println("获取返回值2: "+future.get());

分析代码:当调用 submit 时候,任务异步开始工作,当调用get 的时候,调用 get() 方法的线程阻塞,等待返回值,获取到返回值的时候继续后续动作。

假设 future2 的等待时长为 3 秒, future1 的等待市场为 6 秒。若先用 future1.get() 则在阻塞时间过后,两个异步工作都完成了,future1.get() 获得返回值后,进入 future2.get() 阻塞,由于异步工作已经完成,阻塞时间几乎为 0 ,直接获得值。

相反如果先调用 future2.get() 则在阻塞时间过后,future2 的异步工作完成,而 future1 的异步工作还要等待 3 秒才能完成。所以, future2.get() 获取返回值取消阻塞后,进入future1.get() ,还要再阻塞 3 秒等待工作完成,才能获取返回值,取消阻塞。

底层如何实现get()阻塞?

get() 中有阻塞方法 awaitDone()

1.12 出现Unable to create new native thread的异常,如何分析解决?

  • 原因:

    创建线程数超过了操作系统的限制

  • 分析:

    线程数设置是合理?是否存在多个定时调度任务、且线程数太大?

  • 解决:

    设置合理的线程数、加机器等等


文章有问题?点此查看未经处理的缓存