(九)FastAPI的数据库集成与操作
FastAPI 不仅支持高性能的异步编程,还能够与多种数据库无缝集成。本文将详细介绍如何在 FastAPI 中进行数据库集成与操作,涵盖数据库的选择、连接、操作以及最佳实践。
一、选择数据库与数据库驱动
在使用 FastAPI 开发应用时,选择合适的数据库和数据库驱动是关键。本文将以 MySQL 为例,演示如何进行数据库集成与操作。
二、配置数据库连接
使用 databases
库进行 MySQL 数据库连接配置。
2.1 安装依赖
首先,安装所需的库:
pip install databases[mysql] sqlalchemy aiomysql
2.2 配置数据库连接
先创建数据库test_db
:
CREATE DATABASE test_db CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
再创建一个 main.py
文件,并配置数据库连接:
from fastapi import FastAPI
from databases import Database
DATABASE_URL = "mysql+aiomysql://user:password@localhost/test_db"
database = Database(DATABASE_URL)
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
请确保将 user
和 password
替换为你的 MySQL 用户名和密码,localhost
替换为 MySQL 服务器地址,test_db
替换为你的数据库名称。
三、创建数据库表
使用 SQLAlchemy 定义数据模型和表结构。
3.1 定义数据模型并创建数据表
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import MetaData, Table, Column, Integer, String
# 配置数据库连接 URL,使用 aiomysql 驱动程序
DATABASE_URL = "mysql+aiomysql://root:123456@localhost/test_db"
# 创建异步数据库引擎
engine = create_async_engine(DATABASE_URL, echo=True)
# 定义元数据
metadata = MetaData()
# 定义 users 表
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("name", String(50)),
Column("email", String(50)),
)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(metadata.create_all)
print("Table created successfully.")
# 运行异步函数
asyncio.run(create_tables())
上述代码定义了一个数据模型 users
并创建了数据表,包含 id
、name
和 email
三个字段。
四、数据库操作
4.1 插入数据
使用 FastAPI 路由插入数据。
from pydantic import BaseModel
class UserIn(BaseModel):
name: str
email: str
@app.post("/users/")
async def create_user(user: UserIn):
query = users.insert().values(name=user.name, email=user.email)
last_record_id = await database.execute(query)
return {**user.dict(), "id": last_record_id}
4.2 查询数据
定义查询所有用户和查询单个用户的路由。
@app.get("/users/")
async def read_users():
query = users.select()
return await database.fetch_all(query)
@app.get("/users/{user_id}")
async def read_user(user_id: int):
query = users.select().where(users.c.id == user_id)
return await database.fetch_one(query)
4.3 更新数据
定义更新用户信息的路由。
class UserUpdate(BaseModel):
name: str
email: str
@app.put("/users/{user_id}")
async def update_user(user_id: int, user: UserUpdate):
query = users.update().where(users.c.id == user_id).values(name=user.name, email=user.email)
await database.execute(query)
return {**user.dict(), "id": user_id}
4.4 删除数据
定义删除用户的路由。
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
query = users.delete().where(users.c.id == user_id)
await database.execute(query)
return {"message": "User deleted"}
五、完整代码示例
将上述代码整合到一个完整的 FastAPI 应用中:
from fastapi import FastAPI
from databases import Database
from sqlalchemy import MetaData, Table, Column, Integer, String
from pydantic import BaseModel
DATABASE_URL = "mysql+aiomysql://root:123456@localhost/test_db"
database = Database(DATABASE_URL)
metadata = MetaData()
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("name", String(50)),
Column("email", String(50)),
)
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
class UserIn(BaseModel):
name: str
email: str
@app.post("/users/")
async def create_user(user: UserIn):
query = users.insert().values(name=user.name, email=user.email)
last_record_id = await database.execute(query)
return {**user.dict(), "id": last_record_id}
@app.get("/users/")
async def read_users():
query = users.select()
return await database.fetch_all(query)
@app.get("/users/{user_id}")
async def read_user(user_id: int):
query = users.select().where(users.c.id == user_id)
return await database.fetch_one(query)
class UserUpdate(BaseModel):
name: str
email: str
@app.put("/users/{user_id}")
async def update_user(user_id: int, user: UserUpdate):
query = users.update().where(users.c.id == user_id).values(name=user.name, email=user.email)
await database.execute(query)
return {**user.dict(), "id": user_id}
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
query = users.delete().where(users.c.id == user_id)
await database.execute(query)
return {"message": "User deleted"}
六、总结
本文详细介绍了如何在 FastAPI 中进行 MySQL 数据库集成与操作。我们选择了 MySQL 作为示例数据库,使用 databases
库进行异步数据库操作,并通过 SQLAlchemy 定义了数据模型和表结构。我们还展示了如何在 FastAPI 中实现基本的增、删、改、查操作。
通过掌握这些基本操作,你可以轻松将 FastAPI 应用与 MySQL 数据库集成,并处理复杂的数据操作。希望本文对你有所帮助,祝你在 FastAPI 的开发之旅中一切顺利!
转载自:https://juejin.cn/post/7380163683010510911