查看原文
其他

Python 定时任务框架:APScheduler 源码剖析 (二)

Python猫 2020-09-13

The following article is from 懒编程 Author ayuliao

👆 Python猫” ,一个值得加星标的公众号

花下猫语:继续分享“二两”同学的源码分析系列。阅读源码是个很好的学习方式,一起学起来吧。

来源:懒编程  ||   作者:ayuliao

剧照 | 《鬼灭之刃》

前言

上一篇文章中,简单的捋了一遍使用 BackgroundScheduler 调度器做定时任务的流程,本篇接着上一篇文章,分析一下_real_add_job 方法和_main_loop 方法。

虽然 APScheduler 有多种不同的调度器以及多种不同的使用方式,但其核心都是类似的,一通百通。

对了,文中分析的 APScheduler 是当前最新版 3.6.1。

剖析_real_add_job

回顾一下上篇文章添加任务对象的大致逻辑。

实例化 BackgroundScheduler --> 调用 add_job 方法 --> 最终调用_real_add_job

_real_add_job 源码如下,其代码的大致逻辑以给出相应的注释。

# apscheduler/schedulers/base.py/BaseScheduler

    def _real_add_job(self, job, jobstore_alias, replace_existing):
        """
        将任务对象添加到指定的存储后端中(默认就是内存中-->dict)
        """


        # 使用默认值填写未定义的值
        replacements = {}
        for key, value in self._job_defaults.items():
            if not hasattr(job, key):
                replacements[key] = value

        # 如果未定义下次运行时间,则计算下次运行时间
        if not hasattr(job, 'next_run_time'):
            now = datetime.now(self.timezone)
            replacements['next_run_time'] = job.trigger.get_next_fire_time(None, now)

        # 应用任何替换
        job._modify(**replacements)

        # 将作业添加到给定的作业库
        store = self._lookup_jobstore(jobstore_alias)
        try:
            store.add_job(job)
        except ConflictingIdError:
            if replace_existing:
                store.update_job(job)
            else:
                raise

        # 将任务对象标记为非待定
        job._jobstore_alias = jobstore_alias

        # 通知监听器已添加新任务
        event = JobEvent(EVENT_JOB_ADDED, job.id, jobstore_alias)
        self._dispatch_event(event)

        self._logger.info('Added job "%s" to job store "%s"', job.name, jobstore_alias)

        # 通知调度程序有关新工作的信息
        if self.state == STATE_RUNNING:
            self.wakeup()

代码中已有大致的注释,这里再简单分析一下。

一开始定义 replacements 字典,然后循环处理_job_defaults 字典,判断该字典中的 key 是否是 job 任务对象的属性,如果是,则添加到 replacements 字典中。

接着同样的方式,判断 job 任务对象是否存在 next_run_time 属性,如果不存在,则需要调用当前任务对象中触发器 (trigger) 的 get_next_fire_time 方法计算出当前任务对象下一次要运行的时间。

随后调用 job 任务对象_modify 方法,该方法的作用就是修改 job 任务对象的属性。

怎么实现属性的修改?没想象的那样使用了什么高深的技巧,看该方法的源码,就定义了 approved 字典,然后将 replacements 字典中的值获取并判断是否替换,替换就放到 approved 字典中,最后遍历 approved 字典,利用 setattr 方法将该字典中的值设置为 job 对象的属性,实现 job 对象属性的修改。

接着调用_lookup_jobstore 方法找到用于存储当前任务对象的 job stores (任务存储器),jobstore_alias 是_real_add_job 方法的参数,该方法是 add_job 方法调用的,往回看,可知 jobstore_alias 默认为 default 字符串,则选择内存作为 job stores,然后调用 job sotres 的 add_job 方法将任务对象加入其中。

为啥说 job stores 为 default 就是使用内存作为 job store 呢?

看到 BaseScheduler 类的 start 方法,该方法部分逻辑如下。

# apscheduler/scheduleers/base.py/BaseScheduler

 with self._jobstores_lock:
            # Create a default job store if nothing else is configured
            if 'default' not in self._jobstores:
                self.add_jobstore(self._create_default_jobstore(), 'default')

