避免 FastAPI 多进程环境下 ApScheduler 定时任务重复触发的方法
前言
问题还原
我们先写一个简单demo看看,定时任务到底是不是重复执行了。
......
app = createApp()
def job():
import time
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}: 定时任务执行中...")
scheduler = BackgroundScheduler()
trigger = CronTrigger.from_crontab('* * * * *')
scheduler.add_job(job, trigger=trigger)
@app.on_event("startup")
async def startup_event():
scheduler.start()
@app.on_event("shutdown")
async def shutdown_event():
scheduler.shutdown()
代码中,我们写了一个简单定时任务,每分钟执行一次输出"定时任务执行中...",定时任务的启动被放在了应用的启动事件中,这样可以确保定时任务只会被启动一次。同时,还添加了应用的关闭事件,用于在应用关闭时停止定时任务。
好了,我们来启动FastApi服务,执行如下命令
uvicorn main:app --host=0.0.0.0 --port=7777 --workers=1
这行命令中,我们先指定了workers为1,看看定时任务是否会重复执行呢?
通过输出可以看到,任务并不会重复执行,我们再来测试一下多个workers的情况,执行如下命令
uvicorn main:app --host=0.0.0.0 --port=7777 --workers=3
这行命令中,我们指定了workers为3,再看看定时任务会不会重复执行呢?
通过输出可以看到,定时任务确实被执行了多次,接下来就是想办法解决这个问题。
解决方案
1、根据问题还原模块中的测试,我们知道启动一个workers是没有问题的,但这个解决方案可能会显得有点笨拙。
2、查询网上资料,可以考虑使用一个外部的任务调度系统来管理定时任务,例如 Celery。Celery 可以让你在多个 worker 中协调任务的执行,避免重复执行的情况发生。(暂未接入使用)
3、可以考虑使用一些分布式锁来保证在多个 worker 中只有一个执行定时任务的实例,例如 Redis 分布式锁。(采用此方案来解决)
前置条件:安装redis库
pip install redis-py-cluster
我们改造上面的代码,来实现Redis分布式锁,看改造后的代码
app = FastAPI()
# 连接到 Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def job():
# 获取 Redis 分布式锁
lock = redis_client.lock("my_lock", timeout=60)
if lock.acquire(blocking=False):
try:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}: 定时任务执行中...")
finally:
lock.release()
scheduler = BackgroundScheduler()
trigger = CronTrigger.from_crontab('* * * * *')
scheduler.add_job(job, trigger=trigger)
@app.on_event("startup")
async def startup_event():
scheduler.start()
@app.on_event("shutdown")
async def shutdown_event():
scheduler.shutdown()
这段代码中,我们首先连接到了 Redis,并在定时任务中使用 Redis 分布式锁来控制任务的执行。当一个 worker 获取到锁后,才会执行任务,其他 worker 将会被阻塞直到锁被释放。这里使用的是简单的分布式锁实现。
装饰器实现:基于redlock来实现
安装redlock:pip install redlock
核心代码:
def lock(key):
"""
redis分布式锁,基于redlock
:param key: 唯一key,确保所有任务一致,但不与其他任务冲突
:return:
"""
def decorator(func):
if asyncio.iscoroutinefunction(func):
logging.info(f"执行了")
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
with RedLock(
f"distributed_lock:{func.__name__}:{key}:{str(args)}",
connection_details=settings.REDIS_NODES,
ttl=30000, # 锁释放时间为30s
):
return await func(*args, **kwargs)
except RedLockError:
print(
f"进程: {os.getpid()}获取任务失败"
)
else:
logging.info(f"else执行了")
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
lock_key = f"distributed_lock:{func.__name__}:{key}:{str(args)}"
logging.info(f"Trying to acquire lock for key: {lock_key}")
with RedLock(
f"distributed_lock:{func.__name__}:{key}:{str(args)}",
connection_details=settings.REDIS_NODES,
ttl=30000, # 锁释放时间为30s
):
logging.info(f"Lock acquired for key: {lock_key}")
return func(*args, **kwargs)
except RedLockError:
logging.error(
f"Failed to acquire lock for key: {lock_key}"
)
print(
f"进程: {os.getpid()}获取任务失败"
)
return wrapper
return decorator
这段代码定义了一个装饰器函数 lock
,用于实现基于 RedLock 的分布式锁。简单介绍一下实现原理:
lock
函数是一个装饰器工厂函数,它接受一个参数key
,该参数用于唯一标识任务的锁。- 在
lock
函数内部,定义了一个嵌套的装饰器函数decorator
,该函数接受被装饰的函数func
作为参数。 - 在
decorator
函数中,首先通过asyncio.iscoroutinefunction(func)
判断被装饰的函数是否是异步函数,如果是异步函数,则创建一个异步的wrapper
函数,否则创建一个同步的wrapper
函数。 wrapper
函数内部使用 RedLock 实例来获取分布式锁,并在获取锁成功后执行被装饰的函数func
,然后释放锁。如果获取锁失败,则打印错误信息。- 最后,根据被装饰的函数
func
是异步函数还是同步函数,返回相应类型的wrapper
函数。
这个装饰器的作用是确保同一时刻只有一个任务能够获取特定 key
对应的锁,以防止并发执行时出现数据竞争或者重复执行的情况。
有了装饰器,使用就简单多了,只需要函数增加装饰器就可以了,像这样
@lock("my_lock")
def job():
import time
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}: 定时任务执行中...")
最后
还的折腾啊,学到的东西要运用到项目中,即使是自己的demo项目,这样你会发现问题,然后解决问题,这样知识积累更快,印象更深。
转载自:https://juejin.cn/post/7350143919788916762