likes
comments
collection
share

10分钟掌握 FastAPI 与 SQLAlchemy + alembic 一套数据库操作SQLAlchemy SQLA

作者站长头像
站长
· 阅读数 51

10分钟掌握 FastAPI 与 SQLAlchemy + alembic 一套数据库操作SQLAlchemy SQLA

SQLAlchemy

SQLAlchemy 是一个强大的 Python 数据操作工具库,它提供了一整套的企业级持久化模式,用于高效的与各种关系型数据库交互。本文试图较为全面的了解 SQLAlchemy,官方文档在内容排版,和内容编排,似乎并不是很友好,或者说它离做项目还是比较远,本文更多的是倾向于 FastAPI 中使用 SQLAlchemy,同时也会简单聊聊迁移工具 alembic。

资源

官方也给了一些使用案例,在 sqlalchemy 的 example 中,需要的小伙伴可以自行获取。

核心概念

与实际开发体验直接相关的 SQLAlchemy 关键字包含以下,它们非常实用:

  • 引擎:Engine
  • 链接:Connection
  • 表:Table
  • 表列:Column
  • 表集合:MetaData
  • 结构:Schema
  • 事务:Transaction

当然你对其全面感兴趣可以去看看官网和源码熟悉。

定义引擎和连接地址

SQLAlchemy 支持不同的数据库,可以使用不同的数据引擎。

from sqlalchemy import create_engine

DATABASE_URL = os.getenv("DATABASE_URL") # 从环境变量中读取
engine = create_engine(DATABASE_URL)

SQLAlchemy 支持两种模式,一种是 SQL 操作,另外一种是 ORM 模式。当然这里指介绍 ORM 模式。

ORM 相关

使用 ORM 操作数据,首先需要一个 Session 对象(也可以认为是数据库),通过 Session 对象操作数据库:

Session

用于管理数据库操作的工作单元,负责查询、添加、修改、删除对象。

from sqlalchemy.orm import sessionmaker

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Declarative Base

基类,用于定义 ORM 模型,通常通过 declarative_base() 创建。

from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

Model (模型)/Mapped Class

用户定义的类,继承自 Declarative Base,用于映射数据库表。

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, relationship

# 创建 Declarative Base
Base = declarative_base()

# 定义映射类 User
class User(Base):
    __tablename__ = 'users'  # 映射到 'users' 表

    id = Column(Integer, primary_key=True)  # 对应 'users' 表的 'id' 列
    name = Column(String(50), nullable=False)  # 对应 'users' 表的 'name' 列
    email = Column(String(120), unique=True)  # 对应 'users' 表的 'email' 列

    # 关系映射
    posts = relationship('Post', back_populates='author')  # 一对多关系

# 定义映射类 Post
class Post(Base):
    __tablename__ = 'posts'  # 映射到 'posts' 表

    id = Column(Integer, primary_key=True)  # 对应 'posts' 表的 'id' 列
    title = Column(String(100), nullable=False)  # 对应 'posts' 表的 'title' 列
    content = Column(String(200))  # 对应 'posts' 表的 'content' 列
    user_id = Column(Integer, ForeignKey('users.id'))  # 外键,指向 'users' 表的 'id' 列

    # 关系映射
    author = relationship('User', back_populates='posts')  # 多对一关系

其他

  • Relationship: 定义模型之间的关系,如一对多、多对多等。
    • Backref: 在一个模型类中定义后,自动在关联的模型类中创建反向关系。
    • Back_populates: 在两个模型类中双向定义,手动设置反向关系。
    • Lazy: 指定关系的加载策略(如惰性加载、立即加载等)。
  • Query: 生成查询语句的接口,提供多种方法来筛选、排序、联接查询结果。
  • Association Table: 中间表,用于处理多对多关系。
  • Association Object: 用于在多对多关系中存储额外的数据。

表列 Column 和Schema (模式)

