查看原文
其他

版本发布->服务抖动->问题分析->优化实践。

why技术 2023-10-26

你好呀,我是歪歪。

今天给大家分享一篇我在博客园上看到的一篇叫做《应用部署引起上游服务抖动问题分析及优化实践方案》的文章。

文章的内容就是作者针对微服务上线的过程中,对于上游服务带来的影响的本质原因追溯和解决方案的探索。

文章的作者是京东物流的程序员,在京东内部使用的 RPC 框架叫做 JSF,虽然我们看不到 JSF 的源码,但是它是脱胎于 Dubbo 的。

而且,我整篇文章看下来,发现源码并不重要,重要的是有清晰的思路和相对完善的监控指标。

在分享正文之前,歪歪歪师傅,先基于一个真实的面试题,来抛砖引个玉。

这个题的答案就藏在题面里:代码预热。

京东的这篇文章也是全文在围绕着“启动过程中哪些地方可以做预热”展开。

而提到预热,我首先想起的是 Dubbo 服务的预热源码。这一块功能编码确实一点也不复杂,主要是能体现出编码的人对于 JVM 和 RPC 方面的“内功”,能够意识到,由于 JVM 的编译特点,再加上 Dubbo 在架构中充当着 RPC 框架的角色,所以为了服务最大程度上的稳定,可以在编码的层面做一定的服务预热。

下面这个方法,只有两行,但是这就是 Dubbo 服务预热功能的核心代码:

org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance#calculateWarmupWeight

看一下这个方法在框架里面调用的地方:

当我们不指定参数的情况下,入参 warmup 和 weight 是有默认值的:

static int calculateWarmupWeight(int uptime) {
    //int ww = (int) ( uptime / ((float) 10 * 60 * 1000 / 100));
    int ww = (int) ( uptime / 6000 );
    return ww < 1 ? 1 : (Math.min(ww, 100));
}

它的入参 uptime 代表服务启动时间,单位是毫秒。返回参数代表当前服务的权重。

基于这个方法,我先给你搞个图。

下面这个图,x 轴是启动时间,单位是秒,y 轴是对应的权重:

从图上可以看出,从服务启动开始,每隔 6 秒权重就会加一,直到 600 秒,即 10 分钟之后,权重变为 100。

比如当 uptime 为 60 秒时,该方法的返回值为 10。

当 uptime 为 66 秒时,该方法的返回值为 11。

当 uptime 为 120 秒时,该方法的返回值为 20。

以此类推...

600 秒,也就是十分钟以及超过十分钟之后,权重均为 100,代表预热完成。

权重,就是用来决定这次请求发送给哪个服务的一个关键因素。

我给你画个示意图:

A、B、C 三台服务,A,B 的权重都是 100,C 服务刚刚启动。

作为一个刚刚启动的服务,是不适合接受突发流量的,以为运行在服务器上的代码还没有经过充分的编译,主链接上的代码可能还没有进入编译器的 C2 阶段。

所以按理来说 C 服务需要一个服务预热的过程,也就是刚刚启动的前 10 分钟,应该有逐步接受越来越多的请求这样的一个过程。

比如最简单的加权随机轮询的负载均衡策略中,关键代码是这样的:

org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance#doSelect

看不明白没关系,我再给你画个图。

在 C 服务启动的第 1 分钟,它的权重是 10:

所以代码中的 totalWeight=210,因此下面这行代码就是随机生成 210 之内的一个数字:

int offset = ThreadLocalRandom.current().nextInt(totalWeight);

在示意图中有三个服务器,所以 for 循环中的 lenght=3。

weights[] 这个数组是个啥玩意呢?

看一眼代码:

每次循环的时候把每个服务器的权重汇总起来,放到 weights[] 里面。

在上面的例子中也就是这样的:

  • weights[0]= 100(A服务器的权重)
  • weights[1]= 100(A服务器的权重)+100(B服务器的权重)=200
  • weights[2]= 100(A服务器的权重)+100(B服务器的权重)+10(C服务器的权重)=210

当随机数 offset 在 0-100 之间,A 服务器处理本次请求。在 100-200 之间 B 服务器处理本次请求。在 200-210 之间 C 服务器处理本次请求:

也就是说:C 服务器有一定的概率被选上,来处理这一次请求,但是概率不大。

怎么概率才能大呢?

权重要大。

权重怎么才大呢?

启动时间长了,权重也随之增大了。

比如服务启动 8 分钟之后,就变成这样了,C 服务器被选中的概率就大了很多:

最后到 10 分钟之后,三台服务器的权重一致,承担的流量也就几乎一致了。

