likes
comments
collection
share

FastApi(自用脚手架)+Snowy搭建后台管理系统(5)脚手架--SqlalchemyPlugin

作者站长头像
站长
· 阅读数 16

插件定义流程

鉴于自己语言组织能力是在欠佳,所以接下来相关的分享,个人还是觉得要放慢一点。

在前面,已经了解相关插件封装和一些插件使用机制。接下来主要是讲解关于Sqlalchemy的插件使用。首先我们在使用之前,需要了解关于我们使用Sqlalchemy的一些基础流程。

1.创建一个引擎对象

首先是创建一个引擎对象,如下代码示例:

self.async_engine = create_async_engine(url=self.settings.ASYNC_SQLALCHEMY_DATABASE_URI,
                                        echo=self.settings.SQLALCHEMY_DATABASE_ECHO,
                                        pool_recycle=self.settings.SQLALCHEMY_POOL_RECYCLE,
                                        max_overflow=self.settings.SQLALCHEMY_MAX_OVERFLOW,
                                        pool_pre_ping=self.settings.SQLALCHEMY_POOL_PRE_PING,
                                        pool_size=self.settings.SQLALCHEMY_POOL_SIZE,
                                        )

其中关于create_async_engine中涉及到参数主要有:

# 1. **engine_str**:一个SQLAlchemy引擎字符串,用于构建数据库引擎。
# 2. **echo**:如果为True,则将SQLAlchemy的日志输出到标准输出流。
# 3. **pool_recycle**:用于控制连接池的回收间隔,以秒为单位。
# 4. **pool_size**:连接池的最大大小。
# 5. **max_overflow**:允许在连接池被耗尽时,池中可以创建的最大连接数。
# 6. **pool_timeout**:获取连接时的超时时间,以秒为单位。
# 7. **executor**:一个可选的执行器,用于运行异步操作,如果不指定,则使用默认的执行器。
# 8. **connect_args**:可选的连接参数,用于传递给引擎和连接池的connect()方法。

2.通过引擎对象创建会话工厂对象

当我们对数据进行操作的时候,都依赖于一个会话对象,所以需要通过会话工厂返回一个可用会话对象,如下代码示例所示:

self.async_session_maker = sessionmaker(bind=self.async_engine,
                                        class_=AsyncSession,
                                        expire_on_commit=False)

其中关于Sessionmaker参数如下:

# Sessionmaker参数如下:
#
# 1. bind:绑定一个engine或者connection,用来支持多个数据库的操作。
#
# 2. class_:自定义Session类。
#
# 3. autoflush:设置flush的时机,默认为True。
#
# 4. autocommit:设置commit的时机,默认为False。
#
# 5. expire_on_commit:设置在commit之后是否需要expire,默认为True。
#
# 6. query_cls:设置Query的class,默认为Query。
#
# 7. expire_on_commit:设置在commit之后是否需要expire,默认为True。
#
# 8. info:设置session的额外信息,默认为None。
#
# 9. extension:设置Session扩展,默认为None。
#
# 10. id:设置Session的id,默认为None。

3.定义依赖注入项

在一个插件中我们通过把上面创建好的async_session_maker赋值到一个fastapi的app对象上,这样我们就可以通过在FastApi中的请求上下文中获取到当前request,进而在获取到对应的app对象即可获取到我们的async_session_maker对象,如下所示代码所示:

async def get_async_session_depends(request: Request) -> AsyncSession:
    """
    使用方法方式定义依赖项
    :param request: current request.
    :return: pyee_events producer from the core_app.
    """
    async with request.app.async_session_maker() as session:
        # await async_session.commit()
        try:
            yield session
            # 使用自动提交在某些情况下 会突发的asyncio.exceptions.CancelledError异常
            # await async_session.commit()
        except SQLAlchemyError as ex:
            print(ex)
            await session.rollback()

        finally:
            # 1.4.0版本的话 需要收的执行关闭,才可以 不然会一直超时
            # 1.4.19的话 这个地方可以起到关闭的作用!
            await session.close()

4.开始通过依赖项完成数据操作

定义好对应的依赖注入项后,我们可以直接的通过它获取到对应的AsyncSession对象,基于它可以进而进行相关数据库的操作,如下代码示例所示:

1:注册插件
SqlalchemyPluginClient(app=app, settings=SqlalchemyPluginClient.SqlalchemySettings(

    ))


