全面详细的java线程池解密,看我就够了!
本文字数:10114字
预计阅读时间:26分钟
概述
线程池状态
RUNNING: 接收新任务和处理队列中的任务 SHUTDOWN: 不接收新任务,可以处理队列中的任务 STOP: 不接收新任务,也不处理队列中的任务,中断正在处理的任务 TIDYING: 线程都退出了,队列也是空的,进入这个状态 TERMINATED: terminated() 被调用后进入这个状态
execute方法
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//只有RUNNING状态才可以向队列添加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker方法
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
不需要同步执行的代码就不要同步执行,可以减少线程对monitor lock的占用时间,全局锁mainLock在关闭线程池(shutdown/shutdownNow)的时候也需要,所以检查线程状态和向workers添加Worker是需要同步执行的,启动线程不用同步执行;
try{ }finally{ } 可以在finally中执行必须要执行的代码。
Worker类
runWorker方法
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out(一开始) with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as(只要) pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to(由于) changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly(突然) holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 对异常和错误处理的说明
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
/**
这块调用unlock其实不是为了解锁,而是让AQS的state变成0,让线程可以进行中断。
Worker继承自AQS,在实例化的时候,state被设置成了-1。看看Worker的interruptIfStarted方法,state等于-1时是不能线程进行中断的。
也就是说,线程刚启动,还没执行到runWorker方法,ThreadPoolExecutor就调用了shutdownNow(线程池进入了STOP状态,会调用Worker的interruptIfStarted),这个线程是不能被中断的。
shutdown方法会让线程进入SHUTDOWN状态,中断空闲的线程,能获取Worker锁的线程是空闲线程(正在getTask获取任务中的线程),Worker的state -1时,是不能获取锁的。
总结一下,线程启动没执行到此处时,shutdown和shutdownNow方法都是不能中断此线程的
*/
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//一个循环从队列中不断获取任务执行
while (task != null || (task = getTask()) != null) {
w.lock(); //这块加锁不是因为并发,而是为了控制线程能否中断
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//这块保证执行真正任务前,如果线程池是STOP状态线程要中断,如果线程池是小于STOP状态也就是RUNNING或SHUTDOWN状态线程是非中断的
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask方法
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to(屈服) termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out? 最后一次poll是否超时
//这块的循环一方面是业务逻辑需要,另一方面是CAS重试需要
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 状态检查
*
* 线程池STOP状态或(线程池SHUTDOWN状态且队列空)时,Worker的数量减1,返回null
* 到getTask的调用处runWorker看一下,返回null会让线程退出的
* 也就是说线程池STOP状态或(线程池SHUTDOWN状态且队列空)时,线程最终是要退出的
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//timed表示是否需要超时等待阻塞控制
//设置了核心线程允许超时(默认是false)或者当前线程数大于核心线程数,表示需要进行超时控制
// Are workers subject to culling(淘汰)?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 这块有点难理解,需要和下面代码结合理解。目的是控制线程池的有效线程数量
* 能执行到这里说明线程池状态是验证通过的
* wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* timed是true表示当前线程需要进行超时控制,timeout是true表示上次从阻塞队列中获取任务超时了,并且(当前线程数大于1或者队列为空)时,线程数量尝试减1,如果减1是失败了返回重试,如果成功了返回null
*
* 这是啥意思呢????????我们需要耐心仔细分析一下
* 首先说明的是,能执行到这块说明,线程池状态是RUNNING或(SHUTDOWN且队列不为空的),
* 看timed是否为true,如果不为true,说明当前线程已经是核心线程了,不需要超时控制,死等队列返回任务,
* 如果timed为true,说明当前线程是非核心线程,还得看当前线程上次是否等待任务超时了,如果超时了,还得继续看,如果线程数量大于1,那么线程数量减1;如果没有超时,跳过这个判断,下面进行超时控制。
* 也就是说,核心线程数不为0的话,会把非核心线程都退出的,核心线程是0的话,保留一个非核心线程,处理队列中的任务,队列空的时候这个非核心线程也得退出
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果线程数大于核心线程数,采用poll超时阻塞
//如果线程数大于核心线程数,采用take阻塞直到获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
//获取任务时当前线程被中断了,设置timedOut为false返回循环重试
//从这个用法,我们也知道,线程被中断不等于就要退出线程,具体需要根据处理逻辑来决定
timedOut = false;
}
}
}
processWorkExit方法
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
* 上面的注释已经解释清楚了这个方法的作用
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* completedAbruptly true说明是执行任务发生了异常,这块需要把线程数量减1
* completedAbruptly false,说明是getTask返回了null,在getTask里已经把线程数量减1了
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
//操作workers是需要获取全局锁的
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//把线程从workers set中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试去中止线程池
tryTerminate();
/**
* 下面这段代码就不太好理解了
*/
int c = ctl.get();
//线程池状态是RUNNING或者SHUTDOWN的时候
if (runStateLessThan(c, STOP)) {
//线程正常退出
if (!completedAbruptly) {
//allowCoreThreadTimeOut true的时候需要保留的最少线程数是0,false是时候需要保留的最少线程数是corePoolSize;corePoolSize也可能是0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//但是,保留的线程数最小是0时,是不对的,还需要看看队列是否为空,队列不为空,至少要保留一个线程执行任务,因为是RUNNIG或SHUTDOWN状态
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//当前线程数量大于等于min时,不做任何处理;否则,重启一个新线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//(1)线程异常退出,重启一个新线程
// (2) 当前线程数量小于需要保留的最小线程数时,重启一个新线程
/**
* 但是,我们会发现一个问题:
* 线程池是SHUTDOWN状态,corePoolSize是3,workerCountOf(c)等于2时,workQueue为空了,难道这时也得重启一个新线程吗?
* 肯定是不需要的,SHUTDOWN状态的线程池,最终是要销毁所有线程的。
* addWorker中处理了这种情况,这种情况调用addWorker是直接返回false的,具体看addWorker的源码
*/
addWorker(null, false);
}
}
tryTerminate方法
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* RUNNING状态,不能中止
* TIDYING或TERMINATED状态,没有必要中止,因为正在中止
* SHUTDONW状态,队列不为空,不能中止
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/**
* STOP状态
* SHUTDOWN状态队列为空
* 是有资格中止的,可是当前线程数不为0,也不行,中断一个空闲线程,这里不是很明白
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//这块就很好理解了,状态更新成TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//状态更新成TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
shutdown
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*/
// android-note: Removed @throws SecurityException
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//乐观锁设置成SHUTDOWN状态
advanceRunState(SHUTDOWN);
//中断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownnow
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* interrupts tasks via {@link Thread#interrupt}; any task that
* fails to respond to interrupts may never terminate.
*/
// android-note: Removed @throws SecurityException
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
//中断所有线程
interruptWorkers();
//移除队列中剩余的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
自问自答
为啥每个任务执行的时候都需要上锁呢?
举例结合源码进行分析
例一
例子二
new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new DiskLruCacheThreadFactory());
/**
* A {@link java.util.concurrent.ThreadFactory} that builds a thread with a specific thread name
* and with minimum priority.
*/
private static final class DiskLruCacheThreadFactory implements ThreadFactory {
@Override
public synchronized Thread newThread(Runnable runnable) {
Thread result = new Thread(runnable, "glide-disk-lru-cache-thread");
result.setPriority(Thread.MIN_PRIORITY);
return result;
}
}
参考
深度解读 java 线程池设计思想及源码实现
https://juejin.im/entry/6844903494223151112
深入理解Java线程池:ThreadPoolExecutor
http://ideabuffer.cn/2017/04/04/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3Java%E7%BA%BF%E7%A8%8B%E6%B1%A0%EF%BC%9AThreadPoolExecutor/
Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理
http://ideabuffer.cn/2017/04/04/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3Java%E7%BA%BF%E7%A8%8B%E6%B1%A0%EF%BC%9AThreadPoolExecutor/
[Java并发(三)线程池原理](https://www.cnblogs.com/warehouse/p/10720781.html)
[Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理](https://www.cnblogs.com/trust-freedom/p/6681948.html#label_3_4)
[synchronized 是可重入锁吗?为什么?](https://www.cnblogs.com/incognitor/p/9894604.html)
[一文彻底理解ReentrantLock可重入锁的使用](https://baijiahao.baidu.com/s?id=1648624077736116382&wfr=spider&for=pc)
[Thread的中断机制(interrupt)](https://www.cnblogs.com/onlywujun/p/3565082.html)
[Doug Lea并发编程文章全部译文](http://ifeve.com/doug-lea/)
上期赠书获奖公示
恭喜:“beatyou1”、“望望”、“1234”!
以上读者请添加小编微信:sohu-tech20兑奖!~
也许你还想看
(▼点击文章标题或封面查看)
加入搜狐技术作者天团
千元稿费等你来!
戳这里!☛