APScheduler原理分析
「这是我参与11月更文挑战的第7天,活动详情查看:2021最后一次更文挑战」
前记
最近由于账单提交和脚本过多不好控制的原因,一直在寻找解决方案,发现APScheduler比较轻量级以及适合我的账单提交,和脚本运行控制(如果脚本运行需要依赖的话就可以上AirFlow).为了弄清楚原理,以及更好的使用APScheduler,所以阅读了APScheduler代码.其实是APScheduler代码量比较少的分析起来才容易(逃)
不过apscheduler有一个致命的缺点, 除,由于apscheduler的实现比较简单, 在初始化时, 能达到分布式work的效果外, 在运行时增加任务时, 并不会同步到每个work.
注: 为了节省篇幅,下面分析代码时大多数是只贴github的源码链接,并加以说明,源码是APscheduler第三版
最新修订见原文, 关注公众号<博海拾贝diary>可以及时收到新推文通知
1.主体逻辑
1.1代码结构
首先看看APScheduler的代码结构,除了job,event,util这几个简单的封装外,APScheduler中的组件都各自一个文件夹
├── executors 执行器,用于执行任务
├── jobstores 储存器,用于存放任务
├── schedulers 调度器,用于调度任务实例,由执行器,存储器,触发器三个组件构成
├── triggers 触发器,用于设定触发任务的条件
├── __init__.py
├── events.py 事件,调度器触发时的事件封装
├── job.py job,对添加的任务进行封装,方便调度器调用
└── util.py 工具包,apscheduler一些常用函数封装
1.2简单的例子
看完了代码结构,会觉得APScheduler代码并不复杂,但是APScheduler大量的用到了Python动态语言的特性,一个一个看可能比较懵,所以需要找一个切入点开始进入APScheduler的代码世界,而这个切入点就是从一个简单的例子开始.先看APScheduler的Hello World
级别的入门代码:
from datetime import datetime,timedelta
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler() # 1
def so1n_job(text): # 2
print(text)
scheduler.add_job(so1n_job, 'date', run_date=datetime.now() + timeelta(hours=1), args=['test']) # 3
scheduler.start() # 4
这个简单的代码如要做如下步骤:
- 1.实例化一个scheduler,这里使用的是BlockingScheduler,它在运行时会阻塞代码
- 2.为了演示而创建的简单job函数,只执行print功能
- 3.通过scheduler的add_job方式添加job, 同时定义了date触发器和触发时间以及运行job时的参数, 这里定义的是一小时后执行任务.
- 4.开始运行scheduler,检查和执行调度.
1.3 初始化scheduler
在实例化scheduler时,会先把其他三个组件加载到自己的父属性[源码]:(github.com/agronholm/a…)
_trigger_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers'))
_trigger_classes = {}
_executor_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.executors'))
_executor_classes = {}
_jobstore_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.jobstores'))
_jobstore_classes = {}
在实例化后,__init__
会创建一些锁相关的属性以及调用configure方法初始化一些数据(在scheduler还没start前,我们也可以直接调用configure方法修改数据)
-
加载配置,把所有数据加载到一个叫config的dict里面
-
通过调用_configure初始化常用配置, 对于一些特定的scheduler, 还会初始化一些属性, 如
background
会初始化deamon,asyncio
会初始化loop等:- logger,APScheduler运行时打日志的logger,默认为apscheduler.scheduler
- timezone, 设置时区, 默认为本地时区. 对于调度系统来说时区是一个非常关键的参数, 特别是对于有冬夏令时的国家, 如果有做多国家业务的, 必须要用时区.
- jobstore_retry_interval, 重试时间, 如果get_due_jobs()调用引发异常,则至少在设置n秒内进行一次新的唤醒
-
同时也会创建创建job的默认配置:
- misfire_grace_time,在指定的运行时之后几秒钟,仍允许运行该作业. 如果有个业务是指定一分钟后运行, 但apscheduler需要在两分钟后才有空闲运行该业务, 那么可以把
misfire_grace_time
的值设置为120+. - coalesce,为True时,即使调度程序确定该job可以运行多次,也只运行一次
- max_instances, apscheduler同时最大运行实例数.
- misfire_grace_time,在指定的运行时之后几秒钟,仍允许运行该作业. 如果有个业务是指定一分钟后运行, 但apscheduler需要在两分钟后才有空闲运行该业务, 那么可以把
-
配置执行器executors以及它的插件,并启动执行器
-
配置任务存储器jobstores以及它的插件,并启动任务存储器
1.4添加job
scheduler初始化完就可以开始添加job了,对于APScheduler来说,每个job的本体都是一个Python函数,在添加job本体的同时,顺便添加执行器,以及其他信息,如触发器,执行器,函数的参数,job的名称和id等,构成一个可以给scheduler调用的job.
不过在添加job的时候还有一个参数叫replace_existing,他不属于job的属性,当它为True时,scheduler会用相同的id替换已经存在的job,同时保留job的运行次数.还有存储器也不属于job的属性,只是让scheduler知道可以从该存储器可以获取到刚才添加的job.
例子中的job添加时,scheduler还未运行,所以会把job, jobstore, replace_existing拼成一个元祖,并存放到一个叫_pending_jobs
的等待队列中.
如果job添加时scheduler还在运行,那就会进行如下一些处理(_real_add_job函数),把job真正的添加到调度系统中:
- 1.如果此时的job没有下次运行时间,则为其创建下次运行时间
- 2.调用job的_modify方法,当配置有效时更新job的配置
- 3.把job添加到对应的store,或更新已经存在store里的job, 这里会调用store的_get_job_index通过二分法查找job需要插入对应的index中.
- 4.如果scheduler正在运行, 则唤醒scheduler,看看新添加的job是否可以被调度(注意,此次唤醒会替换原本已经安排的唤醒计划)
注意, 上面第4步只会唤醒本身的apscheduler, 如果是多个worker,那么其他的apscheduler并不会被唤醒.
1.5开始运行scheduler
添加完job后,scheduler就可以开始运行了,通过调用scheduler的start开始处理任务. 在开始运行前,scheduler会先去检查是否在使用禁用线程的uWSGI环境下运行,只有检查通过后才能继续运行.
检查完毕后,scheduler会去进行一些初始化,首先scheduler会激活所有添加到scheduler的执行器,以便待会可以使用,同样激活所有添加到scheduler的储存器,以便待会可以使用.这里会把scheduler初始化时的_pending_jobs
通过1.4添加job里面说到的_real_add_job
函数,把job真正的添加到调度系统中.
初始化完成了,scheduler可以真正的开始去检查和调度job了,这一切都发生在scheduler的_process_jobs函数里,他会遍历每个作业存储器中的job,然后执行可以被调度的job,最后检查下次运行时间,apscheduler会休眠到下次运行时间在启动,防止一直运行导致浪费计算机资源,具体操作如下:
- 1.遍历存储器,并从存储器的get_due_jobs方法找出比目前时间早的job列表,如果处理失败则会根据scheduler的
jobstore_retry_interval
生成下一次唤醒scheduler的时间. - 2.遍历并处理从步骤1拿到的job列表
- 2.1.遍历步骤1的job列表,提取job的执行器,如果提取失败则从存储器中删除掉job.
- 2.2.从job的_get_run_times方法获取介于现在时间到job的下一次运行时间中触发器可以触发的时间,并放在run_times列表中.
- 2.3.调用执行器的submit_job方法,首先检查目前该job的执行实例数会不会大于或等于定义的max_instances,只有没超过定义的max_instances时,才会继续执行执行器的
_do_submit_job
方法,执行job. - 2.4.运行完毕job后,[计算job是否还有next_run_time,如果有更新job以及对应的存储器,如果没有则把job从存储器中移除
- 3.执行完job后,算出存储器中最早的下次运行时间,并与next_wakeup_time比对,如果早于next_wakeup_time则把next_wakeup_time设置为存储器中最早的下次运行时间(992-997)
- 4.处理队列中的event,并算出距离下次运行时间与现在时间的时间差wait_seconds,并让scheduler睡眠wait_seconds,防止cpu空转(999-1004).
- 5.等待了wait_seconds后scheduler从步骤1继续开始执行操作.
则此,根据例子的主体逻辑代码分析已经分析完毕了,但是还有一些scheduler的方法,event,APScheduler的util以及APScheduler支持的gevent,async等库代码还没有分析.
2.源码分析
上一部分主要说的是APScheduler中的主要逻辑,简单的了解到APScheduler是如何运行的,以及运行时要做哪些操作,在说到主要函数时只说了是哪个模块下的哪个函数以及这个函数做了什么,对于一些细节并没有披露出来.而这一节不再跟着APScheduler的运行顺序进行分析,而是根据APScheduler的代码结构逐一分析里面的代码,从代码中了解APScheduler的原理,以及从APScheduler中吸收一些比较棒的idea.
2.1 executors
executors是apscheduler中的执行器,apscheduler为python中各种类型封装了executors,但核心的方法就只有几个,比较简单.
-
从scheduler获取资源和部分数据初始化,其余由其他封装实现
-
由其他封装实现,停用执行器并删除部分资源,如果任务并未完成,则会取消或清空任务
-
用于运行前的初始化和检查,主要用于检查当前job有多少实例正在运行,如果超出限制则抛出异常,未超出限制则执行_do_submit_job
-
由其他封装实现,负责运行job,并检查运行结果,成功则调用_run_job_success, 失败则调用_run_job_error
-
_run_job_success 运行成功的后续操作
-
_run_job_error 运行失败的后续操作
除此之外executors文件中还有一个叫run_job的函数,它才是正真用于执行job的函数,它除了调用job的func和处理异常外,还对任务是否错过运行窗口进行检查,比如任务应该在9.00-9.10间运行,然而直到9.15程序没有运行,那么apscheduler会抛出对应的错误event和job.
def run_job(job, jobstore_alias, run_times, logger_name):
"""
Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
scheduler.
"""
events = []
logger = logging.getLogger(logger_name)
for run_time in run_times:
# 如果设置了,misfire_grace_time,且时间差在misfire_grace_time外,则超出了任务执行时间的时间窗口,放弃运行
if job.misfire_grace_time is not None:
difference = datetime.now(utc) - run_time
grace_time = timedelta(seconds=job.misfire_grace_time)
if difference > grace_time:
events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,
run_time))
logger.warning('Run time of job "%s" was missed by %s', job, difference)
continue
logger.info('Running job "%s" (scheduled at %s)', job, run_time)
try:
# 执行job
retval = job.func(*job.args, **job.kwargs)
except BaseException:
exc, tb = sys.exc_info()[1:]
formatted_tb = ''.join(format_tb(tb))
events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
exception=exc, traceback=formatted_tb))
logger.exception('Job "%s" raised an exception', job)
# 回收对象
if six.PY2:
sys.exc_clear()
del tb
else:
import traceback
traceback.clear_frames(tb)
del tb
else:
events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
retval=retval))
logger.info('Job "%s" executed successfully', job)
return events
2.2 JobStore
JobStore是apscheduler中的存储器,apscheduler为各种存储器做了封装,核心的JobStore比较简单,各个封装的功能都一样,只是具体逻辑跟对应的客户端相关.这里先以MemoryJobStore
和BaseJobStore
对JobStore的所有功能函数进行分析.
2.2.1 MemoryJobStore和BaseJobStore
- start 当apscheduler开始执行start或者job被添加到jobstores时,开始执行start,初始化scheduler以及jobstores的别名.
- shutdown 关闭对应stores的客户端链接或者清理内存回收空间.
- _fix_paused_jobs_sorting 返回没有
next_run_time
属性的任务(或者说暂停的任务) - lookup_job 获取指定任务id的任务
- get_due_jobs 返回早于
next_run_time
或等于now
的任务列表,返回的任务必须按next_run_time
(升序)进行排序 - get_next_run_time 从存储器的所有job中获取最早运行的一个,由于在
MemoryJobStore
中对保存job的_jobs队列进行了排序维护,不管添加和删除都确保他是有序的,所以MemoryJobStore
的get_next_run_time
只要从_jobs[0]获取的job就是即将最早运行的job - get_all_jobs 从存储器中获取所有任务
- add_job 向存储器添加任务
- update_job 更新已经存储在存储器中的任务
- remove_job 从存储器中删除指定的任务(根据job id)
- remove_all_jobs 从存储器中删除所有任务
- _get_job_index(
MemoryJobStore
特有方法) 通过二分法查找快速查找job的索引,或者如果找不到索引,则根据给定的时间戳记将job插入的索引。
2.2.2 其他JobStore
其他JobStore提供的功能也是与MemoryJobStore一样,但是由于各个存储容器/数据库不同,实现的逻辑都是不同,但原理还是一样的.Apschedulers除了MemoryJobStore外,通过把Job序列化存到JobStore中,使得job可以与add_job的进程分离,达到分布式调用的效果,但是由于每个Apschedulers并不会互相通信,所以可能存在多个Apschedulers获得到相同的Job,所以我们需要添加一个锁来解决该问题.
2.3 schedulers
schedulers是Apschedulers的调度器,负责Apschedulers的核心功能,所以在上面的主体逻辑
中基本都说了,这里只说一些上面没提到的功能函数.
-
状态
Apschedulers中提供以下三种状态,通过状态机切换状态使Apschedulers可以正确的启动停止,以及在
_process_jobs
中通过判断当前状态是否为STATE_PAUSED
来实现暂停的功能STATE_STOPPED = 0 # 停止 STATE_RUNNING = 1 # 运行 STATE_PAUSED = 2 #暂停
-
_process_jobs
process_jobs函数为了处理job,函数比较长, 这里看实际代码def _process_jobs(self): # 如果是暂停状态,则暂停运行 if self.state == STATE_PAUSED: self._logger.debug('Scheduler is paused -- not processing jobs') return None self._logger.debug('Looking for jobs to run') now = datetime.now(self.timezone) next_wakeup_time = None events = [] with self._jobstores_lock: # 获取存储job的jobstore for jobstore_alias, jobstore in six.iteritems(self._jobstores): try: # 从jobstore中获取满足条件的job list due_jobs = jobstore.get_due_jobs(now) except Exception as e: # 计算该jobstore至少需要n秒内唤醒一次 self._logger.warning('Error getting due jobs from job store %r: %s', jobstore_alias, e) retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval) if not next_wakeup_time or next_wakeup_time > retry_wakeup_time: next_wakeup_time = retry_wakeup_time continue for job in due_jobs: try: # 获取job的存储器 executor = self._lookup_executor(job.executor) except BaseException: self._logger.error( 'Executor lookup ("%s") failed for job "%s" -- removing it from the ' 'job store', job.executor, job) # 从存储器中移除掉job self.remove_job(job.id, jobstore_alias) continue # 获取job的运行时间 run_times = job._get_run_times(now) run_times = run_times[-1:] if run_times and job.coalesce else run_times if run_times: try: # 运行job executor.submit_job(job, run_times) except MaxInstancesReachedError: self._logger.warning( 'Execution of job "%s" skipped: maximum number of running ' 'instances reached (%d)', job, job.max_instances) # 提交job运行实例过大的event event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id, jobstore_alias, run_times) events.append(event) except BaseException: self._logger.exception('Error submitting job "%s" to executor "%s"', job, job.executor) else: # 提交job运行成功的event event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias, run_times) events.append(event) # 如果有下一个执行时间,则更新job,否则将job从jobstore中删除。 job_next_run = job.trigger.get_next_fire_time(run_times[-1], now) if job_next_run: job._modify(next_run_time=job_next_run) jobstore.update_job(job) else: self.remove_job(job.id, jobstore_alias) # 计算jobstore下次唤醒时间 jobstore_next_run_time = jobstore.get_next_run_time() if jobstore_next_run_time and (next_wakeup_time is None or jobstore_next_run_time < next_wakeup_time): next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone) # 触发所有event for event in events: self._dispatch_event(event) # 计算下次运行时间 if self.state == STATE_PAUSED: wait_seconds = None self._logger.debug('Scheduler is paused; waiting until resume() is called') elif next_wakeup_time is None: wait_seconds = None self._logger.debug('No jobs; waiting until a job is added') else: wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX) self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds) return wait_seconds
2.4 triggers
首先BaseTrigger
提供一个触发器的基本方法,用于给get_next_fire_time
添加抖动时间,防止大量任务在同一时间运行.
def _apply_jitter(self, next_fire_time, jitter, now):
if next_fire_time is None or not jitter:
return next_fire_time
# 主要的代码,通过随机选择+-jitter值与next_fire_time进行和运算
next_fire_time_with_jitter = next_fire_time + timedelta(
seconds=random.uniform(-jitter, jitter))
if next_fire_time_with_jitter < now:
# 如果新的时间值小于当前时间,则返回旧时间
return next_fire_time
return next_fire_time_with_jitter
而其他细节比较简单,各种触发器都是按照设置的时间进行运行,如果能算出下次运行时间,则在运行后按照下次运行时间继续运行,算不出下次运行时间该job将停止运行
2.5 job.py
job比较简单,主要是提供一些给scheduler调用的方法.
job的方法分为两大类,一类是类似于代理,通过调用scheduler的方法来修改自己本身,如modify
,reschedule
,pause
,resume
,remove
.
这类方法比较简单,而且主要逻辑在于scheduler,不在于job,另外的就是主要逻辑在job的方法.如__getstate__
以及__setstate__
的序列化相关方法,同时还有一个_modify
用来接受更新job的方法,虽然该方法很长,但主要逻辑也是各种判断再更新指
3.总结
表面上看APScheduler的代码会比较复杂,但经过拆解后,Apscheduler的代码除了scheduler的代码是非常简单的,主要是针对各种不同的运行环境而封装的代码比较多,导致在分析代码时,觉得这些代码经不起分析,但是APScheduler的核心设计还是很不错的.
转载自:https://juejin.cn/post/7036044818467782692