likes
comments
collection
share

FastApi(自用脚手架)+Snowy搭建后台管理系统(3)脚手架--插件基类及示例说明

作者站长头像
站长
· 阅读数 8

前言

首先我们知道我们FastAPI提供的扩展机制其实非常好,本质上我们的所有相关定义的插件都可以只需要依赖于一个FastApi实例化对象即可完成相关插件的分离。我们只要有了FastApi实例化对象的引用后,就可以在对应的插件中调用其自身相关的一些方法来完成我们插件的初始化。

第三方插件pluginbase 抽象定义

正如上面所说,所有的插件基本我们只需要一个FastApi实例化对象即可完成相关插件的初始化。起抽象基类代码如下所示:

# MODULES APP SETUP -------------------------------------------------------------
import abc

import fastapi
import pydantic
from fastapi import FastAPI
import typing as t


class PluginException(Exception):
    """查询异常的定义"""


class IBasePlugin(metaclass=abc.ABCMeta):
    """插件基础定义"""
    # 插件名称
    name: str
    describe: str  # 描述
    # 依赖FastAPI主应用对象
    app: t.Optional[FastAPI] = None

    def __init__(self, app: FastAPI = None, name=None, settings=None, **options):
        """创建应用的插件"""
        if getattr(self, "name", None) is None:
            # raise TypeError("Plugin.name is required")
            raise PluginException('Plugin.name is required')
        # 安装插件
        if app is not None:
            self.name = name or self.name
            self.app = app
            self.settings = settings
            self.setup(app, name, settings, **options)
        else:
            pass

    def __repr__(self) -> str:
        """输出插件信息"""
        return f"<FastAPI.Exts.Plugin: {self.name}>"

    @property
    def installed(self):
        """检测插件是否已安装"""
        return bool(self.app)

    @abc.abstractmethod
    def setup(self, app: FastAPI, name: str = None, settings=None, **options):
        """插件初始化"""
        # 插件对象保存
        # self.app.state.plugins={}
        # self.app.state.plugins[self.name] = self

如上代码所示:

  • 首先我们定义一个IBasePlugin,它自身是一个抽象类
  • 在初始化的时候,我们需要传入对应 app: FastAPI对象(理论上我们应该是必传,但是部分插件可能也未必需要,所以这里设置了为None)
  • name 参数是对应插件参数命名
  • settings 参数这是改插件需要相关对于配置参数信息
  • setup 方法则是完成具体插件初始化工程具体入口

1. swaggerui可视化文档插件

如脚手架中对应插件图示:

FastApi(自用脚手架)+Snowy搭建后台管理系统(3)脚手架--插件基类及示例说明

在图示中,我们可以看到对应static对应着文档中我们依赖一些静态的资源文件。我们为了解决网络异常或者在无网络情况也也可以查看我们的可视化文档的时候,则需要使用到此类的插件来实现。 相关的资源文件,我们可以在有网络情况进行对应的资源下载即可。接下来我们主要看我们的插件的实现,如下代码所示:

#!/usr/bin/evn python
# coding=utf-8

"""
Author = zyx
@Create_Time: 2022/5/29 14:09
@version: v1.0.0
@Contact: 308711822@qq.com
@File: __init__.py.py
@文件功能描述:------
"""

from fastapi import FastAPI
from fastapi.openapi.docs import (get_redoc_html, get_swagger_ui_html, get_swagger_ui_oauth2_redirect_html, )
from fastapi.staticfiles import StaticFiles
from fastapi import Request

from afastcore.plugins.pluginbase import IBasePlugin as BasePlugin


