Java线程池分享
Java线程池分享
多线程分享链接(更新中):https://blog.csdn.net/weixin_43093006/category_10650515.html
1. 问题:
线程池的主要工作流程是什么? 线程池都有哪几种工作队列?怎么理解无界队列和有界队列? 线程池的拒绝策略有何用途,有哪些拒绝策略?是否可以自定义拒绝策略 如何创建、停止线程池?为什么不建议使用Executors构建线程池? 线程池有哪些种类,各自的使用场景是什么? 线程池有哪些状态,状态的设计机制是什么?状态是如何相互切换的? 请谈谈线程池的使用场景?线程池为什么能提高性能? 线程池有哪些重要参数?如何设置这些重要参数? 线程池如何获取执行返回的结果? 出现unable to create new native thread的异常,如何分析解决?
1.1 线程池的主要工作流程
拒绝策略是一种限流的机制
1.1.1 相关源码
我们看到 ThreadPoolExecutor
public void execute(Runnable command) {
//如果任务为null,则抛空指针异常
if (command == null)
throw new NullPointerException();
//获取当前线程池的状态+线程个数变量的组合值
int c = ctl.get();
//1. 如果当前有效线程数小于核心线程数,调用addWorker执行任务(创建一条线程执行该任务)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2. 如果当前有效线程大于等于核心线程数,并且当前线程池状态为运行状态,同时尝试用非阻塞方法向任务队列中放入任务(放入失败offer()返回false)
if (isRunning(c) && workQueue.offer(command)) {
//二次检查
int recheck = ctl.get();
//如果当前线程状态不是RUNNING则从队列删除任务,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
//调用拒绝策略处理任务 - 返回
reject(command);
//如果当前线程池空,则添加一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 如果阻塞队列已满,则调用addWorker执行任务(即创建一条线程执行该任务)
else if (!addWorker(command, false))
//如果创建线程失败,则调用线程拒绝策略
reject(command);
}
看一下 addWorker() 方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//计数器
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果工作线程数大于容量或者大于核心或非核心线程数,则返回false,无法创建
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//如果能创建线程,首先计数器+1
if (compareAndIncrementWorkerCount(c))
break retry;
//如果没有计数器+1失败
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
//如果运行状态有变化,重新尝试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//计数器+1成功
//准备为任务创建线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//Worker通过ThreadFactory创建一个thread
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//如果线程可用,向HashSet中添加worker
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果任务添加,则开启线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果线程开启失败
if (! workerStarted)
//在HashSet中去掉该worker,计数器-1,尝试结束
addWorkerFailed(w);
}
return workerStarted;
}
1.1.2 运行示例
public class ThreadPoolDemo1 implements Runnable{
public static void main(String[] args) {
//创建有界队列
ArrayBlockingQueue<Runnable> queue=new ArrayBlockingQueue<Runnable>(12);
// LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(12);
//Executors工厂类底层用的就是ThreadPoolExecutor
//核心4 最大池8
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, queue);
for (int i = 0; i < 30; i++)
{
//放入任务,当核心线程数满了之后,就会把任务加到队列中去
threadPool.execute(
new Thread(new ThreadPoolDemo1(), "Thread".concat(i + "")));
// System.out.println("线程池中活跃的线程数: " +threadPool.getPoolSize()+",核心线程数:"+ threadPool.getCorePoolSize()+",最大线程数:"+threadPool.getMaximumPoolSize());
if (queue.size() > 0)
{
System.out.println("阻塞队列有线程了,队列中阻塞的线程数:" + queue.size()+", 线程池中执行任务的线程数:"+threadPool.getActiveCount());
}
System.out.println(" 线程池中当前的线程数:" +threadPool.getPoolSize());
}
threadPool.shutdown();
}
@Override
public void run() {
try
{
Thread.sleep(100);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
1.1.3 运行结果
线程池中当前的线程数:1
线程池中当前的线程数:2
线程池中当前的线程数:3
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:1, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:2, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:3, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:4, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:5, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:6, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:7, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:8, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:9, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:10, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:11, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:12, 线程池中执行任务的线程数:4
线程池中当前的线程数:4
阻塞队列有线程了,队列中阻塞的线程数:12, 线程池中执行任务的线程数:5
线程池中当前的线程数:5
阻塞队列有线程了,队列中阻塞的线程数:12, 线程池中执行任务的线程数:6
线程池中当前的线程数:6
阻塞队列有线程了,队列中阻塞的线程数:12, 线程池中执行任务的线程数:7
线程池中当前的线程数:7
阻塞队列有线程了,队列中阻塞的线程数:12, 线程池中执行任务的线程数:8
线程池中当前的线程数:8
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Thread[Thread20,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@330bedb4[Running, pool size = 8, active threads = 8, queued tasks = 12, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at 线程.threadpool.ThreadPoolDemo1.main(ThreadPoolDemo1.java:18)
1.2 线程池都有哪几种工作队列
ArrayBlockingQueue : 有界队列(基于数组)
LinkedBlockingQueue : 有/无界队列 (基于链表(单向链表,头尾指针),传参就有界,不传参就无界)无界:
Integer.MAX_VALUE//不带参数就是无穷大
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//带了参数 不能为非正数
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}SynchronousQueue:同步队列(不存储元素的阻塞队列)
1.2.1 如何理解无界队列和有界队列
有界队列
有固定大小的队列。比如设定了固定大小的 LinkedBlockingQueue ,大小为 0 ,在生产者和消费者做中转用的 SynchronousQueue 都属于有界队列
无界队列
没有设定固定大小的队列,特点是可以直接入列,直到溢出,容量默认为 Integer.MAX_VALUE (2^31-1),相当于无界,比如没有设定固定大小的 LinkedBlockingQueue
1.3 线程池的拒绝策略有哪些用途?有哪些拒绝策略?是否可以自定义拒绝策略
1.3.1 线程池拒绝策略用途及种类
线程池的拒绝策略从设计上来说,是对线程池起限流保护作用 ThreadPoolExecutor.AbortPolicy : 丢弃任务,并抛出 RejectedExecutionException异常ThreadPoolExecutor.DiscardPolicy :丢弃任务,但是不抛出异常 ThreadPoolExecutor.DiscardOldestPolicy : 丢弃队列最前面的任务,然后重新尝试执行任务 ThreadPoolExecutor.CallerRunsPolicy : 由调用线程处理该任务
1.3.2 自定义线程池拒绝处理策略
可以根据应用场景实现 RejectedExecutionHandler接口,自定义拒绝策略,比如记录日志或者持久化存储不能处理的任务,便于定位分析问题
自定义线程池Demo
运行实例
public class ThreadPoolRejectDemo {
public static class MyTask implements Runnable {
public void run() {
System.out.println(System.currentTimeMillis() + "thread id:" + Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(4);
ExecutorService es = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
queue,
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is discard");
System.out.println(" log the rejectedExecution");
}
});
for (int i = 0; i < 200; i++) {
es.submit(task);
}
}
}
运行结果
1607568814423thread id:12
1607568814423thread id:16
1607568814423thread id:13
1607568814423thread id:15
1607568814423thread id:14
java.util.concurrent.FutureTask@135fbaa4 is discard
log the rejectedExecution
java.util.concurrent.FutureTask@45ee12a7 is discard
log the rejectedExecution
...
java.util.concurrent.FutureTask@1f17ae12 is discard
log the rejectedExecution
java.util.concurrent.FutureTask@4d405ef7 is discard
log the rejectedExecution
1607568814525thread id:14
1607568814525thread id:12
1607568814525thread id:15
1607568814525thread id:13
1.4 如何创建、停止线程池?为什么不建议使用Executors构建线程池?
1.4.1 线程池的几个状态
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
1.4.2 线程池的终止方法
优雅退出- shutdown(),强迫退出-shutdownNow()如果调用了 shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务(runStateof(c)是SHUTDOWN),它会等待所有任务执行完毕。有个onShutdown()方法如果调用了 shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务
线程终止Demo
Shutdown()
public class ThreadPoolShutdownDemo {
public static void main(String[] args) {
//创建固定 3 个线程的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//向线程池提交 10 个任务
for (int i = 1; i <= 10; i++) {
final int index = i;
threadPool.submit(() -> {
System.out.println("正在执行任务 " + index);
//休眠 3 秒
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
//休眠 4 秒
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//关闭线程池
threadPool.shutdown();
}
}
shutdownNow()
public class ThreadPoolShutdownNowDemo {
public static void main(String[] args) {
//创建固定 3 个线程的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//向线程池提交 10 个任务
for (int i = 1; i <= 10; i++) {
final int index = i;
threadPool.submit(() -> {
System.out.println("正在执行任务 " + index);
//休眠 3 秒
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
//休眠 4 秒
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//关闭线程池
List<Runnable> tasks = threadPool.shutdownNow();
System.out.println("剩余 " + tasks.size() + " 个任务未执行");
}
}
shutdownNow() 示例报了错,java.lang.InterruptedException: sleep interrupted, 还在 sleep就被关闭了。
1.4.3 线程池的创建方法
使用ThreadPoolExecutor创建
使用Executors创建
Executors.newFixedThreadPool(10); // LinkedBlockingQueue无限加入队列Executors.newScheduledThreadPool(10); // DelayedWorkQueue队列如果满了,阻塞Executors.newSingleThreadScheduledExecutor(); // DelayedWorkQueue队列如果满了,阻塞Executors.newCachedThreadPool(); // SynchronousQueue队列如果满了,抛异常,它是不存储东西的Executors.newSingleThreadExecutor(); // LinkedBlockingQueue无限加入队列
1.5 为什么不建议使用Executors构建线程池
Executors提供的很多方法默认使用的都是无界的 LinkedBlockingQueue ,高负载技术场景下,无界队列很容易导致 OOM (内存溢出),因此强烈建议使用有界队列。
存在这种情况,当每个线程获取到一个任务后,执行时间比较长,导致 workQueue 里积压的任务越来越多,机器的内存使用不停的飙升,最后也会导致 OOM 。
是否有解决办法?
线程任务使用有界阻塞队列,队列满了就暂停放入线程任务;但是这样拒绝策略要设置好,因为默认的是超过队列长度就丢弃的策略 AbortPolicy ,这样不行,应该使用 CallerRunsPolicy 策略,直接让调用者执行任务(或者说是 由启动线程池的线程运行多出的线程,如果在主线程中调用。主线程就被阻塞了)。
直接让调用者执行任务?
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
//拒绝策略中,调用它本身的run()去运行?
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
这能解决OOM?可以解决,因为主线程被阻塞,后续不会再添加进来,等到运行完毕,后续才会再添加到队列中,但又有可能导致ANR,因为在主线程中创建的线程池。
能否在子线程中创建线程池?可以,这样就不会导致主线程阻塞。
1.6 为什么不建议使用Executors静态工厂构建线程池?
FixedThreadPool 和 SingleThreadPool: 允许的请求队列(底层是 LinkedblockingQueue )长度是 Integer.MAX_VALUE,可能会堆积大量的请求,造成OOMCachedThreadPool和ScheduledThreadPool 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,导致 OOM避免使用 Executors 创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用 ThreadPoolExecutor 的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量就可以了。
1.7 线程池有哪些种类,各自的使用场景是什么?
newSingleThreadPoolExecutor:
单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。
newFixedThreadPool:
固定大小的线程池,用于已知并发压力的情况下,对线程数做限制。
newCachedThreadPool:
创建一个可缓存的线程池,比较适合处理执行时间比较小的任务。
newScheduledThreadPool:
适用于定时以及周期性执行任务的场景
newWorkStealingPool:
jdk1.8提供的线程池,底层使用
ForkJoinPool实现,适用于大任务分解并行执行的场景。Fork/Join框架用到了工作窃取 ( work-stealing ) 算法,任务分割为若干互不依赖的子任务(有点像 Master-worker 设计模式)
1.8 线程池有哪些状态,状态的设计机制是什么?状态是如何相互切换的?
1.8.1 线程池状态
RUNNING:接受新任务并且处理阻塞队列里的任务 SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务 STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务 TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用 terminated方法;TERMINATED:终止状态, terminated方法调用完成以后的状态。
1.8.2 线程池的状态设计机制
ctl是一个AtomicInteger类型,它的低29位用于存放当前的线程数高3位用于表示当前线程池的状态,其中高三位的值和状态对应如下:
111:RUNNING
000:SHUTDOWN
001:STOP
010:TIDING
011:TERMINATED
源码部分是这样定义的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 高3位存储当前线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池状态打印Demo
public class ThreadPoolStateTest {
//ctl是线程池中一个非常重要的变量,以它的低29位表示线程池中处于RUNNING状态的线程个数,高3位表示线程池所处的状态
public static int COUNT_BITS = Integer.SIZE - 3;
//左移29位 1000...(29)...0 -1 0 11111...(29)...111
public static int CAPACITY = (1 << COUNT_BITS) - 1;
public static void main(String[] args) {
// <<表示 左移
int RUNNING = -1 << COUNT_BITS;
int SHUTDOWN = 0 << COUNT_BITS;
int STOP = 1 << COUNT_BITS;
int TIDYING = 2 << COUNT_BITS;
int TERMINATED = 3 << COUNT_BITS;
System.out.println("Integer.SIZE is :"+Integer.SIZE+" , COUNT_BITS is : "+COUNT_BITS);
System.out.println("CAPACITY is : "+CAPACITY+" , "+Integer.toBinaryString(CAPACITY));
//五种状态中SHUTDOWN值等于0,RUNNING值小于0,其他三种状态STOP、TIDYING、TERMINATED值均大于0
System.out.println("RUNNING = " + RUNNING + " = " + Integer.toBinaryString(RUNNING));
System.out.println("SHUTDOWN = " + SHUTDOWN + " = " + Integer.toBinaryString(SHUTDOWN));
System.out.println("STOP = " + STOP + " = 00" + Integer.toBinaryString(STOP));
System.out.println("TIDYING = " + TIDYING + " = 0" + Integer.toBinaryString(TIDYING));
System.out.println("TERMINATED = " + TERMINATED + " = 0" + Integer.toBinaryString(TERMINATED));
}
//CAPACITY低29位全1高3位全0,它与c做与运算得到的就是当前运行的线程个数
static int workerCountOf(int c) {
return c & CAPACITY;
}
}
打印结果
Integer.SIZE is :32 , COUNT_BITS is : 29
CAPACITY is : 536870911 , 11111111111111111111111111111
RUNNING = -536870912 = 11100000000000000000000000000000
SHUTDOWN = 0 = 0
STOP = 536870912 = 00100000000000000000000000000000
TIDYING = 1073741824 = 01000000000000000000000000000000
TERMINATED = 1610612736 = 01100000000000000000000000000000
1.8.3 线程池状态切换
1.RUNNING -> SHUTDOWN:显示调用 shutdown()方法,或者隐式调用了finalize(),它里面调用了shutdown()方法2.RUNNING or SHUTDOWN -> STOP:显示调用 shutdownNow()方法时候3.SHUTDOWN->TIDYING:当线程池和任务队列都为空的时候 4.STOP->TIDYING:当线程池为空的时候 5.TIDYING->TERMINATED:当 terminated()hook 方法执行完成时候
1.9 线程池的使用场景?线程池为什么能提升性能?
1.9.1 线程池的使用场景以及作用
场景:适用于高并发、批量处理、性能调优等场景 作用:节省线程创建、销毁的时间,提升性能
1.10 线程池有哪些重要参数?如何设置这些重要参数?
1.10.1 线程池的重要参数(ThreadPoolExecutor构造器传入的参数)
corePoolSize :线程池的核心线程数,即便是线程池里没有任何任务,也会有
corePoolSize个线程在候着等待任务。maximumPoolSize :最大线程数,不管你提交多少任务,线程池里最多工作线程数就是
maximumPoolSize(这里面包括了corePoolSize)keepAliveTime :线程的存活时间。当线程池里的线程数大于
corePoolSize时候(有非核心线程的时候),如果等了keepAliveTime时长还没有任务可执行,则线程退出。在 ThreadPoolExecutor 的getTask()方法中,workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)引用到。在
LinkedBlockingQueue中public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
//count为当前的元素数量
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果当前没有元素,则倒数等待
while (count.get() == 0) {
//如果倒数结束还是没有元素,则返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//如果有元素,则出队
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}unit :这个用来指定
keepAliveTime的单位,比如秒:TimeUnit.SECONDSworkQueue :队列,提交的任务将会被放到这个队列里
threadFactory :线程工厂,用来创建线程,主要是为了给线程起名字
handler :拒绝策略,当线程池里线程被耗尽,且队列也满了的时候会调用
1.10.2 线程池的重要参数
参考 Executors 类设置、测试
CPU 密集型:
CPU 密集型(计算密集型),大部分时间用来做计算逻辑判断等 CPU 动作。尽量减少 CPU 上下文切换,核心线程数大小 = Ncpu + 1 。(经验值)
IO 密集型:
IO 密集型任务指任务需要执行大量的 IO 操作,涉及到网络、磁盘 IO 操作,对CPU消耗较少。核心线程数大小 = 2 Ncpu 。(经验值)
1.11 线程池如何获取执行返回的结果?
一般使用线程池执行任务都是调用的 execute 方法,这个方法定义在 Executor 接口中:
public interface Executor {
void execute(Runnable command);
}123
这个方法是没有返回值的,而且只接受 Runnable 。
但是在 ExecutorService 接口 ( ThreadPoolExecutor 实现了这个接口 ) 中能找到这个方法:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
这个方法接收两种参数,Callable 和 Runnable。返回值是 Future。
1.11.1 Callable和Runnable
Callable
public interface Callable<V> {
V call() throws Exception;
}Runnable
interface Runnable {
public abstract void run();
}区别:
Callable 可以接受泛型, call()方法中可以返回一个这个类型的值,而 Runnable 方法没有返回值Callable 的 call方法可以跑出异常,Runnable 的run方法不会抛出异常什么时候用 Callable 什么时候用 Runnable ?
需要返回值的时候用 Callable 。Callable 面向 Future 编程
1.11.2 Future
返回值 Future 也是一个接口,通过他可以获得任务执行的返回值。
定义如下:
public interface Future<V> {
boolean cancel(boolean var1);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}
get()方法的阻塞性
运行到 future.get() 的时候就阻塞住了,一直等到任务执行完毕,拿到了返回的返回值,主线程才会继续运行。
其中一个情况:
Callable myCallable = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(6000);
System.out.println("calld方法执行了");
return "call方法返回值";
}
};
Callable myCallable2 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
System.out.println("calld2方法执行了");
return "call2方法返回值";
}
};
ExecutorService executor = Executors.newFixedThreadPool(2);
Future future = executor.submit(myCallable);
Future future2 = executor.submit(myCallable2);
System.out.println("获取返回值: "+future2.get());
System.out.println("获取返回值2: "+future.get());
分析代码:当调用 submit 时候,任务异步开始工作,当调用get 的时候,调用 get() 方法的线程阻塞,等待返回值,获取到返回值的时候继续后续动作。
假设 future2 的等待时长为 3 秒, future1 的等待市场为 6 秒。若先用 future1.get() 则在阻塞时间过后,两个异步工作都完成了,future1.get() 获得返回值后,进入 future2.get() 阻塞,由于异步工作已经完成,阻塞时间几乎为 0 ,直接获得值。
相反如果先调用 future2.get() 则在阻塞时间过后,future2 的异步工作完成,而 future1 的异步工作还要等待 3 秒才能完成。所以, future2.get() 获取返回值取消阻塞后,进入future1.get() ,还要再阻塞 3 秒等待工作完成,才能获取返回值,取消阻塞。
底层如何实现get()阻塞?
在get() 中有阻塞方法 awaitDone()
1.12 出现Unable to create new native thread的异常,如何分析解决?
原因:
创建线程数超过了操作系统的限制
分析:
线程数设置是合理?是否存在多个定时调度任务、且线程数太大?
解决:
设置合理的线程数、加机器等等