SQLAlchemy 使用 Class 定义一个表模型,它必须继承 Base 基础类,使用 Column 定义个列,以下是 Column 可能的属性,对应数据库的各种约束:

  • Type: 定义列的数据类型,比如 Integer, String, Boolean, DateTime 等。
  • Primary Key: 主键,用于唯一标识表中的每一行。
  • Foreign Key: 外键,用于定义表与表之间的关联关系。
  • Unique Constraint: 唯一约束,确保表中的某一列或几列的数据是唯一的。
  • Index: 索引,加速查询操作。
  • Check Constraint: 检查约束,用于对列的数据进行条件限制。
  • nullable: 指定该列是否可以为空。默认情况下列是可为空的。
  • default: 指定该列的默认值,如果插入数据时没有指定该列的值,则使用该默认值。
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)  # 主键
    username = Column(String(50), nullable=False, unique=True)  # 唯一约束
    email = Column(String(120), nullable=False, unique=True)  # 唯一约束
    age = Column(Integer, CheckConstraint('age >= 18'), nullable=False)  # 检查约束,确保年龄 >= 18

    # 关系映射
    posts = relationship('Post', back_populates='author')

    # 创建索引
    __table_args__ = (
        Index('idx_username', 'username'),  # 为 username 创建索引
    )

# 定义 Post 模型
class Post(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)  # 主键
    title = Column(String(100), nullable=False)  # 标题
    content = Column(String(200))  # 内容
    user_id = Column(Integer, ForeignKey('users.id'), nullable=False)  # 外键,指向 users 表的 id 列

    # 关系映射
    author = relationship('User', back_populates='posts')

关系

定义好模型和列之后,数据常用的就是字段、表之间的关系处理

One-to-Many (一对多)

定义父对象和多个子对象之间的关系。

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String)

    posts = relationship('Post', back_populates='author')

class Post(Base):
    __tablename__ = 'posts'
    id = Column(Integer, primary_key=True)
    title = Column(String)
    user_id = Column(Integer, ForeignKey('users.id'))

    author = relationship('User', back_populates='posts')

Many-to-One (多对一)

定义多个对象共享一个父对象的关系。

class Post(Base):
    __tablename__ = 'posts'
    id = Column(Integer, primary_key=True)
    title = Column(String)
    user_id = Column(Integer, ForeignKey('users.id'))

    author = relationship('User', back_populates='posts')

Many-to-Many (多对多)

通过中间表定义多个对象之间的关系。

association_table = Table('association', Base.metadata,
    Column('student_id', Integer, ForeignKey('students.id')),
    Column('course_id', Integer, ForeignKey('courses.id'))
)

class Student(Base):
    __tablename__ = 'students'
    id = Column(Integer, primary_key=True)
    name = Column(String)

    courses = relationship('Course', secondary=association_table, back_populates='students')

class Course(Base):
    __tablename__ = 'courses'
    id = Column(Integer, primary_key=True)
    title = Column(String)

    students = relationship('Student', secondary=association_table, back_populates='courses')

One-to-One (一对一)

定义两个对象之间一对一的关系。

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String)

    profile = relationship('Profile', uselist=False, back_populates='user')

class Profile(Base):
    __tablename__ = 'profiles'
    id = Column(Integer, primary_key=True)
    bio = Column(String)
    user_id = Column(Integer, ForeignKey('users.id'), unique=True)

    user = relationship('User', back_populates='profile')

Slef-to-Slef (一对一)

定义自己引用关系

class Employee(Base):
    __tablename__ = 'employees'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    manager_id = Column(Integer, ForeignKey('employees.id'))

    subordinates = relationship('Employee', backref='manager', remote_side=[id])

CRUD

使用 SQLAlchemy 快速进行 CRUD。

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker

# 创建 Declarative Base
Base = declarative_base()

# 定义 User 模型
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String(50), nullable=False, unique=True)
    email = Column(String(120), nullable=False, unique=True)
    age = Column(Integer, nullable=False)

# 创建数据库引擎和会话
engine = create_engine(<your_database_url>)

Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

# 1. 创建新用户
new_user = User(username='alice', email='alice@example.com', age=30)
session.add(new_user)
session.commit()

