likes
comments
collection
share

flask源码分析(三)上下文

作者站长头像
站长
· 阅读数 61

前言

本系列将对python几个流行的web框架源码进行分析。主要分析各个框架的特点、特色功能实现原理、以及一个请求在框架中的经过哪些处理。 文章不打算精读每一行代码,而是挑出框架比较有特色的地方深入研究,对于其他内容,采取粗读的形式。 预计会选取flask,fastapi,Django这三个框架。 每个框架各有特色,所以文章没有固定格式和必要篇目。

在一段文字或者对话中,如果不知道上下文,可能就会对这段文字感到疑惑,因为你缺少了一些信息。 比如文字中出现了【他】,【她】,我们得从上下文中才能知道指代的是谁。在程序里面也是如此,所谓的上下文,就是包含了程序的各种变量信息。 例如一次API请求包含了这次请求的请求头、请求地址、参数等等。 一个函数可能需要调用另外一个函数,第二个函数需要知道这次请求的某些信息,这些就是上下文。

通过上篇文章得知,请求最后是进到了用户自定义的试图函数中的:

from flask import Flask
app = Flask(__name__)

@app.get('/get')
def get(): 
    return 'get'

可以看到,对于一个请求来说,它的请求地址【'/get'】是通过get装饰器绑定好的,也就是不需要额外去获取。对于这个请求,目前没有其他的请求信息需要获取。 但是现在,如果需要获取这个请求的请求头呢?

看一下几种不同的框架如何获请求头的:

# wsgi
def application(environ, start_response):
    status = '200 OK'
    headers = [('Content-type', 'text/plain')]
    # 获取请求头
    headers.extend([(key, value) for key, value in environ.items() if key.startswith('HTTP_')])
    
    start_response(status, headers)
    return [b"Hello World"]


# django
from django.http import HttpResponse

def test_view(request):
    # 获取请求头
    headers = request.headers
    return HttpResponse("Hello World", headers=headers)
    
# fastapi
from fastapi import FastAPI, Header

app = FastAPI()

@app.get("/test")
async def test_view(header: str = Header(None)):
    # 获取请求头
    return {"message": "Hello World", "header": header}

# flask
from flask import Flask, request

app = Flask(__name__)

@app.route('/test')
def test_view():
    # 获取请求头
    headers = request.headers
    return "Hello World", 200, headers.items()

可以看到一个非常明显的区别,django和fastapi需要明显传入一个【request】对象,这个对象包含了请求的所有信息,也就是wsgi里面的【environ】。框架会处理【environ】,封装成【request】对象,然后一路传递下去。后面的文章也会对【请求】【响应】做出分析。

现在主要讨论一下请求上下文。 flask在和其他例子的对比中,可以看到,视图函数不需要接收一个请求对象,而是通过from flask import request导入的【request】来获取内容。 不得不说这是很棒的设计,实际上,上下文管理也是flask这个框架的一大亮点。

对于这样的使用方法,有几点疑问: 【request】是全局的吗?如何做到不同的请求数据不会互相干扰?不同的线程、协程是如何隔离的?

请求上下文

在上一篇中,我们看到,请求进来之后,会根据【path】找到已经绑定好的视图函数,最后返回视图函数的调用结果。这里不再累述,可以去看一下上一篇。现在主要看一下,在调用视图函数的过程中,发生了什么。

# flask/app.py

def wsgi_app(
    self, environ: WSGIEnvironment, start_response: StartResponse
) -> cabc.Iterable[bytes]:
    ctx = self.request_context(environ)
    error: BaseException | None = None
    try:
        try:
            ctx.push()
            response = self.full_dispatch_request()
        except Exception as e:
            error = e
            response = self.handle_exception(e)
        except:  # noqa: B001
            error = sys.exc_info()[1]
            raise
        return response(environ, start_response)
    finally:
        if "werkzeug.debug.preserve_context" in environ:
            environ["werkzeug.debug.preserve_context"](_cv_app.get())
            environ["werkzeug.debug.preserve_context"](_cv_request.get())

        if error is not None and self.should_ignore_error(error):
            error = None

        ctx.pop(error)

