(十四)FastAPI的实际项目案例分析
项目概述
TaskMaster 是一个基于 FastAPI 构建的任务管理系统,具备用户认证、任务管理、异步处理、缓存和分布式任务队列等功能。该项目旨在提供一个全面的示例,展示如何使用 FastAPI 和现代 Python 库来构建高效、可扩展的 Web 应用程序。通过 TaskMaster,你将学习如何使用 FastAPI 构建从前端接口到后端处理的完整应用,理解异步编程的优势以及如何利用缓存和分布式任务队列提升应用性能。
项目结构
项目的基本结构如下:
taskmaster/
│
├── app/
│ ├── __init__.py
│ ├── auth.py
│ ├── config.py
| ├── database.py
| ├── main.py
│ ├── models.py
│ ├── schemas.py
│ ├── tasks.py
│ └── worker.py
│
├── tests/
│ ├── __init__.py
│ └── test_main.py
│
├── alembic/
│ ├── env.py
│ └── versions/
│
├── alembic.ini
├── Dockerfile
├── log_config.yaml
├── logging.ini
├── requirements.txt
└── README.md
项目依赖包
requirements.txt
fastapi~=0.111.0
uvicorn
sqlalchemy~=2.0.31
databases
aiomysql
alembic
pytest
httpx
python-multipart
pydantic~=2.7.4
passlib
pyjwt
celery~=5.4.0
redis
python-jose==3.3.0
aioredis~=2.0.1
fastapi-cache2
1. FastAPI 项目配置
1.1 config.py
配置文件包含数据库和 Celery 的配置。
class Settings:
db_user: str = "root"
db_password: str = "123456"
db_host: str = "localhost"
db_name: str = "test_db"
settings = Settings()
2. 数据库模型
2.1 models.py
定义数据库模型。
from sqlalchemy import Column, Integer, String
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), unique=True, index=True)
email = Column(String(50), unique=True, index=True)
hashed_password = Column(String(100))
2.2 schemas.py
定义数据验证模型。
from pydantic import BaseModel
class UserSchema(BaseModel):
id: int
name: str
email: str
class Config:
from_attributes = True
3. 数据库操作
3.1 database.py
设置数据库连接。
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from .config import settings
DATABASE_URL = f"mysql+aiomysql://{settings.db_user}:{settings.db_password}@{settings.db_host}/{settings.db_name}"
engine = create_async_engine(DATABASE_URL, echo=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
Base = declarative_base()
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_db():
async with SessionLocal() as session:
async with session.begin():
yield session
4. 路由
在 app/main.py
中配置路由:
import logging.config
import yaml
from jose import jwt
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from . import models, schemas, database, auth
from .database import get_db
from .tasks import add
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
SECRET_KEY = "secret_key"
ALGORITHM = "HS256"
app = FastAPI()
with open("log_config.yaml", "r") as file:
config = yaml.safe_load(file.read())
logging.config.dictConfig(config)
logger = logging.getLogger("myapp")
@app.on_event("startup")
async def startup_event():
logger.info("Application startup")
@app.on_event("shutdown")
async def shutdown_event():
logger.debug("Application shutdown")
def create_access_token(data: dict):
to_encode = data.copy()
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/users/")
def create_user(name: str, email: str, password: str, db: Session = Depends(get_db)):
# 创建用户
hashed_password = pwd_context.hash(password)
db_user = models.User(name=name, email=email, hashed_password=hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(database.get_db)):
user = await db.execute(select(models.User).where(models.User.email == form_data.username))
user = user.scalars().first()
if not user or not pwd_context.verify(form_data.password, user.hashed_password):
raise HTTPException(status_code=400, detail="Incorrect username or password")
token = create_access_token(data={"sub": user.email})
return {"access_token": token, "token_type": "bearer"}
@app.get("/users/me", response_model=schemas.UserSchema)
async def read_users_me(current_user: models.User = Depends(auth.get_current_user)):
return current_user
@app.post("/tasks/add")
async def create_task(x: int, y: int):
task = add.delay(x, y)
return {"task_id": task.id}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
task = add.AsyncResult(task_id)
if task.state == "PENDING":
response = {"state": task.state, "status": "Pending..."}
elif task.state != "FAILURE":
response = {"state": task.state, "result": task.result}
else:
response = {"state": task.state, "status": str(task.info)}
return response
5. 数据库迁移
5.1 配置Alembic
在 alembic.ini
文件中配置数据库连接:
[alembic]
script_location = alembic
sqlalchemy.url = mysql+aiomysql://root:123456@localhost/test_db
[loggers]
keys = root, sqlalchemy, alembic
[handlers]
keys = console, file
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_sqlalchemy]
level = WARN
handlers = console
qualname = sqlalchemy.engine
propagate = 0
[logger_alembic]
level = INFO
handlers = console
qualname = alembic
propagate = 0
[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic
[handler_file]
class = FileHandler
args = ('alembic.log', 'a')
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s
5.2 创建迁移脚本
初始化Alembic:
alembic init alembic
创建初始迁移脚本:
alembic revision --autogenerate -m "Initial migration"
alembic upgrade head
6. 异步任务
6.1 配置Celery
在 app/worker.py
中配置Celery:
from celery import Celery
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
include=["app.tasks"]
)
celery_app.conf.update(
result_expires=3600,
task_serializer='json',
accept_content=['json'],
result_serializer='json',
)
if __name__ == "__main__":
celery_app.start()
启动 Celery worker:
celery -A app.worker.celery_app worker --pool=solo --loglevel=info
7. 使用缓存
在实际项目中,使用缓存可以显著提高性能,特别是对于那些频繁访问但不经常变化的数据。缓存可以减少数据库查询的次数,从而减轻数据库的负担。
7.1 安装依赖
我们使用 fastapi-cache
库来实现缓存功能。首先,安装依赖:
pip install fastapi-cache2 redis
7.2 配置缓存
接下来,在项目中配置缓存。在本例中,我们使用 Redis
做缓存来演示。
在 main.py
文件中添加以前代码:
# main.py
...
from fastapi_cache.decorator import cache
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from redis.asyncio import Redis
@app.on_event("startup")
async def startup_event():
redis = Redis(host='localhost', port=6379, decode_responses=True)
FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
logger.info("Application startup")
...
@app.get("/users/{user_id}", response_model=schemas.UserSchema)
@cache(expire=60) # 缓存有效期为 60 秒
async def read_item(user_id: int, db: AsyncSession = Depends(database.get_db)):
user = await db.execute(select(models.User).where(models.User.id == user_id))
user = user.scalars().first()
return user
8. 测试
8.1 编写测试用例
在 tests/test_main.py
中编写测试用例:
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app, get_db
from app.database import Base
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:123456@localhost/test_db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(bind=engine)
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
def test_create_user():
response = client.post("/users/?name=测试用户2&email=test2@example.com&password=123456")
assert response.status_code == 200
data = response.json()
assert data["name"] == "测试用户2"
assert data["email"] == "test2@example.com"
8.2 执行测试用例
pytest
9. 部署
9.1 Docker部署
创建 Dockerfile
文件:
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.8
WORKDIR /app
COPY . /app
RUN pip install --no-cache-dir -r requirements.txt
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]
9.2 使用GitHub Actions进行CI/CD
创建 .github/workflows/main.yml
文件:
name: CI/CD
on:
push:
branches:
- main
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.8
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run tests
run: pytest
- name: Build Docker image
run: docker build -t myapp .
- name: Push to Docker Hub
run: |
echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin
docker tag myapp:latest myapp:latest
docker push myapp:latest
9.3 日志配置
创建 log_config.yaml
文件:
version: 1
formatters:
default:
format: "%(levelname)s - %(asctime)s - %(name)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
formatter: default
loggers:
myapp:
level: DEBUG
handlers: [ console ]
propagate: yes
root:
level: DEBUG
handlers: [ console ]
总结
-
在视图函数中使用
celery
执行异步任务
-
查看
celery
异步任务执行的结果
-
用户查看自己的信息
以上就是完整的FastAPI项目案例分析,包含了项目配置、数据库集成、认证与授权、路由配置、数据库迁移、异步任务、测试、部署与日志配置。每个部分都详细介绍了相关的代码和配置,确保项目能够顺利运行。
转载自:https://juejin.cn/post/7382891667672252466