查看原文
其他

严选库存稳定性治理系列:一个线程池拒绝策略引发的血案

严选技术 严选技术产品团队 2023-04-08





针对库存中心线上业务full gc问题进行深入分析,并结合此次问题解决的方法、根因分析落地此类问题的解决思路,并沉淀出一些通用的实践经验及注意事项。


1. 事件回顾

从7.27号开始主站、分销等业务方开始反馈下单偶发超时现象,我们开始分析排查问题原因,震惊地发现线上偶发full gc,如下图所示。如果继续放任下去,势必影响严选交易下单核心链路及用户体验,造成交易损失。库存中心开发迅速响应,积极排查并解决问题,把问题在萌芽状态处理掉,避免造成资损。

因此,我们紧急排查,并把排查及解决问题的过程用时间轴方式展示如下,后续会一一介绍解决方案及问题原因。

2. 紧急止血

对于频繁full gc,根据经验,我们大胆猜测可能由于某些接口产生大对象并且调用频率较高引起,在紧急情况下,首先保证系统核心功能不受影响,然后再排查问题。一般有三种手段,如下:
  • 扩容
    扩容一般有两种方式,一是提高堆内存大小,二是对应用机器扩容;本质上都是延缓full gc出现的次数和频率,尽力保证核心业务,然后再排查问题。
  • 限流
    限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的,一般可以在代理层和应用层做限流。
  • 重启
    相对比较暴力的一种方式,稍微不注意免会带来数据不一致问题,除非必要否则不建议。
我们应用限流是针对应用接口层面得,由于不知道问题具体原因且问题还在萌芽状态,所以就没有直接限流,而是直接扩容,顺带重启。我们临时紧急对堆内存扩容,将部分机器堆内存大小由6g提升至22g,并对应用进行重启,使配置参数生效。
7.27号紧急对部分机器(73和74)扩容后,我们可以发现扩容后2天内没有发生full gc,为我们进一步排查提供了容错时间;

3. 问题分析

3.1 现状挑战

由于没有OOM,所以没有现场内存快照,不好确定问题原因,而库存主服务涉及的逻辑太多(核心业务逻辑有十几万行代码,都是日常在运行的),且业务逻辑复杂、调用量大,并且存在少量慢请求,增加了排查问题难度。由于没有相对完善的基建设施,我们没有一个全局的调用监控平台去观察full gc前后应用到底发生了什么,只能通过在问题机器上分析链路调用情况,去一点点寻找问题真相。

3.2 表象原因

本质上,我们要看下发生full gc时应用系统做了什么事情,也就是说找到压死骆驼最后一根稻草是什么?
我们对发生full gc前时间点应用日志进行了大量分析,结合慢sql分析,只要业务在一段时间频繁操作【内外部采购出库】业务,系统就会触发一次full gc,时间点比较吻合,因此,初步判定可能由于内外部采购出库业务操作引起得,通过分析业务代码分析发现,该库存变更经过干预拦截会将10w条数据load到内存中,共计300M左右,感觉一下子看到了希望!
对此我们7.28号紧急联系dba将该部分业务数据紧急迁移到其他数据库,避免进一步对业务造成影响,后续再对业务流程进行优化!!
迁移后我们发现当天并没有出现full gc,也没有业务反馈接口超时,7.29号我们发现73这台机器(升级配置)没有full gc,而154这台机器,在7.29号继续出现full gc,观察每次gc可回收的内存大小并不多,也就是说内存并没有及时释放,极有可能存在泄漏问题!

3.3 问题根因

当时我们多次dump了内存快照,并没有发现类似问题,庆幸地是155这台机器最后才升级(备用机,主要用于处理定时任务,留作参照对比效果),让我们进一步接近了问题根因。
为进一步分析原因,我们对其中一台机器(155)堆内存快照进行了分析,发现了一个比较有意思的现象,即存在大量的线程阻塞等待线程;
每一个阻塞线程会持有大约14M的内存,正是这些线程导致了内存泄漏,至此我们终于找到了问题原因,同时验证了我们的猜想,即发生了内存泄漏!

3.4 原因分析

3.4.1 业务描述

从4.2我们定位到问题代码,为方便理解我们对这部分业务(从数据库中拉取sku数量信息,每500个sku组成一个SyncTask,然后缓存到redis中,供其他业务方使用,每5min执行一次)做下概述,并把主要的代码片段罗列如下:

3.4.2 业务代码

