心法利器[8] | 模型热更新小记
【前沿重器】
全新栏目,本栏目主要和大家一起讨论近期自己学习的心得和体会,与大家一起成长。具体介绍:仓颉专项:飞机大炮我都会,利器心法我还有。
往期回顾
在工作中,我们完成一个模型的最终目标就是这玩意要能上线来跑,好不容易有了第一版后,我们需要做迭代,或者是针对一些实时的情况,我们需要对模型或者模型内的参数进行热更新,然而我们知道的是,一旦服务上线了,就不能随意停止或者是重启服务,服务终止所带来的损失在互联网时代看来是非常难以支撑的,所以要保证尽可能小的损失的话,就需要有特定的热更新机制,让我们热更新的东西能够快速切换。
今天我就来给大家分享一个非常简单的模型热更新方案。代码在文章偏后的位置,前面主要和大家说明白这个设计的原理。
热更新的需求分析
所谓的热更新,来看一个百度的解释:
热更新就是动态下发代码,它可以使开发者在不发布新版本的情况下,修复 BUG 和发布功能。
简单地说,就是不需要停止服务,也能更新到最新版本,对于算法而言,就是更新这个模型不需要重启服务,不可影响服务的正常运行,拆解开来,其实有这几个细节的需求:
原子化。简单地说就是一件事,要么做要么不做,中间不可以有插入其他任务的机会。这个是高并发所提出的要求,一般而言我们的模型会同时被多个线程请求计算,此时我们在更新模型的时候,如果不能保证原子化,那么进行预测的进程就可能读到被改到一半的模型从而导致计算错误。 模型加载更新过程服务不可终止。加载模型本身是一个非常耗时的任务,如果和常规的推理任务放在一个进程里,必然会导致大量的请求被堵塞。 新模型并非100%成功,失败的时候需要一定的回滚保护机制。
为了这个热更新的需求,double buffer机制应运而生。
double buffer
中文叫做双备份机制,这个机制的原理并不复杂。就是对于要更新的部分,如模型、配置,在运行过程中用两个变量来保存,一个变量用于正常运行,给预测的请求用,另一个则用来更新模型,模型更新完以后,把带着新模型的变量用来进行预测。
这样做能完美满足上面提到的3个细节需求:
原子化。真正更换模型的过程其实是把调用的指针从指着带有原模型的变量转为带有新模型的变量,这个指针转化的过程就是原子化的。 模型加载更新过程服务不可终止。加载模型过程是另一个线程在做的,可最终更新模型只是一个指针的更换,所以影响面压缩的非常小,终止情况大大减少。 两块内容进行,本身就能够保证回滚。
道理应该不难懂,我们来举个栗子看看,网上没找到python版本的,我这里尝试写写,也不是非常正规的double buffer,就是抛砖引玉吧。
上代码
原始模型
我先给出一个最简单的模型类吧,为了问题简化,我把这个模型整成一个输入一个数,返回这个数的被,这个就是模型的参数了。
class Model:
def __init__(self, config):
self.config = config
self.model = self.load_model()
def load_model(self):
with open(self.config["model_path"]) as f:
for line in f:
model = float(line.strip())
return model
def predict(self,x):
return self.model * x
可以看到这应该是一个再简单不过的模型了。
装饰器实现单例模式
为了保证模型本身在整个任务里面的唯一性,全局都只使用一个模型,我这里画蛇添足了一波,本身和双buffer的关系不会太大,就是用装饰器实现了一个单例模式:
def singleton(cls):
_instance = {}
def inner(*a):
if cls not in _instance:
_instance[cls] = cls(*a)
return _instance[cls]
return inner
于是模型可以加上这个装饰器,这样就能保证一任务下,所以调用这个Model的对象,都是用的一个模型,这样能有效减少内存消耗:
@singleton
class Model:
具体的原理可以参考这个:https://www.cnblogs.com/dingjiaoyang/p/11004793.html
双buffer
既然要双buffer,一方面我们设计好模型的更新方式,另一方面我们要把这个双buff设计好。来看代码:
@singleton
class Model:
def __init__(self, config):
self.config = config
init_model = self.load_model()
self.model = [init_model, init_model]
self.cur_run_model_idx = 0
def load_model(self):
with open(self.config["model_path"]) as f:
for line in f:
model = float(line.strip())
return model
def update_model(self):
new_model = self.load_model()
self.model[(self.cur_run_model_idx + 1) % 2] = new_model
new_cur_run_model_idx = (self.cur_run_model_idx + 1) % 2
self.cur_run_model_idx = new_cur_run_model_idx
def predict(self,x):
return self.model[self.cur_run_model_idx] * x
来看看变化点:
原来的模型是直接一个变量存着,现在的模型变成了一个数组来存。 用一个简单的变量存着现在正是在使用的模型是谁,就是 cur_run_model_idx
。update_model
就是更新模型的核心了,首先用一个临时局部变量存着新模型,当然新模型在文件上需要覆盖原模型;然后把家再好的模型赋值给没有在使用的那个模型里面,最后再修改cur_run_model_idx
为新模型。
整个机制已经完成了,那么问题来了,如何去执行这个update_model
呢,这里给出两个方案,一个是定时任务的方式,另一个是手动的方式。
模型更新机制
定时任务方法
定时任务可以理解为新开辟一个进程,监听模型文件,或者定时重新加载模型文件更新模型。
这里要用到python的定时任务工具,apscheduler。
pin install apscheduler
具体他是啥不多说了,教程和文档放这里:
教程:https://www.cnblogs.com/dingjiaoyang/p/11004793.html 非官方的官方文档翻译:https://www.jianshu.com/p/4f5305e220f0 官方文档:https://apscheduler.readthedocs.io/en/latest/
另外这里的使用参考了这两个链接:
https://blog.csdn.net/qq_38368548/article/details/109483589 https://blog.csdn.net/somezz/article/details/83104368
我们这里用到的是后台更细任务器,在python中就是额外启动一个进程来执行定时任务。
from apscheduler.schedulers.background import BackgroundScheduler
def __init_scueduler(self):
self.scheduler = BackgroundScheduler()
trigger = TriggerManager.interval_trigger(conf={"timeInterval": 10, "timeUnit": 's'})
self.scheduler.add_job(self.update_model, trigger,
kwargs={},
id="update_model")
self.scheduler.start()
这里的TriggerManager其实是一个触发管理器,可以设置特定的触发时。
from apscheduler.triggers.interval import IntervalTrigger
class TriggerManager(object):
"""
触发管理器
"""
def __init__(self):
pass
@classmethod
def interval_trigger(cls, conf):
"""
间隔触发方法
:param conf: 配置计划时间
:return:
"""
time_args = {"s": 0, "m": 0, "h": 0, "d": 0, "w": 0}
time_unit = conf['timeUnit']
time_interval = conf['timeInterval']
time_args[time_unit] = time_interval
return IntervalTrigger(seconds=time_args['s'],
minutes=time_args['m'],
hours=time_args['h'],
days=time_args['d'],
weeks=time_args['w']
)
因此融合起来,整块代码就是这样的:
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
def singleton(cls):
_instance = {}
def inner(*a):
if cls not in _instance:
_instance[cls] = cls(*a)
return _instance[cls]
return inner
class TriggerManager(object):
"""
触发管理器
"""
def __init__(self):
pass
@classmethod
def interval_trigger(cls, conf):
"""
间隔触发方法
:param conf: 配置计划时间
:return:
"""
time_args = {"s": 0, "m": 0, "h": 0, "d": 0, "w": 0}
time_unit = conf['timeUnit']
time_interval = conf['timeInterval']
time_args[time_unit] = time_interval
return IntervalTrigger(seconds=time_args['s'],
minutes=time_args['m'],
hours=time_args['h'],
days=time_args['d'],
weeks=time_args['w']
)
@singleton
class Model:
def __init__(self, config):
self.config = config
init_model = self.load_model()
self.model = [init_model, init_model]
self.cur_run_model_idx = 0
self.__init_scueduler()
def __init_scueduler(self):
self.scheduler = BackgroundScheduler()
trigger = TriggerManager.interval_trigger(conf={"timeInterval": 10, "timeUnit": 's'})
self.scheduler.add_job(self.update_model, trigger,
kwargs={},
id="update_model")
self.scheduler.start()
def load_model(self):
with open(self.config["model_path"]) as f:
for line in f:
model = float(line.strip())
return model
def update_model(self):
new_model = self.load_model()
self.model[(self.cur_run_model_idx + 1) % 2] = new_model
new_cur_run_model_idx = (self.cur_run_model_idx + 1) % 2
self.cur_run_model_idx = new_cur_run_model_idx
def predict(self,x):
return self.model[self.cur_run_model_idx] * x
手动法
这也是我用了singleton的原因,在所有任务里面所指的模型,都是同一个,因此我们可以在这个模型里面给出外面更加简单的接口。
def init_model():
global model
model = Model({"model_path":"./model.txt"})
def predict(x):
return model.predict(x)
def update_model():
return model.update_model()
当我们要紧急更新模型的时候,只需要执行一个这么简单的脚本:
from model import update_model
update_model()
更新模型完事了。
小结
除了研究算法,偶尔研究研究这些开发技能上的东西,还是挺好用的。虽然这里谈的是热更新,但实际上里面的技术点还是挺多的:
singleton,单例模式。 装饰器。 apscheduler。 等等。