其调用了_create_default_jobstore 方法创建默认 jobstore,该方法代码如下。

# apscheduler/scheduleers/base.py/BaseScheduler

def _create_default_jobstore(self):
        """Creates a default job store, specific to the particular scheduler type."""
        return MemoryJobStore() # 创建内存作为jobstore

看 MemoryJobStore 类的__init__方法,就可以知道它使用 dict 来作为最终的存储对象。

# apscheduler/jobstores/memory.py

class MemoryJobStore(BaseJobStore):
    """
    Stores jobs in an array in RAM. Provides no persistence support.

    Plugin alias: ``memory``
    """


    def __init__(self):
        super().__init__()
        # list of (job, timestamp), sorted by next_run_time and job id (ascending)
        self._jobs = []
        self._jobs_index = {}  # id -> (job, timestamp) lookup table

回到_real_add_job 方法,通过_lookup_jobstore 方法寻找 job store 并通过 add_job 方法将其添加到相应的 job store 后,修改 job 任务对象_jobstore_alias,将任务对象标记为非待定状态。

随后实例化 JobEvent 类,然后调用_dispatch_event 方法实现事件消息通知,这里的细节暂时不看,简单而言就是当调度器因某个任务被触发了,而自身自定义了一些相关的监听器,这些监听器此时就会被调用。

做完这些操作后,就会唤醒调度器线程,通过 wakeup 方法,该方法在 BaseScheduler 中是被 abstractmethod 装饰器装饰的空方法,因为 abstractmethod,所以其子类必须重写 wakeup 方法。该方法的代码在 BlockingScheduler 中,作用就是调用 set () 方法唤醒线程。

# apscheduler/schedulers/blocking.py/BlockingScheduler

def wakeup(self):
   # set()方法会将事件标志状态设置为true。
   self._event.set() # 唤醒,避免过长的休眠

剖析 get_next_fire_time

回顾一下_real_add_job,其中一段代码如下。

if not hasattr(job, 'next_run_time'):
            now = datetime.now(self.timezone)
            replacements['next_run_time'] = job.trigger.get_next_fire_time(None, now)

get_next_fire_time 方法挺关键的,上述代码中传入了 None 和当前地区的时间戳,该方法计算方式如下。

# apscheduler/triggers/interval.py/IntervalTrigger

    def get_next_fire_time(self, previous_fire_time, now):
        if previous_fire_time:
            # 上次运行时间加上间隔时间获得下一次要运行的时间
            next_fire_time = previous_fire_time + self.interval
        elif self.start_date > now:
            next_fire_time = self.start_date
        else:
            timediff_seconds = timedelta_seconds(now - self.start_date)
            next_interval_num = int(ceil(timediff_seconds / self.interval_length))
            next_fire_time = self.start_date + self.interval * next_interval_num

        if self.jitter is not None:
            next_fire_time = self._apply_jitter(next_fire_time, self.jitter, now)

        if not self.end_date or next_fire_time <= self.end_date:
            return self.timezone.normalize(next_fire_time)

get_next_fire_time 方法主要就是计算任务对象下次要运行的时间。

不同的触发器,其 get_next_fire_time 方法不同。

剖析_main_loop

在剖析_main_loop 前, 先回顾一下 BackgroundScheduler 类的继承。

BackgroundScheduler --> BlockingScheduler --> BaseScheduler

BaseScheduler 是元类。

BlockingScheduler 适合于只在进程中运行单个任务的情况,通常在调度器是你唯一要运行的东西时使用。

BackgroundScheduler 基于 BlockingScheduler,相比于 BlockingScheduler,其实就创建一下线程来实现后台处理的效果,其余整体与 BlockingScheduler 没有什么区别。

在 BackgroundScheduler 的 start 方法中,创建另一个线程来启动_main_loop 方法,该方法的逻辑在 BlockingScheduler 类中,BlockingScheduler 类完整代码如下。

