Celery 源码分析(三): Worker启动
Worker启动
讲完了celery的基础架构之后,我们明白,别管其他有的没的组件,Worker那是妥妥的C位, 而我们对于Celery的使用也正是从Worker开始的。Worker启动有两种不同的方法,一种是之前Django-celery模块封装的 python manager celery worker
的启动方式,不过呢,这种随着celery4的到来已经退出了历史的舞台,现在对于celery4以上的版本,通常来说都是使用celery -A proj worker -l INFO
这种方式直接启动的。
入口:
Celery的文档并没有直接给出来Clelery究竟是从哪个文件进行启动的。如果是在找不着的话,那就通过日志的蛛丝马迹去找线索,我们发现,每次我们启动worker的时候,控制台最先打印的都是这一大坨东西:
- ** ---------- .> app: proj:0x7f9f8254c9b0
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 2 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
这说明什么,这说明即使这一段不是最最一开始执行的,那基本上也大概率是最早一批被执行的代码直接,直接搜索,最终定位到了celery.apps.worker.Worker.on_start
方法。
按照我们的惯例,肯定还是继续找这个类有没有调用on_start
方法的。发现Worker类并没有,于是顺藤摸瓜找到Worker类的父类,啥也没找着,只找到个start
方法,思路再一次卡住了,卡住怎么办呢?
当然是去谷歌搜了,不然自己找那得找到猴年马月。
经过一番简单的搜索,我们非常轻松的就找到了celery worker的启动入口,就是这里:celery.bin.worker
,发现有个run
方法,二话不说,直接去看:
def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
loglevel=None, logfile=None, pidfile=None, statedb=None,
**kwargs):
# 代码省略一部分
worker = self.app.Worker(
hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
logfile=logfile, # node format handled by celery.app.log.setup
pidfile=self.node_format(pidfile, hostname),
statedb=self.node_format(statedb, hostname),
**kwargs)
worker.start()
return worker.exitcode
到这里,我们仍然无法去十分肯定,这里的self.app.Worker
就一定是我们上面搜到的那个Worker
这个时候怎么办呢,直接莽进去,一看果然是:
@cached_property
def Worker(self):
"""Worker application.
See Also:
:class:`~@Worker`.
"""
return self.subclass_with_self('celery.apps.worker:Worker')
接着我们顺藤摸瓜,最后发现, 这样一条调用路径:
# WorkController
def start(self):
try:
self.blueprint.start(self)
|
v
# WorkController
self.blueprint = self.Blueprint(
steps=self.app.steps['worker'],
on_start=self.on_start,
on_close=self.on_close,
on_stopped=self.on_stopped,
)
|
v
# Blueprint
def start(self, parent):
self.state = RUN
if self.on_start:
self.on_start()
# .....
最终调用了Worker的on_start
方法,日志就打出来了。
因为我本地启动是blueapps封装了celery4的python manager 启动方式,但不管是哪种启动方式,最终都落到了self.app.Worker
这个位置,所以,等等,app是什么时候初始化的?查看开发框架的源码,发现是从celery的_get_current_object方法拿的。
from blueapps.contrib.bk_commands.management.app import app
base = celery.CeleryCommand(app=app)
###
from celery import current_app
#: The Django-Celery app instance.
app = current_app._get_current_object()
如果常规的启动方法,入口应该是celery.app.base.Celery.start
:
def start(self, argv=None):
"""Run :program:`celery` using `argv`.
Uses :data:`sys.argv` if `argv` is not specified.
"""
return instantiate(
'celery.bin.celery:CeleryCommand', app=self
).execute_from_commandline(argv)
详细的细节就不概述了,感兴趣的朋友可以自己调试找一下,但是我们也发现了一个非常有趣的现象,app这个成员变量往往是第一个就被初始化的,那么Celery这个类究竟有什么奇特之处,让它第一个进行实例化?
Celery 类
简单的整理了下,Celery的这个成员变量大概有这些:
-
Worker
, 对应celery.apps.worker:Worker
-
WorkController
对应celery.worker:WorkController
-
Beat
对应celery.apps.beat:Beat
-
Task
对应celery.app.task:Task
也可以由用户指定自己定义的Task类。 -
AsyncResult
对应celery.result:AsyncResult
主要用来封装异步执行的返回结果 -
ResultSet
对应celery.result:ResultSet
-
GroupResult
对应celery.result:GroupResult
-
pool
:kombu.connection.ConnectionPool
应该是个ambq连接池 -
tasks
: 所有的task集合。是个字典,key是name,value是对应的task对象。
方法的话,主要重点关注这几个,关于这些方法的作用,会在后面的部分详细说明。
task
:我们自定义的task实际上是在这里注册的。send_task
:任务实际上是在这里被包装发送到broker的。_connection
:链接实际上是在这里完成的。
Worker的初始化:
现在让我们继续回到Worker的初始化里面去。前面实际上是生成了一个Worker对象,然后呢我们发现Worker对象并没有重写父类的__init__
方法,也就是说实际上大多数初始化的步骤还是父类再控制的,点进去看看WorkController
这个类具体都干了啥:
def __init__(self, app=None, hostname=None, **kwargs):
self.app = app or self.app # 设置app
self.hostname = default_nodename(hostname) #生成该worker的hostname
self.startup_time = datetime.utcnow() # 设置启动的时间,这里用的是UTC时区
self.app.loader.init_worker() # init worker相关的一些操作,引入task模块什么的
self.on_before_init(**kwargs) # 调用钩子
self.setup_defaults(**kwargs) # 初始化默认的配置
self.on_after_init(**kwargs) # 初始化执行之后的操作
self.setup_instance(**self.prepare_args(**kwargs)) #建立相关的实例,blueprint,use_eventloop什么的
比较重要的方法有setup_defaults
和setup_instance
这两个:
setup_defaults
加载了默认配置,这里的意思是,如果你没有指定那就用默认的。如果用户自定义了,那就用用户自己的。
def setup_defaults(self, concurrency=None, loglevel='WARN', logfile=None,
task_events=None, pool=None, consumer_cls=None,
timer_cls=None, timer_precision=None,
autoscaler_cls=None,
pool_putlocks=None,
pool_restarts=None,
optimization=None, O=None, # O maps to -O=fair
statedb=None,
time_limit=None,
soft_time_limit=None,
scheduler=None,
pool_cls=None, # XXX use pool
state_db=None, # XXX use statedb
task_time_limit=None, # XXX use time_limit
task_soft_time_limit=None, # XXX use soft_time_limit
scheduler_cls=None, # XXX use scheduler
schedule_filename=None,
max_tasks_per_child=None,
prefetch_multiplier=None, disable_rate_limits=None,
worker_lost_wait=None,
max_memory_per_child=None, **_kw):
either = self.app.either
self.loglevel = loglevel
self.logfile = logfile
self.concurrency = either('worker_concurrency', concurrency)
self.task_events = either('worker_send_task_events', task_events)
self.pool_cls = either('worker_pool', pool, pool_cls)
self.consumer_cls = either('worker_consumer', consumer_cls)
self.timer_cls = either('worker_timer', timer_cls)
self.timer_precision = either(
'worker_timer_precision', timer_precision,
)
self.optimization = optimization or O
self.autoscaler_cls = either('worker_autoscaler', autoscaler_cls)
self.pool_putlocks = either('worker_pool_putlocks', pool_putlocks)
self.pool_restarts = either('worker_pool_restarts', pool_restarts)
self.statedb = either('worker_state_db', statedb, state_db)
self.schedule_filename = either(
'beat_schedule_filename', schedule_filename,
)
self.scheduler = either('beat_scheduler', scheduler, scheduler_cls)
self.time_limit = either(
'task_time_limit', time_limit, task_time_limit)
self.soft_time_limit = either(
'task_soft_time_limit', soft_time_limit, task_soft_time_limit,
)
self.max_tasks_per_child = either(
'worker_max_tasks_per_child', max_tasks_per_child,
)
self.max_memory_per_child = either(
'worker_max_memory_per_child', max_memory_per_child,
)
self.prefetch_multiplier = int(either(
'worker_prefetch_multiplier', prefetch_multiplier,
))
self.disable_rate_limits = either(
'worker_disable_rate_limits', disable_rate_limits,
)
self.worker_lost_wait = either('worker_lost_wait', worker_lost_wait)
整理出来一份简单的表格。
配置名 | 描述 |
---|---|
worker_concurrency | worker的进程数量 |
worker_send_task_events | 发送与任务相关的事件,通过自定义这个,可以统计一些指标 |
worker_pool | worker内部的执行池 |
worker_consumer | 对应的consumer类 |
worker_timer | 对应的timmer类 |
worker_timer_precision | 设置ETA调度程序可以在重新检查计划之间睡眠时的最长时间。将此值设置为1秒表示调度器精度为1秒钟。如果您需要接近毫秒精度,则可以将其设置为0.1。默认是1s |
worker_state_db | 用于持久化工作状态的db文件的路径。 |
beat_schedule_filename | PersistentsCheduler使用的文件的名称来存储周期性任务的上次运行时间。可以是一个相对或绝对路径, |
beat_scheduler | beat 调度器 |
task_time_limit | hard模式,如果配置了10s, 10s内 task 没有执行结束,则处理这个task会被杀掉,并继续执行其他新的 task |
task_soft_time_limit | soft 模式,如果配置了10s, 10s内 task 没有执行结束,可以在 task 内捕获这个异常,并处理。也可以在全局处理,进行重试或扔死信队列等操作。 |
worker_max_tasks_per_child | 池工作进程在用新工作进程替换之前可以执行的最大任务数, 默认是没有限制的 |
worker_max_memory_per_child | 在新worker替换之前,worker可能消耗的最大驻留内存量, 如果单个任务导致worker超过此限制, 则任务将完成,worker将被替换 |
worker_prefetch_multiplier | 控制一个工作节点所能预取的任务倍数。这里倍数的意思是:该节点可同时执行的任务数量的N 倍(默认设置是4 倍)。即,一个并行100 个任务的节点,它默认能够预取的任务数量是400 |
worker_disable_rate_limits | 即使任务具有明确的速率限制,禁用所有速率限制。 |
worker_lost_wait | 在杀死某个worker前,等待worker返回结果的时间,这里的worker指的是worker内部的子进程,并不是整个大的worker进程。 |
是不是非常的不熟悉,实际上日常正常的使用过程中大概率是完全使用不到这些自定义的配置项的,这也侧面验证了为什么我们觉得celery的功能看起来简单,但是工程却很大的原因。因为实际上我们使用的只是celey的一小部分。
setup_instance
这个方法初始化了一些Worker可能需要的一些实例出来:
def setup_instance(self, queues=None, ready_callback=None, pidfile=None,
include=None, use_eventloop=None, exclude_queues=None,
**kwargs):
self.pidfile = pidfile
self.setup_queues(queues, exclude_queues) # 指定相关的消费与不消费队列
self.setup_includes(str_to_list(include)) # 获取所有的task任务
# Set default concurrency
if not self.concurrency:
try:
self.concurrency = cpu_count() # 设置进程数与cpu的个数相同
except NotImplementedError:
self.concurrency = 2 # 出现异常则默认两个子进程
# Options
self.loglevel = mlevel(self.loglevel) # 设置日志级别
self.ready_callback = ready_callback or self.on_consumer_ready # 就绪的回掉韩数
# this connection won't establish, only used for params
self._conninfo = self.app.connection_for_read() #
self.use_eventloop = ( #获取eventloop类型
self.should_use_eventloop() if use_eventloop is None
else use_eventloop
)
self.options = kwargs
signals.worker_init.send(sender=self) # 发送信号
# Initialize bootsteps
self.pool_cls = _concurrency.get_implementation(self.pool_cls) # 获取缓冲池类
self.steps = [] # v初始化需要执行的初始化步骤
self.on_init_blueprint() # 初始化blueprint
self.blueprint = self.Blueprint(
steps=self.app.steps['worker'],
on_start=self.on_start,
on_close=self.on_close,
on_stopped=self.on_stopped,
)
self.blueprint.apply(self, **kwargs) # 应用blueprint,
apply的作用是这里的作用是将所有的step根据依赖关系排好序之后,调用并初始化这些组件:
def apply(self, parent, **kwargs):
self._debug('Preparing bootsteps.')
order = self.order = []
steps = self.steps = self.claim_steps()
self._debug('Building graph...')
for S in self._finalize_steps(steps):
step = S(parent, **kwargs)
steps[step.name] = step
order.append(step)
self._debug('New boot order: {%s}',
', '.join(s.alias for s in self.order))
for step in order:
step.include(parent)
return self
class Blueprint(bootsteps.Blueprint):
"""Worker bootstep blueprint."""
name = 'Worker'
default_steps = {
'celery.worker.components:Hub',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
}
注意,在apply
方法中,只是生成了这些组件的对象实例,并没有真正开始启动这些组件.
什么时候才真正的开始执行这些组件实际的启动逻辑呢? 先回到我们现在的思路,我们现在Worker,诶,实例什么的初始化工作做完了,然后生成实例之后呢,紧接着做了一个什么操作? 调用worker实例的 start
方法。
worker = self.app.Worker(
hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
logfile=logfile, # node format handled by celery.app.log.setup
pidfile=self.node_format(pidfile, hostname),
statedb=self.node_format(statedb, hostname),
**kwargs)
worker.start()
点进去这个start方法,看看他都干了啥:
def start(self):
try:
self.blueprint.start(self)
except WorkerTerminate:
self.terminate()
except Exception as exc:
logger.critical('Unrecoverable error: %r', exc, exc_info=True)
self.stop(exitcode=EX_FAILURE)
except SystemExit as exc:
self.stop(exitcode=exc.code)
except KeyboardInterrupt:
self.stop(exitcode=EX_FAILURE)
发现这个时候才真正的启动之前初始化好的哪些组件,并回调了Worker
类的on_start
方法。
def start(self, parent):
self.state = RUN
if self.on_start:
self.on_start()
for i, step in enumerate(s for s in parent.steps if s is not None):
self._debug('Starting %s', step.alias)
self.started = i + 1
step.start(parent)
logger.debug('^-- substep ok')
千万千万要注意的是,这个parent不是别的,正是我们的worker实例。千万要注意,我们点进去一个简单的组件,看看他的start方法里面有什么:
完蛋了,翻车了,点击去一看啥也没有,,发现实现create实现了主要的初始化逻辑。
诶,create是什么时候被调用的呢?顺着源码看:
for step in order:
step.include(parent) # 注意这行代码
我们找一个简单的组件来看, 就拿Consumer
为例. 对应的源码地址是:celery.worker.components.Consumer
:
class Consumer(bootsteps.StartStopStep):
"""Bootstep starting the Consumer blueprint."""
last = True
def create(self, w):
if w.max_concurrency:
prefetch_count = max(w.max_concurrency, 1) * w.prefetch_multiplier
else:
prefetch_count = w.concurrency * w.prefetch_multiplier
c = w.consumer = self.instantiate(
w.consumer_cls, w.process_task,
hostname=w.hostname,
task_events=w.task_events,
init_callback=w.ready_callback,
initial_prefetch_count=prefetch_count,
pool=w.pool,
timer=w.timer,
app=w.app,
controller=w,
hub=w.hub,
worker_options=w.options,
disable_rate_limits=w.disable_rate_limits,
prefetch_multiplier=w.prefetch_multiplier,
)
return c
注意这个w
参数,实际上就是我们的worker对象是实例. 子组件创建完成之后,就把对应的实例赋值给了 worker的consumer变量,这样以来,Consumer便和Worker 完成了绑定。这样做的好处是Worker不再承担子组件的初始化的职责,Worker的代码更精简了,坏处嘛,就是worker会一直往下传,传递到自己的孙组件甚至重孙组件里面,感觉代码可读性不是很好。因为worker中的一个变量的初始化不太好找是在哪里初始化出来的。特别是Python还是个动态语言,Celery也没用一个类型注释,基本上全靠蒙。
·
转载自:https://juejin.cn/post/7352814673223254043