2:使用依赖注入方式
@app.get("/async_session_depends")
async def sync_route(async_session: AsyncSession = Depends(get_async_session_depends)):
    async with get_async_session_context() as async_session:
        pass
        logger.info(await SysCategoryService.get_sys_category_items_all(db_session=async_session))
    print("ssssssssssssssss", await SysCategoryService.get_sys_category_items_all(db_session=async_session))
    # eventsaaa.async_emit_evnet("app_mention", "我是发送的数据信息")
    return 'ok'

5.完整的插件类的定义

在上面中我们已经完成相关SqlalchemyPlugin插件使用流程说明,下面我们来看一下关于这个插件的定义,如下完整的代码示例所示:

# 中间方式的实现
class SqlalchemyPluginClient(BasePlugin):
    # 设置插件默认的参数信息
    name = 'SqlalchemyPluginClient'

    # 内部配置项定义
    class SqlalchemySettings(Settings):
        # 数据库连接信息
        MYSQL_SERVER_HOST: str = "47.99.189.42:31166"
        MYSQL_USER_NAME: str = "root"
        MYSQL_PASSWORD: str = "xiaozhong123456"
        MYSQL_DB_NAME: str = "fastapi-jeecg-boot-admin"
        # 数据库连接池
        SQLALCHEMY_DATABASE_ECHO: bool = False
        SQLALCHEMY_POOL_RECYCLE: int = 7200
        SQLALCHEMY_POOL_PRE_PING: bool = True
        SQLALCHEMY_POOL_SIZE: int = 20
        SQLALCHEMY_MAX_OVERFLOW: int = 64

        @property
        def ASYNC_SQLALCHEMY_DATABASE_URI(self) -> Optional[str]:
            # 不要使用utf8
            return 'mysql+aiomysql://%s:%s@%s/%s?charset=UTF8MB4' % (
                self.MYSQL_USER_NAME,
                self.MYSQL_PASSWORD,
                self.MYSQL_SERVER_HOST,
                self.MYSQL_DB_NAME,
            )

    def setup(self, app: FastAPI, name: str = None, *args, **kwargs):
        """插件初始化"""
        self.app = app
        self._async_session: Union[AsyncSession, async_scoped_session, None] = None
        self.async_session_maker = None

        app.add_middleware(AsyncSessionLoadMiddleware)

        @app.on_event("startup")
        async def startup_event():
            try:
                # 创建异步会话对象
                global global_async_engine
                global global_async_session
                  # create_async_engine参数说明
                           # - database_url:用于创建引擎的数据库URL
                           # - pool_size:连接池大小
                           # - pool_recycle:重用连接池中的连接的时间(秒)
                           # - max_overflow:连接池中最多可以存放多少个连接
                           # - echo:是否记录日志
                           # create_async_engine() 方法用于创建一个异步数据引擎,参数如下:
                           #
                           # 1. **engine_str**:一个SQLAlchemy引擎字符串,用于构建数据库引擎。
                           # 2. **echo**:如果为True,则将SQLAlchemy的日志输出到标准输出流。
                           # 3. **pool_recycle**:用于控制连接池的回收间隔,以秒为单位。
                           # 4. **pool_size**:连接池的最大大小。
                           # 5. **max_overflow**:允许在连接池被耗尽时,池中可以创建的最大连接数。
                           # 6. **pool_timeout**:获取连接时的超时时间,以秒为单位。
                           # 7. **executor**:一个可选的执行器,用于运行异步操作,如果不指定,则使用默认的执行器。
                           # 8. **connect_args**:可选的连接参数,用于传递给引擎和连接池的connect()方法。
                self.async_engine = create_async_engine(url=self.settings.ASYNC_SQLALCHEMY_DATABASE_URI,
                                                        echo=self.settings.SQLALCHEMY_DATABASE_ECHO,
                                                        pool_recycle=self.settings.SQLALCHEMY_POOL_RECYCLE,
                                                        max_overflow=self.settings.SQLALCHEMY_MAX_OVERFLOW,
                                                        pool_pre_ping=self.settings.SQLALCHEMY_POOL_PRE_PING,
                                                        pool_size=self.settings.SQLALCHEMY_POOL_SIZE,
                                                        )
                # Sessionmaker参数如下:
                #
                # 1. bind:绑定一个engine或者connection,用来支持多个数据库的操作。
                #
                # 2. class_:自定义Session类。
                #
                # 3. autoflush:设置flush的时机,默认为True。
                #
                # 4. autocommit:设置commit的时机,默认为False。
                #
                # 5. expire_on_commit:设置在commit之后是否需要expire,默认为True。
                #
                # 6. query_cls:设置Query的class,默认为Query。
                #
                # 7. expire_on_commit:设置在commit之后是否需要expire,默认为True。
                #
                # 8. info:设置session的额外信息,默认为None。
                #
                # 9. extension:设置Session扩展,默认为None。
                #
                # 10. id:设置Session的id,默认为None。
                self.async_session_maker = sessionmaker(bind=self.async_engine,
                                                        class_=AsyncSession,
                                                        expire_on_commit=False)
                app.async_session = self
                app.async_session_maker = self.async_session_maker


            except Exception as e:
                raise e

        @app.on_event("shutdown")
        async def shutdown_event():
            pass

