likes
comments
collection
share

Flask之SQLAlchemy的基本使用

作者站长头像
站长
· 阅读数 12

SQLAlchemy

SQLAlchemy是一种在Python中使用的流行的SQL工具包,提供了ORM(对象关系映射),ORM可以将Python类和SQL表进行映射,它允许开发人员通过面向对象的方式使用关系数据库。

SQLAlchemy支持多种主流数据库,如MySQL、PostgreSQL、SQLite等,并且提供了非常丰富的功能,可以轻松地处理复杂的数据库操作。

Flask-SQLAlchemy是Flask 框架的一个扩展,其对SQLAlchemy进行了封装,借助Flask-SQLAlchemy,可以使用Python类定义数据库模型,然后使用这些类来创建、读取、更新和删除相应的数据库表中的记录。此外,Flask-SQLAlchemy还提供了对数据库迁移的支持,使得随着应用程序的不断演进,修改数据库模式变得更加容易。

SQLAlchemy的基本使用

安装

安装Flask-SQLAlchemy

pip install flask-sqlalchemy

使用MySQL数据库,还需安装MySQL的Python客户端库

pip install mysqlclient

数据库连接配置

在Flask中使用Flask-SQLAlchemy需要进行配置,主要配置有三项:

1.SQLALCHEMY_DATABASE_URI :数据库的连接信息

Postgres: postgresql://user:password@localhost/mydatabase

MySQL: mysql://user:password@localhost/mydatabase

Oracle: oracle://user:password@127.0.0.1:1521/sidname

SQLite: sqlite:////absolute/path/to/foo.db

2.SQLALCHEMY_TRACK_MODIFICATIONS : 动态追踪修改,可以设置为True或False,⼀般情况下设置False

3.SQLALCHEMY_ECHO :显示生成的SQL语句,可用于调试

在Flask应用中引入相关模块和配置数据库连接

配置参数放在Flask的应用配置app.config中,有2种方式:

第一种

from flask import Flask
app = Flask(__name__)

# 定义配置对象
class Config(object):
    SQLALCHEMY_DATABASE_URI = 'mysql://root:mysql@127.0.0.1:3306/db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)

第二种

from flask import Flask
app = Flask(__name__)

# 配置数据库的连接信息
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/db'
# 关闭动态追踪修改的警告信息
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 展示sql语句
app.config['SQLALCHEMY_ECHO'] = True

配置参数说明

名字备注
SQLALCHEMY_DATABASE_URI连接数据库URI
SQLALCHEMY_BINDS一个映射 binds 到连接 URI 的字典。更多 binds 的信息见用 Binds 操作多个数据库
SQLALCHEMY_ECHO如果设置为Ture, SQLAlchemy 会记录所有 发给 stderr 的语句,对调试有用
SQLALCHEMY_RECORD_QUERIES可以用于显式地禁用或启用查询记录。查询记录 在调试或测试模式自动启用。更多信息见get_debug_queries()
SQLALCHEMY_NATIVE_UNICODE可以用于显式禁用原生 unicode 支持。当使用 不合适的指定无编码的数据库默认值时,这对于 一些数据库适配器是必须的(比如 Ubuntu 上 某些版本的 PostgreSQL )
SQLALCHEMY_POOL_SIZE数据库连接池的大小。默认是引擎默认值(通常 是 5 )
SQLALCHEMY_POOL_TIMEOUT设定连接池的连接超时时间。默认是 10
SQLALCHEMY_POOL_RECYCLE多少秒后自动回收连接。这对 MySQL 是必要的, 它默认移除闲置多于 8 小时的连接。注意如果 使用了 MySQL , Flask-SQLALchemy 自动设定 这个值为 2 小时

创建SQLAlchemy对象

方式一:

# 导入扩展包flask_sqlalchemy
from flask_sqlalchemy import SQLAlchemy

# 直接实例化sqlalchemy对象,传⼊app
db = SQLAlchemy(app)

方式二:

# 导入扩展包flask_sqlalchemy
from flask_sqlalchemy import SQLAlchemy