C 服务器承担的请求随着服务启动时间越来越多,直到 10 分钟后到达一个峰值,这就算是经历了一个预热的过程。

前面介绍的就是一个预热的手段,而类似于这样的预热思想你在其他的一些网关类的开源项目中也能找到类似的源码。

但是预热不只是有这样的一个实现方式。

比如阿里基于 OpenJDK 搞了一个 Alibaba Dragonwell,其实也就是一个 JDK。

https://github.com/alibaba/dragonwell8

其中之一的特性就是预热:

对于我前面大篇幅的介绍的 Dubbo 的预热机制,在《应用部署引起上游服务抖动问题分析及优化实践方案》一文中,也提到了,但是它是这样描述的:

最关键的是该方案的预热规则配置的是在一个固定预热周期(比如1分钟)内某个接口的预热权重(接收调用量比例),简单理解就是小流量试跑,这就决定了该方案无法对系统资源进行充分预热,预热周期过后全部流量进入依然会因需要创建或初始化资源引起服务抖动,对于交易接单服务来说,抖动就会导致接单失败,有卡单风险。

那么针对这个问题,还有什么其他的解题方案呢?

京东技术团队发布在博客园的这篇文章,分享给你:

原文链接:https://www.cnblogs.com/jingdongkeji/p/17317511.html
作者:京东物流 朱永昌

背景介绍

本文主要围绕应用部署引起上游服务抖动问题展开,结合百川分流系统实例,提供分析、解决思路,并提供一套切实可行的实践方案。

百川分流系统作为交易订单中心的专用网关,为交易订单中心提供统一的对外标准服务(包括接单、修改、取消、回传等),对内则基于配置规则将流量分发到不同业务线的应用上。随着越来越多的流量切入百川系统,因系统部署引起服务抖动导致上游系统调用超时的问题也逐渐凸显出来。为提供稳定的交易服务系统,提升系统可用率,需要对该问题进行优化。

经调研,集团内部现有两种预热方案:

  • 方案一:JSF官方提供的预热方案;
  • 方案二:行云编排部署结合录制回放的预热方案。

两种方法均无法达到预期效果。

关于方案一。

首先,使用的前提条件是JSF消费端必需升级JSF版本到1.7.6,百川分流系统上游调用方有几十个,推动所有调用方升级版本比较困难。

其次,JSF平台预热规则以接口纬度进行配置,百川分流系统对外提供46个接口,配置复杂。

最关键的是该方案的预热规则配置的是在一个固定预热周期(比如1分钟)内某个接口的预热权重(接收调用量比例),简单理解就是小流量试跑,这就决定了该方案无法对系统资源进行充分预热,预热周期过后全部流量进入依然会因需要创建或初始化资源引起服务抖动,对于交易接单服务来说,抖动就会导致接单失败,有卡单风险。

关于方案二通过录制线上流量进行压测回放来实现预热,适合读接口,但对于写接口如果不做特殊处理会影响线上数据。

针对这个问题,目前的解决方案是通过压测标识来识别压测预热流量,但交易业务逻辑复杂,下游依赖繁多,相关系统目前并不支持。单独改造的话,接口多、风险高。

基于以上情况,我们通过百川分流系统部署引起上游服务抖动这个实例,追踪其表象线索,深入研读JSF源码,最终找到导致服务抖动的关键因素,开发了一套更加有效的预热方案,验证结果表明该方案预热效果明显,服务调用方方法性能MAX值降低90%,降到了超时时间范围内,消除了因机器部署引起上游调用超时的问题。

问题现象

系统上线部署期间,纯配接单服务上游调用方反馈接单服务抖动,出现调用超时现象。

查看此服务UMP打点,发现此服务的方法性能监控MAX值最大3073ms,未超过调用方设置的超时时间10000ms(如图1所示)

图1 服务内部监控打点

查看此服务PFinder性能监控,发现上游调用方应用调用此服务的方法性能监控MAX值多次超过10000ms(可以直接查看调用方的UMP打点,若调用方无法提供UMP打点时,也可借助PFinder的应用拓扑功能进行查看,如图2所示)

图2 服务外部监控打点

分析思路

从上述问题现象可以看出,在系统上线部署期间服务提供方接口性能MAX值并无明显抖动,但服务调用方接口性能MAX值抖动明显。

由此,可以确定耗时不在服务提供方内部处理逻辑上,而是在进入服务提供方内部处理逻辑之前(或者之后),那么在之前或者之后具体都经历了什么呢?

我们不着急回答这个问题,先基于现有的一些线索逐步进行追踪探索。

线索一:部署过程中机器CPU会有短暂飙升(如图3所示)