从上面的代码中我们注意到我们的完成相关数据引擎对象和会话对象的初始化是在@app.on_event("startup")启动事件函数中完成的。

6. 全局单例变量async_session

从上面代码中,我们注意到里面有使用到了一个全局变量的定义,如下代码所示:

# 创建异步会话对象
global global_async_engine
global global_async_session

而通过全局单例变量其实也可以得到相关sync_session对象的获取,如上代码中,我们不仅仅可以赋值给了

app.async_session = self
app.async_session_maker = self.async_session_maker

还可以赋值给当前全局单例变量:

# 全局变量的定义处理
global_async_engine = self.async_engine
global_async_session = self.async_session_maker

当完成赋值后,则可以通过导入上述两个定义的global_async_engine和global_async_session来创建对应的会话,所以我们也可以通过它来创建对应的依赖项,如下代码所示:

@asynccontextmanager
async def get_async_session_context_globa() -> AsyncGenerator:
    '''
    下面的上下文的方式是基于全局变量的方式处理
    :return:
    '''
    async_session: AsyncSession = global_async_session()
    try:
        yield async_session
        # Exception during reset or similar
        # Traceback (most recent call last):
        # 使用自动提交在某些情况下 会突发的asyncio.exceptions.CancelledError异常
        # await async_session.commit()
    except OperationalError as ex:
        pass
        raise ex
    except SQLAlchemyError as ex:
        await async_session.rollback()
        raise ex
    finally:
        await async_session.close()

或者:

async def get_async_session_depends_globa() -> AsyncSession:
    """
    使用方法方式定义依赖项
    :param request: current request.
    :return: pyee_events producer from the core_app.
    """
    async with global_async_session() as session:
        # await async_session.commit()
        try:
            yield session
            # 使用自动提交在某些情况下 会突发的asyncio.exceptions.CancelledError异常
            # await async_session.commit()
        except SQLAlchemyError as ex:
            print(ex)
            await session.rollback()

        finally:
            # 1.4.0版本的话 需要收的执行关闭,才可以 不然会一直超时
            # 1.4.19的话 这个地方可以起到关闭的作用!
            await session.close()

基于上下文对象方式的,使用全局变量方式如下:

@app.get("/async_session_with")
async def sync_route():
    async with get_async_session_context_globa() as async_session:
        pass
        logger.info(await SysCategoryService.get_sys_category_items_all(db_session=async_session))
        print("ssssssssssssssss",await SysCategoryService.get_sys_category_items_all(db_session=async_session))
    # eventsaaa.async_emit_evnet("app_mention", "我是发送的数据信息")
    return 'ok'

基于类方式依赖注入项

从前面我们已经了解到,要对数据库进行操作需要获取到一个异步会话对象即可。前面我们使用async_session是使用函数方式定义的,如果换成类的话也可以,如下代码示例所示:

class AsyncSessionDependency:
    """
    使用类形式定义依赖项
    """

    async def __call__(self, request: Request) -> None:
        """
        """
        async with request.app.async_session_maker() as session:
            # await async_session.commit()
            try:
                yield session
                # 使用自动提交在某些情况下 会突发的asyncio.exceptions.CancelledError异常
                # await async_session.commit()
            except SQLAlchemyError as ex:
                await session.rollback()
            finally:
                # 1.4.0版本的话 需要收的执行关闭,才可以 不然会一直超时
                # 1.4.19的话 这个地方可以起到关闭的作用!
                await session.close()


# 实例化依赖类
async_session_dependency = AsyncSessionDependency()

