likes
comments
collection
share

Fastapi框架-冷饭再炒-基础知识补充篇(4)- 异步数据库篇

作者站长头像
站长
· 阅读数 13

从前面的几个梳理看,其实大多数的是围绕我们的框架的一些基础来讲,大部分的都是依据官网的来讲解。不过多数的情况我自己没有按官网的文档的目录来。

这一篇呐,着重主要讲如何的对接的我们的数据库,一个API没数据库肯定的对接肯定是没灵魂的!而又了数据库的对接也需要接入的是支持的异步的数据库!不然异步的框架它就不‘香’了。

但是在python中,我们的数据库的对于的支持的异步的库还算是可以的,算起来也是非常的丰富滴!对于的异步的ORM库也算是有好些个。

当然原生的SQL和ORM的性能的对比上由来已久,我这里也不说太多的所谓的性能对比的比较,还是哪句老话,一切以自己的服务的要求的为导向的基础上,哪个顺手用哪个!实在是不可以用ORM的那直接的原生SQL也是没问题滴!

按我的自己的使用来说,如果可以你可以简单自己通过元类的方式封装合适自己简单的的ORM,当然你也可以直接利用现成的第三方库。

接下来我的这里主要是使用几个来简单的展开:

  • 支持的ORM-SQLAlchemy的使用(异步假象)
  • 官网推荐的async-sql-databases
  • 支持异步的ORM-peewee-async的使用(使用databases异步excute(查询)的方式)
  • 支持异步的ORM-TortoiseORM
  • 支持异步的ORM-GINO(国产品牌)

PS:GINO 递归定义为 GINO Is Not ORM,是一个基于 asyncio 和 SQLAlchemy core 的轻量级异步 Python ORM 框架,目前(2020 年初)仅支持 asyncpg 一种引擎。

因为个人的原因,常用的数据库主要是PostgreSQL,所以以下的示例,我自己都会以这个数据库类型展开。

1 官网示例:基于SQLAlchemy上封装的异步操作

前置条件需要安装:

pip install psycopg2
pip install SQLAlchemy 

PS:SQLAlchemy这个最好是要安装到最新的版本哟!不然可能没有declarative_base这个对象

这个是官网的常规的一种封装示例,首先是先看它的项目的结构:

Fastapi框架-冷饭再炒-基础知识补充篇(4)- 异步数据库篇

接下来对项目做一个简单的介绍:

  • main 是我们的服务启动的入口
  • crud 主要是定义了对我们的数据库表中的ORM模型的简单的操作(新增,查询,删除等操作)
  • database 主要我们连接数据库的一些配置信息,和我们的连接数据库的引擎对象的初始化
  • models 主要是我们的ORM对于的数据库的表的模型
  • schema 主要是和ORM模型对应的Pydantic模型的基础数据模型,主要是用于返回API结果的返回模型的绑定。

1.1 定义数据库配置和连接

database.py:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base


pg_username = 'postgres'
pg_password = '密码@密码@密码';
pg_host = '127.0.0.1'
pg_port = '5432'
pg_database = 'ces'

# 转码特殊的密码
from urllib import parse
# 如果密码中有@等特殊字符时需要把特殊的密码转码
pg_password = parse.quote_plus(pg_password)

#数据库连接的引擎对象
engine = create_engine('postgresql+psycopg2://' + pg_username + ':' + pg_password + '@' + pg_host + ':' + str(pg_port) + '/' + pg_database)

# 数据库的会滴对象
SessionLocal = sessionmaker(bind=engine)

Base = declarative_base()

1.2 定义我们的数据库表模型

需要说明的一点是:我们的数据库表模型是继承于我们的上面database的定义的Base对象。

模型这块我的为了简单演示,没完全用官网的,官网还设计的到外键什么的,后续扩展的时候在有时间不上吧!

models.py

from .database import Base
from sqlalchemy import Column, Integer, String


class User(Base):
    # 对应的数据库的表的名称
    __tablename__ = 'User'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String)
    password = Column(String)