class BlockingScheduler(BaseScheduler):
    """
    A scheduler that runs in the foreground
    (:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block).
    """

    _event = None

    def start(self, *args, **kwargs):
        self._event = Event()
        super().start(*args, **kwargs)
        self._main_loop()

    def shutdown(self, wait=True):
        super().shutdown(wait)
        self._event.set()

    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            # 等待事件通知,wait_seconds为等待事件通知的超时时间
            # wait()方法会阻塞线程,直到事件标志状态为true。
            self._event.wait(wait_seconds)
            # clear()方法将事件标志状态设置为false
            self._event.clear()
            wait_seconds = self._process_jobs()

    def wakeup(self):
        # set()方法会将事件标志状态设置为true。唤醒线程
        self._event.set()

先只需关注其_main_loop 方法,该方法一开始定义了 wait_seconds 等待时间,定位 threaing.TIMEOUT_MAX,然后判断调度器当前的状态,如果不是停止状态,则通过线程事件的 wait 方法阻塞线程,要解开阻塞,要么_real_add_job 方法调用了 wakeup 方法将线程唤醒,如果调度器线程是被唤醒的,说明有任务对象需要它处理,另种解开阻塞的方式就是等待其超时。

当线程被唤醒后,立刻又会调用 clear 方法,将线程事件状态设置为 false,当再次循环到 wait 时会再次被阻塞。

_main_loop 方法中最关键的其实是_process_jobs 方法,该方法会遍历所有的 job store,调用执行器执行到时间的 job 并计算出每个 job store 中的 job 下次的执行时间,将其中最短的时间返回,调度器会阻塞等待相应的时间。

_process_jobs 方法源码如下。

# apscheduler/schedulers/base.py/BaseScheduler

       def _process_jobs(self):
        if self.state == STATE_PAUSED:
            self._logger.debug('Scheduler is paused -- not processing jobs')
            return None

        self._logger.debug('Looking for jobs to run')
        now = datetime.now(self.timezone) # 当前时间
        next_wakeup_time = None
        events = []

        with self._jobstores_lock:
            # 从_jobstores中获取当前要处理的任务
            for jobstore_alias, jobstore in self._jobstores.items():
                try:
                    # 以当前时间为基准,判断是否到了执行时间
                    due_jobs = jobstore.get_due_jobs(now)
                except Exception as e:
                    self._logger.warning('Error getting due jobs from job store %r: %s',
                                         jobstore_alias, e)
                    # 唤醒时间
                    retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
                    if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
                        next_wakeup_time = retry_wakeup_time

                    continue

                for job in due_jobs:
                    # 搜索当前任务对象的执行器
                    try:
                        executor = self._lookup_executor(job.executor)
                    except BaseException:
                        self._logger.error(
                            'Executor lookup ("%s") failed for job "%s" -- removing it from the '
                            'job store', job.executor, job)
                        self.remove_job(job.id, jobstore_alias)
                        continue
                    # 获得运行时间
                    run_times = job._get_run_times(now)
                    run_times = run_times[-1:] if run_times and job.coalesce else run_times
                    if run_times:
                        try:
                            # 提交这个任务给执行器
                            executor.submit_job(job, run_times)
                        except MaxInstancesReachedError:
                            self._logger.warning(
                                'Execution of job "%s" skipped: maximum number of running '
                                'instances reached (%d)', job, job.max_instances)
                            event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
                                                       jobstore_alias, run_times)
                            events.append(event)
                        except BaseException:
                            self._logger.exception('Error submitting job "%s" to executor "%s"',
                                                   job, job.executor)
                        else:
                            event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
                                                       run_times)
                            events.append(event)

                        # 计算任务对象下一次执行时间
                        job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)
                        if job_next_run:
                            # 修改job数学
                            job._modify(next_run_time=job_next_run)
                            # 将修改后的job更新到jobstore中
                            jobstore.update_job(job)
                        else:
                            # 没有下次运行时间了,则说明这个任务对象结束了,将其移除jobstore
                            self.remove_job(job.id, jobstore_alias)

                # 设置新的下次唤醒时间
                jobstore_next_run_time = jobstore.get_next_run_time()
                if jobstore_next_run_time and (next_wakeup_time is None or
                                               jobstore_next_run_time < next_wakeup_time):
                    next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)

        # Dispatch collected events
        for event in events:
            self._dispatch_event(event)

        # Determine the delay until this method should be called again
        if self.state == STATE_PAUSED:
            wait_seconds = None
            self._logger.debug('Scheduler is paused; waiting until resume() is called')
        elif next_wakeup_time is None:
            wait_seconds = None
            self._logger.debug('No jobs; waiting until a job is added')
        else:
            wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
            self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
                               wait_seconds)

        return wait_seconds