class SwaggeruiPluginClient(BasePlugin):
    name = '本地的Swaggerui文档,避免在线访问'
    describe = '该插件启用后,会自动过滤之前路由设置,只保留下来/docs的路由访问'

    def __init__(self, app: FastAPI = None, name=None, proxy=False, **options):
        super().__init__(app, name, **options)
        self.proxy = proxy

    def setup(self, app: FastAPI, name: str = None, *args, **kwargs):
        """插件初始化"""
        # 启用插件后自动关闭之前的配置在线文档

        # 路由: Route(path='/openapi.json', name='openapi', methods=['GET', 'HEAD'])
        # 路由: Route(path='/docs', name='swagger_ui_html', methods=['GET', 'HEAD'])
        # 路由: Route(path='/docs/oauth2-redirect', name='swagger_ui_redirect', methods=['GET', 'HEAD'])
        # 路由: Route(path='/redoc', name='redoc_html', methods=['GET', 'HEAD'])

        # assert app.redoc_url is None and app.docs_url is None, '本地的Swaggerui文档插件,请先关闭APP原始的开关设置为None'
        # 过滤路由的方式,不用手动的关闭app.redoc_url is None and app.docs_url is None
        app.router.routes = [route for route in app.routes if route.path != '/docs' and route.path != '/redoc']
        # 需要先设置了关闭才可以自定义
        import pathlib
        cur_file_path = str(pathlib.Path(__file__).absolute()).replace("__init__.py", "")
        # cur_file_path='/data/www/woreid_right_pay/azyxfastcore/plugins/swaggerui/__init__.py/static'
        # 原始当前路径: /data/www/woreid_right_pay/azyxfastcore/plugins/swaggerui/__init__.py
        # 当前路径: /data/www/woreid_right_pay/azyxfastcore/plugins/swaggerui/__init__.py
        # 处理配置经过NGINX代理后的路径有所变化的问题
        if '/__init__.py/static' in cur_file_path:
            cur_file_path = cur_file_path.replace('/__init__.py/static', '')
        if '/__init__.py' in cur_file_path:
            cur_file_path = cur_file_path.replace('/__init__.py', '')

        # 挂载静态目录
        app.mount("/static", StaticFiles(directory=f"{cur_file_path}/static"), name="static")

        # 自定义需要在关闭的情况下才可以
        @app.get('/', include_in_schema=False)
        @app.get('/docs', include_in_schema=False)
        @app.get('/swagger/docs', include_in_schema=False)
        async def custom_swagger_ui_html(req: Request):
            root_path = req.scope.get("root_path", "").rstrip("/")
            openapi_url = root_path + req.app.openapi_url
            # INFO  2022-10-01 19:01:43.980 | azyxfastcore.plugins.swaggerui:custom_swagger_ui_html:77 |  - root_path:
            # INFO  2022-10-01 19:01:43.980 | azyxfastcore.plugins.swaggerui:custom_swagger_ui_html:78 |  - openapi_url:('/openapi.json',)
            # 如果是反向代理之后的话,就不需要设置--- swagger_js_url="/static/swagger-ui-bundle.js",并且返回代理那需要配置为:
            #  location /openapi.json {
            #     proxy_pass http://woreid_right_pay_online_api_up/openapi.json;
            #  }
            #
            #  location /swagger-ui-bundle.js {
            #     proxy_pass http://woreid_right_pay_online_api_up/static/swagger-ui-bundle.js;
            #  }
            #
            #  location /swagger-ui.css {
            #     proxy_pass http://woreid_right_pay_online_api_up/static/swagger-ui.css;
            #  }
            if self.proxy:
                swagger_js_url = "/swagger-ui-bundle.js"
                swagger_css_url = "/swagger-ui.css"
            else:
                pass
                swagger_js_url = "/static/swagger-ui-bundle.js"
                swagger_css_url = "/static/swagger-ui.css"

            return get_swagger_ui_html(
                openapi_url=openapi_url,
                title=app.title + " - Swagger UI",
                oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
                swagger_js_url=swagger_js_url,
                init_oauth=app.swagger_ui_init_oauth,
                swagger_ui_parameters=app.swagger_ui_parameters,
                swagger_css_url=swagger_css_url,
                # swagger_js_url: str = "https://cdn.jsdelivr.net/npm/swagger-ui-dist@3/swagger-ui-bundle.js",
                # swagger_css_url: str = "https://cdn.jsdelivr.net/npm/swagger-ui-dist@3/swagger-ui.css",
                # swagger_favicon_url: str = "https://fastapi.tiangolo.com/img/favicon.png",
            )

        @app.get(app.swagger_ui_oauth2_redirect_url, include_in_schema=False)
        async def swagger_ui_redirect():
            return get_swagger_ui_oauth2_redirect_html()

        @app.get("/redoc", include_in_schema=False)
        async def redoc_html():
            return get_redoc_html(
                openapi_url=app.openapi_url,
                title=app.title + " - ReDoc",
                redoc_js_url="/static/redoc.standalone.js",
            )

