likes
comments
collection
share

Celery 源码分析(九): Timer & Eventloop

作者站长头像
站长
· 阅读数 7

补充 Timer & Eventloop

前面我们详细的讲了一个任务的执行周期,并且还讲到了一个任务在执行的过程中与其他组件的联系。但是有两个组件我们只是一笔带过了,那就是EventloopTimer组件。

之所以把这篇文章作为补充,而不是在Consumer子组件那一章一起写了,最主要的原因是,timer的一个比较经典的应用场景是延时任务的执行,而Eventloop是实现延时任务的一个重要的部分。如果我们对于Task的消费流程一知半解的话,要理解这快的内容是有一点点麻烦的。直接进入正题:

前面我们分析了一个完整Task任务的生命周期,从delay开始,从task_message_handler结束,但是对celery比较熟悉的人可能会敏感的发现,在那一篇文章中,我们实验使用的task都是立即执行的,但是问题是,celery也同样提高了一个额外的参数countdown, 允许我们指定这个任务在一段时间后才会执行。而这样的任务显然Worker收到之后显然是不能直接就消费的,不然延时的效果何在? 也不能直接在主进程slepp等待这个任务到时间了才执行。

现在对于实现这样一种特殊的任务,我们从生产者和消费者的角度各自出发,会产生两种截然不同的方案:

  • 从生产者的角度出发: 此刻Worker的处理逻辑不变,收到消息立即消费。但是生产者的逻辑就有一定的变化了。当生产者发现一个带countdown参数的延时任务产生时,我并不直接就往任务队列里面扔,而是我维护一个延时任务队列 ,我先扔延时队列里面去,然后时间到了扔到任务队列里面去。确实也可以。
  • 从消费者也就是worker到角度出发,生产者的处理逻辑不变,生产任务直接丢到任务队列里面。但是worker的处理逻辑变了,当worker收到一个延时任务时,注意,我并不直接消费它。而是把它扔到worker维护的一个延时任务队列里面,当任务时间到的时候,再去执行。

本质上实现的思路是一样的,只是实现的位置不同。

第二个问题就是,定时任务,那不就是Beat的工作吗,能这么想的朋友肯定是把延时任务定时任务搞混了,定时任务是每隔一段时间,这个任务就会被重复执行一次。而延时任务,本质上是和普通的任务没什么区别,都是由用户发起的,只是时间上稍微往后延长了那么一点点,但是不管延期多久,这个任务只会执行一次,它并不是一个周期性质的东西。也就是我们没有办法用Beat去实现这样的逻辑。

搞清楚这个概念之后,我们只需要确认Celery的延时任务是在消费者端实现的还是在生产者端实现的就行了。如何判别, 排除法,我们先看看生产者相关的逻辑:

def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                link=None, link_error=None, shadow=None, **options):
  
  	"""省略部分代码逻辑"""

    app = self._get_app()
    if app.conf.task_always_eager:
        with app.producer_or_acquire(producer) as eager_producer:
            serializer = options.get('serializer')
            if serializer is None:
                if eager_producer.serializer:
                    serializer = eager_producer.serializer
                else:
                    serializer = app.conf.task_serializer
            body = args, kwargs
            content_type, content_encoding, data = serialization.dumps(
                body, serializer,
            )
            args, kwargs = serialization.loads(
                data, content_type, content_encoding,
                accept=[content_type]
            )
        with denied_join_result():
            return self.apply(args, kwargs, task_id=task_id or uuid(),
                              link=link, link_error=link_error, **options)
    else:
        return app.send_task(
            self.name, args, kwargs, task_id=task_id, producer=producer,
            link=link, link_error=link_error, result_cls=self.AsyncResult,
            shadow=shadow, task_type=self,
            **options
        )
