10分钟掌握 FastAPI 与 SQLAlchemy + alembic 一套数据库操作SQLAlchemy SQLA
SQLAlchemy
SQLAlchemy 是一个强大的 Python 数据操作工具库,它提供了一整套的企业级持久化模式,用于高效的与各种关系型数据库交互。本文试图较为全面的了解 SQLAlchemy,官方文档在内容排版,和内容编排,似乎并不是很友好,或者说它离做项目还是比较远,本文更多的是倾向于 FastAPI 中使用 SQLAlchemy,同时也会简单聊聊迁移工具 alembic。
资源
- sqlalchemy 官方网站
- sqlalchemy Github 托管地址
- fastapi sql-databases 中使用了 sqlalchemy ORM。
- alembic 官方文档
官方也给了一些使用案例,在 sqlalchemy 的 example 中,需要的小伙伴可以自行获取。
核心概念
与实际开发体验直接相关的 SQLAlchemy 关键字包含以下,它们非常实用:
- 引擎:Engine
- 链接:Connection
- 表:Table
- 表列:Column
- 表集合:MetaData
- 结构:Schema
- 事务:Transaction
当然你对其全面感兴趣可以去看看官网和源码熟悉。
定义引擎和连接地址
SQLAlchemy 支持不同的数据库,可以使用不同的数据引擎。
from sqlalchemy import create_engine
DATABASE_URL = os.getenv("DATABASE_URL") # 从环境变量中读取
engine = create_engine(DATABASE_URL)
SQLAlchemy 支持两种模式,一种是 SQL 操作,另外一种是 ORM 模式。当然这里指介绍 ORM 模式。
ORM 相关
使用 ORM 操作数据,首先需要一个 Session 对象(也可以认为是数据库),通过 Session 对象操作数据库:
Session
用于管理数据库操作的工作单元,负责查询、添加、修改、删除对象。
from sqlalchemy.orm import sessionmaker
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Declarative Base
基类,用于定义 ORM 模型,通常通过 declarative_base()
创建。
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
Model (模型)/Mapped Class
用户定义的类,继承自 Declarative Base,用于映射数据库表。
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, relationship
# 创建 Declarative Base
Base = declarative_base()
# 定义映射类 User
class User(Base):
__tablename__ = 'users' # 映射到 'users' 表
id = Column(Integer, primary_key=True) # 对应 'users' 表的 'id' 列
name = Column(String(50), nullable=False) # 对应 'users' 表的 'name' 列
email = Column(String(120), unique=True) # 对应 'users' 表的 'email' 列
# 关系映射
posts = relationship('Post', back_populates='author') # 一对多关系
# 定义映射类 Post
class Post(Base):
__tablename__ = 'posts' # 映射到 'posts' 表
id = Column(Integer, primary_key=True) # 对应 'posts' 表的 'id' 列
title = Column(String(100), nullable=False) # 对应 'posts' 表的 'title' 列
content = Column(String(200)) # 对应 'posts' 表的 'content' 列
user_id = Column(Integer, ForeignKey('users.id')) # 外键,指向 'users' 表的 'id' 列
# 关系映射
author = relationship('User', back_populates='posts') # 多对一关系
其他
- Relationship: 定义模型之间的关系,如一对多、多对多等。
- Backref: 在一个模型类中定义后,自动在关联的模型类中创建反向关系。
- Back_populates: 在两个模型类中双向定义,手动设置反向关系。
- Lazy: 指定关系的加载策略(如惰性加载、立即加载等)。
- Query: 生成查询语句的接口,提供多种方法来筛选、排序、联接查询结果。
- Association Table: 中间表,用于处理多对多关系。
- Association Object: 用于在多对多关系中存储额外的数据。
表列 Column 和Schema (模式)
SQLAlchemy 使用 Class 定义一个表模型,它必须继承 Base 基础类,使用 Column 定义个列,以下是 Column 可能的属性,对应数据库的各种约束:
- Type: 定义列的数据类型,比如
Integer
,String
,Boolean
,DateTime
等。 - Primary Key: 主键,用于唯一标识表中的每一行。
- Foreign Key: 外键,用于定义表与表之间的关联关系。
- Unique Constraint: 唯一约束,确保表中的某一列或几列的数据是唯一的。
- Index: 索引,加速查询操作。
- Check Constraint: 检查约束,用于对列的数据进行条件限制。
- nullable: 指定该列是否可以为空。默认情况下列是可为空的。
- default: 指定该列的默认值,如果插入数据时没有指定该列的值,则使用该默认值。
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True) # 主键
username = Column(String(50), nullable=False, unique=True) # 唯一约束
email = Column(String(120), nullable=False, unique=True) # 唯一约束
age = Column(Integer, CheckConstraint('age >= 18'), nullable=False) # 检查约束,确保年龄 >= 18
# 关系映射
posts = relationship('Post', back_populates='author')
# 创建索引
__table_args__ = (
Index('idx_username', 'username'), # 为 username 创建索引
)
# 定义 Post 模型
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True) # 主键
title = Column(String(100), nullable=False) # 标题
content = Column(String(200)) # 内容
user_id = Column(Integer, ForeignKey('users.id'), nullable=False) # 外键,指向 users 表的 id 列
# 关系映射
author = relationship('User', back_populates='posts')
关系
定义好模型和列之后,数据常用的就是字段、表之间的关系处理
One-to-Many (一对多)
定义父对象和多个子对象之间的关系。
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String)
posts = relationship('Post', back_populates='author')
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String)
user_id = Column(Integer, ForeignKey('users.id'))
author = relationship('User', back_populates='posts')
Many-to-One (多对一)
定义多个对象共享一个父对象的关系。
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String)
user_id = Column(Integer, ForeignKey('users.id'))
author = relationship('User', back_populates='posts')
Many-to-Many (多对多)
通过中间表定义多个对象之间的关系。
association_table = Table('association', Base.metadata,
Column('student_id', Integer, ForeignKey('students.id')),
Column('course_id', Integer, ForeignKey('courses.id'))
)
class Student(Base):
__tablename__ = 'students'
id = Column(Integer, primary_key=True)
name = Column(String)
courses = relationship('Course', secondary=association_table, back_populates='students')
class Course(Base):
__tablename__ = 'courses'
id = Column(Integer, primary_key=True)
title = Column(String)
students = relationship('Student', secondary=association_table, back_populates='courses')
One-to-One (一对一)
定义两个对象之间一对一的关系。
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String)
profile = relationship('Profile', uselist=False, back_populates='user')
class Profile(Base):
__tablename__ = 'profiles'
id = Column(Integer, primary_key=True)
bio = Column(String)
user_id = Column(Integer, ForeignKey('users.id'), unique=True)
user = relationship('User', back_populates='profile')
Slef-to-Slef (一对一)
定义自己引用关系
class Employee(Base):
__tablename__ = 'employees'
id = Column(Integer, primary_key=True)
name = Column(String)
manager_id = Column(Integer, ForeignKey('employees.id'))
subordinates = relationship('Employee', backref='manager', remote_side=[id])
CRUD
使用 SQLAlchemy 快速进行 CRUD。
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
# 创建 Declarative Base
Base = declarative_base()
# 定义 User 模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), nullable=False, unique=True)
email = Column(String(120), nullable=False, unique=True)
age = Column(Integer, nullable=False)
# 创建数据库引擎和会话
engine = create_engine(<your_database_url>)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# 1. 创建新用户
new_user = User(username='alice', email='alice@example.com', age=30)
session.add(new_user)
session.commit()
# 2. 读取用户数据
users = session.query(User).all()
for user in users:
print(f'User: {user.username}, Email: {user.email}, Age: {user.age}')
# 3. 更新用户年龄
user = session.query(User).filter_by(username='alice').first()
if user:
user.age = 31
session.commit()
# 4. 删除用户
user = session.query(User).filter_by(username='alice').first()
if user:
session.delete(user)
session.commit()
Querying (查询)
- Filter: 用于过滤查询结果。
- Order By: 用于对查询结果进行排序。
- Join: 用于联接多个表的数据。
- Group By: 用于将查询结果分组。
- Subquery: 用于创建子查询,将子查询的结果用于其他查询。
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine, func
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# 创建 Declarative Base
Base = declarative_base()
# 定义 User 模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), nullable=False, unique=True)
age = Column(Integer, nullable=False)
# 定义 Post 模型
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(100), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
author = relationship('User')
# 创建数据库引擎和会话
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# 添加一些示例数据
user1 = User(username='alice', age=30)
user2 = User(username='bob', age=22)
user3 = User(username='carol', age=17)
post1 = Post(title='Alice\'s First Post', author=user1)
post2 = Post(title='Bob\'s First Post', author=user2)
post3 = Post(title='Alice\'s Second Post', author=user1)
session.add_all([user1, user2, user3, post1, post2, post3])
session.commit()
# Filter 示例
adult_users = session.query(User).filter(User.age >= 18).all()
# Order By 示例
users_by_age = session.query(User).order_by(User.age).all()
# Join 示例
posts_with_authors = session.query(Post, User).join(User, Post.user_id == User.id).all()
# Group By 示例
age_groups = session.query(User.age, func.count(User.id)).group_by(User.age).all()
# Subquery 示例
subquery = session.query(User.id).filter(User.age > 20).subquery()
posts_of_adults = session.query(Post).filter(Post.user_id.in_(subquery)).all()
关联查询
- 基础的
join
操作 outerjoin
操作- 多表连接
subquery
和exists
aliased
和复杂查询
下面是一个示例:
# 定义模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), nullable=False, unique=True)
posts = relationship('Post', back_populates='author')
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(100), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
author = relationship('User', back_populates='posts')
comments = relationship('Comment', back_populates='post')
class Comment(Base):
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
content = Column(String(200))
post_id = Column(Integer, ForeignKey('posts.id'))
post = relationship('Post', back_populates='comments')
# 添加一些示例数据
user1 = User(username='alice')
user2 = User(username='bob')
post1 = Post(title='Alice\'s Post', author=user1)
post2 = Post(title='Bob\'s Post', author=user2)
comment1 = Comment(content='Great post!', post=post1)
comment2 = Comment(content='Thanks for sharing!', post=post2)
session.add_all([user1, user2, post1, post2, comment1, comment2])
session.commit()
# 1. 基础的 join 操作
basic_join_query = session.query(Post).join(User).filter(User.username == 'alice').all()
for post in basic_join_query:
print(f'Post Title: {post.title}, Author: {post.author.username}')
# 2. outerjoin 操作
outerjoin_query = session.query(Post).outerjoin(User).all()
for post in outerjoin_query:
print(f'Post Title: {post.title}, Author: {post.author.username if post.author else "No Author"}')
# 3. 多表连接
multiple_join_query = session.query(Post).join(Comment).filter(Comment.content == 'Great post!').all()
for post in multiple_join_query:
print(f'Post Title: {post.title}, Comments: {[c.content for c in post.comments]}')
# 4. subquery 和 exists
subquery = session.query(Post.id).filter(Post.title.like('Alice%')).subquery()
exists_query = session.query(Comment).filter(Comment.post_id.in_(subquery)).all()
for comment in exists_query:
print(f'Comment Content: {comment.content}, Post Title: {comment.post.title}')
# 5. aliased 和复杂查询
user_alias = aliased(User)
complex_query = session.query(Post, user_alias).join(user_alias, Post.user_id == user_alias.id).filter(user_alias.username == 'alice').all()
for post, author in complex_query:
print(f'Post Title: {post.title}, Author: {author.username}')
事务
在 SQLAlchemy 中,事务由 Session
对象管理。Session
自动处理事务的开始、提交和回滚。下面是一些常见的事务操作示例:
创建和使用事务
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String, unique=True, nullable=False)
try:
# 开始事务
user1 = User(username='alice')
user2 = User(username='bob')
# 添加用户到会话
session.add(user1)
session.add(user2)
# 提交事务
session.commit()
print("Users have been added successfully.")
except Exception as e:
# 回滚事务
session.rollback()
print(f"An error occurred: {e}")
finally:
# 关闭会话
session.close()
事务嵌套
try:
with session.begin(): # 开始一个事务
user1 = User(username='charlie')
session.add(user1)
# 嵌套事务
with session.begin_nested():
user2 = User(username='david')
session.add(user2)
# 提交外部事务
print("Users have been added successfully.")
except Exception as e:
# 回滚事务
session.rollback()
print(f"An error occurred: {e}")
finally:
session.close()
异常处理和回滚
try:
# 进行一些数据库操作
user = User(username='eve')
session.add(user)
# 故意引发异常
raise Exception("Simulated error")
session.commit() # 提交事务
except Exception as e:
session.rollback() # 回滚事务
print(f"An error occurred: {e}")
finally:
session.close()
上下文管理器(Context Manager)
from sqlalchemy.orm import scoped_session, sessionmaker
# 创建作用域会话
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
with Session() as session:
try:
user = User(username='frank')
session.add(user)
session.commit() # 提交事务
except Exception as e:
session.rollback() # 回滚事务
print(f"An error occurred: {e}")
Advanced Features (高级功能)
- Hybrid Properties: 允许在 ORM 对象中定义方法,像属性一样使用。
- Events: 用于在特定操作前或后触发事件处理函数。
- Custom Types: 自定义列的数据类型。
- Mixins: 用于在多个模型中共享代码的类。
当我们配置好了数据想改你的内容,当我们需要修改我们的模型或者其他的配置,数据就需要迁移工具了 SQLAlchemy 和 FastAPI 常用的迁移工具是 alembic。
alembic 数据迁移工具
Alembic 是 SQLAlchemy 的数据库迁移工具,用于跟踪数据库架构的变化。
pip install alembic # 安装
cd <root_dir>
alembic init alembic # 初始化
- 确保在
alembic/env.py
中配置了target_metadata
。
from myapp.models import Base
target_metadata = Base.metadata
[alembic]
# Path to migration scripts
script_location = alembic
[database]
# SQLAlchemy Database URL
sqlalchemy.url = postgresql://user:password@localhost/dbname
- 迁移
alembic revision --autogenerate -m "create user and post tables" #生成迁移文件
alembic downgrade -1 # 回退迁移
alembic upgrade head # 将迁移脚本应用到数据库中
alembic current # 显示当前数据库应用的版本。
alembic history # 显示迁移历史及其状态。
alembic revision -m "描述信息"
小结
本文试图一文了解 SQLAlchemy 作为 ORM 使用的技巧和迁移工具 alembic 使用与总结。较为前面的总结了相关内容,读者可以在较短的时间内掌握 SQLAlchemy 相关内容,如果熟悉其他语言的数据工具,学习上手就更加容易高效了。
转载自:https://juejin.cn/post/7403163989632581668