分布式场景下使用APScheduler
The following article is from 懒编程 Author ayuliao
剧照 | 《爱尔兰人》
简介
APScheduler是一个定时任务框架,其主要功能就是方便控制不同类型的定时任务,本身并没有考虑分布式多实例情况下的一些问题,本篇文章就来简单谈谈APScheduler在简单分布式场景下的使用。
分布式带来的问题
比如有个服务A,服务A中有使用APScheduler添加任务的逻辑,每次添加的任务会在随后固定的某个时间点被APScheduler调用执行。
在单节点情况下,这并没有什么问题,但随着业务加大,你可能要开启多个服务A来做负载时,此时APScheduler就会出现重复执行任务的问题。
为了方便说明,这里使用MongoDB作为APScheduler的jobstore,使用线程池作为它的执行器。(如果你不明白我在说啥,建议看看此前APScheduler的文章)
scheduler = BlockingScheduler(
jobstores={"default": mongostore},
executors={"default": ThreadPoolExecutor(10)},
job_defaults={"coalesce": True, "max_instances": 3},
timezone='Asia/Shanghai',
)
如果开启了多个服务A,服务A中都使用了相同配置的scheduler,此时就会出现任务重复执行的问题。
为何会有这个问题?一起来阅读一下相关源码,一探究竟。
因为使用了BlockingScheduler作为调度器,所以直接看到该类的代码
# apscheduler/schedulers/blocking.py
class BlockingScheduler(BaseScheduler):
"""
A scheduler that runs in the foreground
(:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block).
"""
_event = None
# ... 省略部分代码
def _main_loop(self):
wait_seconds = TIMEOUT_MAX
while self.state != STATE_STOPPED:
# 等待事件通知,wait_seconds为等待事件通知的超时时间
# wait()方法会阻塞线程,直到事件标志状态为true。
self._event.wait(wait_seconds)
# clear()方法将事件标志状态设置为false
self._event.clear()
wait_seconds = self._process_jobs()
_main_loop
方法会构成主循环,其具体的执行逻辑在 _process_jobs
方法中, _process_jobs
方法部分代码如下。
# apscheduler/schedulers/base.py/BaseScheduler
def _process_jobs(self):
"""
Iterates through jobs in every jobstore, starts jobs that are due and figures out how long
to wait for the next round.
If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least
``jobstore_retry_interval`` seconds.
"""
if self.state == STATE_PAUSED:
self._logger.debug('Scheduler is paused -- not processing jobs')
return None
self._logger.debug('Looking for jobs to run')
now = datetime.now(self.timezone) # 当前时间
next_wakeup_time = None
events = []
with self._jobstores_lock:
# 从_jobstores中获取当前要处理的任务
for jobstore_alias, jobstore in self._jobstores.items():
try:
# 以当前时间为基准,判断是否到了执行时间
due_jobs = jobstore.get_due_jobs(now)
except Exception as e:
# Schedule a wakeup at least in jobstore_retry_interval seconds
# 在 jobstore 重试间隔时间(秒)内唤醒
self._logger.warning('Error getting due jobs from job store %r: %s',
jobstore_alias, e)
# 唤醒时间
retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
next_wakeup_time = retry_wakeup_time
continue
# ... 省略部分代码
_process_jobs
方法通过 due_jobs=jobstore.get_due_jobs(now)
获取jobstore中的任务对象,通过前面的配置可知,mongodb是这里的jobstore。
看到mongodb对应jobstore的代码。
# apscheduler/jobstores/mongodb.py/MongoDBJobStore
def get_due_jobs(self, now):
timestamp = datetime_to_utc_timestamp(now)
return self._get_jobs({'next_run_time': {'$lte': timestamp}})
getduejobs方法主要调用 _get_jobs
方法去获取任务对象,要关注的重点是,它使用了lte以及时间戳作为参数,简单用过mongodb的朋友都知道lte其实就是小于等于的意思,简单而言,只要小于或等于timestamp这个时间戳的任务都会被获取。
都看到这了,顺便看一下 _get_jobs
方法的代码吧。
def _reconstitute_job(self, job_state):
# 反序列化,获取任务对象参数
job_state = pickle.loads(job_state)
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
return job
def _get_jobs(self, conditions):
jobs = []
failed_job_ids = []
for document in self.collection.find(conditions, ['_id', 'job_state'],
sort=[('next_run_time', ASCENDING)]):
try:
jobs.append(self._reconstitute_job(document['job_state']))
except BaseException:
self._logger.exception('Unable to restore job "%s" -- removing it',
document['_id'])
failed_job_ids.append(document['_id'])
# Remove all the jobs we failed to restore
if failed_job_ids:
self.collection.remove({'_id': {'$in': failed_job_ids}})
return jobs # 返回所有小于等于某一时间戳的任务对象
到这里就很清楚APScheduler会出现重复执行任务问题的原因。
启动多个服务A,相当于运行同一份代码多次,此时APSCheduler的配置都是相同的,即多个APScheduler实例连接同一个mongodb,此时mongodb中存在一个任务就有可能被APScheduler消费多次。
使用分布式锁
要解决APScheduler多实例重复执行任务的问题,最常见的解决方案就是使用分布式锁,而分布式锁中最常见的就是基于Redis构建的字段锁。
Redis字段锁很容易理解,就是通过set命令在redis中设置一个字段,如果字段存在,则是加锁状态,而字段不存在,则是解锁状态。
设计Redis锁时,需要考虑操作原子性,避免同时去获取Redis字段的情况出现,还需要考虑字段超时,避免因逻辑错误出现的长时间死锁,所以设计Redis字段锁还是需要一些tick的,这里分享一种写法,如下。
@contextmanager
def redis_lock(name, timeout=(24 + 2) * 60 * 60):
try:
today_string = datetime.datetime.now().strftime("%Y-%m-%d")
key = f"servername.lock.{name}.{today_string}"
log.info(f"<Redis Lock> {key}")
# 原子性的锁: 不存在,创建锁,返回1,相当于获取锁;存在,创建锁失败,返回0,相当于获取锁失败;过一段时间超时,避免死锁
# nx: 不存在,key值设置为value,返回1,存在,不操作,返回0
# ex: 设置超时
lock = bonus_redis.set(key, value=1, nx=True, ex=timeout)
yield lock
finally:
bonus_redis.delete(key) # 释放锁
通过上面方法设置的锁与常用的锁不同。
如果程序没有获得常用的锁,则会阻塞等待锁,而这里涉及的锁并不会等待,它的作用只是保证被锁方法在特定时间段内只执行一次。
此外还要考虑的是加锁位置,因为APScheduler会获取小于某个时间戳下的所有任务,那为了避免任务被重复执行,最直观的做法就是在任务函数中加上锁,例子如下。
# 要被执行的任务函数
def tick():
with redislock() as lock:
if lock:
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BackgroundScheduler()
# 添加到相应的jobstore中
scheduler.add_job(tick, 'interval', seconds=3) # 添加一个任务,3秒后运行
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
结尾
至此分布式场景下使用APScheduler的方法就介绍完了,核心思想就是确保多个APScheduler实例对同一任务只会执行一次,感谢你的阅读。
如果文章对你有所帮助,点击「在看」支持二两,叩谢豪恩。
优质文章,推荐阅读:
Python 定时任务框架:APScheduler 源码剖析 (一)