Celery 源码分析(二): 基础架构
Celery 基础架构
基本组件
Celery 有五大组件,至于为什么是五大,这个是我从网上搜到的,大家都说是五个, 查了查官方文档,也没说,那就姑且认为它是五个吧。
worker
,不用多说,提到celery的组件,worker毕竟拥有一席之地,也是我们最为熟悉的巨核心的组件之一。worker是celery中的核心组件,主要承担的职责是任务执行者,说人话就是监听消息队列,从broker中拉取任务,然后分配给worker内部维护的子进程去执行。worker可以是分布式的,可以部署在多台服务器,一个任务最终被决定到分配到哪个worker上去执行是通过Gossip协议选出来的。broker
: celery中的消息中间件,主要的任务是,接受任务消息,将消息存进队列中按顺序分发给指定的任务消费方,broker支持多种不同的任务消费方式,比较常用的有rabbitmq
,redis
和数据库。
beat
: 任务调度器,一般有定时任务
的时候会开启beat,beat的作用主要是周期性的沦陷定时任务,看到哪个时间到了就放到任务队列里面交给worker去执行。beat提供了接口,允许开发者自定义自己的scheduler
。producer
: 有消费者,自然就有生产者,producer
是celery中的任务生产者,生产任务的方式有很多种,通常我们通过函数、装饰器和调用celery api来生产任务。result backend
:任务结果存储,任务处理完后保存状态信息和结果,以供查询。存储的位置主要在于你指定来什么样的媒介来存储任务的执行结果,celery支持多种backend,常用的有rabbitmq。
对于上面的五大组件,具体又是如何协同完成一个任务完整的生命周期的呢?
下面我们以一个简单的任务执行模型为例:
@task
def add(x, y):
return x+y
add.delay((2, 3))
上面的代码很简单,执行这段代码并不会返回给我5
,而是会返回我一个AsyncResult
对象。假设我们用的broker
是rabbitmq
, 通常来说. 一个任务的生命周期大致是这样的。
add.delay((2, 3))
这行代码实际上就对应着我们上面组件的使用celery api 方式生产任务的producer
,到这步,任务生产出来了- 任务就像商品一样,生产出来得有地儿卖啊,讲究一个渠道,于是任务生产者生产出来任务之后,就扔给了
broker
,注意,生产者并不直接和消息队列打交道,就像格力空调总部也会直接和各个乡镇的专卖店直接接触一样,这里的broker
就和格力各个省份的总代理是一个意思,生产者把任务给broker之后,broker
就把生产出来的任务给丢给他代理的消息队列RabbitMq里面去了。 - 这个时候消费者可没闲着,
worker
可一直在监听队列,队列里面一旦发现有了任务,就急忙消费,注意,任务的消费实际上是由worker
的子进程去消费的。为什么?大家想一想,一般上面有什么销售任务,都是格力专卖店的老板去干的?当然是给服务员去做哇。因为服务员可以有多个,老板只能有一个哇。使用子进程的方式主要是为了提升worker
本身的执行效率。 worker
执行完事儿之后,把结果再怼回去到RabbitMQ中。
流程图大致长这样:
这个时候有人就有疑惑了,不对,Exchange你玩意你也没说啊?
Exchange
我们以rabbitmq
为例,exchange
的作用其实就是一个路由器:
你告诉Exhange
说, 给我把消息送到这个队列里面去,这个是地址(route_key)。Exhange就会帮你把消息扔进指定的队列里面去。别忘记我们的Worker
启动的时候可是可以指定监听哪个队列的。那你就可以选择生产任务的时候把任务扔到指定的队列里面交给指定的worker去消费。
我们可以通过配置celery task_routes的方式指定某个task的队列:
{
"test.add":{
"queue":"addtask",
"routing_key":"test"
}
}
然后启动worker
的时候指定这个队列就好了:
celery -A proj worker --loglevel=info -Q addtask
kombu
celery
本身也并不直接操作消息队列,实际上,celery
操作消息队列是通过Kombu
来实现的。Kombu
的基本概念如下:
Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象,是一个把消息传递封装成统一接口的库。其特点是支持多种的符合APMQ协议的消息队列系统。不仅支持原生的AMQP消息队列如RabbitMQ、Qpid,还支持虚拟的消息队列如redis、
关于AMQP协议和kombu的核心概念等到以后再讲,具体的原因非常真实,我现在也不是非常的熟悉这一块的内容。
Worker之间的交互
在前面那一章我们讲到,Celery是一个分布式的任务队列框架,这里的分布式我们通常讲的是Worker, Beat不是分布式的。既然是分布式,那就有一点点分布式的样子,各个Worker之间是如何交互的呢? 最简单的一个例子就是,那么多的worker,我往队列里面发送一个消息,交给哪个Worker去消费是个问题,让他们自己去抢吧,听起来不太优雅,现在毕竟是法治社会,我们要和谐,要民主。
文章开头我们说过,Worker之间的选举本身是通过Gossip协议实现的,Celery在Worker的层面并没有所畏的Master存在去协调所有Worker的运行。所以最终交由哪个Worker去执行,是由Worker互相之间的民主商议决定的,并不是由一个Master进程自己拍板决定的。
关于Gossip协议,大家可以看这篇文章:
cloud.tencent.com/developer/a…
具体在Celery源码中的位置是:
celery.worker.consumer.gossip.Gossip
在此之前我们需要明确这两点:
- 每个Worker都维护一个逻辑时钟clock,逻辑时钟的值就是当前Worker本身的排序
- 每次选举会有三个基本信息:
- id 标明这是哪一次的选举
- topic: 主题,标示action的类型
- action: 本次选举的目的,选举成功的worker将会执行这个 action
大致的选举顺序如下:
- 当control意识到需要一次选举的时候,就会调用本worker的gossip发送一个
worker-event
给所有其他的Worker。 - 其他的Worker收到之后,就将自己的Clock给回复过去。
- 一段时间之后(通常很快),每个worker都能收到其他所有Worker的clock,然后在自己本地进行对比,看看自己是不是最大的那一个。如果是的话,自己处理这个任务,如果不是的话,那就不管它。因为与此同时,其他的worker也在重复相同的过程,clock最大的那个worker迟早会发现自己是最大的,然后消费者个任务,所以不用担心任务无法被消费的问题。除非消息传送的过程中丢了。
除了选举之外,worker之间几乎谁也不搭理谁,互不干涉,谁拿到任务谁就去执行就好了。
听起来非常的简单,但实际上Gossip和其他的协议一样,也存在不可避免的缺陷:
- 消息延迟:节点随机向少数几个节点发送消息,消息最终是通过多个轮次的散播而到达全网,不可避免的造成消息延迟。这对于及时性要求非常高的系统是不适用的。
- 消息冗余:节点定期随机选择周围节点发送消息,而收到消息的节点也会重复该步骤,因此不可避免地引起同一节点多次接收同一消息,增加消息处理的压力。一次通信会对网路带宽、CUP资源造成很大的负载,而这些负载又受限于 通信频率,该频率又影响着算法收敛的速度。
- 拜占庭问题:如果有一个恶意传播消息的节点,Gossip协议的分布式系统就会出问题。
Worker的组成部分
在celery中,如何快速知道某个组件的组成部分非常的有规律,我们只需要找到对应的核心组件,然后找到对应的steps就行, 比如在Worker中,我们先是找到了Worker这个类,然后发现Worker继承了WorkController这个类, 点进去一看,Blueprint
这个子类映入眼帘。
class Blueprint(bootsteps.Blueprint):
"""Worker bootstep blueprint."""
name = 'Worker'
default_steps = {
'celery.worker.components:Hub',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
}
-
Hub: Eventloop 的封装对象,
-
Pool: Worker维护的池子(进程池,线程池,协程), 实际上任务的执行是交由这些子进程or线程完成的。
-
Beat: 只有再加 -beat 的时候才会开启。
-
Timer: 用于执行定时任务的 Timer, 当我们设置一个任务五秒后执行的时候,这个任务并不会直接进入任务队列,而是先进入Timer中维护的队列,时间到了在扔到任务执行队列中去。
-
StateDB: 用户持久化Worker重启的时候的数据。
-
Consumer: 任务消费者,负责从broker那里接受消息,然后封装消息交由对应的逻辑去处理。
-
WorkerComponent: 在线调节进程池大小。
问题:
- celery 是如何启动的?
- 我们定义的task celery 是如何扫描并注册到celery的?
- worker 是如何消费任务的?
- celery beat 轮询任务的?
后面的篇幅我们将在翻看源码的时候,去解答这些问题。
转载自:https://juejin.cn/post/7352681668551786546