likes
comments
collection
share

(十四)FastAPI的实际项目案例分析

作者站长头像
站长
· 阅读数 48

项目概述

TaskMaster 是一个基于 FastAPI 构建的任务管理系统,具备用户认证、任务管理、异步处理、缓存和分布式任务队列等功能。该项目旨在提供一个全面的示例,展示如何使用 FastAPI 和现代 Python 库来构建高效、可扩展的 Web 应用程序。通过 TaskMaster,你将学习如何使用 FastAPI 构建从前端接口到后端处理的完整应用,理解异步编程的优势以及如何利用缓存和分布式任务队列提升应用性能。

项目结构

项目的基本结构如下:

taskmaster/

├── app/
│   ├── __init__.py
│   ├── auth.py
│   ├── config.py
|   ├── database.py
|   ├── main.py
│   ├── models.py
│   ├── schemas.py
│   ├── tasks.py
│   └── worker.py

├── tests/
│   ├── __init__.py
│   └── test_main.py

├── alembic/
│   ├── env.py
│   └── versions/

├── alembic.ini
├── Dockerfile
├── log_config.yaml
├── logging.ini
├── requirements.txt
└── README.md

项目依赖包

requirements.txt

fastapi~=0.111.0
uvicorn
sqlalchemy~=2.0.31
databases
aiomysql
alembic
pytest
httpx
python-multipart
pydantic~=2.7.4
passlib
pyjwt
celery~=5.4.0
redis
python-jose==3.3.0
aioredis~=2.0.1
fastapi-cache2

1. FastAPI 项目配置

1.1 config.py

配置文件包含数据库和 Celery 的配置。

class Settings:
    db_user: str = "root"
    db_password: str = "123456"
    db_host: str = "localhost"
    db_name: str = "test_db"


settings = Settings()

2. 数据库模型

2.1 models.py

定义数据库模型。

from sqlalchemy import Column, Integer, String
from .database import Base


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), unique=True, index=True)
    email = Column(String(50), unique=True, index=True)
    hashed_password = Column(String(100))

2.2 schemas.py

定义数据验证模型。

from pydantic import BaseModel


class UserSchema(BaseModel):
    id: int
    name: str
    email: str

    class Config:
        from_attributes = True

3. 数据库操作

3.1 database.py

设置数据库连接。

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from .config import settings

DATABASE_URL = f"mysql+aiomysql://{settings.db_user}:{settings.db_password}@{settings.db_host}/{settings.db_name}"

engine = create_async_engine(DATABASE_URL, echo=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)

Base = declarative_base()


async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)


async def get_db():
    async with SessionLocal() as session:
        async with session.begin():
            yield session

4. 路由

app/main.py 中配置路由:

import logging.config
import yaml
from jose import jwt
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from passlib.context import CryptContext
from sqlalchemy.orm import Session

from . import models, schemas, database, auth
from .database import get_db
from .tasks import add

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

SECRET_KEY = "secret_key"
ALGORITHM = "HS256"

app = FastAPI()

with open("log_config.yaml", "r") as file:
    config = yaml.safe_load(file.read())
    logging.config.dictConfig(config)

logger = logging.getLogger("myapp")


@app.on_event("startup")
async def startup_event():
    logger.info("Application startup")


@app.on_event("shutdown")
async def shutdown_event():
    logger.debug("Application shutdown")


def create_access_token(data: dict):
    to_encode = data.copy()
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


@app.post("/users/")
def create_user(name: str, email: str, password: str, db: Session = Depends(get_db)):
    # 创建用户
    hashed_password = pwd_context.hash(password)
    db_user = models.User(name=name, email=email, hashed_password=hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(database.get_db)):
    user = await db.execute(select(models.User).where(models.User.email == form_data.username))
    user = user.scalars().first()
    if not user or not pwd_context.verify(form_data.password, user.hashed_password):
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    token = create_access_token(data={"sub": user.email})
    return {"access_token": token, "token_type": "bearer"}


@app.get("/users/me", response_model=schemas.UserSchema)
async def read_users_me(current_user: models.User = Depends(auth.get_current_user)):
    return current_user


@app.post("/tasks/add")
async def create_task(x: int, y: int):
    task = add.delay(x, y)
    return {"task_id": task.id}


@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    task = add.AsyncResult(task_id)
    if task.state == "PENDING":
        response = {"state": task.state, "status": "Pending..."}
    elif task.state != "FAILURE":
        response = {"state": task.state, "result": task.result}
    else:
        response = {"state": task.state, "status": str(task.info)}
    return response

5. 数据库迁移

5.1 配置Alembic

alembic.ini 文件中配置数据库连接:

[alembic]

script_location = alembic
sqlalchemy.url = mysql+aiomysql://root:123456@localhost/test_db


[loggers]
keys = root, sqlalchemy, alembic

