likes
comments
collection
share

Celery 源码分析(四) - Consumer 组件

作者站长头像
站长
· 阅读数 15

Celery组件- Consumer

在上一章Worker启动的时候,我们最终以Consumer作为例子去说明了Worker是如何与自己的子组件进行绑定的。

c = w.consumer = self.instantiate(
    w.consumer_cls, w.process_task,
    hostname=w.hostname,
    task_events=w.task_events,
    init_callback=w.ready_callback,
    initial_prefetch_count=prefetch_count,
    pool=w.pool,
    timer=w.timer,
    app=w.app, # 注意这里,又传递到了更深层了组件中
    controller=w, # 注意这里,又传递到了更深层了组件中
    hub=w.hub,
    worker_options=w.options,
    disable_rate_limits=w.disable_rate_limits,
    prefetch_multiplier=w.prefetch_multiplier,
)

在这里我们大概知道是生成了一个consumer对象,但此刻,我们并不能知道生成了一个什么对象,通过调试我们发现w.consumer_cls 对应的类是:

celery.worker.consumer:Consumer

点进去一探究竟:

好家伙,有意思的地方来了,Consumer类也有自己的Blueprint

class Blueprint(bootsteps.Blueprint):
  """Consumer blueprint."""

    name = 'Consumer'
    default_steps = [
      'celery.worker.consumer.connection:Connection',
      'celery.worker.consumer.mingle:Mingle',
      'celery.worker.consumer.events:Events',
      'celery.worker.consumer.gossip:Gossip',
      'celery.worker.consumer.heart:Heart',
      'celery.worker.consumer.control:Control',
      'celery.worker.consumer.tasks:Tasks',
      'celery.worker.consumer.consumer:Evloop',
      'celery.worker.consumer.agent:Agent',
    ]

真的就是经典的俄罗斯套娃了,在说明Consumer这个类的作用之前,我们先花一些篇幅去简单解释一下这些组件的作用:

  • Connection: 管理和 broker 的 Connection 连接
  • Mingle: 不同Worker之间状态的同步
  • Events: 管理任务事件,一般用来监控,比如flower就是利用了celery的Events机制
  • Gossip: 消费来自其他Worker的事件,主要用于leader选举
  • Heart: 发送心跳事件
  • Control: 远程命令管理服务
  • Tasks: 启动任务相关的Consumer 重点
  • Agent: cell actor, 目前具体的作用未知。

接下来我们看下Consumer这个类有没有什么一眼就能看出来干啥的方法或者属性:

发现一个:

    def create_task_handler(self, promise=promise):
        strategies = self.strategies
        on_unknown_message = self.on_unknown_message
        on_unknown_task = self.on_unknown_task
        on_invalid_task = self.on_invalid_task
        callbacks = self.on_task_message
        call_soon = self.call_soon

        def on_task_received(message):
            # payload will only be set for v1 protocol, since v2
            # will defer deserializing the message body to the pool.
            payload = None
            try:
                type_ = message.headers['task']                # protocol v2
            except TypeError:
                return on_unknown_message(None, message)
            except KeyError:
                try:
                    payload = message.decode()
                except Exception as exc:  # pylint: disable=broad-except
                    return self.on_decode_error(message, exc)
                try:
                    type_, payload = payload['task'], payload  # protocol v1
                except (TypeError, KeyError):
                    return on_unknown_message(payload, message)
            try:
                strategy = strategies[type_]
            except KeyError as exc:
                return on_unknown_task(None, message, exc)
            else:
                try:
                    strategy(
                        message, payload,
                        promise(call_soon, (message.ack_log_error,)),
                        promise(call_soon, (message.reject_log_error,)),
                        callbacks,
                    )
                except (InvalidTaskError, ContentDisallowed) as exc:
                    return on_invalid_task(payload, message, exc)
                except DecodeError as exc:
                    return self.on_decode_error(message, exc)

        return on_task_received

为什么会说这个方法?主要是看到一个on_task_received, 看起来这个方法会在收到消息的时候调用,然后有一个策略什么相关系的封装。

还记得我们在第上一章说到的那个Celery类吗,我记得那个类里面有一个send_task方法,忘了的话可以去看看,里面调用了这样一行代码,点进去一看呢, 发现了as_task_v2这个方法,因为我们用的新版本的celery,所以就直接看v2的协议了

message = amqp.create_task_message(
  task_id, name, args, kwargs, countdown, eta, group_id,
  expires, retries, chord,
  maybe_list(link), maybe_list(link_error),
  reply_to or self.oid, time_limit, soft_time_limit,
  self.conf.task_send_sent_event,
  root_id, parent_id, shadow, chain,
  argsrepr=options.get('argsrepr'),
  kwargsrepr=options.get('kwargsrepr'),
)

# 在 as_task_v2这个 方法中,我们发现,最终celery封装了一个task的协议,用于在消息队列中传输,实际上消息队列里面存放的内容就是这样的:
return task_message(
  headers={
    'lang': 'py',
    'task': name,
    'id': task_id,
    'shadow': shadow,
    'eta': eta,
    'expires': expires,
    'group': group_id,
    'retries': retries,
    'timelimit': [time_limit, soft_time_limit],
    'root_id': root_id,
    'parent_id': parent_id,
    'argsrepr': argsrepr,
    'kwargsrepr': kwargsrepr,
    'origin': origin or anon_nodename()
  },
  properties={
    'correlation_id': task_id,
    'reply_to': reply_to or '',
  },
  body=(
    args, kwargs, {
      'callbacks': callbacks,
      'errbacks': errbacks,
      'chain': chain,
      'chord': chord,
    },
  ),
  sent_event={
    'uuid': task_id,
    'root_id': root_id,
    'parent_id': parent_id,
    'name': name,
    'args': argsrepr,
    'kwargs': kwargsrepr,
    'retries': retries,
    'eta': eta,
    'expires': expires,
  } if create_sent_event else None,
)

再结合on_task_received中从headers获取的task,基本上可以肯定on_task_received就是celery worker收到消息之后的回调函数了。然后通过查看这个协议,我们也发现了一些其他的信息,比如headers中存储的就是task的源信息,参数什么的则存储在了 body里面。

继续往下看,发现Consumber这个类没什么好看的,通过前面我们对Worker类的初始化来看,大概有一部分的初始化逻辑被下放到了子组件里面去实现了。所以下一章我们重点去看Consumber的子组件。

转载自:https://juejin.cn/post/7352814673223303195
评论
请登录