likes
comments
collection
share

celery 创建有优先级的队列

作者站长头像
站长
· 阅读数 18

某次投递临时任务时,投递数量太多,导致日常任务全部积压了,于是研究了下celery队列的优先级。但是网上搜到的信息比较杂,很多都不太直观,这里详细说明一下。

示例代码

from celery import Celery
from kombu import Exchange, Queue


app = Celery(
    'default',
    broker=REDIS_URI	#这里选择redis做任务存储
)


app.conf.update(
    worker_pool_restarts=True,
    timezone='Asia/Shanghai',
    enable_utc = True,
    worker_prefetch_multiplier=1,	#当任务比较耗时的时候,需要关注这个参数
    task_queues = (
        Queue('q1', routing_key='q1', queue_arguments={'x-max-priority': 10}),
        Queue('q2', routing_key='q2', queue_arguments={'x-max-priority': 10}),
    ),
    task_routes = {     #默认的路由,实际投递任务时可以手动指定队列
        'celery_task.task1': {'queue': 'q1'},
        'celery_task.task2': {'queue': 'q2'}
    }
)

解析

要创建优先级的队列,主要关注config里面的task_queues,需要在定义的队列里面加上参数queue_arguments={'x-max-priority': 10},这个x-max-priority表示此队列最大的优先级数字,一般10够用。配置了此参数还没完,需要你在投递任务的时候手动加上优先级字段。

from celery_task import task1

task1.apply_async(
	kwargs={'key':value},
	priority=5		#这个字段表示此任务具体是多少优先级
)

task1.apply_async(
	kwargs={'key':value}, 
	priority=9		#这个字段表示此任务具体是多少优先级
)

task1.apply_async(
	kwargs={'key':value}, 
)

重要:不同优先级的任务执行顺序与broker相关,rabbitMQ是数字越大,越先执行,而redis是数字越小越先执行

比如上面的代码,同时投递了三个task1,根据默认路由,都进入了q1队列,我选用的是redis,所以priority=5会优先执行,其次是priority=9。最后一个没有设置优先级,会最后执行(根据网上其他人的结果来的,我没试过,既然设置了优先级队列,投递任务时就要都加上优先级字段,提高代码可读性)。

到这里还没结束,上面的配置虽然给你了10个级别可以选择,但实际上只有4个,参考:stackoverflow.com/questions/1…

实际的4个级别分别是 ['celery0', 'celery3', 'celery6', 'celery9'],其他级别会分配到相近的级别里面,比如给两个任务分别赋予优先级6和7,最终实际上两个任务都会被分配到优先级6里面去,此时实际执行时无法确保赋6的比赋7的先执行

所以默认情况下,最好直接给任务分配0、3、6、9这四种优先级,这样可以确保任务先后执行不出问题。

其他问题

有时配置好了之后,还是可能出现高优先级的任务迟迟不执行的情况,此时大概率是任务耗时较长,同时worker_prefetch_multiplier没有配置的原因(示例代码中已经配置好了),这个字段表示预取任务的数量,默认值为4,当前worker如果并发数为10,那么这个worker实际会取50个任务,其中10个正在执行,剩下40个任务是预取的,处于待机状态,此时如果插入一个更高优先级的任务,至少要41个任务完成后才能真正的开始执行(10个正在跑的+31个待机的,之后还剩9个任务,空余出了一个线程可以跑新的高优先级任务了)。

所以要保证先后执行的效果,这个值越小越好,按官网描述,4表示一次取4*并发数个任务,但是我实际测试下来,应该是取了(1+4)*并发数个任务,实际设为1够用了,放一段官方文档的描述:

How many messages to prefetch at a time multiplied by the number of concurrent processes. The default is 4 (four messages for each process). The default setting is usually a good choice, however – if you have very long running tasks waiting in the queue and you have to start the workers, note that the first worker to start will receive four times the number of messages initially. Thus the tasks may not be fairly distributed to the workers.

To disable prefetching, set worker_prefetch_multiplier to 1. Changing that setting to 0 will allow the worker to keep consuming as many messages as it wants.

参考

celery官方文档:celeryproject.readthedocs.io/zh_CN/lates…

参考文章1:segmentfault.com/a/119000003…

参考文章2:blog.csdn.net/wyh1618/art…