[handlers]
keys = console, file

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console

[logger_sqlalchemy]
level = WARN
handlers = console
qualname = sqlalchemy.engine
propagate = 0

[logger_alembic]
level = INFO
handlers = console
qualname = alembic
propagate = 0

[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic

[handler_file]
class = FileHandler
args = ('alembic.log', 'a')
level = NOTSET
formatter = generic

[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s

5.2 创建迁移脚本

初始化Alembic:

alembic init alembic

(十四)FastAPI的实际项目案例分析

创建初始迁移脚本:

alembic revision --autogenerate -m "Initial migration"
alembic upgrade head

(十四)FastAPI的实际项目案例分析

(十四)FastAPI的实际项目案例分析

6. 异步任务

6.1 配置Celery

app/worker.py 中配置Celery:

from celery import Celery

celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
    include=["app.tasks"]
)

celery_app.conf.update(
    result_expires=3600,
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
)

if __name__ == "__main__":
    celery_app.start()

启动 Celery worker:

celery -A app.worker.celery_app worker --pool=solo --loglevel=info

(十四)FastAPI的实际项目案例分析

7. 使用缓存

在实际项目中,使用缓存可以显著提高性能,特别是对于那些频繁访问但不经常变化的数据。缓存可以减少数据库查询的次数,从而减轻数据库的负担。

7.1 安装依赖

我们使用 fastapi-cache 库来实现缓存功能。首先,安装依赖:

pip install fastapi-cache2 redis

7.2 配置缓存

接下来,在项目中配置缓存。在本例中,我们使用 Redis 做缓存来演示。 在 main.py 文件中添加以前代码:

# main.py
...
from fastapi_cache.decorator import cache
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from redis.asyncio import Redis

@app.on_event("startup")
async def startup_event():
    redis = Redis(host='localhost', port=6379, decode_responses=True)
    FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
    logger.info("Application startup")
    
...

@app.get("/users/{user_id}", response_model=schemas.UserSchema)
@cache(expire=60)  # 缓存有效期为 60 秒
async def read_item(user_id: int, db: AsyncSession = Depends(database.get_db)):
    user = await db.execute(select(models.User).where(models.User.id == user_id))
    user = user.scalars().first()
    return user

(十四)FastAPI的实际项目案例分析

(十四)FastAPI的实际项目案例分析

8. 测试

8.1 编写测试用例

tests/test_main.py 中编写测试用例:

from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app, get_db
from app.database import Base

SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:123456@localhost/test_db"

engine = create_engine(SQLALCHEMY_DATABASE_URL)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base.metadata.create_all(bind=engine)


def override_get_db():
    try:
        db = TestingSessionLocal()
        yield db
    finally:
        db.close()


app.dependency_overrides[get_db] = override_get_db

client = TestClient(app)


def test_create_user():
    response = client.post("/users/?name=测试用户2&email=test2@example.com&password=123456")
    assert response.status_code == 200
    data = response.json()
    assert data["name"] == "测试用户2"
    assert data["email"] == "test2@example.com"

8.2 执行测试用例

pytest

(十四)FastAPI的实际项目案例分析

9. 部署

9.1 Docker部署

创建 Dockerfile 文件:

FROM tiangolo/uvicorn-gunicorn-fastapi:python3.8

WORKDIR /app

COPY . /app

RUN pip install --no-cache-dir -r requirements.txt

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]

9.2 使用GitHub Actions进行CI/CD

创建 .github/workflows/main.yml 文件:

name: CI/CD

on:
  push:
    branches:
      - main

jobs:
  build:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v2
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: 3.8
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Run tests
        run: pytest
      - name: Build Docker image
        run: docker build -t myapp .
      - name: Push to Docker Hub
        run: |
          echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin
          docker tag myapp:latest myapp:latest
          docker push myapp:latest

9.3 日志配置

创建 log_config.yaml 文件:

version: 1
formatters:
  default:
    format: "%(levelname)s - %(asctime)s - %(name)s - %(message)s"
handlers:
  console:
    class: logging.StreamHandler
    formatter: default
loggers:
  myapp:
    level: DEBUG
    handlers: [ console ]
    propagate: yes
root:
  level: DEBUG
  handlers: [ console ]

总结

  • 在视图函数中使用 celery 执行异步任务

(十四)FastAPI的实际项目案例分析

  • 查看 celery 异步任务执行的结果

(十四)FastAPI的实际项目案例分析

  • 用户查看自己的信息

(十四)FastAPI的实际项目案例分析

以上就是完整的FastAPI项目案例分析,包含了项目配置、数据库集成、认证与授权、路由配置、数据库迁移、异步任务、测试、部署与日志配置。每个部分都详细介绍了相关的代码和配置,确保项目能够顺利运行。

转载自:https://juejin.cn/post/7382891667672252466
评论
请登录