1.3 先把我们的表给创建出来

在我们的models.py进行表的创建

from sqlalchemy_sql_app.database import Base, engine
from sqlalchemy import Column, Integer, String


class User(Base):
    __tablename__ = 'User'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String)
    password = Column(String)


if __name__ == '__main__':
    # 根据定义模型- 创建我们的表,不需要手动的创建
    Base.metadata.create_all(bind=engine)

Fastapi框架-冷饭再炒-基础知识补充篇(4)- 异步数据库篇

1.4 定义Pydantic模型schema和我们的ORM进行对应

  • SQLAlchemy模型里面用 "="来定义属性的

  • Pydantic模型里面用":"来声明类型

schema.py

from pydantic import BaseModel


class User(BaseModel):
    name: str
    password: str

    class Config:
        orm_mode = True

PS 上面的orm_mode = True 其实是起到一个关联绑定的作用。

意思就是Pydantic数据模型可以兼容SQLAlchemy数据模型,这样后续我们的封装操作我们的User(BaseModel)的时候,就可以直接的进行对应我们的ORM的模型的,这样操作起来就方便了!

1.5 定义我们的crud对我们的表进行相关的操作

crud.py

关于异步的处理前置说明

1:async def 用来声明协程

2:await 用来将协程加入一个事件循环(即上文提到的将控制权交给事件循环中)。

这里我尝试的不安官网的写 也写成定义为一个协程的对象,但是官网说了!!其实我们的

由于SQLAlchemy不具备使用await直接,就像这样:

user = await db.query(User).first()

.相反,我们使用的是:

所以它的示例里面都是使用同步的函数的: 所以严格上不是一种异步的示例!官网的同步示例我就不搬了!哈哈用过的都知道!

不过我这里试一试看看也改成一个协程的去调用看看是否OK?

from sqlalchemy.orm import Session
from sqlalchemy_sql_app import schema, models


async def save_user(db: Session, user_info: schema.User):
    user = models.User(**user_info.dict())
    db.add(user)
    db.commit()
    db.refresh(user)
    return user


async def get_users(db: Session):
    return db.query(models.User).all()


async def delete_user(db: Session, user_id: int):
    db.query(models.User).filter(models.User.id == user_id).delete();
    db.commit()
    return "删除用户成功!"


1.6 启动入口main

main.py

from fastapi import FastAPI, Depends
from sqlalchemy_sql_app import   crud,models
from sqlalchemy_sql_app.database import SessionLocal, engine
from sqlalchemy_sql_app.schema import User

app = FastAPI(title="测试把CRUD改造成协程")

#定义数据库的会话
def db():
    try:
        db = SessionLocal()
        yield db
    finally:
        db.close()

@app.get('/')
async def root():
    return {"Message": "Welcome"}

@app.get('/users')
async def getUsers(db=Depends(db)):
    return await crud.get_users(db)

@app.post('/user')
async def CreateUser(user:User,db=Depends(db)):
    return await crud.save_user(db,user)


@app.delete("/user")
async  def DeleteUser(userId:int,db=Depends(db)):
    return await crud.delete_user(db,userId)


import uvicorn

if __name__ == '__main__':
    # 等于通过 uvicorn 命令行 uvicorn 脚本名:app对象 启动服务:uvicorn xxx:app --reload
    uvicorn.run('main:app', host="127.0.0.1", port=8000, debug=True, reload=True)


1.7 启动文档操作看看

Fastapi框架-冷饭再炒-基础知识补充篇(4)- 异步数据库篇

添加一条记录:

Fastapi框架-冷饭再炒-基础知识补充篇(4)- 异步数据库篇

查看结果:

Fastapi框架-冷饭再炒-基础知识补充篇(4)- 异步数据库篇

好在这样的写也没啥毛病,但是吧!感觉还是单线程的同步!!!异步只是一个假象!!!! 个人吧 建议!不这么完!还是使用专业的异步的去处理!

如官网推荐的使用async-sql-databases! 晚点讲一下这个!

