查看原文
其他

Python定时任务框架:APScheduler源码剖析(三)

Python猫 2020-09-13

The following article is from 懒编程 Author ayuliao

前言

距离上一篇APScheduler源码分析已经间隔了一段时间,趁现在有点闲暇,赶紧写上一篇。

这篇来分析APScheduler执行器相关的代码。

回顾

先回忆一下APScheduler是怎么运行起来的?回顾一下example的代码。

  1. scheduler = BackgroundScheduler()

  2. scheduler.add_job(tick, 'interval', seconds=3) # 添加一个任务,3秒后运行

  3. scheduler.start()

简单而言,实例化BackgroundScheduler,然后调用add_job方法添加任务,最后调用start方法启动。

add_job方法通过前面文章的分析已经知道了,就是将方法存到内存dict中,interval指定触发器为间隔触发器,间隔时间为3秒。

现在看一下start方法。

start方法

BackgroundScheduler的start方法调用了BaseScheduler类的start方法,其代码如下。

  1. # apscheduler/schedulers/base.py/BaseScheduler


  2. def start(self, paused=False):

  3. if self.state != STATE_STOPPED:

  4. raise SchedulerAlreadyRunningError

  5. # 检查:如果我们在uWSGI线程禁用状态下运行时就返回相应的错误警报

  6. self._check_uwsgi()


  7. with self._executors_lock:

  8. # Create a default executor if nothing else is configured

  9. # 创建默认执行器

  10. if 'default' not in self._executors:

  11. self.add_executor(self._create_default_executor(), 'default')


  12. # Start all the executors

  13. for alias, executor in self._executors.items():

  14. executor.start(self, alias)


  15. with self._jobstores_lock:

  16. # Create a default job store if nothing else is configured

  17. # 创建默认的存储器

  18. if 'default' not in self._jobstores:

  19. self.add_jobstore(self._create_default_jobstore(), 'default')


  20. # Start all the job stores

  21. for alias, store in self._jobstores.items():

  22. store.start(self, alias)


  23. # Schedule all pending jobs

  24. for job, jobstore_alias, replace_existing in self._pending_jobs:

  25. self._real_add_job(job, jobstore_alias, replace_existing)

  26. del self._pending_jobs[:]


  27. self.state = STATE_PAUSED if paused else STATE_RUNNING

  28. self._logger.info('Scheduler started')

  29. self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_START))


  30. if not paused:

  31. self.wakeup()

start方法代码含义直观,就是创建默认执行器以及默认的存储器,此外还调用了想要的start方法,执行器的start方法传入了self(调度器本身)与alias,执行器的start方法做了什么?默认执行器的start方法BaseExecutor类中,其代码如下。

  1. # apscheduler/executors/base.py/BaseExecutor


  2. def start(self, scheduler, alias):

  3. self._scheduler = scheduler

  4. self._lock = scheduler._create_lock()

  5. self._logger = logging.getLogger('apscheduler.executors.%s' % alias)

可以发现,start方法其实没做什么。

APScheduler默认的执行器就是线程执行器

  1. # apscheduler/schedulers/base.py/BaseScheduler


  2. def _create_default_executor(self):

  3. """Creates a default executor store, specific to the particular scheduler type."""

  4. return ThreadPoolExecutor()

本质就是使用ThreadPoolExecutor,但要注意其继承了BasePoolExecutor,而BasePoolExecutor又继承了BaseExecutor。

  1. # apscheduler/executores/pool.py


  2. class ThreadPoolExecutor(BasePoolExecutor):

  3. def __init__(self, max_workers=10):

  4. pool = concurrent.futures.ThreadPoolExecutor(int(max_workers))

  5. super().__init__(pool)

如何调用执行器?

这就要说回 _process_jobs方法了,该方法详细分析在「Python定时任务框架:APScheduler源码剖析(二)」中,这里截取部分相关代码

  1. for job in due_jobs:

  2. # Look up the job's executor

  3. # 搜索当前任务对象的执行器

  4. try:

  5. executor = self._lookup_executor(job.executor)

  6. except BaseException:

  7. #...省略

  8. # 获得运行时间

  9. run_times = job._get_run_times(now)

  10. run_times = run_times[-1:] if run_times and job.coalesce else run_times

  11. if run_times:

  12. try:

  13. # 提交这个任务给执行器

  14. executor.submit_job(job, run_times)

  15. except MaxInstancesReachedError:

  16. #...省略

大致逻辑就是从jobstore获取job任务对象,然后通过submitjob方法将job任务对象提交到执行器中,submitjob方法的具体实现在BaseExecutor类中,其逻辑如下。

  1. # apscheduler/executors/base.py/BaseExecutor


  2. def submit_job(self, job, run_times):

  3. # self._lock 为 RLock

  4. assert self._lock is not None, 'This executor has not been started yet'

  5. with self._lock:

  6. if self._instances[job.id] >= job.max_instances:

  7. raise MaxInstancesReachedError(job)


  8. self._do_submit_job(job, run_times)

  9. self._instances[job.id] += 1

