likes
comments
collection
share

避免 FastAPI 多进程环境下 ApScheduler 定时任务重复触发的方法

作者站长头像
站长
· 阅读数 8

前言

问题还原

我们先写一个简单demo看看,定时任务到底是不是重复执行了。

......

app = createApp()

def job():
    import time
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}: 定时任务执行中...")

scheduler = BackgroundScheduler()
trigger = CronTrigger.from_crontab('* * * * *')
scheduler.add_job(job, trigger=trigger)

@app.on_event("startup")
async def startup_event():
    scheduler.start()

@app.on_event("shutdown")
async def shutdown_event():
    scheduler.shutdown()

代码中,我们写了一个简单定时任务,每分钟执行一次输出"定时任务执行中...",定时任务的启动被放在了应用的启动事件中,这样可以确保定时任务只会被启动一次。同时,还添加了应用的关闭事件,用于在应用关闭时停止定时任务。

好了,我们来启动FastApi服务,执行如下命令

uvicorn main:app --host=0.0.0.0 --port=7777 --workers=1

这行命令中,我们先指定了workers为1,看看定时任务是否会重复执行呢? 避免 FastAPI 多进程环境下 ApScheduler 定时任务重复触发的方法 通过输出可以看到,任务并不会重复执行,我们再来测试一下多个workers的情况,执行如下命令

uvicorn main:app --host=0.0.0.0 --port=7777 --workers=3

这行命令中,我们指定了workers为3,再看看定时任务会不会重复执行呢?

避免 FastAPI 多进程环境下 ApScheduler 定时任务重复触发的方法 通过输出可以看到,定时任务确实被执行了多次,接下来就是想办法解决这个问题。

解决方案

1、根据问题还原模块中的测试,我们知道启动一个workers是没有问题的,但这个解决方案可能会显得有点笨拙。

2、查询网上资料,可以考虑使用一个外部的任务调度系统来管理定时任务,例如 Celery。Celery 可以让你在多个 worker 中协调任务的执行,避免重复执行的情况发生。(暂未接入使用)

3、可以考虑使用一些分布式锁来保证在多个 worker 中只有一个执行定时任务的实例,例如 Redis 分布式锁。(采用此方案来解决)

前置条件:安装redis库

pip install redis-py-cluster

我们改造上面的代码,来实现Redis分布式锁,看改造后的代码


app = FastAPI()

# 连接到 Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def job():
    # 获取 Redis 分布式锁
    lock = redis_client.lock("my_lock", timeout=60)
    if lock.acquire(blocking=False):
        try:
            print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}: 定时任务执行中...")
        finally:
            lock.release()

scheduler = BackgroundScheduler()
trigger = CronTrigger.from_crontab('* * * * *')
scheduler.add_job(job, trigger=trigger)

@app.on_event("startup")
async def startup_event():
    scheduler.start()

@app.on_event("shutdown")
async def shutdown_event():
    scheduler.shutdown()

这段代码中,我们首先连接到了 Redis,并在定时任务中使用 Redis 分布式锁来控制任务的执行。当一个 worker 获取到锁后,才会执行任务,其他 worker 将会被阻塞直到锁被释放。这里使用的是简单的分布式锁实现。

装饰器实现:基于redlock来实现

安装redlock:pip install redlock

核心代码:

def lock(key):
    """
    redis分布式锁,基于redlock
    :param key: 唯一key,确保所有任务一致,但不与其他任务冲突
    :return:
    """

    def decorator(func):
        if asyncio.iscoroutinefunction(func):
            logging.info(f"执行了")
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                try:
                    with RedLock(
                        f"distributed_lock:{func.__name__}:{key}:{str(args)}",
                        connection_details=settings.REDIS_NODES,
                        ttl=30000,  # 锁释放时间为30s
                    ):
                        return await func(*args, **kwargs)
                except RedLockError:
                    print(
                        f"进程: {os.getpid()}获取任务失败"
                    )

        else:
            logging.info(f"else执行了")

            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                try:
                    lock_key = f"distributed_lock:{func.__name__}:{key}:{str(args)}"
                    logging.info(f"Trying to acquire lock for key: {lock_key}")
                    with RedLock(
                        f"distributed_lock:{func.__name__}:{key}:{str(args)}",
                        connection_details=settings.REDIS_NODES,
                        ttl=30000,  # 锁释放时间为30s
                    ):
                        logging.info(f"Lock acquired for key: {lock_key}")
                        return func(*args, **kwargs)
                except RedLockError:
                    logging.error(
                        f"Failed to acquire lock for key: {lock_key}"
                    )
                    print(
                        f"进程: {os.getpid()}获取任务失败"
                    )

        return wrapper

    return decorator

这段代码定义了一个装饰器函数 lock,用于实现基于 RedLock 的分布式锁。简单介绍一下实现原理:

  1. lock 函数是一个装饰器工厂函数,它接受一个参数 key,该参数用于唯一标识任务的锁。
  2. lock 函数内部,定义了一个嵌套的装饰器函数 decorator,该函数接受被装饰的函数 func 作为参数。
  3. decorator 函数中,首先通过 asyncio.iscoroutinefunction(func) 判断被装饰的函数是否是异步函数,如果是异步函数,则创建一个异步的 wrapper 函数,否则创建一个同步的 wrapper 函数。
  4. wrapper 函数内部使用 RedLock 实例来获取分布式锁,并在获取锁成功后执行被装饰的函数 func,然后释放锁。如果获取锁失败,则打印错误信息。
  5. 最后,根据被装饰的函数 func 是异步函数还是同步函数,返回相应类型的 wrapper 函数。

这个装饰器的作用是确保同一时刻只有一个任务能够获取特定 key 对应的锁,以防止并发执行时出现数据竞争或者重复执行的情况。

有了装饰器,使用就简单多了,只需要函数增加装饰器就可以了,像这样

@lock("my_lock")
def job():
    import time
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}: 定时任务执行中...")

最后

还的折腾啊,学到的东西要运用到项目中,即使是自己的demo项目,这样你会发现问题,然后解决问题,这样知识积累更快,印象更深。