2 使用中间件的替代DB会话

关于中间件的作用我的就多说了,如果你用过GO语言的GIN的话就知道中间件多好玩,如果也看过我的Flask中讲的关于中间件的,估计也知道它的作用。

这里简单的说就是在请求之前和请求处理完成后可以拦截做点什么事情,这个就是中间件的大概的意义吧!

2.1 定义中间件,并把db注入到request.state

@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response

2.2 定义一个新的依赖注入对象,从 request.state返回DB

把原来的:
def db():
    try:
        db = SessionLocal()
        yield db
    finally:
        db.close()
修改为:
# Dependency
def db(request: Request):
    return request.state.db

2.3 main完整改造后为中间件后的示例

from fastapi import FastAPI, Depends, Request, Response
from sqlalchemy_sql_app import crud, models
from sqlalchemy_sql_app.database import SessionLocal, engine
from sqlalchemy_sql_app.schema import User

app = FastAPI(title="测试把CRUD改造成协程")


def db(request: Request):
    return request.state.db


@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response


@app.get('/')
async def root():
    return {"Message": "Welcome"}


@app.get('/users')
async def getUsers(db=Depends(db)):
    return await crud.get_users(db)


@app.post('/user')
async def CreateUser(user: User, db=Depends(db)):
    return await crud.save_user(db, user)


@app.delete("/user")
async def DeleteUser(userId: int, db=Depends(db)):
    return await crud.delete_user(db, userId)


import uvicorn

if __name__ == '__main__':
    # 等于通过 uvicorn 命令行 uvicorn 脚本名:app对象 启动服务:uvicorn xxx:app --reload
    uvicorn.run('main:app', host="127.0.0.1", port=8000, debug=True, reload=True)

2.4 扩展关于request.state

request.state是每个Request对象。它可以存储附加到请求本身的任意对象,比如本例中的数据库会话。类似我们的之前的flask中的G或者我们的可以自己添加request.xxx到当前的请求的上下文对象一样。

这里使用中间件的方式他可以帮助我们确保通过所有请求使用单个数据库会话,并且然后(在中间件中)关闭。

3 官网推荐的异步sync-sql-databases

对于databases,官网说拷贝的它的示例运行就好了!刚好的它的示例也我的使用的Postgresql。 这个的库的开源地址是:github.com/encode/data…

按官方的文档的说法是说:

  • 这个数据库为一系列数据库提供了简单的异步支持。
  • 它允还许您使用强大的SQLAlchemy核心表达式语言,并提供对PostgreSQL、MySQL和SQLite的支持。 首先

3.1 安装databases和相关的依赖


$ pip install databases

您可以通过以下方式安装所需的数据库驱动程序:

$ pip install databases[postgresql]
$ pip install databases[mysql]
$ pip install databases[sqlite]

3.2 databases+SQLAlchemy

直接分析官网异步的支持的部分。其他按时不做详细的深入的分析。

3.2.1 创建database 的连接对象

关键的代码:

import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel

DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)

3.2.2 创建sqlalchemy的引擎对象



engine = sqlalchemy.create_engine(
    DATABASE_URL, connect_args={"check_same_thread": False}
)

元数据信息
metadata = sqlalchemy.MetaData()

# 创建具体的表
notes = sqlalchemy.Table(
    "notes",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("text", sqlalchemy.String),
    sqlalchemy.Column("completed", sqlalchemy.Boolean),
)


#创建上面的的所有的模型的表
metadata.create_all(engine)

3.2.3 用database对象进行异步数据操作

@app.on_event("startup")
async def startup():
    await database.connect()


@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()


@app.get("/notes/", response_model=List[Note])
async def read_notes():
    query = notes.select()
    return await database.fetch_all(query)


@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
    query = notes.insert().values(text=note.text, completed=note.completed)
    last_record_id = await database.execute(query)
    return {**note.dict(), "id": last_record_id}

3.2.4 完整的运行的示例

from typing import List

import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel

# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"

database = databases.Database(DATABASE_URL)

