分布式队列神器 Celery,你了解多少?|Python 主题月
我们在web开发中会经常遇到异步任务,对于一些消耗资源和时间的操作,如果不从应用中单独抽出来的话,体验是非常不好的,例如:一个手机验证码登录的过程,当用户输入手机号点击发送后,如果如果直接扔给后端应用去执行的话,就会引起网络IO的阻塞,那整个应用就非常不友好了,那如何优雅的解决这个问题呢?
我们可以使用异步任务,当接收到请求后,我们可以在业务逻辑的处理时触发一个异步任务,前端立即返回读秒让用户接收验证码,同时由于是异步执行的任务,后端也可以处理其他的请求,这就非常的完美了。
实现异步任务的工具有很多,其原理也都是去实现一个消息队列,这里我们主要来了解一下Celery。
Celery 是什么?
Celery简介
Celery是一个由Python编写的简单,灵活且可靠的分布式系统,它可以处理大量消息,同时也提供了操作、维护该分布式系统所需的工具。
说白点就是,Celery 是一个异步任务的调度工具,它专注于实时任务处理,支持任务调度。有了Celery,我们可以快速建立一个分布式任务队列并能够简单的管理。Celery虽然是由python编写, 但协议可以用任何语言实现。迄今,已有 Ruby 实现的RCelery、node.js 实现的node-celery以及一个PHP 客户端 。
Celery架构
此处借鉴一张图片,这张图非常明了把Celery的组成以及工作方式描述出来了。
Celery的架构由下面三个部分组成:
Brokers
意为中间件/中间人,在这里指的是任务队列,我们要注意Celery本身不是任务队列,它是管理分布式任务队列的工具,换一句话说,用Celery可以快速进行任务队列的使用与管理, Celery可以方便的和第三方提供的任务队列集成,例如RabbitMQ, Redis等。
Worker
任务执行单元,是Celery提供的任务执行的单元,简单来说,它就是Celery的工人。就是一打工人,一心只有工作哈哈哈。
类似于消费者,它实时监控着任务队列,当有新的任务入队时,它会从任务队列中取出任务并执行。
backend/Task result store
任务结构存储,顾名思义,它就是用来存储Worker执行的任务的结果的地方,Celery支持以不同方式存储任务的结果,有redis,Memcached等。
简单来说,当用户、或者我们的应用中的触发器将任务入Brokers队列之后,Celery的Worker就会取出任务并执行,然后将结构保存到Task result store中。
使用Celery
简单实现
Celery及消息队列(redis/RabbitMQ)的安装过程在这里就不再赘述了,出于方便,我们这里使用redis,点击这里查看官网给出的更多的Brokers和backend支持。
首先,我们新建一个tasks.py文件。
import time
from celery import Celery
brokers = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery('tasks', broker=brokers, backend=backend)
@app.task
def add(x, y):
time.sleep(2)
return x + y
上述代码,我们导入了celery库,新建了一个celery实例,传入了broker和backend,然后创建了任务函数add,我们用time.sleep(2)来模拟耗时操作。
接下来我们要启动Celery服务,在当前命令行终端运行:
celery -A tasks worker --loglevel=info
注意:如果在Windows中要运行如下命令:
celery -A celery_app worker --loglevel=info -P eventlet
不然会报错。。。。。。
我们会看到下面的输出结果:
D:\use_Celery>celery -A tasks worker --loglevel=info -P eventlet
-------------- celery@DESKTOP-8E96VUV v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2021-07-13 15:12:19
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x35a2270
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results: redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2021-07-13 15:12:19,097: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2021-07-13 15:12:19,134: INFO/MainProcess] mingle: searching for neighbors
[2021-07-13 15:12:20,161: INFO/MainProcess] mingle: all alone
[2021-07-13 15:12:20,271: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/0.
[2021-07-13 15:12:20,273: INFO/MainProcess] celery@DESKTOP-8E96VUV ready.
这些输出包括指定的启动Celer应用的一些信息,还有注册的任务等等。
此时worker已经处于待命状态,而 broker中还没有任务 ,我们需要触发任务进入broker中,worker才能去取出任务执行。
我们新建一个add_task.py文件:
from tasks import add
result = add.delay(5, 6) # 使用celery提供的接口delay进行调用任务函数
while not result.ready():
pass
print("完成:", result.get())
我们可以看到命令窗口的输出的celery执行的日志 :
[2021-07-13 15:14:55,615: INFO/MainProcess] Received task: tasks.add[958eceff-c067-4e74-af87-6ae4f94eb80e]
[2021-07-13 15:14:57,615: INFO/MainProcess] Task tasks.add[958eceff-c067-4e74-af87-6ae4f94eb80e] succeeded in 2.0s: 11
当然我们在backend的redis中也可以看到执行任务的相关信息。
至此,一个简单的 celery 应用就完成啦。
周期/定时任务
Celery 也可以实现定时或者周期性任务,实现也很简单,只需要配置好周期任务,然后再启动要启动一个 beat 服务即可。
新建Celery配置文件celery_conf.py:
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'add': {
'task': 'tasks.add',
'schedule': timedelta(seconds=3),
'args': (16, 16)
}
}
然后在 tasks.py 中通过app.config_from_object('celery_config')
读取Celery配置:
# tasks.py
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.config_from_object('celery_config')
然后重新运行 worker,接着再运行 beat:
celery -A tasks beat
我们可以看到以下信息:
celery beat v4.4.2 (cliffs) is starting.
__ - ... __ - _
LocalTime -> 2021-07-15 09:05:32
Configuration ->
. broker -> redis://127.0.0.1:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
然后我们就可以看到启动worker的命令行在周期性的执行任务:
可以看出每3秒就有一个任务被加入队列中去执行。
那定时任务又怎样去实现呢?
也很简单,我们只需要更改一下配置文件即可:
# crontab任务
# 每周四7:30调用task.add
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'add-crontab-func': {
'task': 'tasks.add',
'schedule': crontab(hour=9, minute=20, day_of_week=4),
'args': (30, 20),
},
}
CELERY_TIMEZONE = 'Asia/Shanghai' # 配置时区信息
其中crontab(hour=9, minute=20, day_of_week=4)代表的是每周四的9点20执行一次,只要我们的Celery服务一直开着,定时任务就会按时执行;在这里我也在配置里加入了时区信息。
我在这里是9点17启动的Celery服务、运行的beat,从下面的输出可以看出,20的时候我们的定时任务就执行了。
总结
由此我们可以看出,利用 Celery 进行分布式队列管理将会大大的提高我们的开发效率,我这里也仅仅是关于Celery的简单介绍和使用,如果大家感兴趣,可以去官方文档 学习更高级更系统的用法。
最后,感谢女朋友在工作和生活中的包容、理解与支持 !
转载自:https://juejin.cn/post/6984986745456721950