如果此时有请求调用到当前机器,接口性能势必会受到影响。因此,考虑机器部署完成且待机器CPU平稳后再上线JSF服务,这可以通过调整JSF延迟发布参数来实现。具体配置如下:

 <jsf:provider id="createExpressOrderService" 
               interface="cn.jdl.oms.api.CreateExpressOrderService"
               ref="createExpressOrderServiceImpl"
               register="true"
               concurrents="400"
               alias="${provider.express.oms}"
               // 延迟发布2分钟
               delay="120000">
</jsf:provider>

然而,实践证明JSF服务确实延迟了2分钟才上线(如图4所示),且此时CPU已经处于平稳状态,但是JSF上线瞬间又引起了CPU的二次飙升,同时调用方仍然会出现服务调用超时的现象。

图3 机器部署过程CPU短暂飙升
图4 部署和JSF上线瞬间均导致CPU飙升

线索二:JSF上线瞬间JVM线程数飙升(如图5所示)

图5 JSF上线瞬间线程数飙升

使用jstack命令工具查看线程堆栈,可以发现数量增长最多的线程是JSF-BZ线程,且都处于阻塞等待状态:

"JSF-BZ-22000-137-T-350" #1038 daemon prio=5 os_prio=0 tid=0x00007f02bcde9000 nid=0x6fff waiting on condition [0x00007efa10284000]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0x0000000640b359e8> (a java.util.concurrent.SynchronousQueue$TransferStack)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458)
 at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
 at java.util.concurrent.SynchronousQueue.take(SynchronousQueue.java:924)
 at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
 - None

"JSF-BZ-22000-137-T-349" #1037 daemon prio=5 os_prio=0 tid=0x00007f02bcde7000 nid=0x6ffe waiting on condition [0x00007efa10305000]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0x0000000640b359e8> (a java.util.concurrent.SynchronousQueue$TransferStack)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458)
 at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
 at java.util.concurrent.SynchronousQueue.take(SynchronousQueue.java:924)
 at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
 - None

"JSF-BZ-22000-137-T-348" #1036 daemon prio=5 os_prio=0 tid=0x00007f02bcdd8000 nid=0x6ffd waiting on condition [0x00007efa10386000]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0x0000000640b359e8> (a java.util.concurrent.SynchronousQueue$TransferStack)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458)
 at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
 at java.util.concurrent.SynchronousQueue.take(SynchronousQueue.java:924)
 at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
 - None

...

通过关键字“JSF-BZ”可以在JSF源码中检索,可以找到关于“JSF-BZ”线程池初始化源码如下:

private static synchronized ThreadPoolExecutor initPool(ServerTransportConfig transportConfig) {
    final int minPoolSize, aliveTime, port = transportConfig.getPort();
    int maxPoolSize = transportConfig.getServerBusinessPoolSize();
    String poolType = transportConfig.getServerBusinessPoolType();
    if ("fixed".equals(poolType)) {
        minPoolSize = maxPoolSize;
        aliveTime = 0;
    } else if ("cached".equals(poolType)) {
        minPoolSize = 20;
        maxPoolSize = Math.max(minPoolSize, maxPoolSize);
        aliveTime = 60000;
    } else {
        throw new IllegalConfigureException(21401, "server.threadpool", poolType);
    }
    String queueType = transportConfig.getPoolQueueType();
    int queueSize = transportConfig.getPoolQueueSize();
    boolean isPriority = "priority".equals(queueType);
    BlockingQueue<Runnable> configQueue = ThreadPoolUtils.buildQueue(queueSize, isPriority);
    NamedThreadFactory threadFactory = new NamedThreadFactory("JSF-BZ-" + port, true);
    RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        private int i = 1;
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (this.i++ % 7 == 0) {
                this.i = 1;
                BusinessPool.LOGGER.warn("[JSF-23002]Task:{} has been reject for ThreadPool exhausted! pool:{}, active:{}, queue:{}, taskcnt: {}", new Object[]{r, Integer.valueOf(executor.getPoolSize()), Integer.valueOf(executor.getActiveCount()), Integer.valueOf(executor.getQueue().size()), Long.valueOf(executor.getTaskCount())});
            }
            RejectedExecutionException err = new RejectedExecutionException("[JSF-23003]Biz thread pool of provider has bean exhausted, the server port is " + port);
            ProviderErrorHook.getErrorHookInstance().onProcess(new ProviderErrorEvent(err));
            throw err;
        }
    };
    LOGGER.debug("Build " + poolType + " business pool for port " + port + " [min: " + minPoolSize + " max:" + maxPoolSize + " queueType:" + queueType + " queueSize:" + queueSize + " aliveTime:" + aliveTime + "]");
    return new ThreadPoolExecutor(minPoolSize, maxPoolSize, aliveTime, TimeUnit.MILLISECONDS, configQueue, (ThreadFactory) threadFactory, handler);
}
public static BlockingQueue<Runnable> buildQueue(int size, boolean isPriority) {
    BlockingQueue<Runnable> queue;
    if (size == 0) {
        queue = new SynchronousQueue<Runnable>();
    } else if (isPriority) {
        queue = (size < 0) ? new PriorityBlockingQueue<Runnable>() : new PriorityBlockingQueue<Runnable>(size);
    } else {
        queue = (size < 0) ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(size);
    }
    return queue;
}

