其他
严选库存稳定性治理系列:一个线程池拒绝策略引发的血案
针对库存中心线上业务full gc问题进行深入分析,并结合此次问题解决的方法、根因分析落地此类问题的解决思路,并沉淀出一些通用的实践经验及注意事项。
1. 事件回顾
从7.27号开始主站、分销等业务方开始反馈下单偶发超时现象,我们开始分析排查问题原因,震惊地发现线上偶发full gc,如下图所示。如果继续放任下去,势必影响严选交易下单核心链路及用户体验,造成交易损失。库存中心开发迅速响应,积极排查并解决问题,把问题在萌芽状态处理掉,避免造成资损。
2. 紧急止血
扩容 扩容一般有两种方式,一是提高堆内存大小,二是对应用机器扩容;本质上都是延缓full gc出现的次数和频率,尽力保证核心业务,然后再排查问题。 限流 限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的,一般可以在代理层和应用层做限流。 重启 相对比较暴力的一种方式,稍微不注意免会带来数据不一致问题,除非必要否则不建议。
3. 问题分析
3.1 现状挑战
3.2 表象原因
3.3 问题根因
3.4 原因分析
3.4.1 业务描述
3.4.2 业务代码
@Override
public String sync(String tableName) {
// 生成数据版本号
DateFormat dateFormat = new SimpleDateFormat("YYYYMMdd_HHmmss_SSS");
// 启动Leader线程完成执行和监控
String threadName = "SyncCache-Leader-" + dateFormat.format(new Date());
Runnable wrapper = ThreadHelperUtil.wrap(new PrimaryRunnable(cacheVersion, tableName,syncCachePool));
Thread core = new Thread(wrapper, threadName); // 新建线程,处理同步同步
core.start();
return cacheVersion;
}
private static class PrimaryRunnable implements Runnable {
private String cacheVersion;
private String tableName;
private ExecutorService syncCachePool;
public PrimaryRunnable(String cacheVersion, String tableName,ExecutorService syncCachePool) {
this.cacheVersion = cacheVersion;
this.tableName = tableName;
this.syncCachePool = syncCachePool;
}
@Override
public void run() {
....
try {
exec();
CacheLogger.doFinishLog(cacheVersion, System.currentTimeMillis() - leaderStart);
} catch (Throwable t) {
CacheLogger.doExecErrorLog(cacheVersion, System.currentTimeMillis() - leaderStart, t);
}
}
public void exec() {
// 查询数据并构建同步任务
List<SyncTask> syncTasks = buildSyncTask(cacheVersion, tableName);
// 同步任务提交线程池
Map<SyncTask, Future> futureMap = Maps.newHashMap();
for (SyncTask task: syncTasks) {
futureMap.put(task, syncCachePool.submit(new Runnable() {
@Override
public void run() {
task.run();
}
}));
}
// 等待执行完成
for (Map.Entry<SyncTask, Future> futureEntry: futureMap.entrySet()) {
try {
futureEntry.getValue().get(); // 阻塞获取同步任务结果
} catch (Throwable t) {
CacheLogger.doFutureFailedLog(cacheVersion, futureEntry.getKey());
throw new RuntimeException(t);
}
}
}
}
/**
* 拒绝策略类
*/
private static class RejectedPolicy implements RejectedExecutionHandler {
static RejectedPolicy singleton = new RejectedPolicy();
private RejectedPolicy() {
}
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (runnable instanceof SyncTask) {
SyncTask task = (SyncTask) runnable;
CacheLogger.doRejectLog(task);
}
}
}
当前队列大小为1000,最大线程数为20,也就是说线程池最少可以处理51w数据,而目前sku个数大约为54w,如果任务耗时,剩余任务可能全部都会放入队列中,可能存在线程池队列不足的情况。队列大小不足,会触发拒绝策略,目前我们项目中的拒绝策略是类似于DiscardPolicy(当新任务被提交后直接被丢弃掉,也不会给你任何的通知),但是为什么会出现线程(name:SyncCache-Leader-时间戳)会阻塞等待的问题,需要我们进一步结合submit方法及业务代码继续分析。
3.4.3 源码分析
java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); //
execute(ftask); //执行excute方法
return ftask; // 返回task,状态为NEW
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
sumbit方法就是通过调用FutureTask的构造器返回FutureTask实例的。 可以看到传递进来的Runnable任务也会通过Executors.callable(runnable, result)工厂方法转换为Callable类型的任务,这是一种设计模式—适配器模式,而FutureTask对callable任务的包装也是一种适配器模式——转换为Runnable类型。 构造器仅仅是初始化callable属性,以及FutureTask状态为NEW。 java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
// 走到这一步说明核心线程数满了,要尝试往队列里面加,
// 调用的workQueue.offer(command)返回false,说明队列满了,执行拒绝策略
reject(command);
}
// 等待执行完成
for (Map.Entry<SyncTask, Future> futureEntry: futureMap.entrySet()) {
try {
futureEntry.getValue().get(); // 阻塞获取同步任务结果
} catch (Throwable t) {
CacheLogger.doLeaderFutureFailedLog(cacheVersion, futureEntry.getKey());
throw new InventoryException(t);
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 执行submit后执行拒绝策略的任务虽然没有加入到队列,但返回的FutureTask
// 状态仍然为NEW,会尝试获取结果;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null) //第一次进for循环时q==null,进入到这个分支
q = new WaitNode();
else if (!queued) // 第二次进for循环时queued为false,则使用CAS将q置为waiters的头结点
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 将q置为头结点后,最终会进入这里调用park()方法,阻塞当前线程.
// 也就是说走到这里,同步线程SyncCache-Leader-时间戳阻塞等待唤醒
LockSupport.park(this);
}
首先当任务提交到线程池,触发了拒绝策略,则FutureTask的状态处于New状态,调用get()方法会到达LockSupport.park(this)处,将当前线程阻塞,导致了内存泄漏; 究其原因,其实主要还是线程池使用不当,主要有两点问题,一是拒绝策略选择上有问题,二是程序中submit提交线程,但是获取结果时却没有使用超时时间,可以使程序超时抛异常终止(另外,项目中不需要获取任务结果,其实也没有必要用submit方法提交任务)。
3.4.4 刨根问底
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result); //如果任务执行成功,则调用set(V result)方法
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread; //q在await()方法中设置的,其值为调用get()方法的线程
if (t != null) {
q.thread = null;
LockSupport.unpark(t); // 唤起因get()而阻塞的线程。
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
也就是说如果run方法正常返回,最终FutureTask会将结果保存并且唤醒当前阻塞线程,但是应用中我们的任务压根就没有执行run方法,也就是说会被一直阻塞!
4. 问题解决
4.1 线程池配置优化
4.1.1 修改拒绝策略
项目中使用的自定义拒绝策略,主要目的其实就是想打印出拒绝任务中包含的任务信息如skuId等,然后手动进行更新,防止提供给其他服务的库存数据异常; 由前文我们已经runnable类型是FutureTask,因此图片中的if判断永远不会成立,这种自定义的拒绝策略像线程池中默认的拒绝策略DiscardPolicy(当新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失); 修改后,队列满之后,会立即触发拒绝策略,抛出异常,父线程才不会一直阻塞获取该FutureTask的结果了。
4.1.2 调高队列大小
线程池最大线程数是20,队列大小是1000,目前sku个数是54w,每个任务有500个skuId,每个人任务执行时间稍长的话,最多只能只能处理51w sku,加之3个任务公用线程池,设置队列大小为3000; 队列调整后,可避免部分sku没有及时将库存数据同步到缓存中。
4.2 业务流程优化
5. 总结沉淀
5.1 full gc解决思路总结
遇到频繁full gc这种线上,我们该怎么办?首先想到的是要先紧急处理,然后再分析原因,紧急处理我们大概有三种可选方案:重启、限流、扩容 三板斧; 其次,明确方向,一般来说,引起full gc的主要原因有两大类,一是应用资源配置问题,二是程序问题。资源配置这块,我们要检查jvm参数配置是否合理;大部分full gc都是由程序问题引起得,主要有两方面原因,一是程序存在大对象,二是存在内存泄漏; 最重要的一点,分析dump文件,但是要保证取得事发时内存快照,分析软件可以用MAT和VisualVM,对于我们遇到的这个问题,其实还可以用jstack获取当前进程所有的线程进行分析; full gc时要及时告警,避免开发响应滞后于业务,另外,在实践中我们要合理设置JVM参数,这样也可以尽量避免full gc,此次问题排查,我们也对jvm参数进行了调整,后续会有相应文章发布。
5.2 线程池使用注意事项
如果不需要同步获取任务结果,尽量使用execute方式提交任务,并且对于异常要谨慎处理,防止频繁销毁和创建线程; 如果需要使用submit方式提交任务,同步获取结果时尽量使用超时获取方式,避免出现一直阻塞问题导致内存泄漏的问题; 谨慎使用拒绝策略,熟悉拒绝策略和线程提交方式配合使用可能存在的问题,如DiscardPolicy和submit方式提交任务就可能存在获取结果阻塞等待情况; 线程池线程要有辨识度,也就是有自己的命名规则,方便问题排查。
本文由作者授权严选技术团队发布