如上代码所示,最核心的关键在于我们的setup的方法的实现,它内部通过传入的app对象的引用调用相关装饰器完成一些路由接口的注册。

在实现上述的插件过程,我们需要关注一个问题点有几个,首先,是自动关闭原有的API文档的地址,并使用新的自定义路由进行取代,细节代码如下所示:

# 过滤路由的方式,不用手动的关闭app.redoc_url is None and app.docs_url is None
app.router.routes = [route for route in app.routes if route.path != '/docs' and route.path != '/redoc']

另外一个问题点则是,当我们的服务经过了Nginx进行负载后,我们需要做相关一些处理,如下代码细节所示:

import pathlib
cur_file_path = str(pathlib.Path(__file__).absolute()).replace("__init__.py", "")
# cur_file_path='/data/www/woreid_right_pay/azyxfastcore/plugins/swaggerui/__init__.py/static'
# 原始当前路径: /data/www/woreid_right_pay/azyxfastcore/plugins/swaggerui/__init__.py
# 当前路径: /data/www/woreid_right_pay/azyxfastcore/plugins/swaggerui/__init__.py
# 处理配置经过NGINX代理后的路径有所变化的问题
if '/__init__.py/static' in cur_file_path:
    cur_file_path = cur_file_path.replace('/__init__.py/static', '')
if '/__init__.py' in cur_file_path:
    cur_file_path = cur_file_path.replace('/__init__.py', '')

首先是具体资源路径位置,有所变化,我自己实践过程中存在对应的路径在linux系统上部署的时候会有所变化。 另外则是:

.....省略部分代码
    if self.proxy:
        swagger_js_url = "/swagger-ui-bundle.js"
        swagger_css_url = "/swagger-ui.css"
    else:
        pass
        swagger_js_url = "/static/swagger-ui-bundle.js"
        swagger_css_url = "/static/swagger-ui.css"

当我们的部署到linux服务器上,需要在线查看文档的时候,这需要进行是否self.proxy的设置。

插件使用示例:

# 离线本地文档浏览
SwaggeruiPluginClient(app=app, proxy=self.settings.swaggerui_proxy)

2. globalrequest全局请求插件

全局request请求插件,它主要实现类似Flask框架中,不需要显示声明,直接导入即可使用当前request。首先该插件依赖于contextvars的实现,它的实现原理其实也非常简单,主要是基于contextvars线程安全和协程安全基础之下,结合对应的中间件,在中间件中获取到当前请求的request,并设置到contextvars即可。具体实现代码细节如下:

首先contextvars中的request请求对象声明以及相关中间件实现,代码如下:

首先是关于request: Request对的绑定代理示例,是通过bind_.py的来实现的,具体代码内容如下:

def bind_contextvar(contextvar):
    class ContextVarBind:
        __slots__ = ()

        def __getattr__(self, name):
            return getattr(contextvar.get(), name)

        def __setattr__(self, name, value):
            setattr(contextvar.get(), name, value)

        def __delattr__(self, name):
            delattr(contextvar.get(), name)

        def __getitem__(self, index):
            return contextvar.get()[index]

        def __setitem__(self, index, value):
            contextvar.get()[index] = value

        def __delitem__(self, index):
            del contextvar.get()[index]

    return ContextVarBind()

