flask源码分析(三)上下文
前言
本系列将对python几个流行的web框架源码进行分析。主要分析各个框架的特点、特色功能实现原理、以及一个请求在框架中的经过哪些处理。 文章不打算精读每一行代码,而是挑出框架比较有特色的地方深入研究,对于其他内容,采取粗读的形式。 预计会选取flask,fastapi,Django这三个框架。 每个框架各有特色,所以文章没有固定格式和必要篇目。
在一段文字或者对话中,如果不知道上下文,可能就会对这段文字感到疑惑,因为你缺少了一些信息。 比如文字中出现了【他】,【她】,我们得从上下文中才能知道指代的是谁。在程序里面也是如此,所谓的上下文,就是包含了程序的各种变量信息。 例如一次API请求包含了这次请求的请求头、请求地址、参数等等。 一个函数可能需要调用另外一个函数,第二个函数需要知道这次请求的某些信息,这些就是上下文。
通过上篇文章得知,请求最后是进到了用户自定义的试图函数中的:
from flask import Flask
app = Flask(__name__)
@app.get('/get')
def get():
return 'get'
可以看到,对于一个请求来说,它的请求地址【'/get'】是通过get
装饰器绑定好的,也就是不需要额外去获取。对于这个请求,目前没有其他的请求信息需要获取。
但是现在,如果需要获取这个请求的请求头呢?
看一下几种不同的框架如何获请求头的:
# wsgi
def application(environ, start_response):
status = '200 OK'
headers = [('Content-type', 'text/plain')]
# 获取请求头
headers.extend([(key, value) for key, value in environ.items() if key.startswith('HTTP_')])
start_response(status, headers)
return [b"Hello World"]
# django
from django.http import HttpResponse
def test_view(request):
# 获取请求头
headers = request.headers
return HttpResponse("Hello World", headers=headers)
# fastapi
from fastapi import FastAPI, Header
app = FastAPI()
@app.get("/test")
async def test_view(header: str = Header(None)):
# 获取请求头
return {"message": "Hello World", "header": header}
# flask
from flask import Flask, request
app = Flask(__name__)
@app.route('/test')
def test_view():
# 获取请求头
headers = request.headers
return "Hello World", 200, headers.items()
可以看到一个非常明显的区别,django和fastapi需要明显传入一个【request】对象,这个对象包含了请求的所有信息,也就是wsgi里面的【environ】。框架会处理【environ】,封装成【request】对象,然后一路传递下去。后面的文章也会对【请求】【响应】做出分析。
现在主要讨论一下请求上下文。 flask在和其他例子的对比中,可以看到,视图函数不需要接收一个请求对象,而是通过from flask import request
导入的【request】来获取内容。 不得不说这是很棒的设计,实际上,上下文管理也是flask这个框架的一大亮点。
对于这样的使用方法,有几点疑问: 【request】是全局的吗?如何做到不同的请求数据不会互相干扰?不同的线程、协程是如何隔离的?
请求上下文
在上一篇中,我们看到,请求进来之后,会根据【path】找到已经绑定好的视图函数,最后返回视图函数的调用结果。这里不再累述,可以去看一下上一篇。现在主要看一下,在调用视图函数的过程中,发生了什么。
# flask/app.py
def wsgi_app(
self, environ: WSGIEnvironment, start_response: StartResponse
) -> cabc.Iterable[bytes]:
ctx = self.request_context(environ)
error: BaseException | None = None
try:
try:
ctx.push()
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except: # noqa: B001
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if "werkzeug.debug.preserve_context" in environ:
environ["werkzeug.debug.preserve_context"](_cv_app.get())
environ["werkzeug.debug.preserve_context"](_cv_request.get())
if error is not None and self.should_ignore_error(error):
error = None
ctx.pop(error)
ctx = self.request_context(environ)
是传入environ
,生成一个请求对象。现在不相信分析具体是做什么的,只要知道ctx
包含了一个请求所有的内容,请求地址,请求头,请求参数等等。
接着重点来了,在调用full_dispatch_request
之前,进行了ctx.push()
,在整个过程处理完之后,进行了ctx.pop(error)
。
这个push和pop看起来是入栈和出栈。 也就是在处理请求之前,flask会将包含了请求的上下文ctx
入栈,处理完请求之后,进行出栈。
这样做的目的是什么? 由上面的例子可以看到,只要导入了全局变量request
,那么就可以直接使用这次请求的上下文,所以这个入栈和出栈,或许是将请求上下文存到全局变量request
中,这样就可以全局调用。
# flask/app.py
def dispatch_request(self) -> ft.ResponseReturnValue:
req = request_ctx.request
if req.routing_exception is not None:
self.raise_routing_exception(req)
rule: Rule = req.url_rule # type: ignore[assignment]
# if we provide automatic options for this URL and the
# request came with the OPTIONS method, reply automatically
if (
getattr(rule, "provide_automatic_options", False)
and req.method == "OPTIONS"
):
return self.make_default_options_response()
# otherwise dispatch to the handler for that endpoint
view_args: dict[str, t.Any] = req.view_args # type: ignore[assignment]
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
在调用dispatch_request
的时候,可以看到,并不是将上面的ctx
传入dispatch_request
,而是通过request_ctx.request
,也就是说,在这里,和在外部视图函数使用的方式是一样的,使用request
全局变量,获取上下文。
push和pop
# flask/globals.py
# ....
_cv_request: ContextVar[RequestContext] = ContextVar("flask.request_ctx")
request_ctx: RequestContext = LocalProxy( # type: ignore[assignment]
_cv_request, unbound_message=_no_req_msg
)
request: Request = LocalProxy( # type: ignore[assignment]
_cv_request, "request", unbound_message=_no_req_msg
)
这些是flask提供的全局变量。request
和request_ctx
都可以全局使用,只是封装的程度不一样。
上文提到,在wsgi_app
函数中,请求到来的时候,调用了ctx.push
入栈,处理完成之后,进行出栈。 这是push
的源码:
# flask/ctx.py
def push(self) -> None:
# Before we push the request context we have to ensure that there
# is an application context.
app_ctx = _cv_app.get(None)
if app_ctx is None or app_ctx.app is not self.app:
app_ctx = self.app.app_context()
app_ctx.push()
else:
app_ctx = None
self._cv_tokens.append((_cv_request.set(self), app_ctx))
if self.session is None:
session_interface = self.app.session_interface
self.session = session_interface.open_session(self.app, self.request)
if self.session is None:
self.session = session_interface.make_null_session(self.app)
if self.url_adapter is not None:
self.match_request()
进行push
的时候,先获取应用上下文,并且入栈,这里先不分析应用上下文,后面会讲。
关键的是self._cv_tokens.append((_cv_request.set(self), app_ctx))
这里做了两件事,首先是_cv_request.set(self)
,这是把self
也就是RequestContext
实例存入Contextvar
中。
Contextvar是python3.7加入的一个模块,详情看文档:contextvars --- 上下文变量 — Python 3.12.3 文档
这里不展开如何实现,主要知道几个内容就行:
# 协程
import asyncio
from contextvars import ContextVar
# 定义一个 ContextVar
cv = ContextVar('example_var')
async def coroutine(identifier):
# 设置 ContextVar 的值,并获取返回的 Token
token = cv.set(f'value for {identifier}')
print(f'Coroutine {identifier} set value to: {cv.get()}')
# 模拟异步操作
await asyncio.sleep(1)
# 获取当前 ContextVar 的值
print(f'Coroutine {identifier} sees value: {cv.get()}')
# 重置 ContextVar 的值
cv.reset(token)
print(f'Coroutine {identifier} reset value, now: {cv.get(None)}')
async def main():
await asyncio.gather(coroutine('A'), coroutine('B'))
asyncio.run(main())
# 多线程
import threading
from contextvars import ContextVar
# 定义一个 ContextVar
cv = ContextVar('example_var')
def thread_function(identifier):
# 设置 ContextVar 的值,并获取返回的 Token
token = cv.set(f'value for {identifier}')
print(f'Thread {identifier} set value to: {cv.get()}')
# 模拟一些操作
threading.Event().wait(1)
# 获取当前 ContextVar 的值
print(f'Thread {identifier} sees value: {cv.get()}')
# 重置 ContextVar 的值
cv.reset(token)
print(f'Thread {identifier} reset value, now: {cv.get(None)}')
# 创建并启动线程
threads = []
for i in range(2):
thread = threading.Thread(target=thread_function, args=(f'Thread-{i}',))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
set
接收一个值,存入Contextvar
实例,并返回一个token
。token
可以通过reset
来重置,让Contextvar
设置成上一个值。get
获取值。
因此,self._cv_tokens.append((_cv_request.set(self), app_ctx))
的作用是把RequestContext
实例存入_cv_request
,将返回的token
和app_ctx
应用上下文存到self._cv_tokens
。
接着是pop
出栈:
# flask/ctx.py
def pop(self, exc: BaseException | None = _sentinel) -> None: # type: ignore
"""Pops the request context and unbinds it by doing that. This will
also trigger the execution of functions registered by the
:meth:`~flask.Flask.teardown_request` decorator.
.. versionchanged:: 0.9
Added the `exc` argument.
"""
clear_request = len(self._cv_tokens) == 1
try:
if clear_request:
if exc is _sentinel:
exc = sys.exc_info()[1]
self.app.do_teardown_request(exc)
request_close = getattr(self.request, "close", None)
if request_close is not None:
request_close()
finally:
ctx = _cv_request.get()
token, app_ctx = self._cv_tokens.pop()
_cv_request.reset(token)
# get rid of circular dependencies at the end of the request
# so that we don't require the GC to be active.
if clear_request:
ctx.request.environ["werkzeug.request"] = None
if app_ctx is not None:
app_ctx.pop(exc)
if ctx is not self:
raise AssertionError(
f"Popped wrong request context. ({ctx!r} instead of {self!r})"
)
这里会判断self._cv_tokens
,如果请求结束了,会执行通过@app.teardown_request
绑定的内容。teardown_request
装饰器会注册一个函数,在请求结束后会执行。通常用来释放资源。
然后通过获取到的token
,进行reset
。
这是push
和pop
的过程。
LocalProxy
# flask/globals.py
_cv_request: ContextVar[RequestContext] = ContextVar("flask.request_ctx")
request_ctx: RequestContext = LocalProxy( # type: ignore[assignment]
_cv_request, unbound_message=_no_req_msg
)
request: Request = LocalProxy( # type: ignore[assignment]
_cv_request, "request", unbound_message=_no_req_msg
)
session: SessionMixin = LocalProxy( # type: ignore[assignment]
_cv_request, "session", unbound_message=_no_req_msg
)
可以看到_cv_request
是一个ContextVar
,作用如上文所说的,用来储存RequestContext
实例。
request_ctx
和request
都是LocalProxy
的实例,它们的作用是什么?
*这里有一个细节,源码中用了# type: ignore[assignment] 这是为了让ide在不出现warning,因为request
类型推导是'Request',但是值却是LocalProxy
。
看一下RequestContext
的构造:
flask/ctx.py
class RequestContext:
def __init__(
self,
app: Flask,
environ: WSGIEnvironment,
request: Request | None = None,
session: SessionMixin | None = None,
) -> None:
self.app = app
if request is None:
request = app.request_class(environ)
request.json_module = app.json
self.request: Request = request
self.url_adapter = None
try:
self.url_adapter = app.create_url_adapter(self.request)
except HTTPException as e:
self.request.routing_exception = e
self.flashes: list[tuple[str, str]] | None = None
self.session: SessionMixin | None = session
# Functions that should be executed after the request on the response
# object. These will be called before the regular "after_request"
# functions.
self._after_request_functions: list[ft.AfterRequestCallable[t.Any]] = []
self._cv_tokens: list[
tuple[contextvars.Token[RequestContext], AppContext | None]
] = []
RequestContext
有个属性:request
,它是通过environ
构造的。
试着对比两个对象是否相等:
from flask import Flask,request
from flask.globals import request_ctx
app = Flask(__name__)
@app.get('/')
def post():
print(request is request_ctx)
print(request.headers is request_ctx.request.headers)
return 'hello world'
if __name__ == '__main__':
app.run(port='8080')
# 结果:
# False
# True
这是很显然的,因为request
和request_ctx
都是LocalProxy
的实例,并不是同一个对象。 但是内容都是一样的。现在问题来了:LocalProxy做了什么?
# werkzeug/local.py
Class LocalProxy(t.Generic[T]):
__slots__ = ("__wrapped", "_get_current_object")
_get_current_object: t.Callable[[], T]
def __init__(
self,
local: ContextVar[T] | Local | LocalStack[T] | t.Callable[[], T],
name: str | None = None,
*,
unbound_message: str | None = None,
) -> None:
if name is None:
get_name = _identity
else:
get_name = attrgetter(name) # type: ignore[assignment]
if unbound_message is None:
unbound_message = "object is not bound"
if isinstance(local, Local):
if name is None:
raise TypeError("'name' is required when proxying a 'Local' object.")
def _get_current_object() -> T:
try:
return get_name(local) # type: ignore[return-value]
except AttributeError:
raise RuntimeError(unbound_message) from None
elif isinstance(local, LocalStack):
def _get_current_object() -> T:
obj = local.top
if obj is None:
raise RuntimeError(unbound_message)
return get_name(obj)
elif isinstance(local, ContextVar):
def _get_current_object() -> T:
try:
obj = local.get()
except LookupError:
raise RuntimeError(unbound_message) from None
return get_name(obj)
elif callable(local):
def _get_current_object() -> T:
return get_name(local())
else:
raise TypeError(f"Don't know how to proxy '{type(local)}'.")
object.__setattr__(self, "_LocalProxy__wrapped", local)
object.__setattr__(self, "_get_current_object", _get_current_object)
flask使用了werkzeug中的LocalProxy
,代理了RequestContext
和requests
对象。可以看到local
过去是使用了threading.local
实现,现在用的这个版本是Contextvar
实现。
request_ctx: RequestContext = LocalProxy( # type: ignore[assignment]
_cv_request, unbound_message=_no_req_msg
)
request: Request = LocalProxy( # type: ignore[assignment]
_cv_request, "request", unbound_message=_no_req_msg
)
如果local
是Contextvar
,则用get
返回一个RequestContext
对象。二者的差别在于传入了一个name
,如果没有传入name,get_name
返回的是对象本身,也就是request_ctx在这里会返回RequestContext
,而request会返回RequestContext.request
,所以上文提到的,两者内容是一样的。
接下来LocalProxy
把一些魔术方法交给了_ProxyLookup
。有很多魔术方法,这边只关注__getattr__
:
....
__getattr__ = _ProxyLookup(getattr)
....
# werkzeug/local.py
class _ProxyLookup:
__slots__ = ("bind_f", "fallback", "is_attr", "class_value", "name")
def __init__(
self,
f: t.Callable[..., t.Any] | None = None,
fallback: t.Callable[[LocalProxy[t.Any]], t.Any] | None = None,
class_value: t.Any | None = None,
is_attr: bool = False,
) -> None:
bind_f: t.Callable[[LocalProxy[t.Any], t.Any], t.Callable[..., t.Any]] | None
if hasattr(f, "__get__"):
# A Python function, can be turned into a bound method.
def bind_f(
instance: LocalProxy[t.Any], obj: t.Any
) -> t.Callable[..., t.Any]:
return f.__get__(obj, type(obj)) # type: ignore
elif f is not None:
# A C function, use partial to bind the first argument.
def bind_f(
instance: LocalProxy[t.Any], obj: t.Any
) -> t.Callable[..., t.Any]:
return partial(f, obj)
else:
# Use getattr, which will produce a bound method.
bind_f = None
self.bind_f = bind_f
self.fallback = fallback
self.class_value = class_value
self.is_attr = is_attr
def __set_name__(self, owner: LocalProxy[t.Any], name: str) -> None:
self.name = name
def __get__(self, instance: LocalProxy[t.Any], owner: type | None = None) -> t.Any:
if instance is None:
if self.class_value is not None:
return self.class_value
return self
try:
obj = instance._get_current_object()
except RuntimeError:
if self.fallback is None:
raise
fallback = self.fallback.__get__(instance, owner)
if self.is_attr:
# __class__ and __doc__ are attributes, not methods.
# Call the fallback to get the value.
return fallback()
return fallback
if self.bind_f is not None:
return self.bind_f(instance, obj)
return getattr(obj, self.name)
def __repr__(self) -> str:
return f"proxy {self.name}"
def __call__(
self, instance: LocalProxy[t.Any], *args: t.Any, **kwargs: t.Any
) -> t.Any:
"""Support calling unbound methods from the class. For example,
this happens with ``copy.copy``, which does
``type(x).__copy__(x)``. ``type(x)`` can't be proxied, so it
returns the proxy type and descriptor.
"""
return self.__get__(instance, type(instance))(*args, **kwargs)
过程是这样的:
- 在调用
request.headers
的时候,调用了LocalProxy.__getattr__
LocalProxy.__getattr__
交给_ProxyLookup
,调用_ProxyLookup.__get__
._ProxyLookup.__get__
内部调用instance._get_current_object()
获取实际对象,也就是这一段:
elif isinstance(local, ContextVar):
def _get_current_object() -> T:
try:
obj = local.get()
except LookupError:
raise RuntimeError(unbound_message) from None
return get_name(obj)
这里将会返回request
。如果是request_ctx将会返回RequestContext
。
- 接着
_ProxyLookup
会返回一个偏函数:
elif f is not None:
# A C function, use partial to bind the first argument.
def bind_f(
instance: LocalProxy[t.Any], obj: t.Any
) -> t.Callable[..., t.Any]:
return partial(f, obj)
这里的f
就是__getattr__ = _ProxyLookup(getattr)
中的getattr
- 最后,这个偏函数接收
headers
,使用getattr
,返回结果。
可以做这样的实验,把这段源码改一下:
if self.bind_f is not None:
# return self.bind_f(instance, obj)
func = self.bind_f(instance, obj)
print(func('headers',obj)) # 这里的obj就是request或者RequestContext对象了。
return func
总结
上下文管理是flask的一个亮点,可以直接导入全局变量来获取上下文的内容。上文主要分析了请求上下文,还有应用上下文current_app
,以及session
和g
。 实现方式都是差不多的。后续遇到再详细展开。
flask通过Contextvar
(旧版本是通过threading.local
)来实现线程/协程安全,让不同的请求互相隔离,又通过全局变量的方式简化上下文管理,不需要一直传参。
那么现在一次请求的过程就是这样的:
- 请求来了,实例化
RequestContext
,实例对象push
,入栈。 - 再
push
中,将RequestContext
对象存入Contextvar。 - 派发请求
full_dispatch_request
、dispatch_request
等等,再函数内部,全局变量,获取请求上下文,还有请求前,请求后的各种处理。这里就体现了flask上下文管理的精妙,不需要传递请求上下文了。 - 请求处理结束,通过
pop
从Contextvar
中重置数据。
过程差不多这样:
下一篇将对Request
对象和Response
展开分析。
转载自:https://juejin.cn/post/7376104996905828387