likes
comments
collection
share

Celery 源码分析(八): Celery Beat 定时任务实现方式

作者站长头像
站长
· 阅读数 7

Celery Beat 定时任务实现方式

到这里,我们可以来解答我们在Celery基础架构中提出来的最后一个问题了:

celery beat 是如何轮询任务的?

Celery Beat本身的实现逻辑并不复杂,本质上就是通过不断的轮询实现的。在分析Celery Beat之前,我们仍然有一个非常重要的工作,那就是找到入口。不过有了Worker的经验,Beat找起来就没有那么复杂了。

直接定位到celery.bin.beat.beat

class beat(Command):

    doc = HELP
    enable_config_from_cmdline = True
    supports_args = False

    def run(self, detach=False, logfile=None, pidfile=None, uid=None,
            gid=None, umask=None, workdir=None, **kwargs):
        if not detach:
            maybe_drop_privileges(uid=uid, gid=gid)
        kwargs.pop('app', None)
        beat = partial(self.app.Beat,
                       logfile=logfile, pidfile=pidfile, **kwargs)

        if detach:
            with detached(logfile, pidfile, uid, gid, umask, workdir):
                return beat().run()
        else:
            return beat().run()

喔嚯,本来以为Beat是一个特殊的Worker,没想到不是. 现在我们知道beat是通过Celery中的Beat属性开启的。那么我们点进去Celery里面看看Beat是什么。

@cached_property
def Beat(self, **kwargs):
    """:program:`celery beat` scheduler application.

    See Also:
        :class:`~@Beat`.
    """
    return self.subclass_with_self('celery.apps.beat:Beat')

发现最终指向的是celery.apps.beat模块下的 Beat对象。也就是说beat的启动实际上是拿到了Beat对象的实例,并且调用了它的run方法,也就是下面这个:

def run(self):
    print(str(self.colored.cyan(
        'celery beat v{0} is starting.'.format(VERSION_BANNER))))
    self.init_loader()
    self.set_process_title()
    self.start_scheduler()

然后在Beat启动时的控制台输出中,我们确实发现了celery beat v. is starting这行日志:

celery beat v4.4.5 (cliffs) is starting.
__    -    ... __   -        _
LocalTime -> 2022-02-07 16:27:17
Configuration ->
    . broker -> redis://localhost:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> django_celery_beat.schedulers.DatabaseScheduler

    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 seconds (5s)
[2022-02-07 16:27:17,133: INFO/MainProcess] beat: Starting...

这也从侧面验证了我们前面的分析都是正确的,Beat的启动入口确实是这里。

我们发现run并没有特别明显的业务逻辑,主要是执行了一些初始化操作。接下来我们点进去看看具体都初始化了什么:

def init_loader(self):
    # Run the worker init handler.
    # (Usually imports task modules and such.)
    self.app.loader.init_worker()
    # 收集所有的tasks
    self.app.finalize()

发现把worker初始化了一下,注意,这里的初始化和真正启动Worker的初始化过程是不一样的,这里我们在启动Beat时的loader指向的是, AppLoader

class AppLoader(BaseLoader):
    """Default loader used when an app is specified."""
    

class BaseLoader(object):
		"""省略部分代码"""
    def on_worker_init(self):
        """Called when the worker (:program:`celery worker`) starts."""

    def import_default_modules(self):
        responses = signals.import_modules.send(sender=self.app)
        # Prior to this point loggers are not yet set up properly, need to
        #   check responses manually and reraised exceptions if any, otherwise
        #   they'll be silenced, making it incredibly difficult to debug.
        for _, response in responses:
            if isinstance(response, Exception):
                raise response
        return [self.import_task_module(m) for m in self.default_modules]

    def init_worker(self):
        if not self.worker_initialized:
            self.worker_initialized = True
            self.import_default_modules()
            self.on_worker_init()

我们发现on_worker_init根本没有具体的实现逻辑,所以这里的Worker的初始化就只是导入模块,并没有做复杂的初始化工作。和我们实际上去启动Worker的时候还是非常不一样的。

接下来就到这里了:self.set_process_title()

def set_process_title(self):
    arg_start = 'manage' in sys.argv[0] and 2 or 1
    platforms.set_process_title(
        'celery beat', info=' '.join(sys.argv[arg_start:]),
    )

看样子是设置了下进程的名称,不是非常核心的逻辑,这里就不展开了。

我们接着往下看start_scheduler, 光看方法名就感觉非常的核心了。

def start_scheduler(self):
    if self.pidfile:
        platforms.create_pidlock(self.pidfile)
    service = self.Service(
        app=self.app,
        max_interval=self.max_interval,
        scheduler_cls=self.scheduler_cls,
        schedule_filename=self.schedule,
    )

    print(self.banner(service))

    self.setup_logging()
    if self.socket_timeout:
        logger.debug('Setting default socket timeout to %r',
                     self.socket_timeout)
        socket.setdefaulttimeout(self.socket_timeout)
    try:
        self.install_sync_handler(service)
        service.start()
    except Exception as exc:
        logger.critical('beat raised exception %s: %r',
                        exc.__class__, exc,
                        exc_info=True)
        raise

果然不出我们所料,还真的是这样,在beat的概念中,我们可以把Service理解为驱动,它的作用实际上是链接beat和对应的scheduler。我们看到最终是由Servicestart启动对应的scheduler的。至于max_interval这些参数我们一会儿到地方了再讲为什么要传一个最大的执行间隔过去。而scheduler_cls则是我们自己配的。

在django里面,一般是用的这个:

django_celery_beat.schedulers.DatabaseScheduler

