FastAPI + SQLAlchemy 异步编程异步 因为官方上和一些开源项目大多以同步的方式展现。异步在 FastA
异步
因为官方上和一些开源项目大多以同步的方式展现。异步在 FastAPI 提高并发量非常重要。这里分享一些关于 FastAPI 异步使用的经验。
如果你是第一个前端,对 Promise + async/await 和生成器比较了解。那么 Python 异步也很好理解。都是基于事件循环。
sqlalchemy 安装
这里以 pg 为例安装
pip install sqlalchemy[asyncio] asyncpg
这里需要安装 sqlalchemy 支持异步特性的。同样需要支持异步的 asyncpg pg 数据库接口库。
或者在 pyproject.toml
中添加
[tool.poetry.dependencies]
sqlalchemy = {extras = ["asyncio"], version = "^2.0.35"}
asyncpg = "^0.29.0"
创建 sqlachemly 异步
这里我们直接给出代码:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from typing import AsyncGenerator
from app.config.config import get_settings
settings = get_settings()
engine = create_async_engine(settings.DATABASE_URL, echo=True)
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
expire_on_commit=False,
class_=AsyncSession,
)
Base = declarative_base()
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with SessionLocal() as session:
yield session
async def init_db_async():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("Tables created successfully")
核心点就是在:AsyncSession 上,我们需要创建一个异步的 Session。下面分析一个步骤:
- 从环境变量中加载数据库地址
- 使用
create_async_engine
创建异步引擎(这里输出了调试信息)。 - 然后使用
sessionmaker
创建爱你一个一步的 Session,这个 Session 就是后续基础。 - Base 是基类,所有的表都是通过它创建。
在 FastAPI 中使用方便我们还需要做一些辅助的事情。
初始化异步数据库
# main.py
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db_async()
yield
app = FastAPI(
title="FastAPI Antd Admin FastAPI",
summary="FastAPI Antd admin",
description="一个管理系统 With FastAPI",
debug=config.DEBUG,
lifespan=lifespan
)
我们使用功能 lifespan 生命周期中初始化异步数据库。并在 FastAPI 实例化 app 的时候使用。后面我们就通过路由注入的方式进行 db 了。
FastAPI 异步路由
异步路由非常简单,直接在 router 装饰器装饰的函数上,添加 async 关键字即可。在需要等待的位置添加 await。
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.schemas.response import ResponseModel, ResponseSuccessModel
from app.db.client import get_db
import app.services.news.news as nns
router = APIRouter(tags=["Client News"])
@router.get("/{id}")
async def get_news_by_id(id: int, db: AsyncSession =Depends(get_db)):
data = await nns.get_news_by_id_service(db, id)
return ResponseSuccessModel(data=data)
之前路由中抽象中过 get_db
函数,我们在 FastAPI 的路由中使用 Depends 注入 get_db 依赖异步使用 db。如果你的分层比较多使用 db 比较深,对类型的可能也是需求的,AsyncSession 是来自于 sqlalchemy.ext.asyncio
,这一点需要注意。
通用抽象
在数据库中进行抽象,如果你的数据库都是一些 crud 操作。我们不妨把这些内容进行抽象,我们分析一下:
- count
- list/all
- create
- update
- delete/delete many
需要注意的在做操作的时候,与同步还是有区别的。最大的区别就是 db.query
不能直接用了。下面一个简单的抽象:
from sqlalchemy import select, func, delete, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import Select
from typing import List, Type, Optional, Any, Dict
from sqlalchemy.orm import DeclarativeMeta
async def get_count(
db: AsyncSession,
model: Type[DeclarativeMeta],
) -> int:
"""
获取指定模型的数据库存储的数量
Args:
db (AsyncSession): 数据库会话
model (DeclarativeMeta): 数据库模型
Returns:
int: 模型数量
"""
# 获取模型的主键列,默认为第一个主键
primary_key_column = model.__mapper__.primary_key[0]
# 查询模型记录的数量
query = select(func.count(primary_key_column))
result = await db.execute(query)
# 获取计数结果
count = result.scalar()
return count
async def get_all(
db: AsyncSession,
model: Type[DeclarativeMeta],
order_by: Optional[Any] = None,
) -> List[Any]:
"""
获取指定模型的所有数据
Args:
db (AsyncSession): 数据库会话
model (DeclarativeMeta): 数据库模型
order_by (Optional[Any]): 排序字段,SQLAlchemy 排序表达式
Returns:
List[Any]: 模型数据列表
"""
# 初始化查询
query = select(model)
# 应用排序
if order_by:
query = query.order_by(order_by)
# 执行查询
result = await db.execute(query)
data = result.scalars().all()
return data
async def get_list(
db: AsyncSession,
model: Type[DeclarativeMeta],
order_by: Optional[Any] = None,
filter: Optional[Any] = None,
page: int = 1,
pageSize: int = 10
) -> List[Any]:
"""
获取指定模型的数据列表
Args:
db (AsyncSession): 数据库会话
model (Base): 数据库模型
order_by (str): 排序字段
filter (str): 过滤条件
page (int, optional): 页码. Defaults to 1.
pageSize (int, optional): 每页数量. Defaults to 10.
Returns:
list: 模型数据列表
"""
offset = (page - 1) * pageSize # Calculate the offset
query: Select = select(model)
if order_by:
query = query.order_by(order_by)
if filter:
query = query.where(filter)
query = select(model).order_by().offset(offset).limit(pageSize) # Apply limit and offset
result = await db.execute(query)
data = result.scalars().all()
return data
async def get_by_id(
db: AsyncSession,
model: Type[DeclarativeMeta],
id: int
) -> Optional[Any]:
"""
根据指定模型的 id 获取数据
Args:
db (AsyncSession): 数据库会话
model (Type[DeclarativeMeta]): 数据库模型
id (int): 模型 id
Returns:
Optional[Any]: 模型数据,如果未找到则返回 None
"""
# 创建查询
query = select(model).where(model.id == id)
# 执行查询
result = await db.execute(query)
data = result.scalar() # 获取单个结果
return data # 如果未找到,data 将为 None
async def get_by_name(
db: AsyncSession,
model: Type[DeclarativeMeta],
name: str
) -> Optional[Any]:
"""
根据指定模型的 name 获取数据
Args:
db (AsyncSession): 数据库会话
model (Type[DeclarativeMeta]): 数据库模型
name (str): 模型 name
Returns:
Optional[Any]: 模型数据,如果未找到则返回 None
"""
# 创建查询
query = select(model).where(model.name == name)
# 执行查询
result = await db.execute(query)
data = result.scalar() # 获取单个结果
return data # 如果未找到,data 将为 None
async def create(
db: AsyncSession,
model: Type[DeclarativeMeta],
obj_in: Dict[str, Any]
) -> DeclarativeMeta:
"""
创建指定模型的数据
Args:
db (AsyncSession): 数据库会话
model (DeclarativeMeta): 数据库模型
obj_in (Dict[str, Any]): 要插入的模型数据,作为字典传入
Returns:
DeclarativeMeta: 创建的模型实例
"""
# 创建数据库对象
db_obj = model(**obj_in)
db.add(db_obj)
# 提交事务
await db.commit()
# 刷新对象以获取数据库生成的值(如自增ID)
await db.refresh(db_obj)
return db_obj
async def update_by_id(
db: AsyncSession,
model: Type[DeclarativeMeta],
id: int,
new_data: Dict[str, Any]
) -> Optional[Any]:
"""
更新指定模型的数据
Args:
db (AsyncSession): 数据库会话
model (Type[DeclarativeMeta]): 数据库模型
id (int): 模型 id
new_data (Dict[str, Any]): 新的数据,键为字段名,值为更新内容
Returns:
Optional[Any]: 更新后的模型数据,如果未找到则返回 None
"""
# 执行更新操作
query = update(model).where(model.id == id).values(**new_data)
result = await db.execute(query)
if result.rowcount == 0:
# 如果没有更新任何行,返回 None
return None
await db.commit()
# 直接返回更新后的记录
updated_record = await db.execute(select(model).where(model.id == id))
return updated_record.scalar_one_or_none()
async def delete_by_id(
db: AsyncSession,
model: Type[DeclarativeMeta],
id: int
) -> int:
"""
单个删除指定模型的数据
Args:
db (AsyncSession): 数据库会话
model (Type[DeclarativeMeta]): 数据库模型
id (int): 模型 id
Returns:
int: 删除的行数,如果未找到记录,则返回 0
"""
# 创建删除语句
query = delete(model).where(model.id == id)
# 执行删除操作
result = await db.execute(query)
# 提交事务
await db.commit()
# 返回删除的行数
return result.rowcount
async def delete_by_ids(
db: AsyncSession,
model: Type[DeclarativeMeta],
ids: List[int]
) -> int:
"""
批量删除指定模型的数据
Args:
db (AsyncSession): 数据库会话
model (Type[DeclarativeMeta]): 数据库模型
ids (List[int]): 模型id列表
Returns:
int: 删除的行数,如果没有删除任何记录则返回 0
"""
if not ids:
return 0 # 如果 ids 列表为空,直接返回 0
# 创建删除语句
stmt = delete(model).where(model.id.in_(ids))
# 执行删除操作
result = await db.execute(stmt)
# 提交事务
await db.commit()
# 返回删除的行数
return result.rowcount
小结
本文主要分享 FastAPI + SQLAlchemy 异步的一些小经验。异步使用很简单,但是需要理解背后的原理。同时如果你已经熟悉了同步,异步在具体的操作上也不一样的。希望这些经验能够帮助到读者。
转载自:https://juejin.cn/post/7422848805048041472