基于中间件方式实例化全局async_session

在前面我们已经把的async_session赋值给我们app对象,我们可以通过实现类似的全局的request对象的方式来定义一个全局的当前上下文的async_session对象,如下代码示例所示:


import contextlib
from contextvars import ContextVar
from typing import Iterator

from sqlalchemy.ext.asyncio import AsyncSession

from .bind_ import bind_contextvar

from starlette.requests import Request
from starlette.types import ASGIApp, Scope, Receive, Send, Message

AsyncSession_var: ContextVar[AsyncSession] = ContextVar("AsyncSession")
async_session: AsyncSession = bind_contextvar(AsyncSession_var)


class AsyncSessionLoadMiddleware:
    '''
    此类的中间件无法读取响应报文的内容
    '''
    pass

    def __init__(self, app: ASGIApp) -> None:
        self.app = app

    def bind_to_request_state(self, request: Request, **kwargs):
        """
        Takes in a set of kwargs and binds them to gziprequest state
        """
        for key, value in kwargs.items():
            setattr(request.state, key, value)

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        # 解析当前的请求体
        request = Request(scope, receive=receive)
        # 开启中间件的时候创建对应的会话对象
        # 这里对request.core_app.async_session_maker()非常关键,避免出现回收连接告警提示
        token = AsyncSession_var.set(request.app.async_session_maker())
        try:
            await self.app(scope, receive, send)
        finally:
            # 释放
            AsyncSession_var.reset(token)

定义好中间件后,我们就完成全局上下文的async_session对象,然后我们可以通过一个装饰器之类的来使用这个对象,如下装饰器代码所示:

class Transactional:

    def __call__(self, func):
        '''
        通过装饰器方式来处理异步的任务
        :param func:
        :return:
        '''
        @wraps(func)
        async def _transactional(*args, **kwargs):
            try:
                result = await func(*args, **kwargs)
                #
                await async_session.commit()
            except Exception as e:
                await async_session.rollback()
                raise e
            finally:
                # The garbage collector is trying to clean up connection <AdaptedConnection <aiomysql.connection.Connection object at 0x000001BB37060DC0>>. This feature is unsupported on unsupported on asyncio dbapis that lack a "terminate" feature, since no IO can be performed at this stage to reset the connection. Please close out all connections when they are no longer used, calling ``close()`` or using a context manager to manage their lifetime.
                # sys:1: SAWarning: The garbage collector is trying to clean up connection <AdaptedConnection <aiomysql.connection.Connection object at 0x000001BB37060DC0>>. This feature is unsupported on unsupported on asyncio dbapis that lack a "terminate" feature, since no IO can be performed at this stage to reset the connection. Please close out all connections when they are no longer used, calling ``close()`` or using a context manager to manage their lifetime.
                # 这个错误提示的意思是:垃圾收集器正试图清理连接<AdaptedConnection <aiomysql.connection.Connection object at 0x000001BB37060DC0>>,
                # 但是由于asyncio dbapis缺少“terminate”功能,因此在此阶段无法执行IO以重置连接,因此不支持此功能。请在不再使用连接时关闭所有连接,调用“close()”或使用上下文管理器来管理它们的生命周期。
                # print("结束!!!",id(async_session))
                await async_session.close()
            return result

        return _transactional

使用方式如下:

from afast_core.databases.sqlalchemy.transactional import Transactional ,async_scoped_session
class UserService:
    def __init__(self):
        ...
    @Transactional()
    async def create_user( self, email: str, password1: str, password2: str, nickname: str) -> None:
       pass
       # query = select(User).where(or_(User.email == email, User.nickname == nickname))
       # result = await async_scoped_session.execute(query)
       # is_exist = result.scalars().first()
       

class SysCategoryService:
    @staticmethod
    @Transactional()
    async def get_sys_category_items_all22222():
        print("查询async_scoped_session装饰器!!!????",id(async_scoped_session))
        query = select(SysCategory)
        data = (await async_scoped_session.execute(query))
        return [it.name for it in data.scalars().all()]

至此,关于数据库插件相关分享已完成!以上内容分享纯属个人经验,仅供参考!文笔有限,如有笔误或错误!欢迎批评指正!感谢各位大佬!有什么问题也可以随时交流!

结尾

END

简书:www.jianshu.com/u/d6960089b…

掘金:juejin.cn/user/296393…

公众号:微信搜【程序员小钟同学】

小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822