另外,JSF官方文档关于线程池的说明如下:

结合JSF源码以及JSF官方文档说明,可以知道JSF-BZ线程池的阻塞队列用的是SynchronousQueue,这是一个同步阻塞队列,其中每个put必须等待一个take,反之亦然。JSF-BZ线程池默认使用的是伸缩无队列线程池,初始线程数为20个,那么在JSF上线的瞬间,大批量并发请求进入,初始化线程远不够用,因此新建了大量线程。

既然知道了是由于JSF线程池初始化线程数量不足导致的,那么我们可以考虑在应用启动时对JSF线程池进行预热,也就是说在应用启动时创建足够数量的线程备用。通过查阅JSF源码,我们找到了如下方式实现JSF线程池的预热:

// 从Spring上下文获取JSF ServerBean,可能有多个
Map<String, ServerBean> serverBeanMap = applicationContext.getBeansOfType(ServerBean.class);
if (CollectionUtils.isEmpty(serverBeanMap)) {
    log.error("application preheat, jsf thread pool preheat failed, serverBeanMap is empty.");
    return;
}

// 遍历所有serverBean,分别做预热处理
serverBeanMap.forEach((serverBeanName, serverBean) -> {
    if (Objects.isNull(serverBean)) {
        log.error("application preheat, jsf thread pool preheat failed, serverBean is null, serverBeanName:{}", serverBeanName);
        return;
    }
    // 启动ServerBean,启动后才可以获取到Server
    serverBean.start();
    Server server = serverBean.getServer();
    if (Objects.isNull(server)) {
        log.error("application preheat, jsf thread pool preheat failed, JSF Server is null, serverBeanName:{}", serverBeanName);
        return;
    }

    ServerTransportConfig serverTransportConfig = server.getTransportConfig();
    if (Objects.isNull(serverTransportConfig)) {
        log.error("application preheat, jsf thread pool preheat failed, serverTransportConfig is null, serverBeanName:{}", serverBeanName);
        return;
    }
    // 获取JSF业务线程池
    ThreadPoolExecutor businessPool = BusinessPool.getBusinessPool(serverTransportConfig);
    if (Objects.isNull(businessPool)) {
        log.error("application preheat, jsf biz pool preheat failed, businessPool is null, serverBeanName:{}", serverBeanName);
        return;
    }

    int corePoolSize = businessPool.getCorePoolSize();
    int maxCorePoolSize = Math.max(corePoolSize, 500);

    if (maxCorePoolSize > corePoolSize) {
        // 设置JSF server核心线程数
        businessPool.setCorePoolSize(maxCorePoolSize);
    }
    // 初始化JSF业务线程池所有核心线程
    if (businessPool.getPoolSize() < maxCorePoolSize) {
        businessPool.prestartAllCoreThreads();
    }
}

线索三:JSF-BZ线程池预热完成后,JSF上线瞬间JVM线程数仍有升高

继续使用jstack命令工具查看线程堆栈,对比后可以发现数量有增长的线程是JSF-SEV-WORKER线程:

"JSF-SEV-WORKER-139-T-129" #1295 daemon prio=5 os_prio=0 tid=0x00007ef66000b800 nid=0x7289 runnable [0x00007ef627cf8000]
   java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
 - locked <0x0000000644f558b8> (a io.netty.channel.nio.SelectedSelectionKeySet)
 - locked <0x0000000641eaaca0> (a java.util.Collections$UnmodifiableSet)
 - locked <0x0000000641eaab88> (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
 at io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:68)
 at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:805)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:457)
 at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
 - None

"JSF-SEV-WORKER-139-T-128" #1293 daemon prio=5 os_prio=0 tid=0x00007ef60c002800 nid=0x7288 runnable [0x00007ef627b74000]
   java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
 - locked <0x0000000641ea7450> (a io.netty.channel.nio.SelectedSelectionKeySet)
 - locked <0x0000000641e971e8> (a java.util.Collections$UnmodifiableSet)
 - locked <0x0000000641e970d0> (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
 at io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:68)
 at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:805)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:457)
 at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
 - None