@Overridepublic String sync(String tableName) { // 生成数据版本号 DateFormat dateFormat = new SimpleDateFormat("YYYYMMdd_HHmmss_SSS"); // 启动Leader线程完成执行和监控 String threadName = "SyncCache-Leader-" + dateFormat.format(new Date()); Runnable wrapper = ThreadHelperUtil.wrap(new PrimaryRunnable(cacheVersion, tableName,syncCachePool)); Thread core = new Thread(wrapper, threadName); // 新建线程,处理同步同步 core.start(); return cacheVersion;}private static class PrimaryRunnable implements Runnable { private String cacheVersion; private String tableName; private ExecutorService syncCachePool; public PrimaryRunnable(String cacheVersion, String tableName,ExecutorService syncCachePool) { this.cacheVersion = cacheVersion; this.tableName = tableName; this.syncCachePool = syncCachePool; } @Override public void run() { .... try { exec(); CacheLogger.doFinishLog(cacheVersion, System.currentTimeMillis() - leaderStart); } catch (Throwable t) { CacheLogger.doExecErrorLog(cacheVersion, System.currentTimeMillis() - leaderStart, t); } } public void exec() { // 查询数据并构建同步任务 List<SyncTask> syncTasks = buildSyncTask(cacheVersion, tableName); // 同步任务提交线程池 Map<SyncTask, Future> futureMap = Maps.newHashMap(); for (SyncTask task: syncTasks) { futureMap.put(task, syncCachePool.submit(new Runnable() { @Override public void run() { task.run(); } })); } // 等待执行完成 for (Map.Entry<SyncTask, Future> futureEntry: futureMap.entrySet()) { try { futureEntry.getValue().get(); // 阻塞获取同步任务结果 } catch (Throwable t) { CacheLogger.doFutureFailedLog(cacheVersion, futureEntry.getKey()); throw new RuntimeException(t); } } }}/** * 拒绝策略类 */private static class RejectedPolicy implements RejectedExecutionHandler { static RejectedPolicy singleton = new RejectedPolicy(); private RejectedPolicy() { } @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { if (runnable instanceof SyncTask) { SyncTask task = (SyncTask) runnable; CacheLogger.doRejectLog(task); } }}

当前队列大小为1000,最大线程数为20,也就是说线程池最少可以处理51w数据,而目前sku个数大约为54w,如果任务耗时,剩余任务可能全部都会放入队列中,可能存在线程池队列不足的情况。队列大小不足,会触发拒绝策略,目前我们项目中的拒绝策略是类似于DiscardPolicy(当新任务被提交后直接被丢弃掉,也不会给你任何的通知),但是为什么会出现线程(name:SyncCache-Leader-时间戳)会阻塞等待的问题,需要我们进一步结合submit方法及业务代码继续分析。

3.4.3 源码分析

  • java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); // execute(ftask); //执行excute方法 return ftask; // 返回task,状态为NEW}
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable}
    • sumbit方法就是通过调用FutureTask的构造器返回FutureTask实例的。
    • 可以看到传递进来的Runnable任务也会通过Executors.callable(runnable, result)工厂方法转换为Callable类型的任务,这是一种设计模式—适配器模式,而FutureTask对callable任务的包装也是一种适配器模式——转换为Runnable类型。
    • 构造器仅仅是初始化callable属性,以及FutureTask状态为NEW。
  • java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) // 走到这一步说明核心线程数满了,要尝试往队列里面加, // 调用的workQueue.offer(command)返回false,说明队列满了,执行拒绝策略 reject(command);}
走到这里,其实这段代码并没有加入到队列中,submit方法直接返回一个state为NEW的FutureTask实例!!!
紧接着我们看到业务代码futureEntry.getValue().get(),从futureTask中阻塞获取结果,下面我们看到这段代码:
// 等待执行完成for (Map.Entry<SyncTask, Future> futureEntry: futureMap.entrySet()) { try { futureEntry.getValue().get(); // 阻塞获取同步任务结果 } catch (Throwable t) { CacheLogger.doLeaderFutureFailedLog(cacheVersion, futureEntry.getKey()); throw new InventoryException(t); }}
public V get() throws InterruptedException, ExecutionException { int s = state; // 执行submit后执行拒绝策略的任务虽然没有加入到队列,但返回的FutureTask // 状态仍然为NEW,会尝试获取结果; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s);}
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); }
int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) //第一次进for循环时q==null,进入到这个分支 q = new WaitNode(); else if (!queued) // 第二次进for循环时queued为false,则使用CAS将q置为waiters的头结点 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else // 将q置为头结点后,最终会进入这里调用park()方法,阻塞当前线程. // 也就是说走到这里,同步线程SyncCache-Leader-时间戳阻塞等待唤醒 LockSupport.park(this); }
分析到这里,我们总结问题原因如下:
  • 首先当任务提交到线程池,触发了拒绝策略,则FutureTask的状态处于New状态,调用get()方法会到达LockSupport.park(this)处,将当前线程阻塞,导致了内存泄漏;
  • 究其原因,其实主要还是线程池使用不当,主要有两点问题,一是拒绝策略选择上有问题,二是程序中submit提交线程,但是获取结果时却没有使用超时时间,可以使程序超时抛异常终止(另外,项目中不需要获取任务结果,其实也没有必要用submit方法提交任务)。