ctx = self.request_context(environ)是传入environ,生成一个请求对象。现在不相信分析具体是做什么的,只要知道ctx包含了一个请求所有的内容,请求地址,请求头,请求参数等等。

接着重点来了,在调用full_dispatch_request之前,进行了ctx.push(),在整个过程处理完之后,进行了ctx.pop(error)。 这个push和pop看起来是入栈和出栈。 也就是在处理请求之前,flask会将包含了请求的上下文ctx入栈,处理完请求之后,进行出栈。

这样做的目的是什么? 由上面的例子可以看到,只要导入了全局变量request,那么就可以直接使用这次请求的上下文,所以这个入栈和出栈,或许是将请求上下文存到全局变量request中,这样就可以全局调用。

# flask/app.py

def dispatch_request(self) -> ft.ResponseReturnValue:
    req = request_ctx.request
    if req.routing_exception is not None:
        self.raise_routing_exception(req)
    rule: Rule = req.url_rule  # type: ignore[assignment]
    # if we provide automatic options for this URL and the
    # request came with the OPTIONS method, reply automatically
    if (
        getattr(rule, "provide_automatic_options", False)
        and req.method == "OPTIONS"
    ):
        return self.make_default_options_response()
    # otherwise dispatch to the handler for that endpoint
    view_args: dict[str, t.Any] = req.view_args  # type: ignore[assignment]
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)

在调用dispatch_request的时候,可以看到,并不是将上面的ctx传入dispatch_request,而是通过request_ctx.request,也就是说,在这里,和在外部视图函数使用的方式是一样的,使用request全局变量,获取上下文。

push和pop

# flask/globals.py
# ....
_cv_request: ContextVar[RequestContext] = ContextVar("flask.request_ctx")
request_ctx: RequestContext = LocalProxy(  # type: ignore[assignment]
    _cv_request, unbound_message=_no_req_msg
)
request: Request = LocalProxy(  # type: ignore[assignment]
    _cv_request, "request", unbound_message=_no_req_msg
)

这些是flask提供的全局变量。requestrequest_ctx都可以全局使用,只是封装的程度不一样。

上文提到,在wsgi_app函数中,请求到来的时候,调用了ctx.push入栈,处理完成之后,进行出栈。 这是push的源码:

# flask/ctx.py

def push(self) -> None:
    # Before we push the request context we have to ensure that there
    # is an application context.
    app_ctx = _cv_app.get(None)

    if app_ctx is None or app_ctx.app is not self.app:
        app_ctx = self.app.app_context()
        app_ctx.push()
    else:
        app_ctx = None

    self._cv_tokens.append((_cv_request.set(self), app_ctx))

    if self.session is None:
        session_interface = self.app.session_interface
        self.session = session_interface.open_session(self.app, self.request)

        if self.session is None:
            self.session = session_interface.make_null_session(self.app)
            
    if self.url_adapter is not None:
        self.match_request()

进行push的时候,先获取应用上下文,并且入栈,这里先不分析应用上下文,后面会讲。

关键的是self._cv_tokens.append((_cv_request.set(self), app_ctx))

这里做了两件事,首先是_cv_request.set(self),这是把self也就是RequestContext实例存入Contextvar中。

Contextvar是python3.7加入的一个模块,详情看文档:contextvars --- 上下文变量 — Python 3.12.3 文档

这里不展开如何实现,主要知道几个内容就行:

# 协程 
import asyncio
from contextvars import ContextVar

# 定义一个 ContextVar
cv = ContextVar('example_var')

async def coroutine(identifier):
    # 设置 ContextVar 的值,并获取返回的 Token
    token = cv.set(f'value for {identifier}')
    print(f'Coroutine {identifier} set value to: {cv.get()}')

    # 模拟异步操作
    await asyncio.sleep(1)

    # 获取当前 ContextVar 的值
    print(f'Coroutine {identifier} sees value: {cv.get()}')

    # 重置 ContextVar 的值
    cv.reset(token)
    print(f'Coroutine {identifier} reset value, now: {cv.get(None)}')