"JSF-SEV-WORKER-139-T-127" #1291 daemon prio=5 os_prio=0 tid=0x00007ef608001000 nid=0x7286 runnable [0x00007ef627df9000]
   java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
 - locked <0x0000000641e93998> (a io.netty.channel.nio.SelectedSelectionKeySet)
 - locked <0x0000000641e83730> (a java.util.Collections$UnmodifiableSet)
 - locked <0x0000000641e83618> (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
 at io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:68)
 at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:805)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:457)
 at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
 - None

那么JSF-SEV-WORKER线程是做什么的?我们是不是也可以对它做预热操作?

带着这些疑问,再次查阅JSF源码:

private synchronized EventLoopGroup initChildEventLoopGroup() {
     NioEventLoopGroup nioEventLoopGroup = null;
     int threads = (this.childNioEventThreads > 0) ? this.childNioEventThreads : Math.max(8, Constants.DEFAULT_IO_THREADS);
 
     NamedThreadFactory threadName = new NamedThreadFactory("JSF-SEV-WORKER", isDaemon());
     EventLoopGroup eventLoopGroup = null;
     if (isUseEpoll()) {
       EpollEventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup(threads, (ThreadFactory)threadName);
     } else {
       nioEventLoopGroup = new NioEventLoopGroup(threads, (ThreadFactory)threadName);
     } 
     return (EventLoopGroup)nioEventLoopGroup;
}

从JSF源码中可以看出JSF-SEV-WORKER线程是JSF内部使用Netty处理网络通信创建的线程,仔细研读JSF源码同样可以找到预热JSF-SEV-WORKER线程的方法,代码如下:

// 通过serverTransportConfig获取NioEventLoopGroup
// 其中,serverTransportConfig的获取方式可参考JSF-BZ线程预热代码
NioEventLoopGroup eventLoopGroup = (NioEventLoopGroup) serverTransportConfig.getChildEventLoopGroup();

int threadSize = this.jsfSevWorkerThreads;
while (threadSize-- > 0) {
    new Thread(() -> {
        // 通过手工提交任务的方式创建JSF-SEV-WORKER线程达到预热效果
        eventLoopGroup.submit(() -> log.info("submit thread to netty by hand, threadName:{}", Thread.currentThread().getName()));
    }).start();
}

JSF-BZ线程、JSF-SEV-WORKER线程预热效果如下图所示:

挖掘源码线索

至此,经过JSF延迟发布、JSF内部线程池预热后,系统部署引起服务调用方抖动超时的现象有一定缓解(从原来的10000ms-20000ms降低到5000ms-10000ms),虽然说是有效果,但还有些不尽如人意。应该还是有优化空间的,现在是时候考虑我们最开始留下的那个疑问了:

服务调用方在进入服务提供方内部处理逻辑之前(或者之后),具体都经历了什么?。

最容易想到的肯定是中间经过了网络,但是网络因素基本可以排除,因为在部署过程中机器网络性能正常,那么还有哪些影响因素呢?

此时我们还是要回归到JSF源码中去寻找线索。

经过仔细研读JSF源码,我们可以发现JSF内部对于接口出入参有一系列编码、解码、序列化、反序列化的操作,而且在这些操作中我们有了惊喜的发现:本地缓存,部分源码如下:

DESC_CLASS_CACHE

private static final ConcurrentMap<String, Class<?>> DESC_CLASS_CACHE = new ConcurrentHashMap<String, Class<?>>();

private static Class<?> desc2class(ClassLoader cl, String desc) throws ClassNotFoundException {
  switch (desc.charAt(0)) {
    case 'V':
      return void.class;
    case 'Z'return boolean.class;
    case 'B'return byte.class;
    case 'C'return char.class;
    case 'D'return double.class;
    case 'F'return float.class;
    case 'I'return int.class;
    case 'J'return long.class;
    case 'S'return short.class;
    case 'L':
      desc = desc.substring(1, desc.length() - 1).replace('/''.');
      break;
    case '[':
      desc = desc.replace('/''.');
      break;
    default:
      throw new ClassNotFoundException("Class not found: " + desc);
  } 
  
  if (cl == null)
    cl = ClassLoaderUtils.getCurrentClassLoader(); 
  Class<?> clazz = DESC_CLASS_CACHE.get(desc);
  if (clazz == null) {
    clazz = Class.forName(desc, true, cl);
    DESC_CLASS_CACHE.put(desc, clazz);
  } 
  return clazz;
}

NAME_CLASS_CACHE