# 2. 读取用户数据
users = session.query(User).all()
for user in users:
    print(f'User: {user.username}, Email: {user.email}, Age: {user.age}')

# 3. 更新用户年龄
user = session.query(User).filter_by(username='alice').first()
if user:
    user.age = 31
    session.commit()

# 4. 删除用户
user = session.query(User).filter_by(username='alice').first()
if user:
    session.delete(user)
    session.commit()

Querying (查询)

  • Filter: 用于过滤查询结果。
  • Order By: 用于对查询结果进行排序。
  • Join: 用于联接多个表的数据。
  • Group By: 用于将查询结果分组。
  • Subquery: 用于创建子查询,将子查询的结果用于其他查询。
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine, func
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

# 创建 Declarative Base
Base = declarative_base()

# 定义 User 模型
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String(50), nullable=False, unique=True)
    age = Column(Integer, nullable=False)

# 定义 Post 模型
class Post(Base):
    __tablename__ = 'posts'
    id = Column(Integer, primary_key=True)
    title = Column(String(100), nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'))

    author = relationship('User')

# 创建数据库引擎和会话
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

# 添加一些示例数据
user1 = User(username='alice', age=30)
user2 = User(username='bob', age=22)
user3 = User(username='carol', age=17)

post1 = Post(title='Alice\'s First Post', author=user1)
post2 = Post(title='Bob\'s First Post', author=user2)
post3 = Post(title='Alice\'s Second Post', author=user1)

session.add_all([user1, user2, user3, post1, post2, post3])
session.commit()

# Filter 示例
adult_users = session.query(User).filter(User.age >= 18).all()

# Order By 示例
users_by_age = session.query(User).order_by(User.age).all()

# Join 示例
posts_with_authors = session.query(Post, User).join(User, Post.user_id == User.id).all()

# Group By 示例
age_groups = session.query(User.age, func.count(User.id)).group_by(User.age).all()

# Subquery 示例
subquery = session.query(User.id).filter(User.age > 20).subquery()
posts_of_adults = session.query(Post).filter(Post.user_id.in_(subquery)).all()

关联查询

  • 基础的 join 操作
  • outerjoin 操作
  • 多表连接
  • subqueryexists
  • aliased 和复杂查询

下面是一个示例:

# 定义模型
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String(50), nullable=False, unique=True)
    posts = relationship('Post', back_populates='author')

class Post(Base):
    __tablename__ = 'posts'
    id = Column(Integer, primary_key=True)
    title = Column(String(100), nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'))
    author = relationship('User', back_populates='posts')
    comments = relationship('Comment', back_populates='post')

class Comment(Base):
    __tablename__ = 'comments'
    id = Column(Integer, primary_key=True)
    content = Column(String(200))
    post_id = Column(Integer, ForeignKey('posts.id'))
    post = relationship('Post', back_populates='comments')

# 添加一些示例数据
user1 = User(username='alice')
user2 = User(username='bob')
post1 = Post(title='Alice\'s Post', author=user1)
post2 = Post(title='Bob\'s Post', author=user2)
comment1 = Comment(content='Great post!', post=post1)
comment2 = Comment(content='Thanks for sharing!', post=post2)
session.add_all([user1, user2, post1, post2, comment1, comment2])
session.commit()

# 1. 基础的 join 操作
basic_join_query = session.query(Post).join(User).filter(User.username == 'alice').all()
for post in basic_join_query:
    print(f'Post Title: {post.title}, Author: {post.author.username}')

# 2. outerjoin 操作
outerjoin_query = session.query(Post).outerjoin(User).all()
for post in outerjoin_query:
    print(f'Post Title: {post.title}, Author: {post.author.username if post.author else "No Author"}')

# 3. 多表连接
multiple_join_query = session.query(Post).join(Comment).filter(Comment.content == 'Great post!').all()
for post in multiple_join_query:
    print(f'Post Title: {post.title}, Comments: {[c.content for c in post.comments]}')

# 4. subquery 和 exists
subquery = session.query(Post.id).filter(Post.title.like('Alice%')).subquery()
exists_query = session.query(Comment).filter(Comment.post_id.in_(subquery)).all()
for comment in exists_query:
    print(f'Comment Content: {comment.content}, Post Title: {comment.post.title}')

# 5. aliased 和复杂查询
user_alias = aliased(User)
complex_query = session.query(Post, user_alias).join(user_alias, Post.user_id == user_alias.id).filter(user_alias.username == 'alice').all()
for post, author in complex_query:
    print(f'Post Title: {post.title}, Author: {author.username}')

事务

在 SQLAlchemy 中,事务由 Session 对象管理。Session 自动处理事务的开始、提交和回滚。下面是一些常见的事务操作示例:

创建和使用事务

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String, unique=True, nullable=False)