async def main():
    await asyncio.gather(coroutine('A'), coroutine('B'))

asyncio.run(main())


# 多线程

import threading
from contextvars import ContextVar

# 定义一个 ContextVar
cv = ContextVar('example_var')

def thread_function(identifier):
    # 设置 ContextVar 的值,并获取返回的 Token
    token = cv.set(f'value for {identifier}')
    print(f'Thread {identifier} set value to: {cv.get()}')

    # 模拟一些操作
    threading.Event().wait(1)

    # 获取当前 ContextVar 的值
    print(f'Thread {identifier} sees value: {cv.get()}')

    # 重置 ContextVar 的值
    cv.reset(token)
    print(f'Thread {identifier} reset value, now: {cv.get(None)}')

# 创建并启动线程
threads = []
for i in range(2):
    thread = threading.Thread(target=thread_function, args=(f'Thread-{i}',))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

set接收一个值,存入Contextvar实例,并返回一个tokentoken可以通过reset来重置,让Contextvar设置成上一个值。get获取值。

因此,self._cv_tokens.append((_cv_request.set(self), app_ctx))的作用是把RequestContext实例存入_cv_request,将返回的tokenapp_ctx应用上下文存到self._cv_tokens

接着是pop出栈:

# flask/ctx.py
def pop(self, exc: BaseException | None = _sentinel) -> None:  # type: ignore
    """Pops the request context and unbinds it by doing that.  This will
    also trigger the execution of functions registered by the
    :meth:`~flask.Flask.teardown_request` decorator.

    .. versionchanged:: 0.9
       Added the `exc` argument.
    """
    clear_request = len(self._cv_tokens) == 1

    try:
        if clear_request:
            if exc is _sentinel:
                exc = sys.exc_info()[1]
            self.app.do_teardown_request(exc)

            request_close = getattr(self.request, "close", None)
            if request_close is not None:
                request_close()
    finally:
        ctx = _cv_request.get()
        token, app_ctx = self._cv_tokens.pop()
        _cv_request.reset(token)

        # get rid of circular dependencies at the end of the request
        # so that we don't require the GC to be active.
        if clear_request:
            ctx.request.environ["werkzeug.request"] = None

        if app_ctx is not None:
            app_ctx.pop(exc)

        if ctx is not self:
            raise AssertionError(
                f"Popped wrong request context. ({ctx!r} instead of {self!r})"
            )

这里会判断self._cv_tokens,如果请求结束了,会执行通过@app.teardown_request绑定的内容。teardown_request装饰器会注册一个函数,在请求结束后会执行。通常用来释放资源。

然后通过获取到的token,进行reset

这是pushpop的过程。

LocalProxy

# flask/globals.py
_cv_request: ContextVar[RequestContext] = ContextVar("flask.request_ctx")
request_ctx: RequestContext = LocalProxy(  # type: ignore[assignment]
    _cv_request, unbound_message=_no_req_msg
)

request: Request = LocalProxy(  # type: ignore[assignment]
    _cv_request, "request", unbound_message=_no_req_msg
)
session: SessionMixin = LocalProxy(  # type: ignore[assignment]
    _cv_request, "session", unbound_message=_no_req_msg
)

可以看到_cv_request是一个ContextVar,作用如上文所说的,用来储存RequestContext实例。 request_ctxrequest都是LocalProxy的实例,它们的作用是什么? *这里有一个细节,源码中用了# type: ignore[assignment] 这是为了让ide在不出现warning,因为request类型推导是'Request',但是值却是LocalProxy

看一下RequestContext的构造:

flask/ctx.py
class RequestContext:
    def __init__(
        self,
        app: Flask,
        environ: WSGIEnvironment,
        request: Request | None = None,
        session: SessionMixin | None = None,
    ) -> None:
        self.app = app
        if request is None:
            request = app.request_class(environ)
            request.json_module = app.json
        self.request: Request = request
        self.url_adapter = None
        try:
            self.url_adapter = app.create_url_adapter(self.request)
        except HTTPException as e:
            self.request.routing_exception = e
        self.flashes: list[tuple[str, str]] | None = None
        self.session: SessionMixin | None = session
        # Functions that should be executed after the request on the response
        # object.  These will be called before the regular "after_request"
        # functions.
        self._after_request_functions: list[ft.AfterRequestCallable[t.Any]] = []

        self._cv_tokens: list[
            tuple[contextvars.Token[RequestContext], AppContext | None]
        ] = []