3.4.4 刨根问底

分析到这里,我们可以说是找到了问题原因,也就是说FutureTask获取执行结果时调用了LockSupport.park(this)阻塞了主线程,什么时候才会将当前线程唤醒?我们继续看代码。
那就是当存在工作线程Worker目前分配的任务执行完成后,其会去调用Worker类的getTask()方法从阻塞队列中拿到该任务,并执行该任务的run()方法,下面是FutureTask的run()方法:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); //如果任务执行成功,则调用set(V result)方法 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); }}
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; //q在await()方法中设置的,其值为调用get()方法的线程 if (t != null) { q.thread = null; LockSupport.unpark(t); // 唤起因get()而阻塞的线程。 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } }
done();
callable = null; // to reduce footprint}

也就是说如果run方法正常返回,最终FutureTask会将结果保存并且唤醒当前阻塞线程,但是应用中我们的任务压根就没有执行run方法,也就是说会被一直阻塞!

4. 问题解决

通过优化线程池配置和业务流程l,如调高线程池队列大小、修复拒绝策略、优化业务流程规避大对象、任务错峰执行等一系列组合措施,保证了任务稳定执行。

4.1 线程池配置优化

调高线程池队列大小、修复拒绝策略

4.1.1 修改拒绝策略

  • 项目中使用的自定义拒绝策略,主要目的其实就是想打印出拒绝任务中包含的任务信息如skuId等,然后手动进行更新,防止提供给其他服务的库存数据异常;
  • 由前文我们已经runnable类型是FutureTask,因此图片中的if判断永远不会成立,这种自定义的拒绝策略像线程池中默认的拒绝策略DiscardPolicy(当新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失);
  • 修改后,队列满之后,会立即触发拒绝策略,抛出异常,父线程才不会一直阻塞获取该FutureTask的结果了。
ps:项目中目前对线程中的Runnable进行了包装,如果使用原生类,可以通过反射获取拒绝策略中被拒绝的任务情况。只是获取拒绝任务信息,可以忽略不看。

4.1.2 调高队列大小

  • 线程池最大线程数是20,队列大小是1000,目前sku个数是54w,每个任务有500个skuId,每个人任务执行时间稍长的话,最多只能只能处理51w sku,加之3个任务公用线程池,设置队列大小为3000;
  • 队列调整后,可避免部分sku没有及时将库存数据同步到缓存中。

4.2 业务流程优化

针对内外部采购出现的大对象进行优化,减少每次请求300M大对象问题,同时对公用线程池的三种定时任务执行时间进行错峰,避免sku增多后任务间相互干扰。

5. 总结沉淀

5.1 full gc解决思路总结

  • 遇到频繁full gc这种线上,我们该怎么办?首先想到的是要先紧急处理,然后再分析原因,紧急处理我们大概有三种可选方案:重启、限流、扩容  三板斧;
  • 其次,明确方向,一般来说,引起full gc的主要原因有两大类,一是应用资源配置问题,二是程序问题。资源配置这块,我们要检查jvm参数配置是否合理;大部分full gc都是由程序问题引起得,主要有两方面原因,一是程序存在大对象,二是存在内存泄漏;
  • 最重要的一点,分析dump文件,但是要保证取得事发时内存快照,分析软件可以用MAT和VisualVM,对于我们遇到的这个问题,其实还可以用jstack获取当前进程所有的线程进行分析;
  • full gc时要及时告警,避免开发响应滞后于业务,另外,在实践中我们要合理设置JVM参数,这样也可以尽量避免full gc,此次问题排查,我们也对jvm参数进行了调整,后续会有相应文章发布。

5.2 线程池使用注意事项

  • 如果不需要同步获取任务结果,尽量使用execute方式提交任务,并且对于异常要谨慎处理,防止频繁销毁和创建线程;
  • 如果需要使用submit方式提交任务,同步获取结果时尽量使用超时获取方式,避免出现一直阻塞问题导致内存泄漏的问题;
  • 谨慎使用拒绝策略,熟悉拒绝策略和线程提交方式配合使用可能存在的问题,如DiscardPolicy和submit方式提交任务就可能存在获取结果阻塞等待情况;
  • 线程池线程要有辨识度,也就是有自己的命名规则,方便问题排查。


本文由作者授权严选技术团队发布



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存