相关中间件实现,如下代码所示:

import contextlib
from contextvars import ContextVar
from typing import Iterator

from .bind_ import bind_contextvar
from fastapi import FastAPI
from ..pluginbase import IBasePlugin as BasePlugin

from starlette.requests import Request
from starlette.types import ASGIApp, Scope, Receive, Send, Message
from contextlib import asynccontextmanager

request_var: ContextVar[Request] = ContextVar("request")
request: Request = bind_contextvar(request_var)


class GlobalRequestLoadMiddleware:
    '''
    此类的中间件无法读取响应报文的内容
    '''
    pass

    def __init__(self, app: ASGIApp, is_proxy=True) -> None:
        self.app = app
        self.is_proxy = is_proxy

    def bind_to_request_state(self, request: Request, **kwargs):
        """
        Takes in a set of kwargs and binds them to gziprequest state
        """
        for key, value in kwargs.items():
            setattr(request.state, key, value)

    @asynccontextmanager
    async def _set_middleware_request_token(self, request: Request) -> Iterator[None]:
        # token_middleware_id: Token = middleware_identifier.set(middleware_id)
        # 设置全局
        token = request_var.set(request)
        try:
            yield
        finally:
            request_var.reset(token)

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        # 解析当前的请求体
        request = Request(scope, receive=receive)

        # 异步的方式调用
        async with self._set_middleware_request_token(request):
            await self.app(scope, receive, send)


class GlobalRequestPluginClient(BasePlugin):
    name = '全局reRequest'

    def setup(self, app: FastAPI, name: str = None, *args, **kwargs):
        """插件初始化"""
        app.add_middleware(GlobalRequestLoadMiddleware, is_proxy=False)

插件使用示例:

gr = GlobalRequestPluginClient(app=app)
@app.get("/stream")
def stream():
    # 直接带入globalrequest.request下的request对象即可
    from afastcore.plugins.globalrequest.request import request
    print('request',request.headers)
    # request.session["session_code"] = '123456'
    # 读取(另一个接口读取)
    # cyrewct_code = request.session["session_code"]
    return 'ok'

3. cache_house装饰器缓存插件

首先cache_house缓存插件依赖于第三方库cache_house库,在框架中,为了修改部分代码的方便,我把它归入到libs包下,如下图所示:

FastApi(自用脚手架)+Snowy搭建后台管理系统(3)脚手架--插件基类及示例说明

插件和对应libs是对应起来的,如下图所示: FastApi(自用脚手架)+Snowy搭建后台管理系统(3)脚手架--插件基类及示例说明

cache_house的是要可以参考官网。下面我们介绍一下如何进行简单的封装,如下代码所示:

from typing import Tuple, Union, List, Optional, Callable, Any
from fastapi import FastAPI
from afastcore.libs.cache_house.backends import RedisFactory
from afastcore.libs.cache_house.helpers import pickle_encoder, pickle_decoder, DEFAULT_NAMESPACE, DEFAULT_PREFIX, \
    key_builder
from afastcore.plugins.pluginbase import IBasePlugin as BasePlugin
from pydantic import BaseSettings as Settings


class CacheHousePluginClient(BasePlugin):
    # 设置插件默认的参数信息
    name = 'CacheHouse'

    class CacheSettings(Settings):
        password: str = None
        db: int = 0
        host: str = "localhost"
        port: int = 6379
        encoder: Callable[..., Any] = pickle_encoder
        decoder: Callable[..., Any] = pickle_decoder
        namespace: str = DEFAULT_NAMESPACE
        key_prefix: str = DEFAULT_PREFIX
        key_builder: Callable[..., Any] = key_builder
        cluster_mode: bool = False

    def setup(self, app: FastAPI, name: str = None, *args, **redis_kwargs):
        """插件初始化"""
        self.app = app

        @app.on_event("startup")
        async def startup():
            self._cache = RedisFactory.init(host=self.settings.host,
                                            port=self.settings.port,
                                            password=self.settings.password,
                                            db=self.settings.db,
                                            cluster_mode=self.settings.cluster_mode,
                                            encoder=self.settings.encoder,
                                            decoder=self.settings.decoder,
                                            namespace=self.settings.namespace,
                                            key_prefix=self.settings.key_prefix,
                                            key_builder=self.settings.key_builder,
                                            **redis_kwargs
                                            )
            # 返回的是:RedisCache里面对象实例
            app.cache_house = self._cache

        @app.on_event("shutdown")
        async def shutdown():
            print("SHUTDOWN")
            RedisFactory.close_connections()