metadata = sqlalchemy.MetaData()

notes = sqlalchemy.Table(
    "notes",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("text", sqlalchemy.String),
    sqlalchemy.Column("completed", sqlalchemy.Boolean),
)

engine = sqlalchemy.create_engine(
    DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)


class NoteIn(BaseModel):
    text: str
    completed: bool


class Note(BaseModel):
    id: int
    text: str
    completed: bool


app = FastAPI()


@app.on_event("startup")
async def startup():
    await database.connect()


@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()


@app.get("/notes/", response_model=List[Note])
async def read_notes():
    query = notes.select()
    return await database.fetch_all(query)


@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
    query = notes.insert().values(text=note.text, completed=note.completed)
    last_record_id = await database.execute(query)
    return {**note.dict(), "id": last_record_id}

import uvicorn

if __name__ == '__main__':
    # 等于通过 uvicorn 命令行 uvicorn 脚本名:app对象 启动服务:uvicorn xxx:app --reload
    uvicorn.run('main:app', host="127.0.0.1", port=8000, debug=True, reload=True)

总结上面的示例看到::

  • database的链接的打开是服务启动的时候才打开
  • database的关闭是服务关闭的时候才去关闭
  • database的异步的模式其实也是await database.execute(query)的模式

4 peewee-async异步

因为之前的一直都是使用peewee做的项目,感觉peewee使用起来也还行,主要是我自己用得顺手吧!虽然中间使用的过程也是会遇到很多可能无法了解到的问题。甚至有些是一下子无法解决,就直接的使用原生的SQL进行查询,本身它也支持原生的SQL的查询。

但是异步上的使用,之前主要是基于Flask的多,还没来得及去体验新开这个异步的炉灶。

这里需要说明的地方是,peewee-async在模型的设计上还是使用peewee的模型,要他真正发挥异步的地方是通过这只一个标记来进行的。

如果后续的希望只有异步调用并将同步视为不需要的或错误:需要设置allow_sync = False

objects.database.allow_sync = False # this will raise AssertionError on ANY sync call

还有其他的选项可以设置:

objects.database.allow_sync = logging.ERROR

下面的尝试一步一步的引入到我们的Fastapi里面。

4.1 安装peewee-async的依赖

pip install peewee-async aiopg

这里的还是使用的PG的数据库,这个peewee-async使用的是aiopg而不是asyncpg

通常我们的对于原生的支持的SQL基于PostgreSQL的话异步库,主要有他们两个库:

使用的在协程的上使用的简单示例(来自官网的示例):


import asyncio
import peewee
import logging
from peewee_async import Manager, PostgresqlDatabase

loop = asyncio.new_event_loop() # Note: custom loop!
database = PostgresqlDatabase('test')

# 丢到循环里面
objects = Manager(database, loop=loop)

async def my_async_func():
    # Add new page block
    await objects.create_or_get(
        PageBlock, key='title',
        text="Peewee is AWESOME with async!")

    # Get one by key
    title = await objects.get(PageBlock, key='title')
    print("Was:", title.text)

    # Save with new text
    title.text = "Peewee is SUPER awesome with async!"
    await objects.update(title)
    print("New:", title.text)

loop.run_until_complete(my_async_func())
loop.close()

4.2 定义数据库配置和连接

database.py:

import asyncio
import peewee
import peewee_async
from app.config import *
# Nothing special, just define model and database:


DB_NAME='xxxxxx'
DB_USER='xxxxxx'
DB_HOST='xxxxxx'
DB_PORT='xxxxxx'
DB_PASS='xxxxxx'

database = peewee_async.PostgresqlDatabase(
    database=DB_NAME,
    user=DB_USER,
    host=DB_HOST,
    port=DB_PORT,
    password=DB_PASS
)

objects = peewee_async.Manager(database=database)

database.set_allow_sync(False) # 是否设置为同步,False异步

上面的创建还可以使用的连接池的方式:

例子:
导入的是:peewee_asyncext.PooledPostgresqlExtDatabase下的包