private static final ConcurrentMap<String, Class<?>> NAME_CLASS_CACHE = new ConcurrentHashMap<String, Class<?>>();

private static Class<?> name2class(ClassLoader cl, String name) throws ClassNotFoundException {
  int c = 0, index = name.indexOf('[');
  if (index > 0) {
    
    c = (name.length() - index) / 2;
    name = name.substring(0, index);
  } 
  if (c > 0) {
    
    StringBuilder sb = new StringBuilder();
    while (c-- > 0) {
      sb.append("[");
    }
    if ("void".equals(name)) { sb.append('V'); }
    else if ("boolean".equals(name)) { sb.append('Z'); }
    else if ("byte".equals(name)) { sb.append('B'); }
    else if ("char".equals(name)) { sb.append('C'); }
    else if ("double".equals(name)) { sb.append('D'); }
    else if ("float".equals(name)) { sb.append('F'); }
    else if ("int".equals(name)) { sb.append('I'); }
    else if ("long".equals(name)) { sb.append('J'); }
    else if ("short".equals(name)) { sb.append('S'); }
    else { sb.append('L').append(name).append(';'); }
     name = sb.toString();
  }
  else {
    
    if ("void".equals(name)) return void.class; 
    if ("boolean".equals(name)) return boolean.class; 
    if ("byte".equals(name)) return byte.class; 
    if ("char".equals(name)) return char.class; 
    if ("double".equals(name)) return double.class; 
    if ("float".equals(name)) return float.class; 
    if ("int".equals(name)) return int.class; 
    if ("long".equals(name)) return long.class; 
    if ("short".equals(name)) return short.class;
  
  } 
  if (cl == null)
    cl = ClassLoaderUtils.getCurrentClassLoader(); 
  Class<?> clazz = NAME_CLASS_CACHE.get(name);
  if (clazz == null) {
    clazz = Class.forName(name, true, cl);
    NAME_CLASS_CACHE.put(name, clazz);
  } 
  return clazz;
}

SerializerCache

private ConcurrentHashMap _cachedSerializerMap;

public Serializer getSerializer(Class<?> cl) throws HessianProtocolException {
  Serializer serializer = (Serializer)_staticSerializerMap.get(cl);
  if (serializer != null) {
    return serializer;
  }
  
  if (this._cachedSerializerMap != null) {
    serializer = (Serializer)this._cachedSerializerMap.get(cl);
    if (serializer != null) {
      return serializer;
    }
  } 
  
  int i = 0;
  for (; serializer == null && this._factories != null && i < this._factories.size(); 
    i++) {

    
    AbstractSerializerFactory factory = this._factories.get(i);
    
    serializer = factory.getSerializer(cl);
  } 
  
  if (serializer == null)
  {
    if (isZoneId(cl)) {
      ZoneIdSerializer zoneIdSerializer = ZoneIdSerializer.getInstance();
    } else if (isEnumSet(cl)) {
      serializer = EnumSetSerializer.getInstance();
    } else if (JavaSerializer.getWriteReplace(cl) != null) {
      serializer = new JavaSerializer(cl, this._loader);
    }
    else if (HessianRemoteObject.class.isAssignableFrom(cl)) {
      serializer = new RemoteSerializer();


    
    }
    else if (Map.class.isAssignableFrom(cl)) {
      if (this._mapSerializer == null) {
        this._mapSerializer = new MapSerializer();
      }
      serializer = this._mapSerializer;
    } else if (Collection.class.isAssignableFrom(cl)) {
      if (this._collectionSerializer == null) {
        this._collectionSerializer = new CollectionSerializer();
      }
      
      serializer = this._collectionSerializer;
    } else if (cl.isArray()) {
      serializer = new ArraySerializer();
    } else if (Throwable.class.isAssignableFrom(cl)) {
      serializer = new ThrowableSerializer(cl, getClassLoader());
    } else if (InputStream.class.isAssignableFrom(cl)) {
      serializer = new InputStreamSerializer();
    } else if (Iterator.class.isAssignableFrom(cl)) {
      serializer = IteratorSerializer.create();
    } else if (Enumeration.class.isAssignableFrom(cl)) {
      serializer = EnumerationSerializer.create();
    } else if (Calendar.class.isAssignableFrom(cl)) {
      serializer = CalendarSerializer.create();
    } else if (Locale.class.isAssignableFrom(cl)) {
      serializer = LocaleSerializer.create();
    } else if (Enum.class.isAssignableFrom(cl)) {
      serializer = new EnumSerializer(cl);
    } 
  }
  if (serializer == null) {
    serializer = getDefaultSerializer(cl);
  }
  
  if (this._cachedSerializerMap == null) {
    this._cachedSerializerMap = new ConcurrentHashMap<Object, Object>(8);
  }
  
  this._cachedSerializerMap.put(cl, serializer);
  
  return serializer;
}