# 通过⼿动调⽤初始话app的函数
db = SQLAlchemy()
db.init_app(app)

注意:在单独运行调试时,对数据库操作需要在Flask的应用上下文中进行

  with app.app_context():
      User.query.all()

ORM模型类参数说明

字段类型

类型名python中类型说明
Integerint普通整数,一般是32位
SmallIntegerint取值范围小的整数,一般是16位
BigIntegerint或long不限制精度的整数
Floatfloat浮点数
Numericdecimal.Decimal普通整数,一般是32位
Stringstr变长字符串
Textstr变长字符串,对较长或不限长度的字符串做了优化
Unicodeunicode变长Unicode字符串
UnicodeTextunicode变长Unicode字符串,对较长或不限长度的字符串做了优化
Booleanbool布尔值
Datedatetime.date时间
Timedatetime.datetime日期和时间
LargeBinarystr二进制文件

列选项

选项名说明
primary_key如果为True,代表表的主键
unique如果为True,代表这列不允许出现重复的值
index如果为True,为这列创建索引,提高查询效率
nullable如果为True,允许有空值,如果为False,不允许有空值
default为这列定义默认值

关系选项

选项名说明
backref在关系的另一模型中添加反向引用
primaryjoin明确指定两个模型之间使用的联结条件
uselist如果为False,不使用列表,而使用标量值
order_by指定关系中记录的排序方式
secondary指定多对多关系中关系表的名字
secondary join在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件

定义ORM模型类

创建User类继承自db.Model类,同时定义id、name、mobile、gender、....等属性,对应数据库中表user的列。

class User(db.Model):
    __tablename__ = 'user'

    class GENDER:
        MALE = 0
        FEMALE = 1

    id = db.Column('user_id', db.Integer, primary_key=True, doc='用户ID')
    mobile = db.Column(db.String, doc='手机号')
    password = db.Column(db.String, doc='密码')
    name = db.Column('user_name', db.String, doc='昵称')
    gender = db.Column(db.Integer, default=GENDER.FEMALE, doc='性别')
    birthday = db.Column(db.Date, doc='生日')
    is_delete = db.Column(db.Boolean, default=False, doc='是否删除')
    # 当模型类字段与表字段不一致,可在Column函数第一个参数指定
    time = db.Column('create_time', db.DateTime, default=datetime.now, doc='创建时间')
    update_time = db.Column('update_time', db.DateTime, default=datetime.now, onupdate=datetime.now, doc='更新时间')

    # primaryjoin定义连接条件 : param1:另外一方类名 param2: 具体连接条件
    follows = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')

创建Car类继承自db.Model

class Car(db.Model):
    __tablename__ = 'car'

    class TYPE:
        SUV = 0
        SEDAN = 1
        PICKUP = 2

    id = db.Column('car_id', db.Integer, primary_key=True, doc='主键ID')
    user_id = db.Column(db.Integer, doc='用户ID')
    type = db.Column(db.Integer, doc='类型')
    name = db.Column(db.String, doc='名称')
    price = db.Column(db.Numeric, default=0.00, doc='价格')

常用参数说明:

db.Model:所有模型类都应该继承自 db.Model。

__tablename__:指定模型类对应的数据库表名。如果不指定,则默认为类名的小写形式。

db.Column:用来定义模型类中的各个字段,需要指定字段类型。

primary_key=True:用来指定主键字段。

default:用来指定字段的默认值。

unique=True:用来指定字段的唯一性约束。

index=True:用来指定字段是否需要创建索引。

db.ForeignKey():用来定义外键关系。需要传入对应的表格的主键作为参数

db.relationship():用来定义模型之间的关系。第一个参数需要传入要关联的模型类名,第二个参数可以通过 backref 来指定反向引用

lazy:用来指定关系的加载方式,有两种常见的方式:
	 lazy=True:表示使用惰性加载,即在首次访问相关属性时才会加载数据。
     lazy=False:表示立即加载,即在查询时同时加载相关数据。