database = PooledPostgresqlExtDatabase('test', register_hstore=False,
                                       max_connections=20)

4.2 定义数据表的模型ORM

models.py

from peewee import *
from playhouse.postgres_ext import BinaryJSONField, ArrayField
from datetime import datetime, datetime

from .database import database
# 创建模型的基类
class BaseModel(Model):

    id = UUIDField(primary_key=True, default=uuid.uuid4)
    created_at = DateTimeField(null=False, default=datetime.now(pytz.timezone('Africa/Nairobi')))
    updated_at = DateTimeField(null=False, default=datetime.now(pytz.timezone('Africa/Nairobi')))

    class Meta:
        database = database



# 用户信息
class UserObj(BaseModel):
    name = CharField(max_length=100,verbose_name="用户姓名",index=True)
    password =  CharField(max_length=100,verbose_name="用户密码")
    phone =  CharField(max_length=111,verbose_name="联系电话")
    class Meta:
        table_name = "User"


4.3 根据模型执行创建表示例:

第一种:

with objects.allow_sync():
    UserObj.create_table(True)

第二种:

database.create_tables([UserObj,其他模型 ])

4.4 然后在我们的API接口调用异步

main.py

from typing import List

import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()




@app.get("/userinfoslist/")
async def read_notes():
    query = notes.select()
    userinfos = await objects.execute(UserObj.select())
    for user in userinfos:
        print(user.name)
 
 @app.get("/adduser/")
 async def my_async_func():
    # 随意的数据的测试
    obj1 = await objects.get(UserObj, id=1)
    obj2 = await objects.get(UserObj, MyModel.id==1)
    obj3 = await objects.get(UserObj.select().where(UserObj.id==1))
    
    user0 = await objects.create(UserObj, username='test')
    user1 = await objects.get(UserObj, id=user0.id)
    user2 = await objects.get(UserObj, username='test')


import uvicorn

if __name__ == '__main__':
    # 等于通过 uvicorn 命令行 uvicorn 脚本名:app对象 启动服务:uvicorn xxx:app --reload
    uvicorn.run('main:app', host="127.0.0.1", port=8000, debug=True, reload=True)

使用上peewee和peewee_async没多少的差异,只是可能查询上有许些不一样,当然深入的去使用和项目中使用的,后续我的尽量的用这个做一次,比较实践出真知!

5 TortoiseORM 和 GINO的使用(不详解了)

感觉这两个库的对于的官网的文档介绍的意见是最清楚的,而且我们的GINO还附上的详尽的一个案例和模板。感觉可以开箱即用了!! 所以我这里也不打就算搬砖了!

至于TortoiseORM后续实战项目里,有可能我会去使用它。主要是搭建属于我们的自己的后台。

TortoiseORM的使用其实和peewee_async流程上是没多大的差异滴!无非也就是定义连接对象,创建模型,根据模型创建表,然后开始封装对表的操作。

下面的是使用官方TortoiseORM的示例(我是搬运过来的,为了完整点!别打我!)

5.1 定义模型如下:

from tortoise.models import Model
from tortoise import fields

class Tournament(Model):
    id = fields.IntField(pk=True)
    name = fields.TextField()
 

5.2 初始化模型和数据库,如下所示:

from tortoise import Tortoise, run_async

async def init():
    # Here we create a SQLite DB using file "db.sqlite3"
    #  also specify the app name of "models"
    #  which contain models from "app.models"
    await Tortoise.init(
        db_url='sqlite://db.sqlite3',
        modules={'models': ['app.models']}
    )
    # Generate the schema
    await Tortoise.generate_schemas()

# run_async is a helper function to run simple async Tortoise scripts.
run_async(init())

5.3 然后像这样使用它:

# Create instance by save
tournament = Tournament(name='New Tournament')
await tournament.save()

# Or by .create()
await Tournament.create(name='Another Tournament')

# Now search for a record
tour = await Tournament.filter(name__contains='Another').first()
print(tour.name)

结尾

码字太累了!!!歇一阵先!