面试官:你在项目中用过 多线程 吗?
The following article is from Java后端技术全栈 Author 田哥
最近,从去年到现在,我给小伙伴们做模拟面试已有100多场。有时候我也在想,现在真的很卷吗?大部分人第一次模拟面试结束,给我的感觉不像大家说的那么卷。
奇怪的现象
我在做模拟面试的过程中,无意中发现一个现象,就是如果问八股文,问知识点,在校生或刚刚毕业不久的,回答的斗殴挺好的,往往是工作三五年的在这一块非常欠缺。
我也私下问过很多人,为什么这种现象,主要原因差不多就是:
每天太忙,没时间学习 年纪大了,记不住 就是不要想学
工作三五年的也不是就真的没有优点,他们的优点就是有大量的项目经验。换着问项目业务和设计之类的,他们明显占优势,但,问他们稍微往深的问,就会懵逼。比如:你们项目中使用到了Redis
,用来干嘛,他们能立马回答上来。
如果继续追问:如何保证Redis
和数据库中的数据一致性?然后就会稀里糊涂的回答。还有就是问他们Redis
的持久化方式使用的是哪种?“这个没注意,不是我安装的”,继续问:那你觉得哪种方式更好,答案各种各样的都有。
总结起来就亮点:
学生或新人,八股文占优势(也有一部分啥都不知道,啥也没去背的)。 三五年有项目经验,但大部分都停留在用上面,稍微问题问题就容易暴露自己的家点(也有一小分部知道的比较多)
在模拟面试的时候,我问过很多人是否在项目中用过并发编程的相关技术,用了什么?
基本上都回答:用过线程池
好吧,接下来,那我们就以一个线程池的面试题来对比以上两类人的回答。
聊聊线程池
线程池核心参数
学生或新人:基本上都是一口气就能吧这些参数回答上来,另外有部分优秀的会对这些参数做一个解释。
三五年的:部分人能全部回答出来,一部分人能说出核心线程数、最大线程数,其他参数就吱吱呜呜的回答,还有一部分就是完全一脸懵逼。
我们来看看到底有哪些参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
corePoolSize
:核心线程数maximumPoolSize
:最大线程数keepAliveTime
:空闲时间unit
:空闲时间单位workQueue
:阻塞队列threadFactory
:线程工厂handler
:拒绝策略
问核心参数时,至少要回答corePoolSize
、maximumPoolSize
、workQueue
、handler
。还是建议全部回答吧,反正也没几个参数。
线程池原理
学生或新人:按照八股文来回答一番,甚至有的在回答核心参数的时候,顺带着就会把线程池的原理给说了(刚刚遇到能说的,就顺带着多说点)。
三五年的:有部分人也会按照八股文上的来回答,有部分人是吱吱呜呜的,不知道在说啥,还有一分部人就是瞎说咯。
关于线程池原理,我这里借用网上一张图:
如果看图记不住,我也有办法,我们可以使用生活案例来理解。
公司A:线程池 公司A自己的员工:核心线程数 公司A接到的订单:我们的业务线程 公司A的仓库:阻塞队列 公司B派的人:最大线程数
开始表演:
公司A接到订单,先给自己员工处理,如果自己员工处理不来了,就丢到公司A仓库里,如果仓库堆满了,这时候就去找公司B,公司B就派人(最大线程数)到公司A,订单持续爆棚,公司B派来的员工和公司A的员工都搞不来了,那就只能把后面来的订单拒绝掉(拒绝策略)。如果公司B的员工在公司A里吧任务做完了,闲着没事了,公司A也不会立马就让人家回公司B,毕竟人员来回还是有成本的,所以,可以适当的给点时间(
keepAliveTime
),是在没有什么任务了,那你们还是回公司B吧。
好了,按照这个故事去编就行了,也可以模仿着编其他故事,至少让面试官觉得你不是在背八股文。
核心线程数量设置
学生或新人:就算知道也是被八股文的,但很遗憾,问过十多个人,回答上来的应该占30%左右。
三五年的:问过几十个人,回答上来的寥寥无几,甚是遗憾。在项目中敢用线程池,却不知道如何设置核心线程数,这不是瞎搞吗?有的人能回答出CPU密集型和IO密集型,但问他哪些类型是CPU密集型、哪些是IO密集型?分表举两个例子,此时很多人都会慌的。
下面,我们来说说CPU密集型和IO密集型:
CPU密集型
:这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1
,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
比如:像加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,大部分场景下都是纯 CPU 计算。
IO密集型
:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占
用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 :
核心线程数=
CPU核心数量*2
。
比如:像 MySQL
数据库、文件的读写、网络通信等任务,这类任务不会特别消耗 CPU 资源,但是 IO 操作比较耗时,会占用比较多时间。
另外,线程的平均工作时间所占比例越高,就需要越少的线程;线程的平均等待时间所占比例越高,就需要越多的线程;
以上只是理论值,实际项目中建议在本地或者测试环境进行多次调优,找到相对理想的值大小。
线程是如何复用的
关于这个问题,目前我见过的,只有两个人能说个大概。
我们都知道,继承Thread类或者实现Runnable接口,然后调用其start()
方法就可以启动线程了,但是如果调用run()
方法就和调用普通方法一样。
我们来看看,线程池中,我们提交的线程实例(任务),在线程池中到底是怎么被执行的。
线程池中有个Worker的角色,我们调用execute(Runnable tak)
时候,会创建一个Worker:
我们先来看看这个Worker是怎么定义的:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
// Worker 只有这一个构造方法,传入 firstTask
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
// 调用 ThreadFactory 来创建一个新的线程,这里创建的线程到时候用来执行任务
// 我们发现创建线程的时候传入的值是this,我们知道创建线程可以通过继承Runnable的方法,
// Worker继承了Runnable,并且下面重写了run()方法
this.thread = getThreadFactory().newThread(this);
}
// 由上面创建线程时传入的this,上面的thread启动后,会执行这里的run()方法,并且此时runWorker传入的也是this
public void run() {
runWorker(this);
}
}
Worker继承了Runnable,并且下面重写了run()方法,这时候我们调用new Worker().start()方法后,就会调用Worker类中的run()
方法。
此时的Worker和我们平时的Thread类就类似了
好了,我们再回到前面的说的execute()
方法中来。
public void execute(Runnable command) {
int c = ctl.get();
// 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
// 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)
if (workerCountOf(c) < corePoolSize) {
// 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了
// 至于执行的结果,到时候会包装到 FutureTask 中。
// 这里的true代表当前线程数小于corePoolSize,表示以corePoolSize为线程数界限
if (addWorker(command, true))
return;
c = ctl.get();
}
// 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了
// 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 队列满了,那么进入到这个分支
// 这里的false代表当前线程数大于corePoolSize,表示以 maximumPoolSize 为界创建新的 worker
// 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
这段代码里我们看到了很多地方都在调用接着addWorker()
方法,我们来看看这个方法(方法内容有点多):
private boolean addWorker(Runnable firstTask, boolean core) {
//相当于goto,虽然不建议滥用,看看大神们是如何使用吧
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker:
// 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED
// 2. firstTask != null
// 3. workQueue.isEmpty()
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//这里就是通过core参数对当前线程数的判断
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/*
* 到这里,我们认为在当前这个时刻,可以开始创建线程来执行任务了,
*/
// worker 是否已经启动
boolean workerStarted = false;
// 是否已将这个 worker 添加到 workers 这个 HashSet 中
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
// 把 firstTask 传给 worker 的构造方法
w = new Worker(firstTask);
// 取 worker 中的线程对象,之前说了,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程
final Thread t = w.thread;
if (t != null) {
// 这个是整个类的全局锁,因为关闭一个线程池需要这个锁,至少我持有锁的期间,线程池不会被关闭
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
// 小于 SHUTTDOWN 那就是 RUNNING
// 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker 里面的 thread 可不能是已经启动的
if (t.isAlive())
throw new IllegalThreadStateException();
// 加到 workers 这个 HashSet 中
workers.add(w);
int s = workers.size();
// largestPoolSize 用于记录 workers 中的个数的最大值
// 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功的话,启动这个线程
if (workerAdded) {
// 启动线程,最重要的就是这里,下面我们会讲解如何执行任务
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉
if (! workerStarted)
addWorkerFailed(w);
}
// 返回线程是否启动成功
return workerStarted;
}
请注意:t.start();
这里的t其实就是我们创建的Worker
对象,就回到我们前面说的,调用start()方法后,会执行到他的run()
方法中来,我们继续看Worker中的run()
方法。
//Worker的run方法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//取出需要执行的任务,
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果task不是null,或者去队列中取任务,注意这里会阻塞,后面会分析getTask方法
while (task != null || (task = getTask()) != null) {
//这个lock在这里是为了如果线程被中断,那么会抛出InterruptedException,而退出循环,结束线程
w.lock();
//判断线程是否需要中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//任务开始执行前的hook方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//这里就是直接调用run 方法
} catch (RuntimeException x) {
thrown = x; throw x;
} finally {
////任务开始执行后的hook方法
afterExecute(task, thrown);
}
} finally {
task = null;//清空task
w.completedTasks++;//完成数添加
w.unlock();
}
}
completedAbruptly = false;
} finally {
//Worker退出
processWorkerExit(w, completedAbruptly);
}
}
这里注意这行代码:
while (task != null || (task = getTask()) != null)
注释中已经说清楚了:如果task不是null,或者去队列中取任务,注意这里会阻塞。
另外,一个注意点:
task.run();
这个其实就是调用我们我们传入到execute(Runnable task)
中的参数,也就是说,我们创建的Runnable对象,根本就不会去调用其start()
方法,而是直接调用其run()
方法。
我们在看看getTask()
方法到底是做什么?
// 此方法有三种可能:
// 1. 阻塞直到获取到任务返回。我们知道,默认 corePoolSize 之内的线程是不会被回收的,
// 它们会一直等待任务
// 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭
// 3. 如果发生了以下条件,此方法必须返回 null:
// - 池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置)
// - 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务
// - 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行
private Runnable getTask() {
boolean timedOut = false;
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 两种可能
// 1. rs == SHUTDOWN && workQueue.isEmpty()
// 2. rs >= STOP
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// CAS 操作,减少工作线程数
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
// 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
// compareAndDecrementWorkerCount(c) 失败,线程池中的线程数发生了改变
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
// wc <= maximumPoolSize 同时没有超时
try {
// 到 workQueue 中获取任务
// 如果timed=wc > corePoolSize=false,我们知道核心线程数之内的线程永远不会销毁,则执行workQueue.take();我前面文章中讲过,take()方法是阻塞方法,如果队里中有任务则取到任务,如果没有任务,则一直阻塞在这里知道有任务被唤醒。
//如果timed=wc > corePoolSize=true,这里将执行超时策略,poll(keepAliveTime, TimeUnit.NANOSECONDS)会阻塞keepAliveTime这么长时间,没超时就返回任务,超时则返回null.
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 如果此 worker 发生了中断,采取的方案是重试
// 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法,
// 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量,
// 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null
timedOut = false;
}
}
}
到这里,大家应该都知道了,线程池中到底是如何复用线程的吧。
我来总结一下:
线程池中,维护了一个Worker的内部类,其中Worker也实现了Runnable接口,重写了
run()
方法,在调用这个run()
时候,会采用类似于死循环的while方式重复使用这个worker去获取任务并执行我们传入execute(Runnable task)
方法的参数task(task存放在阻塞队列里)。
<END>
【轻薄款上架】 🕚
推荐阅读:
每日打卡赢积分兑换书籍入口