submit_job方法先判断可重入锁是否存在,存在则在加锁的情况下使用 _do_submit_job方法执行job任务对象。

因为默认使用是线程执行器,其dosubmit_job方法就简单的将job任务对象提交给线程池,对应代码如下

  1. # apscheduler/executors/pool.py/BasePoolExecutor


  2. def _do_submit_job(self, job, run_times):

  3. def callback(f):

  4. exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else

  5. (f.exception(), getattr(f.exception(), '__traceback__', None)))

  6. if exc:

  7. self._run_job_error(job.id, exc, tb)

  8. else:

  9. self._run_job_success(job.id, f.result())


  10. f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)

  11. f.add_done_callback(callback)

_do_submit_job方法中,一开始定义了回调函数,用于接收线程池执行任务的结果,如果成功了,则调用 _run_job_success方法,失败了则调用 _run_job_error方法,这两个方法都在BaseExecutor中。

_run_job_success方法代码如下。

  1. # apscheduler/executors/base.py/BaseExecutor


  2. def _run_job_success(self, job_id, events):

  3. """

  4. Called by the executor with the list of generated events when :func:`run_job` has been

  5. successfully called.


  6. """

  7. with self._lock:

  8. self._instances[job_id] -= 1

  9. if self._instances[job_id] == 0:

  10. del self._instances[job_id]


  11. for event in events:

  12. self._scheduler._dispatch_event(event)

该方法会调用事件相关的机制,将线程池执行job任务对象的结果通APScheduler事件机制分发出去。

APScheduler的事件机制下次再聊,回过头看 f=self._pool.submit(run_job,job,job._jobstore_alias,run_times,self._logger.name),job任务对象作为runjob方法的参数,所以执行job的其实是runjob方法。

run_job方法

run_job方法代码如下。

  1. # apscheduler/executors/base.py


  2. def run_job(job, jobstore_alias, run_times, logger_name):


  3. events = []

  4. logger = logging.getLogger(logger_name)

  5. for run_time in run_times:


  6. # misfire_grace_time:在指定运行时间的之后几秒仍运行该作业运行

  7. if job.misfire_grace_time is not None:

  8. difference = datetime.now(utc) - run_time

  9. grace_time = timedelta(seconds=job.misfire_grace_time)

  10. # 判断是否超时

  11. if difference > grace_time:

  12. # 超时,则将 EVENT_JOB_MISSED 事件记录到 events 这个 list 中

  13. events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,

  14. run_time))

  15. logger.warning('Run time of job "%s" was missed by %s', job, difference)

  16. continue


  17. logger.info('Running job "%s" (scheduled at %s)', job, run_time)

  18. try:

  19. # 执行job任务对象

  20. retval = job.func(*job.args, **job.kwargs)

  21. except BaseException:

  22. exc, tb = sys.exc_info()[1:]

  23. formatted_tb = ''.join(format_tb(tb))

  24. # job任务对象执行报错,将 EVENT_JOB_ERROR 添加到

  25. events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,

  26. exception=exc, traceback=formatted_tb))

  27. logger.exception('Job "%s" raised an exception', job)


  28. # 为了防止循环引用,导致内存泄漏

  29. traceback.clear_frames(tb)

  30. del tb

  31. else:

  32. events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,

  33. retval=retval))

  34. logger.info('Job "%s" executed successfully', job)


  35. return events

run_job方法中,一开始先判断当前job任务对象的运行时间是否超过了 misfire_grace_time(在指定运行时间的之后几秒仍运行该作业运行),如果超时,则记录到events这个list中。

然后通过 retval=job.func(*job.args,**job.kwargs)真正的执行任务对象,如果执行过程中崩溃了,也会将job任务对象执行报错以事件的形式添加到events中。

这里出现了一个有趣的小技巧。

job任务对象执行崩溃后,通过 exc,tb=sys.exc_info()[1:]获取错误,而不是常见的将Exception中的值打印。

sys.exc_info方法会返回三个值:type(异常类别), value(异常说明,可带参数), traceback(traceback 对象,包含更丰富的信息),这里只取了value与traceback信息,然后通过 traceback.format_tb方法将其格式化,记录到日志中后,调用 traceback.clear_frames(tb)方法回溯清除所有堆栈帧中的局部变量tb,从APScheduler对该方法的注释是「为了防止循环引用,导致内存泄漏」。有点意思。

结尾

本文主要剖析了APScheduler中线程执行器它的源码,线程执行器代码简单,是APScheduler默认的执行器,APScheduler还有多个不同的执行器,各位有兴趣可以自行探究一下,有雅致可以联系我一同简单的讨论讨论。

APScheduler源码不同执行器、调度器、触发器其设计理念是类似的,这里就不一一去分析的,但还有个东西在前面一直出现却没有分析,那就是APSCheduler的「事件分发」机制,下一篇文章就来看看APScheduler的事件分发/监听等是怎么实现的。

如果文章对你有所帮助,点击「在看」支持二两,下篇文章见。

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存