当然,django_celery_beat只是其中一种实现,只需要实现celery的Scheduler协议,自己也可以自定义。我们点进去Service的start方法,看看里面是啥

位置: celery.beat.Service

def start(self, embedded_process=False):
    info('beat: Starting...')
    debug('beat: Ticking with max interval->%s',
          humanize_seconds(self.scheduler.max_interval))

    signals.beat_init.send(sender=self)
    if embedded_process:
        signals.beat_embedded_init.send(sender=self)
        platforms.set_process_title('celery beat')

    try:
        while not self._is_shutdown.is_set():
            interval = self.scheduler.tick()
            if interval and interval > 0.0:
                debug('beat: Waking up %s.',
                      humanize_seconds(interval, prefix='in '))
                time.sleep(interval)
                if self.scheduler.should_sync():
                    self.scheduler._do_sync()
    except (KeyboardInterrupt, SystemExit):
        self._is_shutdown.set()
    finally:
        self.sync()

非常的简单啊,其实就是一个while死循环,然后定期sleep一下,这是因为,对于进程而言,如果不停的While True,可能会因为占用计算机CPU资源或者其他的原因被操作系统给杀掉,所以我们需要定期sleep一下,确保进程可以一直平稳的运行下去。

然后我们发现,对于整个start方法,最核心的其实就只有一行代码self.scheduler.tick()

对于使用了django_celery_beat这个实现的人来说,这里的scheduler显然就是DatabaseScheduler, 我们去看下它的tick实现是怎样的。

# pylint disable=redefined-outer-name
def tick(self, event_t=event_t, min=min, heappop=heapq.heappop,
         heappush=heapq.heappush):
    """Run a tick - one iteration of the scheduler.

    Executes one due task per call.

    Returns:
        float: preferred delay in seconds for next call.
    """
    adjust = self.adjust
    max_interval = self.max_interval

    if (self._heap is None or
            not self.schedules_equal(self.old_schedulers, self.schedule)):
        self.old_schedulers = copy.copy(self.schedule)
        self.populate_heap()

    H = self._heap
    if not H:
        return max_interval

    event = H[0]
    entry = event[2]
    is_due, next_time_to_run = self.is_due(entry)
    if is_due:
        verify = heappop(H)
        if verify is event:
            next_entry = self.reserve(entry)
            self.apply_entry(entry, producer=self.producer)
            heappush(H, event_t(self._when(next_entry, next_time_to_run),
                                event[1], next_entry))
            return 0
        else:
            heappush(H, verify)
            return min(verify[0], max_interval)
    return min(adjust(next_time_to_run) or max_interval, max_interval)

这里其实是维护了一个堆的数据结构,堆顶是最近要执行的任务,当这个任务到时间了,就把取出来执行,并且把它下一次要执行的执行对象放到堆里面。如果发现堆顶的任务到点了,就调用self.apply_entry(entry, producer=self.producer)去执行。

def apply_entry(self, entry, producer=None):
    info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
    try:
        result = self.apply_async(entry, producer=producer, advance=False)
    except Exception as exc:  # pylint: disable=broad-except
        error('Message Error: %s\n%s',
              exc, traceback.format_stack(), exc_info=True)
    else:
        debug('%s sent. id->%s', entry.task, result.id)
def apply_async(self, entry, producer=None, advance=True, **kwargs):
    # Update time-stamps and run counts before we actually execute,
    # so we have that done if an exception is raised (doesn't schedule
    # forever.)
    entry = self.reserve(entry) if advance else entry
    task = self.app.tasks.get(entry.task)

    try:
        entry_args = [v() if isinstance(v, BeatLazyFunc) else v for v in (entry.args or [])]
        entry_kwargs = {k: v() if isinstance(v, BeatLazyFunc) else v for k, v in entry.kwargs.items()}
        if task:
            return task.apply_async(entry_args, entry_kwargs,
                                    producer=producer,
                                    **entry.options)
        else:
            return self.send_task(entry.task, entry_args, entry_kwargs,
                                  producer=producer,
                                  **entry.options)
    except Exception as exc:  # pylint: disable=broad-except
        reraise(SchedulingError, SchedulingError(
            "Couldn't apply scheduled task {0.name}: {exc}".format(
                entry, exc=exc)), sys.exc_info()[2])
    finally:
        self._tasks_since_sync += 1
        if self.should_sync():
            self._do_sync()

这个大概就是Beat的实现逻辑了,当然,还有很多我们没有写进去,一个是当我们DB里面定时任务加了一条记录的时候,DatabaseScheduler会执行同步逻辑,把我们最新的定时任务加到调度里面。如果需要具体的逻辑大家去看相关的代码就好。

看了Beat的实现逻辑,优缺点其实都是显而易见的,先说优点,Beat的实现逻辑非常的简洁,明了,用堆巧妙的解决了定时任务排序的问题(事实上是从4.x版本之后才引入这样一种实现方式)。缺点也比较多:

  1. beat 无法实现分布式,也就是beat只能有一个,无法进行水平扩展
  2. 不管怎么说,beat都是存在单个节点的性能瓶颈的,特别是当DB里面某个时间点有大量的定时任务的时候,会超过单个Beat的最大处理能力。所以Beat并不适用于大规模的定时任务场景。同时维护堆的数据结构对于大量定时任务的场景对内存也是非常不友好的。
  3. 精度问题,Beat的特性基本上就说明了它对定时任务的精度是无法保证的,所以celery的定时任务只支持到分钟级。进程休眠的方式,会导致Beat难免会出现一定时间的延时。