创建数据库表

可以手动创建数据库表,也可以通过迁移的方式,创建数据库表。

CREATE TABLE `user` (
  `user_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  `mobile` char(11) NOT NULL COMMENT '手机号',
  `password` varchar(93) NULL COMMENT '密码',
  `user_name` varchar(32) NULL COMMENT '昵称',
  `gender` tinyint(1) NOT NULL DEFAULT '0' COMMENT '性别',
	`birthday` date NULL COMMENT '生日',    
	`is_delete` tinyint(1) NOT NULL DEFAULT '1' COMMENT '是否删除',
	`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
                                
  PRIMARY KEY (`user_id`),
  UNIQUE KEY `mobile` (`mobile`),
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户信息表';



CREATE TABLE `car` (
  `car_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用户ID',
  `type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '类型',
	`name` varchar(20) NOT NULL COMMENT '名称',
	`price` decimal(10,2)  DEFAULT '0.00' COMMENT '价格',
  PRIMARY KEY (`car_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='车辆表';

完整使用示例

from datetime import datetime

from flask import Flask
# 导入扩展包flask_sqlalchemy
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
# 配置数据库的连接信息
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:123456@localhost/demo'
# 关闭动态追踪修改的警告信息
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 展示sql语句
app.config['SQLALCHEMY_ECHO'] = True

# 实例化sqlalchemy对象,并且和程序实例关联
db = SQLAlchemy(app)


class User(db.Model):
    __tablename__ = 'user'

    class GENDER:
        MALE = 0
        FEMALE = 1

    id = db.Column('user_id', db.Integer, primary_key=True, doc='用户ID')
    mobile = db.Column(db.String, doc='手机号')
    password = db.Column(db.String, doc='密码')
    name = db.Column('user_name', db.String, doc='昵称')
    gender = db.Column(db.Integer, default=GENDER.FEMALE, doc='性别')
    birthday = db.Column(db.Date, doc='生日')
    is_delete = db.Column(db.Boolean, default=False, doc='是否删除')
    # 当模型类字段与表字段不一致,可在Column函数第一个参数指定
    time = db.Column('create_time', db.DateTime, default=datetime.now, doc='创建时间')
    update_time = db.Column('update_time', db.DateTime, default=datetime.now, onupdate=datetime.now, doc='更新时间')

    # primaryjoin定义连接条件 : param1:另外一方类名 param2: 具体连接条件
    follows = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')


class Car(db.Model):
    __tablename__ = 'car'

    class TYPE:
        SUV = 0
        SEDAN = 1
        PICKUP = 2

    id = db.Column('car_id', db.Integer, primary_key=True, doc='主键ID')
    user_id = db.Column(db.Integer, doc='用户ID')
    type = db.Column(db.Integer, doc='类型')
    name = db.Column(db.String, doc='名称')
    price = db.Column(db.Numeric, default=0.00, doc='价格')


if __name__ == '__main__':
    app.run()

简单的CRUD操作

# 插入一条记录
user = User(name='张三', mobile='12345678910')
db.session.add(user)
db.session.commit()

# 查询记录
users = User.query.all()
print(users)

# 更新记录
user = User.query.filter_by(name='张三').first()
user.mobile  = '12345678910'
db.session.commit()

# 删除记录
user = User.query.filter_by(name='张三').first()
db.session.delete(user)
db.session.commit()

数据库迁移

首先在MySQL中创建数据库,接着定义模型类,通过迁移的方式,创建数据库表。

实现数据库迁移,需要用到扩展包:

flask-script:提供程序运行、迁移的脚本命令

flask-migrate:提供数据库迁移的功能

创建启动文件manage.py实现数据库迁移

app = Flask(__name__)
 
# 从flask_script中导入脚本管理器
from flask_script import Manager
# 从flask_migrate导入迁移工具、迁移命令
from flask_migrate import Migrate, MigrateCommand

# 实例化脚本管理器对象
manager = Manager(app)

# 创建SQLAlchemy对象
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

# 让迁移工具和程序实例app、sqlalchemy实例关联
Migrate(app, db)

# 添加迁移命令
manager.add_command('db', MigrateCommand)

if __name__ == '__main__':
    manager.run()

在终端中通过命令执行迁移

初始化迁移仓库

python manage.py db init

生成迁移脚本文件

python manage.py db migrate 

python manage.py db migrate -m init_tables

执行迁移脚本文件

python manage.py db upgrade

SQLAlchemy的CRUD操作

通过模型类的数据库会话对象db.session进行ORM类的CRUD操作,其封装了对数据库的基本操作,如:提交数据、回滚、添加、删除等

增加

@app.route('/add')
def add():
    # 添加数据
    user = User(mobile='12345678910', name='flask')
    # 把创建的模型类对象添加给数据库会话对象
    db.session.add(user)
    # 提交数据到数据库中
    db.session.commit()
    # 添加多个数据
    user1 = User(mobile='12345678911', name='flask1')
    user2 = User(mobile='12345678912', name='flask2')
    db.session.add_all([user1, user2])
    # 提交数据到数据库中
    db.session.commit()
    return 'add ok'


if __name__ == '__main__':
    app.run()

修改

更新和删除都必须要commit提交数据

user = User.query.get(1)
user.name = 'flask'
db.session.add(user) 
db.session.commit()
 User.query.filter_by(id=1).update({'name':'flask'})
  db.session.commit()

删除

更新和删除都必须要commit提交数据

 user = User.query.order_by(User.id.desc()).first()
  db.session.delete(user)
  db.session.commit()
  User.query.filter(User.mobile='12345678910').delete()
  db.session.commit()

查询

基本查询

all():查询所有,返回列表

select * from 表名;
 
User.query.all()

first():查询第一个,返回对象

select * from 表名 limit 1;

User.query.first()

get():根据主键ID获取对象,若主键不存在返回None

select * from 表名 where id=1;

User.query.get(1)

另一种查询方式

db.session.query(User).all()

db.session.query(User).first()

db.session.query(User).get(1)

过滤查询

filter_by:进行过虑,查询条件可以为空,默认查询所有,参数为模型类的字段名,只能使⽤赋值运算符,必须使⽤查询执⾏器;

select mobile from 表名 where mobile='12345678910'

User.query.filter_by(mobile='12345678910').all()

User.query.filter_by(mobile='12345678910').first()
# 查询条件是and关系
User.query.filter_by(mobile='12345678910', id=1).first()  

filter:进行过虑,查询条件可以为空,默认查询所有,参数为模型类名加上字段名,可以使用丰富的运算符,保修使用查询执⾏器;

User.query.filter(User.mobile=='12345678910').first()

运算符

逻辑运算符:与或⾮都需要导⼊使⽤,多条件默认是and关系,非就是不等于

⽐较运算符:>、<、>=、<=、!=、==

逻辑或

from sqlalchemy import or_

User.query.filter(or_(User.mobile=='12345678910', User.name.endswith('sk'))).all()

逻辑与

from sqlalchemy import and_

User.query.filter(and_(User.name != '12345678910', User.mobile.startswith('sk'))).all()

逻辑非

from sqlalchemy import not_

User.query.filter(not_(User.mobile == '12345678910')).all()

偏移与限制

offset:偏移,表示起始位置

User.query.offset(2).all()

limit:限制返回条⽬数

User.query.limit(2).all()

排序

order_by:asc表示升序,desc表示降序

# 正序
User.query.order_by(User.id).all()  

# 倒序
User.query.order_by(User.id.desc()).all() 

复合查询

多条件复合查询:手机号以123开始,按用户id倒序排序,起始位置2开始,返回3条符合的数据

User.query.filter(User.name.startswith('123')).order_by(User.id.desc()).offset(2).limit(3).all()
query = User.query.filter(User.name.startswith('123'))
query = query.order_by(User.id.desc())
query = query.offset(2).limit(3)
ret = query.all()

优化查询

ORM默认是全表扫描,使⽤load_only函数可以指定字段

# 查询所有字段
user = User.query.filter_by(id=1).first()  


# 查询指定字段
from sqlalchemy.orm import load_only
User.query.options(load_only(User.name, User.mobile)).filter_by(id=1).first() 

聚合查询

查询所有用户的拥有的SUV类型的车辆数

from sqlalchemy import func

db.session.query(Car.user_id, func.count(Car.name)).filter(Car.relation == Car.TYPE.SUV).group_by(Car.user_id).all()

关联查询

  1. 使用ForeignKey
# 一方
class User(db.Model):
    # relationship:指定关联对象Car,表示一个用户可以拥有多辆车
    cars = db.relationship('Car')

# 多方
class Car(db.Model):
	# ForeignKey: 指定car属于那个用户
    user_id = db.Column(db.Integer, db.ForeignKey('user.user_id'), doc='用户ID')
    
    # 在flask-sqlalchemy中返回模型类对象的数据
    def __repr__(self):
        car = {
            'car_id': self.id,
            'name': self.name,
            'type': self.type,
            'price': self.price,
        }
        return str(car)
@app.route('/test')
def test():
	# select *  from user where user_id=1
    user = User.query.get(1)
    print(user)
    # select * from car where user_id=1
    print(user.cars)
    for car in user.cars:
        print(car.name)
    return 'ok'

uselist:返回数据是否已列表形式返回。Talse:user.cars得到的是一个对象,否则是一个InstrumentedList类型,需要遍历

class User(db.Model):
    cars = db.relationship('Car', uselist=False)
@app.route('/test')
def test():
    user = User.query.get(1)
    print(user)
    print(user.cars)
    print(user.cars.name)
    return 'ok'

backref: 反向引用,在查询时使用反向引用来获取关联对象的属性值

class User(db.Model):
 cars = db.relationship('Car', uselist=False, backref='myuser')
@app.route('/test')
def test():
    car = Car.query.get(1)
    print(car.myuser)
    return 'ok'
  1. 使用primaryjoin
# 一方
class User(db.Model):
    cars = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')

# 多方
class Car(db.Model):
	id = db.Column('car_id', db.Integer, primary_key=True, doc='主键ID')
    user_id = db.Column(db.Integer, doc='用户ID')
    
@app.route('/test')
def test():
    user = User.query.get(1)
    print(user)
    print(user.cars)
    for car in user.cars:
        print(car.name)
    return 'ok'
  1. 指定字段关联查询
class User(db.Model):
	cars = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')
from sqlalchemy.orm import contains_eager,load_only
 
@app.route('/test')
def test():
    # 使用了 join() 和 contains_eager() 方法来实现关联查询

    # User.query.join(User.cars):在查询 User 表时,关联查询 Cars 表
    # options:为查询添加选项
    # load_only:指定只加载部分字段,以提高查询效率
    # contains_eager:加载 User 表与 Cars 表的关联数据。Cars是User模型中定义的cars属性,它是一个 relationship 属性,表示一个用户可以拥有多个车辆
    # oad_only(Car.name):指定只加载 Cars 表中的 user_id 字段,而不加载其他字段
    # filter:对查询结果进行过滤
    # all:执行查询,并返回查询结果
    # sql:  SELECT car.car_id AS car_car_id, car.name AS car_name, user.user_id AS user_user_id, user.user_name AS user_user_name FROM user INNER JOIN car ON user.user_id = car.user_id WHERE user.user_name = %s
    all = User.query.join(User.cars).options(load_only(User.name),contains_eager(User.cars).load_only(Car.name)).filter(User.name == 'flask').all()
    print(all)
    for item in all:
        print(item)
    return 'ok'

事务

flask-sqlalchemy中⾃带事务⽀持,会默认开启事务

可以⼿动触发回滚:db.session.rollback()
@app.route('/test')
def test():
    try:
        User.query.filter_by(id=1).update({'name': 'rollback'})
        1/0
        db.session.commit()
    except:
        db.session.rollback()
    return 'ok'