Celery 源码分析(四) - Consumer 组件
Celery组件- Consumer
在上一章Worker启动的时候,我们最终以Consumer作为例子去说明了Worker是如何与自己的子组件进行绑定的。
c = w.consumer = self.instantiate(
w.consumer_cls, w.process_task,
hostname=w.hostname,
task_events=w.task_events,
init_callback=w.ready_callback,
initial_prefetch_count=prefetch_count,
pool=w.pool,
timer=w.timer,
app=w.app, # 注意这里,又传递到了更深层了组件中
controller=w, # 注意这里,又传递到了更深层了组件中
hub=w.hub,
worker_options=w.options,
disable_rate_limits=w.disable_rate_limits,
prefetch_multiplier=w.prefetch_multiplier,
)
在这里我们大概知道是生成了一个consumer对象,但此刻,我们并不能知道生成了一个什么对象,通过调试我们发现w.consumer_cls 对应的类是:
celery.worker.consumer:Consumer
点进去一探究竟:
好家伙,有意思的地方来了,Consumer类也有自己的Blueprint
:
class Blueprint(bootsteps.Blueprint):
"""Consumer blueprint."""
name = 'Consumer'
default_steps = [
'celery.worker.consumer.connection:Connection',
'celery.worker.consumer.mingle:Mingle',
'celery.worker.consumer.events:Events',
'celery.worker.consumer.gossip:Gossip',
'celery.worker.consumer.heart:Heart',
'celery.worker.consumer.control:Control',
'celery.worker.consumer.tasks:Tasks',
'celery.worker.consumer.consumer:Evloop',
'celery.worker.consumer.agent:Agent',
]
真的就是经典的俄罗斯套娃了,在说明Consumer
这个类的作用之前,我们先花一些篇幅去简单解释一下这些组件的作用:
- Connection: 管理和 broker 的 Connection 连接
- Mingle: 不同Worker之间状态的同步
- Events: 管理任务事件,一般用来监控,比如flower就是利用了celery的Events机制
- Gossip: 消费来自其他Worker的事件,主要用于leader选举
- Heart: 发送心跳事件
- Control: 远程命令管理服务
- Tasks: 启动任务相关的Consumer 重点
- Agent:
cell
actor, 目前具体的作用未知。
接下来我们看下Consumer这个类有没有什么一眼就能看出来干啥的方法或者属性:
发现一个:
def create_task_handler(self, promise=promise):
strategies = self.strategies
on_unknown_message = self.on_unknown_message
on_unknown_task = self.on_unknown_task
on_invalid_task = self.on_invalid_task
callbacks = self.on_task_message
call_soon = self.call_soon
def on_task_received(message):
# payload will only be set for v1 protocol, since v2
# will defer deserializing the message body to the pool.
payload = None
try:
type_ = message.headers['task'] # protocol v2
except TypeError:
return on_unknown_message(None, message)
except KeyError:
try:
payload = message.decode()
except Exception as exc: # pylint: disable=broad-except
return self.on_decode_error(message, exc)
try:
type_, payload = payload['task'], payload # protocol v1
except (TypeError, KeyError):
return on_unknown_message(payload, message)
try:
strategy = strategies[type_]
except KeyError as exc:
return on_unknown_task(None, message, exc)
else:
try:
strategy(
message, payload,
promise(call_soon, (message.ack_log_error,)),
promise(call_soon, (message.reject_log_error,)),
callbacks,
)
except (InvalidTaskError, ContentDisallowed) as exc:
return on_invalid_task(payload, message, exc)
except DecodeError as exc:
return self.on_decode_error(message, exc)
return on_task_received
为什么会说这个方法?主要是看到一个on_task_received
, 看起来这个方法会在收到消息的时候调用,然后有一个策略什么相关系的封装。
还记得我们在第上一章说到的那个Celery
类吗,我记得那个类里面有一个send_task
方法,忘了的话可以去看看,里面调用了这样一行代码,点进去一看呢, 发现了as_task_v2
这个方法,因为我们用的新版本的celery,所以就直接看v2的协议了
message = amqp.create_task_message(
task_id, name, args, kwargs, countdown, eta, group_id,
expires, retries, chord,
maybe_list(link), maybe_list(link_error),
reply_to or self.oid, time_limit, soft_time_limit,
self.conf.task_send_sent_event,
root_id, parent_id, shadow, chain,
argsrepr=options.get('argsrepr'),
kwargsrepr=options.get('kwargsrepr'),
)
# 在 as_task_v2这个 方法中,我们发现,最终celery封装了一个task的协议,用于在消息队列中传输,实际上消息队列里面存放的内容就是这样的:
return task_message(
headers={
'lang': 'py',
'task': name,
'id': task_id,
'shadow': shadow,
'eta': eta,
'expires': expires,
'group': group_id,
'retries': retries,
'timelimit': [time_limit, soft_time_limit],
'root_id': root_id,
'parent_id': parent_id,
'argsrepr': argsrepr,
'kwargsrepr': kwargsrepr,
'origin': origin or anon_nodename()
},
properties={
'correlation_id': task_id,
'reply_to': reply_to or '',
},
body=(
args, kwargs, {
'callbacks': callbacks,
'errbacks': errbacks,
'chain': chain,
'chord': chord,
},
),
sent_event={
'uuid': task_id,
'root_id': root_id,
'parent_id': parent_id,
'name': name,
'args': argsrepr,
'kwargs': kwargsrepr,
'retries': retries,
'eta': eta,
'expires': expires,
} if create_sent_event else None,
)
再结合on_task_received
中从headers获取的task,基本上可以肯定on_task_received
就是celery worker收到消息之后的回调函数了。然后通过查看这个协议,我们也发现了一些其他的信息,比如headers中存储的就是task的源信息,参数什么的则存储在了 body里面。
继续往下看,发现Consumber这个类没什么好看的,通过前面我们对Worker类的初始化来看,大概有一部分的初始化逻辑被下放到了子组件里面去实现了。所以下一章我们重点去看Consumber的子组件。
转载自:https://juejin.cn/post/7352814673223303195