Fastapi系列-request:Request全局变量代理实践
前言
关于全局变量代理,最明显则是我们的flask框架中的request的变量,本身它是一个线程安全的,所以我们在使用的时候,可以随意在某个模块里导入进行使用。如果对于FastApi你也有类似此类的需求的话,其实我们的可以实现类似的request的代理,这样我们在处理某些业务逻辑的时候就不需要显示的进行声明,如下代码所示:
@app.post("/get/access_token")
def access_token(request:Request,name=Body(...)):
# print(reques.body())
from asgiref.sync import async_to_sync
body = async_to_sync(reques.body)()
print(body)
return PlainTextResponse(body.decode(encoding='utf-8'))
在上述的代码中,我们如果需要使用request:Request则需要在路径函数进行声明,这种方式在某些场景下存在一定局限。所以我们的可以使用全局代理的方式来实现。
但是需要说明的一点是,即使实现全局代理,我们在request:Request调用携程函数,如读取body的内容,如果不是在路由函数内部调用的话,都会引发阻塞问题,因为非在路由函数内部调用request:Request的body的话,都会存在二次消费的问题。
实践系列
我们下面通过两种方式来实践具体的全局代理的案例。
1. flask模式
在开始之前我们先讲可能应用到的全局上下文的全局变量的场景。
1.1 链路追踪的traceid
通常我们在应用内做日志记录的时候,有可能会涉及到很多服务或第三方接口请求日志记录,此时我们需要通过一个ID来标识这一个请求是日志属于哪一次请求发起的。所以我们需要做请求日志ID额的生成标记。
通过中间件的方式我们可以很简单的做到这个traceid的生成,如下示例:
class TracdIDMiddleware(BaseHTTPMiddleware):
# dispatch 必须实现
async def dispatch(self, request:Request, call_next):
request.state.traceid = uuid.uuid4()
responser = await call_next(request)
# 返回接口响应时间
return responser
但是抛出一个问题来:通过上面我们可知道traceid依赖于 request:Request,此时如果想要得到traceid,那就需要传入request:Request,这样就需要显示的定义request:Request,无法让traceid脱离于request:Request的传参才能获取到?
1.2 上下文request:Request
如果你学过Flask应该知道它可以让我们的一个request全局导入的方式来使用,并且它是线程安全,也就是说吗,每次导入request全它可以清晰的知道这个request是属于哪一个请求的?
然而在Fastapi中我们的request:Request其实是动态创建的,每一次的中间件所产生的request:Request都是不同的对象, 但是我们可以通过上下文的方式来让他具有唯一性。
于是乎我们的可以借助:contextvars来存贮这一个TracdIDMiddleware所产生的request:Request,这样它就可以进行上下文的传递。
PS:contextvars 3.7+后才通用的支持多线程和协程,以前的版本需要导入不同的方式来处理!
所以通过上面的思路我们的改造我们代码为:
所以通过上面的思路我们的改造我们代码为:
import contextvars
request_context = contextvars.ContextVar('request_context')
class TracdIDMiddleware(BaseHTTPMiddleware):
# dispatch 必须实现
async def dispatch(self, request:Request, call_next):
request_context.set(request)
request.state.traceid = uuid.uuid4()
responser = await call_next(request)
# 返回接口响应时间
return responser
在上面中我们的引入了request_context这个上下文变量管理,通过 request_context.set(request)来设置它的值,那么后续我们的就可以直接通过request_context来获取这一次请求中的request所包含的值。如下:
def log_info(mage=None):
request: Request =request_context.get()
print('index-requet',request.state.traceid)
然后还是有一个小问题,每次获取 request: Reques我都需要通过=request_context.get()的显示的方式起来调用,那能不能直接的使用类似Flask那种方式就可以了呢?
1.3 全局代理request:Request
如果看过之前的flask的源码的话,其实里面就是通过代理的方式来实现的直接导入就要是使用的方法,所以我们同理,可以借鉴这种思路,然后对=request_context.get()这种方式进行加工处理一下。下面的主题主要是:基于代理模式下使用更加简洁的上下文模块contextvars来实现全局代理request: Request =request_context.get()
步骤1:定义LocalProxy,LocalProxy 作为 Local 的代理,转发所有的操作到Local中的实际数据
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : locsasl
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/12/13
-------------------------------------------------
修改描述-2021/12/13:
-------------------------------------------------
"""
import copy
from threading import get_ident
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident
def release_local(local):
'''
清空相关的字典内的数据
传入的是一个对象,对象里面调用的是它的一个叫做__release_local__()的方法
:param local:
:return:
'''
local.__release_local__()
class LocalStack:
"""
存储本地参数的堆栈:
LocalStack与Local对象类似,区别在于其数据结构是栈的形式,而Local是字典的形式
Local相当于dict,LocalStack相当于栈
"""
def __init__(self):
self._local = Local()
def __release_local__(self):
self._local.__release_local__()
@property
def __ident_func(self): # pylint: disable=unused-private-member
return self._local.__ident_func__
@__ident_func.setter
def __ident_func__(self, value):
object.__setattr__(self._local, "__ident_func__", value)
def __call__(self):
def _lookup():
rv = self.top
if rv is None:
raise RuntimeError("Object unbond")
return rv
return LocalProxy(_lookup)
def push(self, obj):
""" 将一个数据入栈
"""
# 判断字典线程字典里面是否存在stack的属性
rv = getattr(self._local, "stack", None)
if rv is None:
self._local.stack = rv = [] # pylint: disable=assigning-non-slot
rv.append(obj)
return rv
def pop(self):
# 判断字典线程字典里面是否存在stack的属性
stack = getattr(self._local, "stack", None)
if stack is None:
return None
# 栈顶
if len(stack) == 1:
self._local.__release_local__()
return stack[-1]
return stack.pop()
@property
def top(self):
""" 返回栈顶元素
"""
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None
class Local:
""" Local 相当于一个字典,key为当前线程ID,
对Local中的数据进行操作,相当于对线程内的数据进行操作,实现了线程之间数据隔离,
对key操作其实就是对:当前线程ID内获取到对应的dict存储空间
# Python内置的字典本质是一个哈希表,它是一种用空间换时间的数据结构。为了解决冲突的问题,
#当字典使用量超过2/3时,Python会根据情况进行2-4倍的扩容。
#由此可预见,取消__dict__的使用可以大幅减少实例的空间消耗
# ===================
:问题点:
--1:普通类身上时,使用__slots__后会丧失动态添加属性和弱引用的功能
--2:当一个类需要创建大量实例时,可以使用__slots__来减少内存消耗
--3:slots的特性来限制实例的属性
"""
# 这个字典仅仅只能有这两个的属性存在
__slots__ = ("__storage__", "__ident_func__")
def __init__(self):
object.__setattr__(self, "__storage__", {})
object.__setattr__(self, "__ident_func__", get_ident)
def __iter__(self):
return iter(self.__storage__.items())
# 当调用【Local对象】时,返回对应的LocalProxy
def __call__(self, proxy):
""" 创建某一个name的Proxy,
返回一个代理的对象
"""
print(f"返回一个代理的对象,创建某一个{proxy}的Proxy")
return LocalProxy(self, proxy)
def __release_local__(self):
'''
# Local类中特有的method,用于清空greenlet id或线程id对应的dict数据
:return:
'''
self.__storage__.pop(self.__ident_func__(), None)
def __getattr__(self, name):
'''
获取某线程内的某参数的执行信息
:param name:
:return:
'''
try:
return self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name) # pylint: disable=raise-missing-from
def __setattr__(self, name, value):
# 执行了__setattr__,创建了{ident:{stack:any}}
ident = self.__ident_func__()
storage = self.__storage__
try:
storage[ident][name] = value
except KeyError:
storage[ident] = {name: value}
def __delattr__(self, name):
try:
del self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name) # pylint: disable=raise-missing-from
class LocalProxy:
""" LocalProxy 作为 Local 的代理,转发所有的操作到Local中的实际数据
未封装前操作数据需要用 `local.xxx` 的语法进行操作,封装时候直接操作数据就行
目的:
使用代理而不是显式的对象的主要目的在于这四个对象使用太过频繁,贯穿整个请求周期,显式传递很容易造成循环导入的问题,需要一个第三方的对象来进行解耦
"""
__slots__ = ("__local", "__dict__", "__name__", "__wrapped__") # pylint: disable=class-variable-slots-conflict
def __init__(self, local, name=None):
# 是这代理对象代理的local对象
object.__setattr__(self, "_LocalProxy__local", local)
# 设置代码对象的名
object.__setattr__(self, "__name__", name)
# 1:被代理的对象必须是一个callable【callable() 函数用于检查一个对象是否是可调用的】
# 2:被代理的对象必须有__release_local__的属性,释放内部的字典数据
if callable(local) and not hasattr(local, "__release_local__"):
object.__setattr__(self, "__wrapped__", local)
def _get_current_object(self):
"""
'''返回当前对象。如果出于性能原因您一次希望将真实对象放在代理后面,
或者因为要将对象传递到不同的上下文,这将很有用。
'''
'''
1.由于所有Local或LocalStack对象都有__release_local__ method, \
所以如果没有该属性就表明self.__local为callable对象。
2.当初始化参数为callable对象时,则直接调用以返回Local或LocalStack对象
'''
"""
# 如果被代码的对象没有__release_local__ 那么就的返回__local()实例化的对象
if not hasattr(self.__local, "__release_local__"):
return self.__local()
try:
# 此处self.__local为Local或LocalStack对象
return getattr(self.__local, self.__name__)
except AttributeError:
raise RuntimeError(f"no object bond to {self.__name__}") # pylint: disable=(raise-missing-from
@property
def __dict__(self):
'''
_get_current_object当前对象的字典序列化返回
:return:
'''
try:
return self._get_current_object().__dict__
except RuntimeError:
raise AttributeError("__dict__") # pylint: disable=(raise-missing-from
def __repr__(self):
'''
_get_current_object__repr__()方法:显示属性
:return:
'''
try:
obj = self._get_current_object()
except RuntimeError:
return "<%s unbond>" % self.__class__.__name__
return repr(obj)
def __bool__(self):
try:
return bool(self._get_current_object())
except RuntimeError:
return False
def __dir__(self):
'''
返回当前对象的自省性格的属性信息
:return:
'''
try:
return dir(self._get_current_object())
except RuntimeError:
return []
def __getattr__(self, name):
if name == "__members__":
return dir(self._get_current_object())
return getattr(self._get_current_object(), name)
def __setitem__(self, key, value):
self._get_current_object()[key] = value
def __delitem__(self, key):
del self._get_current_object()[key]
# 重载了绝大多数操作符,以便在调用LocalProxy的相应操作时,
# 通过_get_current_object method来获取真正代理的对象,然后再进行相应操作
__setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v)
__delattr__ = lambda x, n: delattr(x._get_current_object(), n)
__str__ = lambda x: str(x._get_current_object())
__lt__ = lambda x, o: x._get_current_object() < o
__le__ = lambda x, o: x._get_current_object() <= o
__eq__ = lambda x, o: x._get_current_object() == o
__ne__ = lambda x, o: x._get_current_object() != o
__gt__ = lambda x, o: x._get_current_object() > o
__ge__ = lambda x, o: x._get_current_object() >= o
__hash__ = lambda x: hash(x._get_current_object())
__call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
__len__ = lambda x: len(x._get_current_object())
__getitem__ = lambda x, i: x._get_current_object()[i]
__iter__ = lambda x: iter(x._get_current_object())
__contains__ = lambda x, i: i in x._get_current_object()
__add__ = lambda x, o: x._get_current_object() + o
__sub__ = lambda x, o: x._get_current_object() - o
__mul__ = lambda x, o: x._get_current_object() * o
__floordiv__ = lambda x, o: x._get_current_object() // o
__mod__ = lambda x, o: x._get_current_object() % o
__divmod__ = lambda x, o: x._get_current_object().__divmod__(o)
__pow__ = lambda x, o: x._get_current_object() ** o
__lshift__ = lambda x, o: x._get_current_object() << o
__rshift__ = lambda x, o: x._get_current_object() >> o
__and__ = lambda x, o: x._get_current_object() & o
__xor__ = lambda x, o: x._get_current_object() ^ o
__or__ = lambda x, o: x._get_current_object() | o
__div__ = lambda x, o: x._get_current_object().__div__(o)
__truediv__ = lambda x, o: x._get_current_object().__truediv__(o)
__neg__ = lambda x: -(x._get_current_object())
__pos__ = lambda x: +(x._get_current_object())
__abs__ = lambda x: abs(x._get_current_object())
__invert__ = lambda x: ~(x._get_current_object())
__complex__ = lambda x: complex(x._get_current_object())
__int__ = lambda x: int(x._get_current_object())
__float__ = lambda x: float(x._get_current_object())
__oct__ = lambda x: oct(x._get_current_object())
__hex__ = lambda x: hex(x._get_current_object())
__index__ = lambda x: x._get_current_object().__index__()
__coerce__ = lambda x, o: x._get_current_object().__coerce__(x, o)
__enter__ = lambda x: x._get_current_object().__enter__()
__exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw)
__radd__ = lambda x, o: o + x._get_current_object()
__rsub__ = lambda x, o: o - x._get_current_object()
__rmul__ = lambda x, o: o * x._get_current_object()
__rdiv__ = lambda x, o: o / x._get_current_object()
__rtruediv__ = __rdiv__
__rfloordiv__ = lambda x, o: o // x._get_current_object()
__rmod__ = lambda x, o: o % x._get_current_object()
__rdivmod__ = lambda x, o: x._get_current_object().__rdivmod__(o)
__copy__ = lambda x: copy.copy(x._get_current_object())
__deepcopy__ = lambda x, memo: copy.deepcopy(x._get_current_object(), memo)
步骤2:创建上下文管理对象
import contextvars
request_context = contextvars.ContextVar('request_context')
步骤3:定义代理操作对象tracdidrequest
from fastapi import Request
def get_current_request():
# 代理里面的get方法直接的返回,这样就不需要显示的调用get()
return request_context.get()
#定义 一个全局代理g里面的current_app_request 的对象,这样也可以实现类似flask上线文的效果
tracdidrequest:Request=LocalProxy(get_current_request)
步骤4:在中间件中件实例化上下文的值
class TracdIDMiddleware(BaseHTTPMiddleware):
# dispatch 必须实现
async def dispatch(self, request:Request, call_next):
request_context.set(request)
request.state.traceid = uuid.uuid4()
responser = await call_next(request)
# 返回接口响应时间
return responser
app.add_middleware(TracdIDMiddleware)
步骤5:然后呢 就可以任意地方导入对象tracdidrequest来获取当前请求request:Request了
@uuser_group_router.get("/login")
def user_login():
print(tracdidrequest.state.traceid)
return {
'code': 'login_ok'
}
然后我们就不需要显示传入request:Request, 甚至不需要显示的request: Request =request_context.get()的方式来获取了!!!
2. 简化模式
其实上面的flask实现代码过程似乎看起来非常复杂,而且其实有写内部细节有点乱。下面我来一个看得懂的更见的实现。
代码来自于index.py框架的实现,如果你想自己实现一个框架的话,可以参考了解哦。
https://github.com/index-py/index.py
步骤1:定义一个绑定代理方法:
def bind_contextvar(contextvar):
class ContextVarBind:
__slots__ = ()
def __getattr__(self, name):
return getattr(contextvar.get(), name)
def __setattr__(self, name, value):
setattr(contextvar.get(), name, value)
def __delattr__(self, name):
delattr(contextvar.get(), name)
def __getitem__(self, index):
return contextvar.get()[index]
def __setitem__(self, index, value):
contextvar.get()[index] = value
def __delitem__(self, index):
del contextvar.get()[index]
return ContextVarBind()
步骤2:基于contextvars方式实现上下文
contextvars的是线程安全和协程安全的,使用这个最好的方式,我们可以把下面的代码定义在某个模块下。
from contextvars import ContextVar
from fastapi import Request
def bind_contextvar(contextvar):
class ContextVarBind:
__slots__ = ()
def __getattr__(self, name):
return getattr(contextvar.get(), name)
def __setattr__(self, name, value):
setattr(contextvar.get(), name, value)
def __delattr__(self, name):
delattr(contextvar.get(), name)
def __getitem__(self, index):
return contextvar.get()[index]
def __setitem__(self, index, value):
contextvar.get()[index] = value
def __delitem__(self, index):
del contextvar.get()[index]
return ContextVarBind()
request_var: ContextVar[Request] = ContextVar("request")
request:Request = bind_contextvar(request_var)
步骤3:自定义一个中间件用于初始化request_var
中间件具体代码如下:
from utils.request import request_var
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
token = request_var.set(request)
try:
response = await call_next(request)
return response
finally:
request_var.reset(token)
步骤4.开始进行各种导入测试验证
注意需要的是导入的是时候导入bind_contextvar(request_var)下的代理对象。
request:Request = bind_contextvar(request_var)
步骤5 实际应用
from utils.request import request
@router_org.post('/add/org', summary="机构添加")
async def callfun(*, org: Org, db_session: AsyncSession = Depends(get_db_session)):
# 这里应该使用事务处理
print(request.headers)
return JSONResponse({
"code": 200,
"msg": "创建机构成功",
"data": {"org_name": org.org_name}
})
以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!
结尾
\
转载自:https://juejin.cn/post/7091845249156268068