DeserializerCache

private ConcurrentHashMap _cachedDeserializerMap;

public Deserializer getDeserializer(Class<?> cl) throws HessianProtocolException {
  Deserializer deserializer = (Deserializer)_staticDeserializerMap.get(cl);
  if (deserializer != null) {
    return deserializer;
  }
  if (this._cachedDeserializerMap != null) {
    deserializer = (Deserializer)this._cachedDeserializerMap.get(cl);
    if (deserializer != null) {
      return deserializer;
    }
  } 
  
  int i = 0;
  for (; deserializer == null && this._factories != null && i < this._factories.size(); 
    i++) {
    
    AbstractSerializerFactory factory = this._factories.get(i);
    
    deserializer = factory.getDeserializer(cl);
  } 
  
  if (deserializer == null)
    if (Collection.class.isAssignableFrom(cl)) {
      deserializer = new CollectionDeserializer(cl);
    }
    else if (Map.class.isAssignableFrom(cl)) {
      deserializer = new MapDeserializer(cl);
    }
    else if (cl.isInterface()) {
      deserializer = new ObjectDeserializer(cl);
    }
    else if (cl.isArray()) {
      deserializer = new ArrayDeserializer(cl.getComponentType());
    }
    else if (Enumeration.class.isAssignableFrom(cl)) {
      deserializer = EnumerationDeserializer.create();
    }
    else if (Enum.class.isAssignableFrom(cl)) {
      deserializer = new EnumDeserializer(cl);
    }
    else if (Class.class.equals(cl)) {
      deserializer = new ClassDeserializer(this._loader);
    } else {
      
      deserializer = getDefaultDeserializer(cl);
    }  
  if (this._cachedDeserializerMap == null) {
    this._cachedDeserializerMap = new ConcurrentHashMap<Object, Object>(8);
  }
  this._cachedDeserializerMap.put(cl, deserializer);
  
  return deserializer;
}

如上述源码所示,我们找到了四个本地缓存,遗憾的是,这四个本地缓存都是私有的,我们并不能直接对其进行初始化。

但是我们还是从源码中找到了可以间接对这四个本地缓存进行初始化预热的方法,代码如下:

DESC_CLASS_CACHE、NAME_CLASS_CACHE预热代码

// DESC_CLASS_CACHE预热
ReflectUtils.desc2classArray(ReflectUtils.getDesc(Class.forName("cn.jdl.oms.express.model.CreateExpressOrderRequest")));
// NAME_CLASS_CACHE预热
ReflectUtils.name2class("cn.jdl.oms.express.model.CreateExpressOrderRequest");

SerializerCache、DeserializerCache预热代码

public class JsfSerializerFactoryPreheat extends HessianSerializerFactory {

    public static void doPreheat(String className) {
        try {
            // 序列化
            JsfSerializerFactoryPreheat.SERIALIZER_FACTORY.getSerializer(Class.forName("cn.jdl.oms.express.model.CreateExpressOrderRequest"));
            // 反序列化
            JsfSerializerFactoryPreheat.SERIALIZER_FACTORY.getDeserializer(Class.forName(className));
        } catch (Exception e) {
            // do nothing
            log.error("JsfSerializerFactoryPreheat failed:", e);
        }
    }
}

由JSF源码对于接口出入参编码、解码、序列化、反序列化操作,我们又想到应用接口内部有对出入参进行Fastjson序列化的操作,而且Fastjson序列化时需要初始化SerializeConfig,对性能会有一定影响。

我们可以通过以下代码对Fastjson进行初始化预热:

JSON.parseObject(JSON.toJSONString(Class.forName("cn.jdl.oms.express.model.CreateExpressOrderRequest").newInstance()), Class.forName("cn.jdl.oms.express.model.CreateExpressOrderRequest"));

到目前为止,我们针对应用启动预热做了以下工作:

  • JSF延迟发布
  • JSF-BZ线程池预热
  • JSF-SEV-WORKER线程预热
  • JSF编码、解码、序列化、反序列化缓存预热
  • Fastjson初始化预热

经过以上预热操作,应用部署引起服务抖动的现象得到了明显改善,由治理前的10000ms-20000ms降低到了 2000ms-3000ms(略高于日常流量抖动幅度)。

解决方案

基于以上分析,将JSF线程池预热、本地缓存预热、Fastjson预热整合打包,提供了一个简单可用的预热小工具,Jar包已上传私服,如有意向请参考使用说明:应用启动预热工具使用说明。