RequestContext有个属性:request,它是通过environ构造的。

试着对比两个对象是否相等:

from flask import Flask,request
from flask.globals import request_ctx

app = Flask(__name__)


@app.get('/')
def post():
    print(request is request_ctx)
    print(request.headers is request_ctx.request.headers)
    return 'hello world'


if __name__ == '__main__':
    app.run(port='8080')
    
# 结果:
# False
# True

这是很显然的,因为requestrequest_ctx都是LocalProxy的实例,并不是同一个对象。 但是内容都是一样的。现在问题来了:LocalProxy做了什么?

# werkzeug/local.py
Class LocalProxy(t.Generic[T]):
    __slots__ = ("__wrapped", "_get_current_object")

    _get_current_object: t.Callable[[], T]


    def __init__(
        self,
        local: ContextVar[T] | Local | LocalStack[T] | t.Callable[[], T],
        name: str | None = None,
        *,
        unbound_message: str | None = None,
    ) -> None:
        if name is None:
            get_name = _identity
        else:
            get_name = attrgetter(name)  # type: ignore[assignment]

        if unbound_message is None:
            unbound_message = "object is not bound"

        if isinstance(local, Local):
            if name is None:
                raise TypeError("'name' is required when proxying a 'Local' object.")

            def _get_current_object() -> T:
                try:
                    return get_name(local)  # type: ignore[return-value]
                except AttributeError:
                    raise RuntimeError(unbound_message) from None

        elif isinstance(local, LocalStack):

            def _get_current_object() -> T:
                obj = local.top

                if obj is None:
                    raise RuntimeError(unbound_message)

                return get_name(obj)

        elif isinstance(local, ContextVar):

            def _get_current_object() -> T:
                try:
                    obj = local.get()
                except LookupError:
                    raise RuntimeError(unbound_message) from None

                return get_name(obj)

        elif callable(local):

            def _get_current_object() -> T:
                return get_name(local())

        else:
            raise TypeError(f"Don't know how to proxy '{type(local)}'.")

        object.__setattr__(self, "_LocalProxy__wrapped", local)
        object.__setattr__(self, "_get_current_object", _get_current_object)

flask使用了werkzeug中的LocalProxy,代理了RequestContextrequests对象。可以看到local过去是使用了threading.local实现,现在用的这个版本是Contextvar实现。

request_ctx: RequestContext = LocalProxy(  # type: ignore[assignment]
    _cv_request, unbound_message=_no_req_msg
)

request: Request = LocalProxy(  # type: ignore[assignment]
    _cv_request, "request", unbound_message=_no_req_msg
)

如果localContextvar,则用get返回一个RequestContext对象。二者的差别在于传入了一个name,如果没有传入name,get_name返回的是对象本身,也就是request_ctx在这里会返回RequestContext,而request会返回RequestContext.request,所以上文提到的,两者内容是一样的。

接下来LocalProxy把一些魔术方法交给了_ProxyLookup。有很多魔术方法,这边只关注__getattr__:

