其他
Java 线程池 8 大拒绝策略,面试必问!
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy(丢弃策略)
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy(弃老策略)
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
第三方实现的拒绝策略
dubbo中的线程拒绝策略
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
//省略实现
}
}
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
});
public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {
private final RejectedExecutionHandler[] handlerChain;
public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {
Objects.requireNonNull(chain, "handlerChain must not be null");
RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);
return new RejectedExecutionHandlerChain(handlerChain);
}
private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {
this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {
rejectedExecutionHandler.rejectedExecution(r, executor);
}
}
}
转自:KL博客
地址:www.kailing.pub/article/index/arcid/255.html
点击「阅读原文」和栈长学更多~