fastapi微服务系列-之封装GRPC实现类似fastapi一个框架(2)-定义一个全局request代理
前言
如果你用过flask的话,肯定对立面那种导入一个模块,就可以到处调用的,而且还是线程安全的隔离的。其实细究内部源码,其实也无法是一个字典+代理模式设置的方式来实现的。
在上一篇中,有对我们的GRPC做了简单的封装,但是没有引入这种模式,而且我再使用contextvars来做上下文的管理的是,也没有实现类似的导入一个模块就导出可以调用的方式,意思就是我需要显示的调用:
g.current_app_request.get()
这种方法才可以获取当前的请求的
request
那能不能实现类似的flask那种模式我把
g.current_app_request.get()
封装成一个模块,然后可以直接导入就能实现呐? 这个其实就是代理模式的实现了!
当然:其实我们的框架中每个服务的实现已经自带有了request和context传递
def SayHelloAgain(self, request, context):
至于为啥还需要封装一个类似的全局导入的模块来使用,主要是你自己场景需要吧,可能如果你进行模块分离的时候,或许可以使用到了!这个就看你个人的习惯了!
今天主要主题是:
- 1:实现类似flask那种代理模式,代理我们的grap中的request
- 2:基于代理模式下使用更加简洁的上下文模块contextvars来实现全局代理grap中的request
1:实现类似flaswk中的全局代理模块
1.1 定义local.py
在此模块中,主要需要解决的问题是:
-
定义Local,需要考虑线程安全等问题,其中Local 相当于一个字典,key为当前线程ID、或协程ID
-
对Local中的数据进行操作,相当于对线程内的数据进行操作,实现了线程之间数据隔离
-
为了性能和节省内存消耗__slots__定义限制扩展属性
-
及时的进行释放清理
-
定义LocalStack,LocalStack与Local对象类似,区别在于其数据结构是栈的形式,而Local是字典的形式
-
定义LocalProxy,LocalProxy 作为 Local 的代理,转发所有的操作到Local中的实际数据,
PS:使用代理而不是显式进行的对象调用:主要目的在于如果一个对象在整个上下文请求过程使用太过频繁,贯穿整个请求周期,显式传递很容易造成循环导入的问题,使用需要一个第三方的对象来进行解耦就比较方便。
完整代码:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : locsasl
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/12/13
-------------------------------------------------
修改描述-2021/12/13:
-------------------------------------------------
"""
import copy
from threading import get_ident
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident
def release_local(local):
'''
清空相关的字典内的数据
传入的是一个对象,对象里面调用的是它的一个叫做__release_local__()的方法
:param local:
:return:
'''
local.__release_local__()
class LocalStack:
"""
存储本地参数的堆栈:
LocalStack与Local对象类似,区别在于其数据结构是栈的形式,而Local是字典的形式
Local相当于dict,LocalStack相当于栈
"""
def __init__(self):
self._local = Local()
def __release_local__(self):
self._local.__release_local__()
@property
def __ident_func(self): # pylint: disable=unused-private-member
return self._local.__ident_func__
@__ident_func.setter
def __ident_func__(self, value):
object.__setattr__(self._local, "__ident_func__", value)
def __call__(self):
def _lookup():
rv = self.top
if rv is None:
raise RuntimeError("Object unbond")
return rv
return LocalProxy(_lookup)
def push(self, obj):
""" 将一个数据入栈
"""
# 判断字典线程字典里面是否存在stack的属性
rv = getattr(self._local, "stack", None)
if rv is None:
self._local.stack = rv = [] # pylint: disable=assigning-non-slot
rv.append(obj)
return rv
def pop(self):
# 判断字典线程字典里面是否存在stack的属性
stack = getattr(self._local, "stack", None)
if stack is None:
return None
# 栈顶
if len(stack) == 1:
self._local.__release_local__()
return stack[-1]
return stack.pop()
@property
def top(self):
""" 返回栈顶元素
"""
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None
class Local:
""" Local 相当于一个字典,key为当前线程ID,
对Local中的数据进行操作,相当于对线程内的数据进行操作,实现了线程之间数据隔离,
对key操作其实就是对:当前线程ID内获取到对应的dict存储空间
# Python内置的字典本质是一个哈希表,它是一种用空间换时间的数据结构。为了解决冲突的问题,
#当字典使用量超过2/3时,Python会根据情况进行2-4倍的扩容。
#由此可预见,取消__dict__的使用可以大幅减少实例的空间消耗
# ===================
:问题点:
--1:普通类身上时,使用__slots__后会丧失动态添加属性和弱引用的功能
--2:当一个类需要创建大量实例时,可以使用__slots__来减少内存消耗
--3:slots的特性来限制实例的属性
"""
# 这个字典仅仅只能有这两个的属性存在
__slots__ = ("__storage__", "__ident_func__")
def __init__(self):
object.__setattr__(self, "__storage__", {})
object.__setattr__(self, "__ident_func__", get_ident)
def __iter__(self):
return iter(self.__storage__.items())
# 当调用【Local对象】时,返回对应的LocalProxy
def __call__(self, proxy):
""" 创建某一个name的Proxy,
返回一个代理的对象
"""
print(f"返回一个代理的对象,创建某一个{proxy}的Proxy")
return LocalProxy(self, proxy)
def __release_local__(self):
'''
# Local类中特有的method,用于清空greenlet id或线程id对应的dict数据
:return:
'''
self.__storage__.pop(self.__ident_func__(), None)
def __getattr__(self, name):
'''
获取某线程内的某参数的执行信息
:param name:
:return:
'''
try:
return self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name) # pylint: disable=raise-missing-from
def __setattr__(self, name, value):
# 执行了__setattr__,创建了{ident:{stack:any}}
ident = self.__ident_func__()
storage = self.__storage__
try:
storage[ident][name] = value
except KeyError:
storage[ident] = {name: value}
def __delattr__(self, name):
try:
del self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name) # pylint: disable=raise-missing-from
class LocalProxy:
""" LocalProxy 作为 Local 的代理,转发所有的操作到Local中的实际数据
未封装前操作数据需要用 `local.xxx` 的语法进行操作,封装时候直接操作数据就行
目的:
使用代理而不是显式的对象的主要目的在于这四个对象使用太过频繁,贯穿整个请求周期,显式传递很容易造成循环导入的问题,需要一个第三方的对象来进行解耦
"""
__slots__ = ("__local", "__dict__", "__name__", "__wrapped__") # pylint: disable=class-variable-slots-conflict
def __init__(self, local, name=None):
# 是这代理对象代理的local对象
object.__setattr__(self, "_LocalProxy__local", local)
# 设置代码对象的名
object.__setattr__(self, "__name__", name)
# 1:被代理的对象必须是一个callable【callable() 函数用于检查一个对象是否是可调用的】
# 2:被代理的对象必须有__release_local__的属性,释放内部的字典数据
if callable(local) and not hasattr(local, "__release_local__"):
object.__setattr__(self, "__wrapped__", local)
def _get_current_object(self):
"""
'''返回当前对象。如果出于性能原因您一次希望将真实对象放在代理后面,
或者因为要将对象传递到不同的上下文,这将很有用。
'''
'''
1.由于所有Local或LocalStack对象都有__release_local__ method, \
所以如果没有该属性就表明self.__local为callable对象。
2.当初始化参数为callable对象时,则直接调用以返回Local或LocalStack对象
'''
"""
# 如果被代码的对象没有__release_local__ 那么就的返回__local()实例化的对象
if not hasattr(self.__local, "__release_local__"):
return self.__local()
try:
# 此处self.__local为Local或LocalStack对象
return getattr(self.__local, self.__name__)
except AttributeError:
raise RuntimeError(f"no object bond to {self.__name__}") # pylint: disable=(raise-missing-from
@property
def __dict__(self):
'''
_get_current_object当前对象的字典序列化返回
:return:
'''
try:
return self._get_current_object().__dict__
except RuntimeError:
raise AttributeError("__dict__") # pylint: disable=(raise-missing-from
def __repr__(self):
'''
_get_current_object__repr__()方法:显示属性
:return:
'''
try:
obj = self._get_current_object()
except RuntimeError:
return "<%s unbond>" % self.__class__.__name__
return repr(obj)
def __bool__(self):
try:
return bool(self._get_current_object())
except RuntimeError:
return False
def __dir__(self):
'''
返回当前对象的自省性格的属性信息
:return:
'''
try:
return dir(self._get_current_object())
except RuntimeError:
return []
def __getattr__(self, name):
if name == "__members__":
return dir(self._get_current_object())
return getattr(self._get_current_object(), name)
def __setitem__(self, key, value):
self._get_current_object()[key] = value
def __delitem__(self, key):
del self._get_current_object()[key]
# 重载了绝大多数操作符,以便在调用LocalProxy的相应操作时,
# 通过_get_current_object method来获取真正代理的对象,然后再进行相应操作
__setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v)
__delattr__ = lambda x, n: delattr(x._get_current_object(), n)
__str__ = lambda x: str(x._get_current_object())
__lt__ = lambda x, o: x._get_current_object() < o
__le__ = lambda x, o: x._get_current_object() <= o
__eq__ = lambda x, o: x._get_current_object() == o
__ne__ = lambda x, o: x._get_current_object() != o
__gt__ = lambda x, o: x._get_current_object() > o
__ge__ = lambda x, o: x._get_current_object() >= o
__hash__ = lambda x: hash(x._get_current_object())
__call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
__len__ = lambda x: len(x._get_current_object())
__getitem__ = lambda x, i: x._get_current_object()[i]
__iter__ = lambda x: iter(x._get_current_object())
__contains__ = lambda x, i: i in x._get_current_object()
__add__ = lambda x, o: x._get_current_object() + o
__sub__ = lambda x, o: x._get_current_object() - o
__mul__ = lambda x, o: x._get_current_object() * o
__floordiv__ = lambda x, o: x._get_current_object() // o
__mod__ = lambda x, o: x._get_current_object() % o
__divmod__ = lambda x, o: x._get_current_object().__divmod__(o)
__pow__ = lambda x, o: x._get_current_object() ** o
__lshift__ = lambda x, o: x._get_current_object() << o
__rshift__ = lambda x, o: x._get_current_object() >> o
__and__ = lambda x, o: x._get_current_object() & o
__xor__ = lambda x, o: x._get_current_object() ^ o
__or__ = lambda x, o: x._get_current_object() | o
__div__ = lambda x, o: x._get_current_object().__div__(o)
__truediv__ = lambda x, o: x._get_current_object().__truediv__(o)
__neg__ = lambda x: -(x._get_current_object())
__pos__ = lambda x: +(x._get_current_object())
__abs__ = lambda x: abs(x._get_current_object())
__invert__ = lambda x: ~(x._get_current_object())
__complex__ = lambda x: complex(x._get_current_object())
__int__ = lambda x: int(x._get_current_object())
__float__ = lambda x: float(x._get_current_object())
__oct__ = lambda x: oct(x._get_current_object())
__hex__ = lambda x: hex(x._get_current_object())
__index__ = lambda x: x._get_current_object().__index__()
__coerce__ = lambda x, o: x._get_current_object().__coerce__(x, o)
__enter__ = lambda x: x._get_current_object().__enter__()
__exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw)
__radd__ = lambda x, o: o + x._get_current_object()
__rsub__ = lambda x, o: o - x._get_current_object()
__rmul__ = lambda x, o: o * x._get_current_object()
__rdiv__ = lambda x, o: o / x._get_current_object()
__rtruediv__ = __rdiv__
__rfloordiv__ = lambda x, o: o // x._get_current_object()
__rmod__ = lambda x, o: o % x._get_current_object()
__rdivmod__ = lambda x, o: x._get_current_object().__rdivmod__(o)
__copy__ = lambda x: copy.copy(x._get_current_object())
__deepcopy__ = lambda x, memo: copy.deepcopy(x._get_current_object(), memo)
1.2 定义request.py,封装Reques对象
Reques对象里面主要封装我们的grpc中传递的request, context,封装起来主要是对方法进行封装,方便后续的调用,而且后续我们的代理的对象其实也就是这个对象的示例,这样就可以直接的调用里面的方法。
完整代码:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : rc
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/12/13
-------------------------------------------------
修改描述-2021/12/13:
-------------------------------------------------
"""
class Request:
def __init__(self, request, context):
""" 初始化Request实例
:param request: 原始请求
:param context: 请求上下文
"""
self.request = request
self.context = context
def headers(self):
""" 获取请求头
"""
if self.rpc_event is not None:
# 反射查询
metadata = getattr(self.rpc_event, "invocation_metadata")
return dict(metadata)
return None
def method(self):
""" 获取本次请求调用的方法
"""
if self.call_details is not None:
# 反射查询
method = getattr(self.call_details, "method")
return method.decode("utf8") if method else method
return None
def service(self):
""" 获取本次请求的Service
"""
if self.method is not None:
return self.method.split("/")[-2]
return None
def rpc_event(self):
""" 返回RPC事件
"""
if self.context is not None:
# 反射查询
return getattr(self.context, "_rpc_event")
return None
def call_details(self):
""" 返回调用详情
"""
if self.rpc_event is not None:
# 反射查询
return getattr(self.rpc_event, "call_details")
return None
1.3 定义ctx.py,封装RequestContext对象
对于我们的Reques对象进行压榨和入栈的管理。
代码:
from grpcframe.grpclocal.request import Request
from grpcframe.grpclocal.g import _request_ctx_stack
import typing as t
class RequestContext:
'''
创建一个请求处理对象
'''
def __init__(self, params=None, context=None) -> None:
""" 创建请求上下文
"""
# 创建GRPC中的请求提信息
self.request = Request(params, context)
# 是否是保存
self.preserved = False
self._preserved_exc = None
def push(self) -> None:
""" 将请求上下文入栈
"""
top = _request_ctx_stack.top
if top is not None and top.preserved:
# 记录异常错误信息
top.pop(top._preserved_exc)
# 入栈存在的是整个的RequestContext的对象,但是在调用我们的代理的时候取的是self.request的属性
_request_ctx_stack.push(self)
def pop(self, exc: t.Optional[BaseException] = object()) -> None:
""" 将请求出栈
"""
rv = _request_ctx_stack.pop()
if rv is not self:
raise RuntimeError("Popped wrong request context")
def __enter__(self) -> "RequestContext":
self.push()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.pop(exc_val)
1.4 定义g.py
这里主要是定义我们需要代理的对象的模块,因为我们的上下文根据上下文来进行获取的,所以我们的需要代理其实是我们的上面提到的Request类实例对象,但是这个实例对象又是不同的请求的,所以是从栈里面进行拿的。
所以整个的代理过程就是:
- 1:定义个_request_ctx_stack = LocalStack()
- 2:定义一个_lookup_req_object方法,这个该方式从_request_ctx_stack栈顶里面获取一个RequestContext对象,然后返回的时候是返回RequestContext对象的request实例对象【这样就可以获取到我们具体上下文中的请求提信息】
- 3:使用偏函数来帮忙自动传参,定义需要代理的方法,和对应的属性值。
完整代码:
from grpcframe.grpclocal.local import LocalStack,LocalProxy
from functools import partial
from grpcframe.grpclocal.request import Request
# 请求上下文的
_request_ctx_stack = LocalStack()
def _lookup_req_object(name)->Request:
# 返回栈顶原生对象
top = _request_ctx_stack.top
if top is None:
raise RuntimeError("Working outside of request context")
# "request"---返回RequestContext对象的request属性
return getattr(top, name)
# 创建一个request的代理对象
# request作为一个代理对象,其指向的是当前的请求的请求上下文RequestContext对象的request属性,
# 即在请求上下文初始化的时候创建的请求对象,它是程序处理逻辑频繁需要的对象,
# 存储着几乎所有的请求数据信息,本质上是Request对象。生命周期为一次请求期间,当请求处理完成后被销毁;
#>>>---"request"---指向的RequestContext对象的request属性---为出现方法提示:Request
request:Request = LocalProxy(partial(_lookup_req_object, "request"))
1.5 定义实例化方法
def request_context(self, request, context):
""" 使用类似falask的自定义的方式封装的-请求上下文
"""
return RequestContext(request, context)
1.6 进行入栈和出栈实例化
使用截图:
在默认固定的基础的中间件里面进行实例化(额外定义一个比较好,避免多次实例化)
完整代码:
# 需要先继承ServerInterceptor再继承ABCMeta
class BaseMiddleware(ServerInterceptor, BaseAppMiddleware, metaclass=abc.ABCMeta):
@abc.abstractmethod
def before_handler(self, request: Any, context: grpc.ServicerContext, method_name):
raise NotImplementedError()
@abc.abstractmethod
def after_handler(self, context: grpc.ServicerContext, response):
raise NotImplementedError()
def intercept(self, method: Callable, request: Any, context: grpc.ServicerContext, method_name: str, ) -> Any:
# 创建上下文对象
ctx = None
if self.app:
ctx = self.app.request_context(request, context)
try:
# 上下文入栈
if self.app:
ctx.push()
# 回调调用必要的前置的函数处理
self.before_handler(request, context, method_name)
response = method(request, context)
# 调用必要的后置的函数处理
self.after_handler(context, response)
return response
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
finally:
if self.app:
ctx.pop()
1.7 导入request随意使用
request:Request = LocalProxy(partial(_lookup_req_object, "request"))
如:
2: 基于contextvars实现类似flaswk中的全局代理模
2.1 定义我们的全局的管理对象
@dataclass
class GManager(metaclass=Singleton):
# 记录当前启动的app
current_app_server = contextvars.ContextVar('current_app_server', default=None)
# 记录当前请求的请求
current_app_request = contextvars.ContextVar('current_app_request', default=None)
# 记录当前请求的请求
current_app_context = contextvars.ContextVar('current_app_context', default=None)
# 记录当前被激活的追踪链路的span
active_tracer_span = contextvars.ContextVar('current_active_tracer_span', default=None)
g = GManager()
2.2 在对应的中间件实例化上下文
from grpcframe.core.middlewares import BaseMiddleware
from typing import Any, Callable, NamedTuple, Tuple
from grpcframe.core.app import g
import grpc
# @app.add_middleware
class CxtgMiddleware(BaseMiddleware):
def before_handler(self, request: Any, context: grpc.ServicerContext, method_name):
pass
# 设置上下文信息处理
g.current_app_request.set(request)
g.current_app_context.set(context)
# 创建类似flask的上下文的对象
def after_handler(self, context: grpc.ServicerContext, response):
pass
2.3 定义代理操作对象
其实这里代理的就是.current_app_request.get()
def get_current_app_request(g=None):
# 代理里面的get方法直接的返回,这样就不需要显示的调用get()
return g.current_app_request.get()
#定义 一个全局代理g里面的current_app_request 的对象,这样也可以实现类似flask上线文的效果
grequest=LocalProxy(partial(get_current_app_request, g))
2.4 导入grequest使用
类似的方法:
def parse_request_action(self,request) -> Dict:
'''
获取当前RPC的请求参数信息,进行字典的序列化
:param request:
:return:
'''
request_dict = json_format.MessageToDict(request, preserving_proto_field_name=True)
return request_dict
传入全局对象进行使用:
总结
以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!
结尾
转载自:https://juejin.cn/post/7041122480768942110