动态线程池思想学习与实践
本文导读
在后台项目开发过程中,我们常常借助线程池来实现多线程任务,以此提升系统的吞吐率和响应性;而线程池的参数配置却是一个难以合理评估的值,虽然业界也针对CPU密集型,IO密集型等场景给出了一些参数配置的经验与方案,但是实际业务场景中通常会因为流量的随机性,业务的更迭性等情况出现预计和实际运行情况偏差较大的情况;而不合理的线程池参数,可能导致服务器负载升高,服务不可用,内存溢出等严重问题;一旦遇到参数不合理的问题,还需要重新上线修改,并且存在反复修改的情况,而这期间花费的时间可能带来更大的风险,甚至导致严重业务事故;那么有没有一种方式能有效感知上述问题并及时避免以上问题呢?或许动态线程池可以。
简单来说,动态线程池就是能在不重新部署应用的情况下动态实时变更其核心参数,并且能对其核心参数及运行状态进行监控及告警;以便开发人员可以及时感知到实际业务中因为各种随机情况导致线程池异常的场景,并依据动态变更能力快速调整并验证参数的合理性。
线程池在给我们业务带来性能和吞吐提升的同时,也存在诸多风险和问题,其中主要原因就在于我们难以设置出合理的线程池参数,一方面线程池的运行机制不是很好理解,配置合理强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,同时实际场景中流量的随机性,业务的更迭性也导致业界难以有一套成熟或开箱即用的经验策略来帮助开发人员参考。而线程池参数难以合理设置的特性又不得不让我们关注以下三个痛点问题:
1.运行情况难感知:在业务使用线程池的过程中,线程池的运行情况对于开发人员来说很难感知,我们难以知道每个线程池创建了多少个线程,是否有队列积压,线程池运行状态怎么样,线程池是否已经耗尽... 直到出现线上问题或收到客诉才后知后觉;(我们能否对系统中用到的线程池进行一个整体的把控,在线程池任务积压,任务拒绝等问题发生时,甚至问题发生前进行及时感知,让开发人员能未雨绸缪,尽早发现和解决问题呢?-线程池监控,异常告警)
流量突增导致预估和实际情况偏差较大,同时由于未能及时感知并解决积压情况,最终引发客诉。 case1:广告主大批量删除物料后异步清理附属表出现任务积压 问题描述:广告主批量删除计划物料后,对应物料附属表数据未及时删除,导致广告主关键词等物料数上限得不到释放而影响创建新物料,引发线上客诉。 问题原因:广告主删除计划物料后,系统会同步删除计划物料主表信息,然后通过线程池的方式异步删除计划物料附属表数据。临近大促广告主物料增删频率及单次批量操作的物料数量都有明显增加,由于核心线程设置较小同时队列设置过长,导致计划主表同步删除后异步删除附属表的任务出现队列积压,对应的关键词等物料数上限得不到释放而影响新物料创建,引发线上客诉。
2.线程拒绝难定位:当拒绝发生后,即使我们迅速感知到了线程池运行异常,也经常会因为拒绝持续时间较短而拿不到问题发生时的线程堆栈,因此通常难以快速定位甚至无法定位到是哪里的原因导致的拒绝,比如是流量的突增将线程池打满,还是某个业务逻辑耗时较长将线程池中的线程拖住;(我们有没有一种方式能在线程池拒绝后去更容易的定位到问题呢?-自动触发线程池堆栈打印,分析工具)
case2: 线程池拒绝具有随机性,当拒绝时长较短时,难以定位问题原因 问题描述:某业务接口内部计算逻辑较多,且存在多处外部接口调用逻辑,上线后不定时出现线程池拒绝异常,由于持续时间不长,问题发生后无法通过jstack去获取问题发生时现场的线程堆栈, 很难定位是什么原因导致了线程池拒绝;由于没有较好的排查手段,只能通过逐步搂日志的方式排查,而排查过程又可能因为日志较多或者日志不全出现问题定位时间长或者是根本无法定位的情况。 问题原因:某外部某接口不稳定,在性能较差且流量较大时就容易把调用线程池打满,导致可用率下降。
3.参数问题难以快速调整:在定位到某个线程池参数设置不合理的情况后,我们需要根据情况随即进行调整,但是"修改->打包->审批->发布"的时间很可能会扩大问题的影响甚至是事故严重程度;同时因为线程池参数难以合理设置的原因,可能导致我们要重复进行上述"修改->打包->审批->发布"的流程...(有没有一种方法能快速修改并验证参数设置的合理性呢?-参数动态调整)
线程池参数设置不合理,难以快速调整参数,业务风险上升 case3:应用RPC(JSF)接口修改为异步调用后出现可用率下降。 问题描述:将应用中部分JSF接口切换为异步模式后,对应可用率有明显下降 问题原因:在修改为异步模式的接口中,部分业务在拿到future对象后使用ThenApply做了一些耗时的操作,另外还有一部分在ThenApply里面又调用了另外一个异步方法;而thenApply的执行会使用JSF的callBack线程池,由于线程池线程配置较小,并且部分回调方法耗时较长,导致callBack线程池被打满,子任务请求线程时进入阻塞队列排队,出现接口超时可用率下降。
当前业界已存在部分动态线程池组件,其主体功能及大体思想类似,但存在以下几个问题
1.与外部中间件耦合较多,难以二次开发加以使用;
2.使用灵活性受限,难以根据业务自身特点进行定制化(自动触发线程池堆栈打印,一键清空队列,callback线程池等)
综合考虑上述问题,决定结合公司中间件及自身业务特点实现一套集线程池监控,异常告警,线程栈自动获取,动态刷新为一体的动态线程池组件。
整体方案
线程池监控及告警
要实现线程池监控及告警,我们需要关注以下几个要点
1.如何获取到待监控的线程池信息
public class ThreadPoolManager {
// 线程池管理器
private static final ConcurrentHashMap<String, Executor> REGISTER_MAP_BY_NAME = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Executor, String> REGISTER_MAP_BY_EXECUTOR = new ConcurrentHashMap<>();
// 注册线程池
public static void registerExecutor(String threadPoolName, Executor executor) {
REGISTER_MAP_BY_NAME.putIfAbsent(threadPoolName, executor);
REGISTER_MAP_BY_EXECUTOR.putIfAbsent(executor, threadPoolName);
}
// 根据名称获取线程池
public static Executor getExecutorByName(String threadPoolName) {
return REGISTER_MAP_BY_NAME.get(threadPoolName);
}
// 根据线程池获取名称
public static String getNameByExecutor(Executor executor) {
return REGISTER_MAP_BY_EXECUTOR.get(executor);
}
// 获取所有线程池名称
public static Set<String> getAllExecutorNames() {
return REGISTER_MAP_BY_NAME.keySet();
}
}
getCorePoolSize() // 核心线程数
getMaximumPoolSize() // 最大线程数
getQueue() // 阻塞队列,获取队列大小,容量等
getActiveCount() // 活跃线程数
getTaskCount() // 历史已完成和正在执行的任务数量
getCompletedTaskCount() // 已完成任务数
2.如何将监控的信息保存和展示出来
监管了应用中的业务线程池,也能获取到某一时刻各线程池的运行情况快照,但要实现线程池数据监控还需要我们在每个时刻去采集线程池运行信息,并将其保存下来,同时还需要将这些数据用一个可视化页面展示出来供我们观察才行,否则我们只知道某一时刻的线程池情况也意义不大。为此,我们需要考虑上面看到的过程,例如使用Micrometer采集性能数据,使用Prometheus时序数据库存储指标数据,使用Grafana展示数据;而现在,我们只需要根据链路监控工具pfinder的埋点要求将对应要监控的线程池指标配置到上报逻辑即可,剩下的数据分时采集,数据存储,数据展示可以完全交给pfinder来完成。
// 已经设置埋点的线程池
public static ConcurrentHashSet<String> monitorThreadPool = new ConcurrentHashSet<>();
// 监控埋点注册
public static void monitorRegister() {
log.info("===> monitor register start...");
// 1.获取所有线程池
Set<String> allExecutorNames = ThreadPoolManager.getAllExecutorNames();
// 2.遍历线程池,注册埋点
allExecutorNames.forEach(executorName-> {
if (!monitorThreadPool.contains(executorName)) {
monitorThreadPool.add(executorName);
Executor executor = ThreadPoolManager.getExecutorByName(executorName);
collect(executor, executorName);
}
});
log.info("===> monitor register end...");
}
// pfinder指标埋点
public static void collect(Executor executorService, String threadPoolName) {
ThreadPoolExecutor executor = (ThreadPoolExecutor)executorService;
String prefix = "thread.pool."+threadPoolName;
gauge1 = PfinderContext.getMetricRegistry().gauges(prefix)
.gauge(() -> executor.isShutdown() ? 0 : executor.getCorePoolSize())
.tags(MetricTag.of("type_dimension", "core_size")).build();
gauge2 = PfinderContext.getMetricRegistry().gauges(prefix)
.gauge(() -> executor.isShutdown() ? 0 : executor.getMaximumPoolSize())
.tags(MetricTag.of("type_dimension", "max_size"))
.build();
gauge4 = PfinderContext.getMetricRegistry().gauges(prefix)
.gauge(() -> executor.isShutdown() ? 0 : executor.getQueue().size())
.tags(MetricTag.of("type_dimension", "queue_size"))
.build();
}
而线程池拒绝异常,我们可以在线程池初始化时包装线程池的拒绝策略,在执行实际拒绝策略前抛出告警;
@Slf4j
public class RejectInvocationHandler implements InvocationHandler {
private final Object target;
@Value("${jtool.pool.reject.alarm.key}")
private String key;
public RejectInvocationHandler(Object target) {
this.target = target;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
ExecutorService executor = (ExecutorService)args[1];
if (Strings.equals(method.getName(), "rejectedExecution")) {
try {
rejectBefore(executor);
}
catch (Exception exp) {
log.error("==> Exception while do rejectBefore for pool [{}]", executor, exp);
}
}
return method.invoke(target, args);
}
private void rejectBefore(ExecutorService executor) {
// 触发报警
rejectAlarm(executor);
}
/**
* 拒绝报警
*/
private void rejectAlarm(ExecutorService executor) {
String alarmKey = Objects.nonNull(key) ? key : ThreadPoolConst.UMP_ALARM_KEY;
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)executor;
String threadPoolName = ThreadPoolManager.getNameByExecutor(threadPoolExecutor);
String errorMsg = String.format("===> 线程池拒绝报警 key: [%s], cur executor: [%s], core size: [%s], max size: [%s], queue size: [%s], curQueue size: [%s]",
alarmKey, threadPoolName, threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getQueue().size()+threadPoolExecutor.getQueue().remainingCapacity(), threadPoolExecutor.getQueue().size());
log.error(errorMsg);
Profiler.businessAlarm(alarmKey, errorMsg);
}
}
自动触发线程堆栈打印
public class RejectInvocationHandler implements InvocationHandler {
...
private void rejectBefore(ExecutorService executor) {
// 打印线程堆栈到日志的间隔条件
if (CommonProperty.canPrintStackTrace()) {
// 触发报警
rejectAlarm(executor);
// 触发线程堆栈打印
printThreadStack(executor);
}
}
...
}
...
/**
* 打印线程堆栈信息
*/
public static void printThreadStack(Executor executor) {
if (!CommonProperty.logPrintFlag) {
log.info("===> 线程池堆栈打印关闭:[{}]", CommonProperty.logPrintFlag);
return;
}
logger.info("\n=================>>> 线程池拒绝堆栈打印start,触发提交拒接处理的线程池:【{}】", executor);
Map<Thread, StackTraceElement[]> allStackTraces = Thread.getAllStackTraces();
log.info("===> allStackTraces size :[{}]", allStackTraces.size());
StringBuilder stringBuilder = new StringBuilder();
allStackTraces.entrySet().stream()
.sorted(Comparator.comparing(entry -> entry.getKey().getName()))
.forEach(threadEntry -> {
Thread thread = threadEntry.getKey();
stringBuilder.append(Strings.format("\n线程:[{}] 时间: [{}]\n", thread.getName(), new Date()));
stringBuilder.append(Strings.format("\tjava.lang.Thread.State: {}\n", thread.getState()));
StackTraceElement[] stack = threadEntry.getValue();
for (StackTraceElement stackTraceElement : stack) {
stringBuilder.append("\t\t").append(stackTraceElement.toString()).append("\n");
}
stringBuilder.append("\n");
logger.info(stringBuilder.toString());
stringBuilder.delete(0, stringBuilder.length());
});
logger.info("==============>>> end");
}
...
public class ThreadLogAnalyzer {
public static void main(String[] args) {
String logFilePath = "/Users/demo/jsfdemo/MyJtool/src/main/resources/reject.monitor 21.log";
String threadPoolNameLike = "simpleTestExecutor";
int threadCount = 0;
HashMap<String, Integer> statusMap = new HashMap<>();
HashMap<String, Integer> methodMap = new HashMap<>();
try (BufferedReader br = new BufferedReader(new FileReader(logFilePath))) {
String line;
while ((line = br.readLine()) != null) {
if (line.contains("=================>>> 线程池拒绝堆栈打印start,触发提交拒接处理的线程池")) {
System.out.println("开始读整个线程");
}
if (line.contains(threadPoolNameLike)) {
threadCount++;
String curStatus = br.readLine();
if (curStatus.contains("java.lang.Thread.State")) {
statusMap.put(curStatus, (statusMap.getOrDefault(curStatus, 0) + 1));
String methodTrace = br.readLine();
methodMap.put(methodTrace, (methodMap.getOrDefault(methodTrace, 0) + 1));
}
}
if (line.contains("==============>>> end")) {
System.out.println("结束读整个线程");
}
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(Strings.format("===> 当前线程名[{}]共计:{}", threadPoolNameLike, threadCount));
System.out.println("\n===> 状态分析结果:");
for (Map.Entry<String, Integer> statusEntry : statusMap.entrySet()) {
System.out.println(Strings.format("\t {} {}", statusEntry.getKey(), statusEntry.getValue()));
}
System.out.println("\n===> 方法分析结果:");
ArrayList<Map.Entry<String, Integer>> methodEntryList = Lists.newArrayList(methodMap.entrySet());
methodEntryList.sort(new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o2.getValue() - o1.getValue();
}
});
for (Map.Entry<String, Integer> methodEntry : methodEntryList) {
System.out.println(Strings.format("{} {} {}%", methodEntry.getKey(), methodEntry.getValue(), (methodEntry.getValue() / (double)threadCount) * 100));
}
}
}
线程池参数动态刷新
要实现线程池参数动态刷新,我们需要关注以下几个要点:
1.哪些参数需要变更
在使用线程池时,我们通常需要配置多个参数,但是实际上我们只需要灵活配置好corePoolSize(核心线程数),maximumPoolSize(最大线程数),workQueue(队列长度)这三个核心参数就可以应对大部分场景了;
2.运行中的线程池如何变更参数
从前面我们可以知道线程池的核心实现类ThreadPoolExecutor提供了改变corePoolSize,maximumPoolSize的两个快捷方法:
(1)setCorePoolSize(int corePoolSize)
(2)setMaximumPoolSize(int maximumPoolSize)
我们只需要通过RPC或者HTTP的方式将想要变更的参数传递到应用再利用上述方法设置进去即可;而队列长度的变更却相对麻烦点,因为我们常使用的阻塞队列LinkedBlockingQueue将队列大小设置为成了一个final类型的变量,我们无法快捷变更,那该怎么办呢,其中一个思想就是自定义一个LinkedBlockQueue,修改capacity为非final类型,增加一个capacity设置方法,同时考虑并发问题对其中涉及到的方法进行修改;(可参考RabbitMq中的VariableLinkedBlockingQueue)
3.应用集群场景下如何实现一键参数变更
/**
* ducc控制线程池刷新方法, 需要动态刷新的线程池信息列表,举例如下:
* value:
* [
* {
* "threadPoolName": "my_pool",
* "corePoolSize": "10",
* "maximumPoolSize": "20",
* "queueCapacity": "100"
* }
* ]
*/
@LafValue("jtool.pool.refresh")
public void refresh(@JsonConverter List<ThreadPoolProperties> threadPoolProperties) {
String jsonString = JSON.toJSONString(threadPoolProperties);
log.info("===> refresh thread pool properties [{}]", jsonString);
threadPoolProperties = JSONObject.parseArray(jsonString, ThreadPoolProperties.class);
refresh(threadPoolProperties);
}
public static boolean refresh(List<ThreadPoolProperties> threadPoolProperties) {
if (Objects.isNull(threadPoolProperties)) {
log.warn("refresh param is empty!");
return false;
}
log.info("Executor refresh param: [{}]", threadPoolProperties);
// 1.根据参数获取对应的线程池
threadPoolProperties.forEach(threadPoolProperty -> {
String threadPoolName = threadPoolProperty.getThreadPoolName();
Executor executor = ThreadPoolManager.getExecutorByName(threadPoolName);
if (Objects.isNull(executor)) {
log.warn("Register not find this executor: {}", threadPoolName);
return;
}
// 2. 线程池刷新
refreshExecutor(executor, threadPoolName, threadPoolProperty);
log.info("Refresh thread pool finish, threadPoolName: [{}]", threadPoolName);
});
return true;
}
Java开发者LLM实战——使用LangChain4j构建本地RAG系统
万字长文浅谈系统稳定性建设
生成式推荐系统与京东联盟广告-综述与应用