_process_jobs 方法似乎比较长,但逻辑其实很直观。

先计算出当前时间 now,然后获取_jobstores_lock 可重入锁,然后遍历所有的 jobstores,调用 get_due_jobs 方法获取当前时间要执行的任务,memory 内存任务存储器其 get_due_jobs 方法代码如下。

# apscheduler/jobstores/memory.py/MemoryJobStore

    def __init__(self):
        super().__init__()
        # list of (job, timestamp), sorted by next_run_time and job id (ascending)
        self._jobs = []
        self._jobs_index = {}  # id -> (job, timestamp) lookup table

    def get_due_jobs(self, now):
        now_timestamp = datetime_to_utc_timestamp(now)
        pending = []
        for job, timestamp in self._jobs:
            if timestamp is None or timestamp > now_timestamp:
                break
            pending.append(job)

        return pending

MemoryJobStore.get_due_jobs 方法的逻辑就是遍历当前所有任务对象的下次执行时间,如果执行时间小于或等于当前时间,则加入 pending 列表中返回。

获得到当然要执行的任务对象列表后,遍历这个列表,然后调用_lookup_executor 方法从中取出当前任务对象的执行器,如果没有执行器,则通过 remove_job 方法将该任务从相应的 job store 移除。

随后调用_get_run_times 方法获取运行时间,该方法代码如下。

# apscheduler/job.py/Job

    def _get_run_times(self, now):
        run_times = []
        next_run_time = self.next_run_time
        while next_run_time and next_run_time <= now:
            run_times.append(next_run_time)
            next_run_time = self.trigger.get_next_fire_time(next_run_time, now)

        return run_times

_get_run_times 方法运行逻辑就是获取 self.next_run_time,然后判断是否小于等于当前时间,如果是,则加入 run_times 列表,并调用 get_next_fire_time 方法计算出下一次要运行的时间。

回到_process_jobs 方法,获得 run_times 后,调用执行器的 submit_job 方法将当前任务对象、运行时间作为参数提交给执行器,执行器会去执行任务对象具体的逻辑。

随后调用 get_next_fire_time 方法计算下次任务的执行时间,计算出的结果通过_modify 方法修改到 job 任务对象上,此外还通过 update_job 方法将任务更新到相应的 job store 中。

最后调用 job store 的 get_next_run_time 方法,计算 job store 下次要执行的时间,这里依旧看一下内存 job store 中该方法的逻辑。

#apscheduler/jobstores/memory.py/MemoryJobStore

   def get_next_run_time(self):
        return self._jobs[0][0].next_run_time if self._jobs else None

结尾

本文展示了比较多的代码细节,当然还有很多细节没有谈及,你看到这里说明挺感兴趣的,这里简单总结一下前面的内容,理一理完整的逻辑。

添加 job 到相应 job store 的逻辑链条为。

BaseScheduler.add_job 方法添加任务对象 --> BaseScheduler._real_add_job 方法才是具体的添加任务对象堕落街 --> BaseScheduler._lookup_jobstore 方法查到当前任务对象对应的 job store --> store.add_job (job) 方法将任务对象添加到 job store 中 --> BlockingScheduler.wakeup 方法唤醒调度器线程。

_main_loop 方法主循环的逻辑链条为。

BlockingScheduler._main_loop 方法为主循环最上层方法 --> _process_jobs 方法进行具体的调度执行器执行任务以及计算任务对象等下次执行事情的逻辑。

APScheduler 源码还没有读完,下篇来看看执行器是怎么执行任务对象的。

如果文章对你有所帮助,点击「在看」支持二两(文章作者),下篇文章见。

优质文章,推荐阅读:

2019 年 10 大顶级 Python 支持库

Python 命令行之旅:深入 click 之选项篇

辟谣错误的爬虫说法,使用正确的爬虫姿势

Python 之父撰文回忆:为什么要创造 pgen 解析器?

感谢创作者的好文

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存