likes
comments
collection
share

(九)FastAPI的数据库集成与操作

作者站长头像
站长
· 阅读数 35

FastAPI 不仅支持高性能的异步编程,还能够与多种数据库无缝集成。本文将详细介绍如何在 FastAPI 中进行数据库集成与操作,涵盖数据库的选择、连接、操作以及最佳实践。

一、选择数据库与数据库驱动

在使用 FastAPI 开发应用时,选择合适的数据库和数据库驱动是关键。本文将以 MySQL 为例,演示如何进行数据库集成与操作。

二、配置数据库连接

使用 databases 库进行 MySQL 数据库连接配置。

2.1 安装依赖

首先,安装所需的库:

pip install databases[mysql] sqlalchemy aiomysql

2.2 配置数据库连接

先创建数据库test_db

CREATE DATABASE test_db CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

再创建一个 main.py 文件,并配置数据库连接:

from fastapi import FastAPI
from databases import Database

DATABASE_URL = "mysql+aiomysql://user:password@localhost/test_db"
database = Database(DATABASE_URL)

app = FastAPI()

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

请确保将 userpassword 替换为你的 MySQL 用户名和密码,localhost 替换为 MySQL 服务器地址,test_db 替换为你的数据库名称。

三、创建数据库表

使用 SQLAlchemy 定义数据模型和表结构。

3.1 定义数据模型并创建数据表

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import MetaData, Table, Column, Integer, String

# 配置数据库连接 URL,使用 aiomysql 驱动程序
DATABASE_URL = "mysql+aiomysql://root:123456@localhost/test_db"

# 创建异步数据库引擎
engine = create_async_engine(DATABASE_URL, echo=True)

# 定义元数据
metadata = MetaData()

# 定义 users 表
users = Table(
    "users",
    metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("name", String(50)),
    Column("email", String(50)),
)


async def create_tables():
    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)
    print("Table created successfully.")


# 运行异步函数
asyncio.run(create_tables())

上述代码定义了一个数据模型 users并创建了数据表,包含 idnameemail 三个字段。

(九)FastAPI的数据库集成与操作

四、数据库操作

4.1 插入数据

使用 FastAPI 路由插入数据。

from pydantic import BaseModel

class UserIn(BaseModel):
    name: str
    email: str

@app.post("/users/")
async def create_user(user: UserIn):
    query = users.insert().values(name=user.name, email=user.email)
    last_record_id = await database.execute(query)
    return {**user.dict(), "id": last_record_id}

(九)FastAPI的数据库集成与操作

4.2 查询数据

定义查询所有用户和查询单个用户的路由。

@app.get("/users/")
async def read_users():
    query = users.select()
    return await database.fetch_all(query)

@app.get("/users/{user_id}")
async def read_user(user_id: int):
    query = users.select().where(users.c.id == user_id)
    return await database.fetch_one(query)

4.3 更新数据

定义更新用户信息的路由。

class UserUpdate(BaseModel):
    name: str
    email: str

@app.put("/users/{user_id}")
async def update_user(user_id: int, user: UserUpdate):
    query = users.update().where(users.c.id == user_id).values(name=user.name, email=user.email)
    await database.execute(query)
    return {**user.dict(), "id": user_id}

4.4 删除数据

定义删除用户的路由。

@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
    query = users.delete().where(users.c.id == user_id)
    await database.execute(query)
    return {"message": "User deleted"}

五、完整代码示例

将上述代码整合到一个完整的 FastAPI 应用中:

from fastapi import FastAPI
from databases import Database
from sqlalchemy import MetaData, Table, Column, Integer, String
from pydantic import BaseModel

DATABASE_URL = "mysql+aiomysql://root:123456@localhost/test_db"
database = Database(DATABASE_URL)
metadata = MetaData()

users = Table(
    "users",
    metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("name", String(50)),
    Column("email", String(50)),
)

app = FastAPI()


@app.on_event("startup")
async def startup():
    await database.connect()


@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()


class UserIn(BaseModel):
    name: str
    email: str


@app.post("/users/")
async def create_user(user: UserIn):
    query = users.insert().values(name=user.name, email=user.email)
    last_record_id = await database.execute(query)
    return {**user.dict(), "id": last_record_id}


@app.get("/users/")
async def read_users():
    query = users.select()
    return await database.fetch_all(query)


@app.get("/users/{user_id}")
async def read_user(user_id: int):
    query = users.select().where(users.c.id == user_id)
    return await database.fetch_one(query)


class UserUpdate(BaseModel):
    name: str
    email: str


@app.put("/users/{user_id}")
async def update_user(user_id: int, user: UserUpdate):
    query = users.update().where(users.c.id == user_id).values(name=user.name, email=user.email)
    await database.execute(query)
    return {**user.dict(), "id": user_id}


@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
    query = users.delete().where(users.c.id == user_id)
    await database.execute(query)
    return {"message": "User deleted"}

(九)FastAPI的数据库集成与操作

六、总结

本文详细介绍了如何在 FastAPI 中进行 MySQL 数据库集成与操作。我们选择了 MySQL 作为示例数据库,使用 databases 库进行异步数据库操作,并通过 SQLAlchemy 定义了数据模型和表结构。我们还展示了如何在 FastAPI 中实现基本的增、删、改、查操作。

通过掌握这些基本操作,你可以轻松将 FastAPI 应用与 MySQL 数据库集成,并处理复杂的数据操作。希望本文对你有所帮助,祝你在 FastAPI 的开发之旅中一切顺利!

转载自:https://juejin.cn/post/7380163683010510911
评论
请登录