Celery 源码分析(五): Consumer 子组件
Consumer 子组件
Tasks组件
我们直接点进去源码看看,位置是:celery.worker.consumer.tasks.Tasks
,对于step相关的子组件,我们直接看它的create
orstart
方法就好:
class Tasks(bootsteps.StartStopStep):
"""Bootstep starting the task message consumer."""
requires = (Mingle,)
def __init__(self, c, **kwargs):
c.task_consumer = c.qos = None
super(Tasks, self).__init__(c, **kwargs)
def start(self, c):
"""Start task consumer."""
c.update_strategies()
qos_global = not c.connection.qos_semantics_matches_spec
# set initial prefetch count
c.connection.default_channel.basic_qos(
0, c.initial_prefetch_count, qos_global,
)
# 注意这一行,在这里我们的task_consumber 正式被启动了
c.task_consumer = c.app.amqp.TaskConsumer(
c.connection, on_decode_error=c.on_decode_error,
)
def set_prefetch_count(prefetch_count):
return c.task_consumer.qos(
prefetch_count=prefetch_count,
apply_global=qos_global,
)
c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)
Connection 组件
通过代码我们发现,Connection其实上啥也没干:
class Connection(bootsteps.StartStopStep):
"""Service managing the consumer broker connection."""
def start(self, c):
c.connection = c.connect()
info('Connected to %s', c.connection.as_uri())
这里的c
实际上是我们的Consumer类的实例,所以Connection组件实际上的作用是调用Consumer的connect方法来初始化Consumer的变量connection
,点进去我们会发现,其实Consumer也是啥都没干,它调用的是Celery的方法初始化connection,好家伙,套娃开始由下往上套了。
def connect(self):
"""Establish the broker connection used for consuming tasks.
Retries establishing the connection if the
:setting:`broker_connection_retry` setting is enabled
"""
conn = self.connection_for_read(heartbeat=self.amqheartbeat)
if self.hub:
conn.transport.register_with_event_loop(conn.connection, self.hub)
return conn
def connection_for_read(self, heartbeat=None):
return self.ensure_connected(
self.app.connection_for_read(heartbeat=heartbeat))
注意,再次强调一下,app是Celery的实例,是我们初始化的第一层,层级关系发展到现在是:
app(Celery)->w(Worker)->c(Consumer)
在这里我们发现一个有意思的地方是什么,实际上,对于Consumer这样的任务消费者而言,它的链接实际上是一个只读链接,两个字严谨
Events 组件:
class Events(bootsteps.StartStopStep):
"""Service used for sending monitoring events."""
requires = (Connection,)
def start(self, c):
# flush events sent while connection was down.
prev = self._close(c)
dis = c.event_dispatcher = c.app.events.Dispatcher(
c.connection_for_write(),
hostname=c.hostname,
enabled=self.send_events,
groups=self.groups,
# we currently only buffer events when the event loop is enabled
# XXX This excludes eventlet/gevent, which should actually buffer.
buffer_group=['task'] if c.hub else None,
on_send_buffered=c.on_send_event_buffered if c.hub else None,
)
if prev:
dis.extend_buffer(prev)
dis.flush()
Events组件的作用和上面两个差不多,但是不一样的是,在Events组件内部,直接就调用了app去初始化出来了一个事件分发器。然后复制给了c.event_dispatcher
具体Events组件的作用,日后再说(手动狗头)
Gossip 组件
八卦组件就厉害了,好家伙,这直接一波反客为主了属于是:
def start(self, c):
super(Gossip, self).start(c)
self.dispatcher = c.event_dispatcher
第一个从Consumer初始化自己的变量的组件。
关于这个组件呢,之前提到两点,一点是这个组件只消费worker相关的事件,从Gossip的代码里面印证了这一点:
def get_consumers(self, channel):
self.register_timer()
# routing_key='worker.#' 印证了Gossip只会消费来自worker的事件
ev = self.Receiver(channel, routing_key='worker.#',
queue_ttl=self.heartbeat_interval)
return [Consumer(
channel,
queues=[ev.queue],
on_message=partial(self.on_message, ev.event_from_message),
no_ack=True
)]
第二个是说Gossip主要用于leader选举,从这个函数可以印证:
def on_elect_ack(self, event):
id = event['id']
try:
replies = self.consensus_replies[id]
except KeyError:
return # not for us
alive_workers = set(self.state.alive_workers())
replies.append(event['hostname'])
if len(replies) >= len(alive_workers):
_, leader, topic, action = self.clock.sort_heap(
self.consensus_requests[id],
)
if leader == self.full_hostname:
info('I won the election %r', id)
try:
handler = self.election_handlers[topic]
except KeyError:
logger.exception('Unknown election topic %r', topic)
else:
handler(action)
else:
info('node %s elected for %r', leader, id)
self.consensus_requests.pop(id, None)
self.consensus_replies.pop(id, None)
注意这个日志: I won the election
,我赢了选举,选举的获胜条件看代码应该说比clock的大小,因为我们看到有一个堆排序的过程:
self.clock.sort_heap
其他关于leader选举的内容,大家可以自己去看看相关的源代码,这里就不做扩展了。百度应该可以搜到很多。
Evloop 组件
这个组件有点特殊,因为别的类都是单独的一个文件,而Evloop 竟然是和 Consumer 放在一个文件下的? 想了想,确实也没有必须放一个模块下的理由,那就只能归因于系统包袱了。
后排的同学醒醒,划重点了,还记得我们上一章发现一个on_task_received
, celery接收到消息的回调方法吗?问题出现了?
它是在哪里注册和kombu的回调绑定到一起的?
你既然是个回调方法,那一定一定就是你告诉了Kombu说,嘿,bro,收到消息记得回信。
但是得益于python动态语言的特性,愣是找不着在哪里进行绑定的。于是呢,我在看Evloop代码的时候, 本来我以为,应该就是个小小的组件吧,但是随着我看的不断深入,一个惊天的秘密开始浮现在水面:
class Evloop(bootsteps.StartStopStep):
"""Event loop service.
Note:
This is always started last.
"""
label = 'event loop'
last = True
def start(self, c):
self.patch_all(c)
c.loop(*c.loop_args())
def patch_all(self, c):
c.qos._mutex = DummyLock()
粗看,平平无奇,在看,还是平平无奇,确实,仅从start
方法,我们无法获取到什么有效的信息。点进去看看loop这个变量是个啥:
在Consumer最终发现了这段代码,唯一的代码:
if not hasattr(self, 'loop'):
self.loop = loops.asynloop if hub else loops.synloop
不需要管什么asynloop
还是 synloop
, 因为不管是什么loop,一定会涉及到回调函数的绑定的。
你为什么这么确定?
废话,我看过代码才写的啊。
点进去看看:
def synloop(obj, connection, consumer, blueprint, hub, qos,
heartbeat, clock, hbrate=2.0, **kwargs):
"""Fallback blocking event loop for transports that doesn't support AIO."""
RUN = bootsteps.RUN
# 获取到我们的on_task_received对象。
on_task_received = obj.create_task_handler()
perform_pending_operations = obj.perform_pending_operations
if getattr(obj.pool, 'is_green', False):
_enable_amqheartbeats(obj.timer, connection, rate=hbrate)
# 千万注意这一行,
consumer.on_message = on_task_received
# 开始消费了
consumer.consume()
obj.on_ready()
while blueprint.state == RUN and obj.connection:
state.maybe_shutdown()
if qos.prev != qos.value:
qos.update()
try:
perform_pending_operations()
connection.drain_events(timeout=2.0)
except socket.timeout:
pass
except socket.error:
if blueprint.state == RUN:
raise
但是这个时候问题又出现了,这consumer
和obj
搞得我有点乱,如果obj
是Consumer的实例的话,consumer
又是啥?
python意味着我们光看参数是啥也看不出来的,只能回到Events,看下传进去个啥了, 当我感到现场的时候,只发现了这段代码。
def loop_args(self):
return (self, self.connection, self.task_consumer,
self.blueprint, self.hub, self.qos, self.amqheartbeat,
self.app.clock, self.amqheartbeat_rate)
好家伙。破案了,上面的consumer
原来是在Tasks组件初始化出来的task_consumer
,有点东西。
celery把consumer.on_message事件绑定到了Consumer中的on_task_received
,这样当task_consumer
收到消息的时候,就会回调我们的on_task_received
方法了。
# 千万注意这一行,
consumer.on_message = on_task_received
# 开始消费了
consumer.consume()
Heart 组件:
class Heart(bootsteps.StartStopStep):
requires = (Events,)
def __init__(self, c,
without_heartbeat=False, heartbeat_interval=None, **kwargs):
self.enabled = not without_heartbeat
self.heartbeat_interval = heartbeat_interval
c.heart = None
super(Heart, self).__init__(c, **kwargs)
def start(self, c):
c.heart = heartbeat.Heart(
c.timer, c.event_dispatcher, self.heartbeat_interval,
)
c.heart.start()
def stop(self, c):
c.heart = c.heart and c.heart.stop()
shutdown = stop
本质上是通过timer周期性的发送心跳。默认周期是2s.
TODO: 是发给谁的? 好像是发给ambq服务器的。
转载自:https://juejin.cn/post/7352814673223319579