在上面的代码中我们主要是在@app.on_event("startup")完成了我们相关插件的初始化工作,并在对应@app.on_event("shutdown")完成相关关闭清理工作。其中关于插件相关配置项比较关键是在插件内部自定义的内部类对象:

    class CacheSettings(Settings):
        password: str = None
        db: int = 0
        host: str = "localhost"
        port: int = 6379
        encoder: Callable[..., Any] = pickle_encoder
        decoder: Callable[..., Any] = pickle_decoder
        namespace: str = DEFAULT_NAMESPACE
        key_prefix: str = DEFAULT_PREFIX
        key_builder: Callable[..., Any] = key_builder
        cluster_mode: bool = False

插件使用示例:

初始化:

CacheHousePluginClient(app=app, settings=CacheHousePluginClient.CacheSettings(
    host='127.0.0.1', port=6379
))

缓存装饰器使用:

from zyxfastcore.plugins.cache_house import cache_house
@app.get("/stream13")
@cache_house()
def test_cache_1(a: int, b: int):
    print("cached")
    return [a, b]

缓存装饰器自定义key的使用示例:

@app.get("/stream14")
@cache_house(expire=30, namespace="app", key_prefix="test")
def test_cache_1(a: int, b: int):
    print("cached")
    return [a, b]
上述的方式中,最终的key是:"test:app:e539522d454296a519ea15d36e855b6e"

缓存装饰器异步使用示例:

@app.get("/stream15")
@cache_house(expire=30, namespace="app", key_prefix="test")
async def test_cache_1(a: int, b: int):
    print("cached")
    return [a, b]

4. 基于aiojobs后台任务插件

首先需要安装aiojobs:

pip install aiojobs

然后封装对应的插件实例,如下代码所示:

import aiojobs
from fastapi import FastAPI
from afastcore.plugins.pluginbase import IBasePlugin as BasePlugin

class AiojobsPluginClient(BasePlugin):
    # 设置插件默认的参数信息
    name = 'Aiojobs插件'

    def setup(self, app: FastAPI, name: str = None, *args, **kwargs):
        """插件初始化"""
        # 当期的插件实例对象赋值
        self.app = app
        self.app.aiojobs = None
        self._aiojobs = None


        # 添加在应用程序启动之前运行的函数
        @app.on_event("startup")
        async def startup_event():
            self._aiojobs = aiojobs.Scheduler()
            app.aiojobs = self._aiojobs

        # 添加在应用程序关闭时运行的函数
        @app.on_event("shutdown")
        async def shutdown_event():
            await self._aiojobs.close()

    @property
    def spawn(self):
        if  self.app.aiojobs is None:
            raise Exception('需要先初始化插件!')
        return self._aiojobs.spawn

插件使用示例:

1)初始化:

app = FastAPI()
aiojob_client= AiojobsPluginClient(app=app)

2)定义后台任务:

async def coro(timeout):
    print("后台任务!!!!!!!!!!!timeout!",timeout)
    await asyncio.sleep(timeout)
    print("后台任务!!!!!!!!!!!!")

3)定义后台任务:

@app.get("/aaaa")
async def sync_route():
    #第一种
    await aiojob_client.spawn(coro(15))
    #第二种
    await app.aiojobs.spawn(coro(15))
    return "ok"

以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!

结尾