....
__getattr__ = _ProxyLookup(getattr)
....
# werkzeug/local.py
class _ProxyLookup:
    __slots__ = ("bind_f", "fallback", "is_attr", "class_value", "name")

    def __init__(
        self,
        f: t.Callable[..., t.Any] | None = None,
        fallback: t.Callable[[LocalProxy[t.Any]], t.Any] | None = None,
        class_value: t.Any | None = None,
        is_attr: bool = False,
    ) -> None:
        bind_f: t.Callable[[LocalProxy[t.Any], t.Any], t.Callable[..., t.Any]] | None

        if hasattr(f, "__get__"):
            # A Python function, can be turned into a bound method.

            def bind_f(
                instance: LocalProxy[t.Any], obj: t.Any
            ) -> t.Callable[..., t.Any]:
                return f.__get__(obj, type(obj))  # type: ignore

        elif f is not None:
            # A C function, use partial to bind the first argument.

            def bind_f(
                instance: LocalProxy[t.Any], obj: t.Any
            ) -> t.Callable[..., t.Any]:
                return partial(f, obj)

        else:
            # Use getattr, which will produce a bound method.
            bind_f = None

        self.bind_f = bind_f
        self.fallback = fallback
        self.class_value = class_value
        self.is_attr = is_attr

    def __set_name__(self, owner: LocalProxy[t.Any], name: str) -> None:
        self.name = name

    def __get__(self, instance: LocalProxy[t.Any], owner: type | None = None) -> t.Any:
        if instance is None:
            if self.class_value is not None:
                return self.class_value

            return self

        try:
            obj = instance._get_current_object()
        except RuntimeError:
            if self.fallback is None:
                raise

            fallback = self.fallback.__get__(instance, owner)

            if self.is_attr:
                # __class__ and __doc__ are attributes, not methods.
                # Call the fallback to get the value.
                return fallback()

            return fallback

        if self.bind_f is not None:
            return self.bind_f(instance, obj)

        return getattr(obj, self.name)

    def __repr__(self) -> str:
        return f"proxy {self.name}"

    def __call__(
        self, instance: LocalProxy[t.Any], *args: t.Any, **kwargs: t.Any
    ) -> t.Any:
        """Support calling unbound methods from the class. For example,
        this happens with ``copy.copy``, which does
        ``type(x).__copy__(x)``. ``type(x)`` can't be proxied, so it
        returns the proxy type and descriptor.
        """
        return self.__get__(instance, type(instance))(*args, **kwargs)

过程是这样的:

  • 在调用request.headers的时候,调用了LocalProxy.__getattr__
  • LocalProxy.__getattr__交给_ProxyLookup,调用_ProxyLookup.__get__.
  • _ProxyLookup.__get__内部调用instance._get_current_object()获取实际对象,也就是这一段:
elif isinstance(local, ContextVar):
    def _get_current_object() -> T:
        try:
            obj = local.get()
        except LookupError:
            raise RuntimeError(unbound_message) from None

        return get_name(obj)

这里将会返回request。如果是request_ctx将会返回RequestContext

  • 接着_ProxyLookup会返回一个偏函数:
elif f is not None:
    # A C function, use partial to bind the first argument.

    def bind_f(
        instance: LocalProxy[t.Any], obj: t.Any
    ) -> t.Callable[..., t.Any]:
        return partial(f, obj)

这里的f就是__getattr__ = _ProxyLookup(getattr)中的getattr

  • 最后,这个偏函数接收headers,使用getattr,返回结果。

可以做这样的实验,把这段源码改一下:

if self.bind_f is not None:
    # return self.bind_f(instance, obj)
    func = self.bind_f(instance, obj)
    print(func('headers',obj)) # 这里的obj就是request或者RequestContext对象了。
    return func

总结

上下文管理是flask的一个亮点,可以直接导入全局变量来获取上下文的内容。上文主要分析了请求上下文,还有应用上下文current_app,以及sessiong。 实现方式都是差不多的。后续遇到再详细展开。

flask通过Contextvar(旧版本是通过threading.local)来实现线程/协程安全,让不同的请求互相隔离,又通过全局变量的方式简化上下文管理,不需要一直传参。

那么现在一次请求的过程就是这样的:

  • 请求来了,实例化RequestContext,实例对象push,入栈。
  • push中,将RequestContext对象存入Contextvar。
  • 派发请求full_dispatch_requestdispatch_request等等,再函数内部,全局变量,获取请求上下文,还有请求前,请求后的各种处理。这里就体现了flask上下文管理的精妙,不需要传递请求上下文了。
  • 请求处理结束,通过popContextvar中重置数据。

过程差不多这样:

flask源码分析(三)上下文

下一篇将对Request对象和Response展开分析。

转载自:https://juejin.cn/post/7376104996905828387
评论
请登录