应用部署导致服务抖动属于一个共性问题,针对此问题目前有如下可选方案:

1、JSF官方提供的预热方案

原理:利用JSF1.7.6的预热策略动态下发,通过服务器负载均衡能力,对于上线需要预热的接口进行流量权重调整,小流量试跑,达到预热目的。

优点:平台配置即可,接入成本低。

缺点:按权重预热,资源预热不充分;需要服务调用方JSF版本升级到1.7.6,对于上游调用方较多的情况下推动版本升级困难。

2、流量录制回放预热方案

原理:录制线上真实流量,然后通过压测的方式将流量回放到新部署机器达到预热目的。

优点:结合了行云部署编排,下线、部署、预热、上线,以压测的方式可以使得预热更加充分。

缺点:使用流程较繁琐;仅对读接口友好,写接口需要关注数据是否对线上有影响。

3、本文方案

原理:通过对服务提供方JSF线程池、本地缓存、Fastjson进行初始化的方式进行系统预热。

优点:资源预热充分;使用简单,支持自定义扩展。

缺点:对除JSF以外的其他中间件如Redis、ES等暂不支持,但可以通过自定义扩展实现。

预热效果

预热前:

预热后:

使用本文提供的预热工具,预热前后对比效果明显,如上图所示,调用方方法性能MAX值从原来的10000ms-20000ms降低到了2000ms-3000ms,已经基本接近日常MAX抖点。

总结

应用部署引起上游服务抖动是一个常见问题,如果上游系统对服务抖动比较敏感,或会因此造成业务影响的话,这个问题还是需要引起我们足够的重视与关注。本文涉及的百川分流系统,单纯对外提供JSF服务,且无其他中间件的引入,特点是接口多,调用量大。

此问题在系统运行前期并不明显,上线部署上游基本无感,但随着调用量的增长,问题才逐渐凸显出来,如果单纯通过扩容也是可以缓解这个问题,但是这样会带来很大的资源浪费,违背“降本”的原则。为此,从已有线索出发,逐步深挖JSF源码,对线程池、本地缓存等在系统启动时进行充分初始化预热操作,从而有效降低JSF上线瞬间的服务抖动。

好了,本文的技术部分就到这里啦。

下面这个环节叫做[荒腔走板],技术文章后面我偶尔会记录、分享点生活相关的事情,和技术毫无关系。我知道看起来很突兀,但是我喜欢,因为这是一个普通博主的生活气息。

荒腔走板

周末的时候去了一趟成都自然博物馆。

本来上个周末就想去逛逛的,结果发现微信上的预约还挺抢手,周末的票分分钟就没有了。于是这个周末学聪明了,放票当天早上定好闹钟,顺利约到了两张票。

结果到现场之后发现其实不用预约也可以进馆,不过稍微麻烦一点。

我主要是奔着看恐龙化石去的,当一个长达 20 多米的恐龙化石真真实实的摆在你面前的时候,那种第一眼的直观感受确实是非常震撼,比看《侏罗纪世界》之类的电影震撼的多。

自然博物馆有三件镇馆之宝。第一件是隆昌铁陨石,是四川到目前为止出土的铁陨石中密度最大的,重量可以达到 158.5 公斤。第二件是重庆合川马门溪龙,是到目前为止中国、乃至亚洲保存最完整的恐龙化石中的最大者,总长度是 24 米。第三件就是大竹重庆鱼,此标本保存非常完整,地质时代为晚侏罗世,距今 1.5 亿年前。

逛博物馆的时候,特别是在化石区,都是以上万年为时间单位的地方,确实能感受到一丝丝的人类之渺小。

整体还是推荐的,有兴趣的可以去看看。而且挨着成都理工大学,附近的好吃的也是数不胜数。

当天下午,我们去了一个“破破烂烂”的地方吃了一碗雪豆海带蹄花汤,味道巴适得板哦。

··············  END  ··············

推荐👍扯下@EventListener这个注解的神秘面纱。

推荐👍我试图通过这篇文章,教会你一种阅读源码的方式。

推荐👍

推荐👍不过是享受了互联网的十年红利期而已。

推荐👍一个普通程序员磕磕绊绊,又闪闪发光的十年。

你好呀,我是歪歪。我没进过一线大厂,没创过业,也没写过书,更不是技术专家,所以也没有什么亮眼的title。

当年高考,随缘调剂到了某二本院校计算机专业。纯属误打误撞,进入程序员的行列,之后开始了运气爆棚的程序员之路。

说起程序员之路还是有点意思,可以点击蓝字,查看我的程序员之路

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存