try:
    # 开始事务
    user1 = User(username='alice')
    user2 = User(username='bob')
    
    # 添加用户到会话
    session.add(user1)
    session.add(user2)
    
    # 提交事务
    session.commit()
    print("Users have been added successfully.")
except Exception as e:
    # 回滚事务
    session.rollback()
    print(f"An error occurred: {e}")
finally:
    # 关闭会话
    session.close()

事务嵌套

try:
    with session.begin():  # 开始一个事务
        user1 = User(username='charlie')
        session.add(user1)
        # 嵌套事务
        with session.begin_nested():
            user2 = User(username='david')
            session.add(user2)
        # 提交外部事务
    print("Users have been added successfully.")
except Exception as e:
    # 回滚事务
    session.rollback()
    print(f"An error occurred: {e}")
finally:
    session.close()

异常处理和回滚

try:
    # 进行一些数据库操作
    user = User(username='eve')
    session.add(user)
    # 故意引发异常
    raise Exception("Simulated error")
    session.commit()  # 提交事务
except Exception as e:
    session.rollback()  # 回滚事务
    print(f"An error occurred: {e}")
finally:
    session.close()

上下文管理器(Context Manager)

from sqlalchemy.orm import scoped_session, sessionmaker

# 创建作用域会话
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)

with Session() as session:
    try:
        user = User(username='frank')
        session.add(user)
        session.commit()  # 提交事务
    except Exception as e:
        session.rollback()  # 回滚事务
        print(f"An error occurred: {e}")

Advanced Features (高级功能)

  • Hybrid Properties: 允许在 ORM 对象中定义方法,像属性一样使用。
  • Events: 用于在特定操作前或后触发事件处理函数。
  • Custom Types: 自定义列的数据类型。
  • Mixins: 用于在多个模型中共享代码的类。

当我们配置好了数据想改你的内容,当我们需要修改我们的模型或者其他的配置,数据就需要迁移工具了 SQLAlchemy 和 FastAPI 常用的迁移工具是 alembic。

alembic 数据迁移工具

Alembic 是 SQLAlchemy 的数据库迁移工具,用于跟踪数据库架构的变化。

pip install alembic # 安装
cd <root_dir>
alembic init alembic # 初始化
  • 确保在 alembic/env.py 中配置了 target_metadata
from myapp.models import Base
target_metadata = Base.metadata
[alembic]
# Path to migration scripts
script_location = alembic

[database]
# SQLAlchemy Database URL
sqlalchemy.url = postgresql://user:password@localhost/dbname
  • 迁移
alembic revision --autogenerate -m "create user and post tables" #生成迁移文件
alembic downgrade -1 # 回退迁移
alembic upgrade head # 将迁移脚本应用到数据库中
alembic current # 显示当前数据库应用的版本。
alembic history # 显示迁移历史及其状态。
alembic revision -m "描述信息"

小结

本文试图一文了解 SQLAlchemy 作为 ORM 使用的技巧和迁移工具 alembic 使用与总结。较为前面的总结了相关内容,读者可以在较短的时间内掌握 SQLAlchemy 相关内容,如果熟悉其他语言的数据工具,学习上手就更加容易高效了。

转载自:https://juejin.cn/post/7403163989632581668
评论
请登录