def send_task(self, name, args=None, kwargs=None, countdown=None,
              eta=None, task_id=None, producer=None, connection=None,
              router=None, result_cls=None, expires=None,
              publisher=None, link=None, link_error=None,
              add_to_parent=True, group_id=None, retries=0, chord=None,
              reply_to=None, time_limit=None, soft_time_limit=None,
              root_id=None, parent_id=None, route_name=None,
   
    parent = have_parent = None
    amqp = self.amqp
    task_id = task_id or uuid()
    producer = producer or publisher  # XXX compat
    router = router or amqp.router
    conf = self.conf
    if conf.task_always_eager:  # pragma: no cover
        warnings.warn(AlwaysEagerIgnored(
            'task_always_eager has no effect on send_task',
        ), stacklevel=2)

    ignored_result = options.pop('ignore_result', False)
    options = router.route(
        options, route_name or name, args, kwargs, task_type)

    if not root_id or not parent_id:
        parent = self.current_worker_task
        if parent:
            if not root_id:
                root_id = parent.request.root_id or parent.request.id
            if not parent_id:
                parent_id = parent.request.id

            if conf.task_inherit_parent_priority:
                options.setdefault('priority',
                                   parent.request.delivery_info.get('priority'))

    message = amqp.create_task_message(
        task_id, name, args, kwargs, countdown, eta, group_id,
        expires, retries, chord,
        maybe_list(link), maybe_list(link_error),
        reply_to or self.oid, time_limit, soft_time_limit,
        self.conf.task_send_sent_event,
        root_id, parent_id, shadow, chain,
        argsrepr=options.get('argsrepr'),
        kwargsrepr=options.get('kwargsrepr'),
    )
              
		"""省略部分代码逻辑"""
   
    return result

我们发现生产者貌似啥也没干,拿到直接就send_task发送出去了, 事实上经过调试确实发现,延时任务发出去worker很快就收到了, 所以这也就说明了,延时任务的方案是在worker内部实现的。

现在我们需要准备一个延时任务, 并让延时五分钟执行,这样我们比较容易观察。

add.apply_async(args=[1,2], countdown=300)

然后继续回到我们Worker对于Task的处理逻辑上去:

def task_message_handler(message, body, ack, reject, callbacks,
                         to_timestamp=to_timestamp):
  
  	"""省略部分代码逻辑"""

    req = Req(
        message,
        on_ack=ack, on_reject=reject, app=app, hostname=hostname,
        eventer=eventer, task=task, connection_errors=connection_errors,
        body=body, headers=headers, decoded=decoded, utc=utc,
    )
    if _does_info:
        info('Received task: %s', req)
    if (req.expires or req.id in revoked_tasks) and req.revoked():
        return

    signals.task_received.send(sender=consumer, request=req)

    if task_sends_events:
        send_event(
            'task-received',
            uuid=req.id, name=req.name,
            args=req.argsrepr, kwargs=req.kwargsrepr,
            root_id=req.root_id, parent_id=req.parent_id,
            retries=req.request_dict.get('retries', 0),
            eta=req.eta and req.eta.isoformat(),
            expires=req.expires and req.expires.isoformat(),
        )
		
    # 注意这里,千万要注意这里
    bucket = None
    eta = None
    if req.eta:
        try:
            if req.utc:
                eta = to_timestamp(to_system_tz(req.eta))
            else:
                eta = to_timestamp(req.eta, app.timezone)
        except (OverflowError, ValueError) as exc:
            error("Couldn't convert ETA %r to timestamp: %r. Task: %r",
                  req.eta, exc, req.info(safe=True), exc_info=True)
            req.reject(requeue=False)
    if rate_limits_enabled:
        bucket = get_bucket(task.name)

    if eta and bucket:
        consumer.qos.increment_eventually()
        return call_at(eta, limit_post_eta, (req, bucket, 1),
                       priority=6)
    # 常规的定时任务会走这快的逻辑
    if eta:
        consumer.qos.increment_eventually()
        call_at(eta, apply_eta_task, (req,), priority=6)
        return task_message_handler
    if bucket:
        return limit_task(req, bucket, 1)

    task_reserved(req)
    if callbacks:
        [callback(req) for callback in callbacks]
    handle(req)
return task_message_handler

经过调试我们发现,req eta:==> 2022-02-08 20:19:28.130377+08:00 刚好就是五分钟以后的时间。然后我们发现它把时间转成时间戳以后,就调用了call_at(eta, apply_eta_task, (req,), priority=6)这个函数。

通过看它的上下文,我们发现call_at 指向的是: call_at = consumer.timer.call_at

所以,当woker收到一个五分钟之后执行的任务的时候,并没有选择直接处理(handle(req)),而是直接丢给timer了。

到这里,我们终于把timer的概念正式的引入了我们任务的消费中。

下一步就很简单,我们需要知道这个timer具体的指向是哪个类。根据经验,这个需要看Timer组件的初始化逻辑。

class Timer(bootsteps.Step):
    """Timer bootstep."""

    def create(self, w):
        if w.use_eventloop:
            # does not use dedicated timer thread.
            w.timer = _Timer(max_interval=10.0)
        else:
            if not w.timer_cls:
                # Default Timer is set by the pool, as for example, the
                # eventlet pool needs a custom timer implementation.
                w.timer_cls = w.pool_cls.Timer
            w.timer = self.instantiate(w.timer_cls,
                                       max_interval=w.timer_precision,
                                       on_error=self.on_timer_error,
                                       on_tick=self.on_timer_tick)

我们发现,如果开启了 use_eventloop, 就使用 from kombu.asynchronous.timer import Timer as _Timer 如果没开启,用户也没指定,则使用w.pool_cls.Timer, 通过前面的知识,我们很容易就可以定位到celery.utils.timer2.Timer 默认的话是开 use_eventloop的,所以使用的是kombu.asynchronous.timer.Timer

注意这个参数,max_interval, 是不是非常的熟悉? 是吧,随着代码的深入你会越来越感到熟悉的。

点进到kombu.asynchronous.timer.Timercall_at函数:

def call_at(self, eta, fun, args=(), kwargs=None, priority=0):
    kwargs = {} if not kwargs else kwargs
    return self.enter_at(self.Entry(fun, args, kwargs), eta, priority)
    
def enter_at(self, entry, eta=None, priority=0, time=monotonic):
    """Enter function into the scheduler.

    Arguments:
        entry (~kombu.asynchronous.timer.Entry): Item to enter.
        eta (datetime.datetime): Scheduled time.
        priority (int): Unused.
    """
    if eta is None:
        eta = time()
    if isinstance(eta, datetime):
        try:
            eta = to_timestamp(eta)
        except Exception as exc:
            if not self.handle_error(exc):
                raise
            return
    return self._enter(eta, priority, entry)
    
def _enter(self, eta, priority, entry, push=heapq.heappush):
    push(self._queue, scheduled(eta, priority, entry))
    return entry

我说什么来着,我说什么来着,果然诶,扔进去了一个队列里面。看来我们在开头的猜测是初步吻合的,看下有没有执行任务相关的函数。

def apply_entry(self, entry):
  	"""
  	任务就是在这里执行的
  	"""
    try:
        entry()
    except Exception as exc:
        if not self.handle_error(exc):
            logger.error('Error in timer: %r', exc, exc_info=True)

def __iter__(self, min=min, nowfun=monotonic,
                pop=heapq.heappop, push=heapq.heappush):
    
    # 看到重写了一个迭代器的函数
    max_interval = self.max_interval
    queue = self._queue

    while 1:
        if queue:
            eventA = queue[0]
            now, eta = nowfun(), eventA[0]

            if now < eta:
                yield min(eta - now, max_interval), None
            else:
                eventB = pop(queue)

                if eventB is eventA:
                    entry = eventA[2]
                    if not entry.canceled:
                        yield None, entry
                    continue
                else:
                    push(queue, eventB)
        else:
            yield None, None

大致看了下,这快的逻辑其实非常的简单,yield 后面第一个参数是下次执行的时间的间隔,最大为max_interval, 第二个是 任务的实例. 当返回时间间隔的时候,说明任务还没到执行时间呢,等着吧,所以任务实例是None, 当到时间的时候,直接就返回任务执行了,这个时候时间间隔是None,任务是entry

妙哇,这不就是celery beat的逻辑吗,实际上差不多, 因为这里的队列实际上是一个有序队列pop=heapq.heappop

但是我们别忘了一件事, celery的Schedule是由Service驱动的,但是到现在我们仍然不知道,谁在外层调用这个Timer, 答案就是EventloopEventloop在这个场景中,正是充当了Service的作用。

接下来我们去看Eventloop相关的初始化逻辑, 别找Eventloop了,还记得我们在Celery基础架构那一章的一句介绍吗:

Hub: Eventloop 的封装对象,

所以我们应该去找Hub的初始化逻辑,不找不知道,一找吓一跳。果然和timer有关系

class Hub(bootsteps.StartStopStep):
    """Worker starts the event loop."""

    requires = (Timer,)
    """
    忽略部分代码
    """

    def create(self, w):
        w.hub = get_event_loop()
        if w.hub is None:
            required_hub = getattr(w._conninfo, 'requires_hub', None)
            #. 重点关注这一句
            w.hub = set_event_loop((
                required_hub if required_hub else _Hub)(w.timer))
        self._patch_thread_primitives(w)
        return self

点进去_Hub对象,看看里面有没有什么具体的逻辑: 位置: kombu.asynchronous.hub.Hub:

有意思的地方来了,发现了一个叫create_loop的函数:

def create_loop(self,
                generator=generator, sleep=sleep, min=min, next=next,
                Empty=Empty, StopIteration=StopIteration,
                KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
    traceback.print_stack()
    readers, writers = self.readers, self.writers
    poll = self.poller.poll
    fire_timers = self.fire_timers
    hub_remove = self.remove
    scheduled = self.timer._queue
    consolidate = self.consolidate
    consolidate_callback = self.consolidate_callback
    on_tick = self.on_tick
    propagate = self.propagate_errors

    while 1:
        todo = self._ready
        self._ready = set()

        for tick_callback in on_tick:
            tick_callback()

        for item in todo:
            if item:
                item()

        poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
        #  print('[[[HUB]]]: %s' % (self.repr_active(),))
        if readers or writers:
            to_consolidate = []
            try:
                events = poll(poll_timeout)
                #  print('[EVENTS]: %s' % (self.repr_events(events),))
            except ValueError:  # Issue celery/#882
                return
              
        """
        忽略部分代码
        """
        else:
          	# 没有链接之后的重试逻辑
            # no sockets yet, startup is probably not done.
            sleep(min(poll_timeout, 0.1))
        yield

大家只需要注意一个地方 fire_timers,

fire_timers里面则是执行了用户的任务.

def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
                propagate=()):
    timer = self.timer
    delay = None
    if timer and timer._queue:
        # max_timers 每次消费几个
        for i in range(max_timers):
            delay, entry = next(self.scheduler)
            if entry is None:
                break
            try:
                # 这里执行了任务
                entry()
            except propagate:
                raise
            except (MemoryError, AssertionError):
                raise
            except OSError as exc:
                if exc.errno == errno.ENOMEM:
                    raise
                logger.error('Error in timer: %r', exc, exc_info=1)
            except Exception as exc:
                logger.error('Error in timer: %r', exc, exc_info=1)
    return min(delay or min_delay, max_delay)

而至于Hubcreate_loop什么时候被执行的,则是在 Consumer的子组件Eventloop的初始化逻辑里面。

ef asynloop(obj, connection, consumer, blueprint, hub, qos,
             heartbeat, clock, hbrate=2.0):
    """Non-blocking event loop."""
    loop = hub.create_loop()

同样的,这个eventloop的最大作用更多的也就是帮我们把Worker阻塞住,别退出了。

这个系列写到这里,本身应该大多数的内容都已经讲清楚了,但貌似总感觉缺了点什么,我们知道CeleryKombu注册了一个回调函数,然后Kombu收到消息之后就扔给Celery处理。但是,回调是何时发生的? 在哪里执行的回调? 对此我们一无所知,而且为什么放到这一章节呢?和Eventloop 或者 Timer 有什么关系?

为了解开这些疑问,于是我继续往后看,让我们继续回到asynloop这个函数中,再一次看下这个函数的内容是什么。

def asynloop(obj, connection, consumer, blueprint, hub, qos,
             heartbeat, clock, hbrate=2.0):
  
    RUN = bootsteps.RUN
    update_qos = qos.update
    errors = connection.connection_errors

    on_task_received = obj.create_task_handler()

    _enable_amqheartbeats(hub.timer, connection, rate=hbrate)

    consumer.on_message = on_task_received
    obj.controller.register_with_event_loop(hub)
    obj.register_with_event_loop(hub)
    consumer.consume()
    obj.on_ready()

    if not obj.restart_count and not obj.pool.did_start_ok():
        raise WorkerLostError('Could not start worker processes')

    if connection.transport.driver_type == 'amqp':
        hub.call_soon(_quick_drain, connection)

    hub.propagate_errors = errors
    loop = hub.create_loop()
    try:
      	# 这个大循环就是为了防止 celery 进程退出
        while blueprint.state == RUN and obj.connection:
            state.maybe_shutdown()

            if qos.prev != qos.value:
                update_qos()

            try:
                next(loop)
            except StopIteration:
                loop = hub.create_loop()
    finally:
        try:
            hub.reset()
        except Exception as exc:  # pylint: disable=broad-except
            logger.exception(
                'Error cleaning up after event loop: %r', exc)

现在我们已经非常清楚正是在asynloop这个函数里面,我们绑定了我们的回调方法。也正是在这里我们执行了create_loop的操作。之前我的猜测是,底层的kombu起了一个新的进程or线程去监听消息队列,有消息后进行回调,但是我发现并不是这样,我发现:

on_task_receivedasynloop 同属一个线程ID, 这说明他们是在同一个上下文中进行的。有了这个重要的线索,结合代码上下文,最终可以得出一个结论那就是回调函数实际上是在 create_loop中执行的。具体在哪里呢? 通过堆栈信息,我们最终定位到了:

def create_loop(self,
                generator=generator, sleep=sleep, min=min, next=next,
                Empty=Empty, StopIteration=StopIteration,
                KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
    readers, writers = self.readers, self.writers
    poll = self.poller.poll
    fire_timers = self.fire_timers
    hub_remove = self.remove
    scheduled = self.timer._queue
    consolidate = self.consolidate
    consolidate_callback = self.consolidate_callback
    on_tick = self.on_tick
    propagate = self.propagate_errors
    
    while 1:
        todo = self._ready
        self._ready = set()

        for tick_callback in on_tick:
            tick_callback()

        for item in todo:
            if item:
                item()

        poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
        #  print('[[[HUB]]]: %s' % (self.repr_active(),))
        if readers or writers:
            to_consolidate = []
            try:
                events = poll(poll_timeout)
                #  print('[EVENTS]: %s' % (self.repr_events(events),))
            except ValueError:  # Issue celery/#882
                return

            for fd, event in events or ():
                general_error = False
                if fd in consolidate and \
                        writers.get(fd) is None:
                    to_consolidate.append(fd)
                    continue
                cb = cbargs = None

                if event & READ:
                    try:
                        cb, cbargs = readers[fd]
                    except KeyError:
                        self.remove_reader(fd)
                        continue
                elif event & WRITE:
                    try:
                        cb, cbargs = writers[fd]
                    except KeyError:
                        self.remove_writer(fd)
                        continue
                elif event & ERR:
                    general_error = True
                else:
                    logger.info(W_UNKNOWN_EVENT, event, fd)
                    general_error = True

                if general_error:
                    try:
                        cb, cbargs = (readers.get(fd) or
                                      writers.get(fd))
                    except TypeError:
                        pass

                if cb is None:
                    self.remove(fd)
                    continue

                if isinstance(cb, generator):
                    try:
                        next(cb)
                    except OSError as exc:
                        if exc.errno != errno.EBADF:
                            raise
                        hub_remove(fd)
                    except StopIteration:
                        pass
                    except Exception:
                        hub_remove(fd)
                        raise
                else:
                    try:
                      	# 这个位置,注意这个位置
                        cb(*cbargs)
                    except Empty:
                        pass
            if to_consolidate:
                consolidate_callback(to_consolidate)
        else:
            # no sockets yet, startup is probably not done.
            sleep(min(poll_timeout, 0.1))
        yield

cb(*cbargs) 这行代码是最关键的,我们发现,在处理定时任务的同时,也把消息的接受回调在个函数完成了。就是: 收到消息,回调,发现是个延时任务,再回来 create_loop函数处理,真不愧是个loop。延时任务陷入循环了这是。

Kombu 消息回调过程:

Kombu 基本概念:

  1. Conection: kumbu 对 AMQP 连接到封装
  2. Chanel: 是 AMQP 对 MQ 的操作的封装;
  3. TransPort: Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行

我们简单梳理一下if readers or writers:这行代码之后的逻辑,大概就是获取 到我们对应的消息队列的 readerswriters。应该就是对应的读和写。然后呢拿到对应的文件句柄fb。当然这个文件句柄什么的我只是猜的,确实没有什么注释告诉我fd代表的含义和类型。然后根据事件类型去拿对应的cb。这个cb啥意思我是真的猜不出来了,应该不是callback的意思。然后执行这个cb对象。 大概就是这样,至于具体的类型,需要调试确认一下:

events: [(12, 1), (18, 1), (20, 1)]  其中: 1表示是一个read事件
cb: <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7fed6f9c5630>>
value: 12

进入on_readable, 注意,现在我们已经进入到另外一个框架了。

def on_readable(self, fileno):
    chan, type = self._fd_to_chan[fileno]
    if chan.qos.can_consume():
        chan.handlers[type]()
# 这里的 chan 指的就是我们对应的 Chanel了 `kombu.transport.redis.Channel`
self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}

这里呢,根据 type的不同讲操作路由到对应的函数里面,当有新的任务过来的时候,type的值等于BRPOP。我们进入到这个方法看看:

def _brpop_read(self, **options):
    try:
        try:
            dest__item = self.client.parse_response(self.client.connection,
                                                    'BRPOP',
                                                    **options)
           	# dest_item (b'default', b'{"body": "gAJLAUsChnEAfXEBfXECKFgJAAAAY2FsbGJhY2tzcQNOWAgAAABlcnJiYWNrc3EETlgFAAAAY2hhaW5xBU5YBQAAAGNob3JkcQZOdYdxBy4=", "content-encoding": "binary", "content-type": "application/x-python-serialize", "headers": {"lang": "py", "task": "itsm.ticket.tasks.add", "id": "41db8a03-6e46-4d72-847c-531b6950d02e", "shadow": null, "eta": null, "expires": null, "group": null, "retries": 0, "timelimit": [null, null], "root_id": "41db8a03-6e46-4d72-847c-531b6950d02e", "parent_id": null, "argsrepr": "(1, 2)", "kwargsrepr": "{}", "origin": "gen59658@MARKHAN-MB0"}, "properties": {"correlation_id": "41db8a03-6e46-4d72-847c-531b6950d02e", "reply_to": "8e55fb0b-6071-335b-ac59-c1e545ce3e7d", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "default"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "12d20e40-da7a-474d-9680-5b4e93882a20"}}')
        except self.connection_errors:
            self.client.connection.disconnect()
            raise
        if dest__item:
            dest, item = dest__item
            dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
            self._queue_cycle.rotate(dest)
            self.connection._deliver(loads(bytes_to_str(item)), dest)
            return True
        else:
            raise Empty()
    finally:
        self._in_poll = None

注意,self.connection._deliver(loads(bytes_to_str(item)), dest) 这一行代码,这行代码就是开始回调的入口了。

kombu.transport.virtual.base.Transport._deliver, 此刻我们的消息再次进入到了Transport, 我们之前说过Chanel是对MQ的操作的封装。此刻这里的callback则是对应的是``<function Channel.basic_consume.._callback>

def _deliver(self, message, queue):
    if not queue:
        raise KeyError(
            'Received message without destination queue: {0}'.format(
                message))
    try:
        callback = self._callbacks[queue]
    except KeyError:
        logger.warning(W_NO_CONSUMERS, queue)
        self._reject_inbound_message(message)
    else:
        callback(message)

点进去callback函数看看:

def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
    """Consume from `queue`."""
    self._tag_to_queue[consumer_tag] = queue
    self._active_queues.append(queue)

    def _callback(raw_message):
        message = self.Message(raw_message, channel=self)
        if not no_ack:
            self.qos.append(message, message.delivery_tag)
        return callback(message)

    self.connection._callbacks[queue] = _callback
    self._consumers.add(consumer_tag)

    self._reset_cycle()

我们发现最后手调用到了callback,当然这里的callback仍然不是我们之前注册的回调函数, 而是:

<bound method Consumer._receive_callback 。位置是, kombu.messaging.Consumer._receive_callback

def _receive_callback(self, message):
    accept = self.accept
    on_m, channel, decoded = self.on_message, self.channel, None
    try:
        m2p = getattr(channel, 'message_to_python', None)
        if m2p:
            message = m2p(message)
        if accept is not None:
            message.accept = accept
        if message.errors:
            return message._reraise_error(self.on_decode_error)
        decoded = None if on_m else message.decode()
    except Exception as exc:
        if not self.on_decode_error:
            raise
        self.on_decode_error(message, exc)
    else:
        return on_m(message) if on_m else self.receive(decoded, message)

现在让我们回到之前注册回调函数的位置:

consumer.on_message = on_task_received
obj.controller.register_with_event_loop(hub)
obj.register_with_event_loop(hub)
consumer.consume()
obj.on_ready()

注意到这个on_message了吗? 在最后的on_m(message)实际上就是回调了我们的on_task_received

到此,消息就完成了一个完美的闭环