likes
comments
collection
share

starlette源码分析

作者站长头像
站长
· 阅读数 12

前记

上一篇分析了uvicorn, 但是uvicorn只是一个ASGI容器, 真正处理请求的是ASGI应用程序,而starlette是最出名也是最标准的ASGI应用程序, 通过了解starlette, 可以了解到每个组件都是ASGI APP的设计理念, 了解ASGI APP的兼容性, 能更完整的理解ASGI生态。

NOTE: 使用了几年的starlette以来, 简单了翻过了几次源码, 觉得starlette堪称工艺品, 设计很完美, 各种逻辑实现起来很简单(也可能是我一开始就使用了sanic框架), 从使用至今, 除了初始化中间件, 在中间件读取body以及官方示例文档比较少这些缺点外, 感觉不出有其他的槽点。

最新修订见原文, 关注公众号<博海拾贝diary>可以及时收到新推文通知

NOTE: 本文偏代码+注释比较多

1.starlette的应用

在之前的文章了解过, Uvicron通过一个通用的协定接口与ASGI应用程序交互, 应用程序只要实现如下代码, 即可通过Uvicorn发送和接收信息:

async def app(scope, receive, send):
    # 一个最简单的ASGI应用程序
    assert scope['type'] == 'http'
    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [
            [b'content-type', b'text/plain'],
        ]
    })
    await send({
        'type': 'http.response.body',
        'body': b'Hello, world!',
    })


if __name__ == "__main__":
    # uvicorn服务
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")

而使用uvicorn启动starlette的方式是:

from starlette.applications import Starlette
from starlette.middleware.gzip import GZipMiddleware


app: Starlette = Starlette()


@app.route("/")
def demo_route() -> None: pass


@app.websocket_route("/")
def demo_websocket_route() -> None: pass


@app.add_exception_handlers(404)
def not_found_route() -> None: pass


@app.on_event("startup")
def startup_event_demo() -> None: pass


@app.on_event("shutdown")
def shutdown_event_demo() -> None: pass


app.add_middleware(GZipMiddleware)


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=5000)

这段代码Starlette先执行了初始化, 然后注册路由,异常处理, 事件,中间件到自身, 然后传给uvicorn.run, uvicorn.run通过调用starlette__call__的方法传递请求数据。

简单的了解完启动后, 先从starlette初始化看是分析:

class Starlette:
    def __init__(
        self,
        debug: bool = False,
        routes: typing.Sequence[BaseRoute] = None,
        middleware: typing.Sequence[Middleware] = None,
        exception_handlers: typing.Dict[
            typing.Union[int, typing.Type[Exception]], typing.Callable
        ] = None,
        on_startup: typing.Sequence[typing.Callable] = None,
        on_shutdown: typing.Sequence[typing.Callable] = None,
        lifespan: typing.Callable[["Starlette"], typing.AsyncGenerator] = None,
    ) -> None:
        """
        :param debug: 决定是否启用debug功能
        :param route: 一个路由列表, 提供HTTP和WebSocket服务.
        :param middleware: 中间件列表, 应用于每个请求
        :param exception_handler: 存放异常回调的字典, 键为HTTP状态码, 值为回调函数
        :on_startup: 启动时调用的回调函数
        :on_shutdown: 关闭时的回调函数
        :lifespan: ASGI中的lifespan功能
        """

        # 这里表示如果有传入lifespan, 则不可传入on_startup以及on_shutdown
        # 因为本质上starlette的通过把on_start_up和on_shutdown转为一个lifespan来接收uvicorn调用的
        assert lifespan is None or (
            on_startup is None and on_shutdown is None
        ), "Use either 'lifespan' or 'on_startup'/'on_shutdown', not both."

        # 初始化变量
        self._debug = debug
        self.state = State()
        self.router = Router(
            routes, on_startup=on_startup, on_shutdown=on_shutdown, lifespan=lifespan
        )
        self.exception_handlers = (
            {} if exception_handlers is None else dict(exception_handlers)
        )
        self.user_middleware = [] if middleware is None else list(middleware)
        # 构建中间件
        self.middleware_stack = self.build_middleware_stack()

通过代码可以看到初始化这里已经满足了大多数功能了, 不过还有一个构建中间件的函数, 需要进一步分析:

class Starlette:
    def build_middleware_stack(self) -> ASGIApp:
        debug = self.debug
        error_handler = None
        exception_handlers = {}

        # 解析异常处理的回调, 分别存放在error_handler和exception_handlers
        # 只有HTTP状态码为500的才会存入到error_handler
        for key, value in self.exception_handlers.items():
            if key in (500, Exception):
                error_handler = value
            else:
                exception_handlers[key] = value

        # 为不同种类的中间件排好顺序
        # 第一层为ServerErrorMiddleware, 它能在发现异常的时候打印错误堆栈, 或者在debug模式的时候展示错误页面, 方便调试
        # 第二层是用户中间件层, 用户自己注册的所有中间件都会存放在这里
        # 第三层是ExceptionMiddleware, 它是异常处理层, 它会处理路由执行时抛出的所有异常
        middleware = (
            [Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug)]
            + self.user_middleware
            + [
                Middleware(
                    ExceptionMiddleware, handlers=exception_handlers, debug=debug
                )
            ]
        )

        # 最后把中间件装填到app中
        app = self.router
        for cls, options in reversed(middleware):
            # cls是中间件类本身, options也就是我们传的参数
            # 可以看出中间件本身也是一个ASGIAPP, 装填中间件就是一个ASGI APP套上另外一个ASGI APP, 一直套娃。
            app = cls(app=app, **options)
        
        # 由于中间件的装填方式是不断的套娃, 而调用是不断的通过`call_next`调用装填它的上级ASGI APP, 所以要采用逆序的方法
        return app

构建完中间件后, 初始化就算完成了, 接着就会通过uvicorn.run方法从而调用到__call__方法:

class Starlette:
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        scope["app"] = self
        await self.middleware_stack(scope, receive, send)

这个方法很简单, 就是通过scope, 把app设置到请求流程中, 方便后续调用, 然后通过调用middleware_stack开始请求的处理。 通过这个方法和中间件的初始化可以看出, starlette中的中间件本身也是一个ASGI APP(也可以看出route是一个ASGI APP, 处于调用栈的最后一层), 同时starlette也把异常的处理也交给了中间件处理, 这在其他的Web应用框架很少见到, 可以看出starlette的设计是每个组件都尽量是ASGI APP。

虽然starlette中间件的设计是非常不错的, 但是它的这种初始化方式我不太喜欢, 因为在编写的时候IDE无法帮你传入的参数做校验, 比如上面示例的GZip中间件, 你知道需要传minimum_size参数, 但是你有可能打错, 只是没到运行时的时候, 压根不知道它是否有异常:

app.add_middleware(GZipMiddleware, minimum_size = 500)

我在设计我的rpc框架rap时也参考了startlette的中间件设计, 但是在这一块进行了优化, 不过与本篇文章关系不大, 有兴趣可以参考:github.com/so1n/rap/bl…

2.中间件

上面说到, 在startlette中, 中间件是一个ASGI APP, 所以在startlette的所有中间件都必定是一个满足如下形式的类:

class BaseMiddleware:
    def __init__(self, app: ASGIApp) -> None:
        pass

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        pass

starlette.middleware中, 有很多的中间件实现, 他们都满足这一点, 不过本章节不会讲所有的中间件, 只会挑选几个有代表性的中间件从最靠近Route到远进行分析。

2.1.异常处理中间件-ExceptionMiddleware

第一个就是ExceptionMiddleware中间件, 这个中间件用户是不会直接接触到的(所以没有放在starlette.middleware里面), 而是通过下面的这个方法间接的接触到:

@app.app_exception_handlers(404)
def not_found_route() -> None: pass

当用户使用这个方法时, startlette会把回调函数挂在对应的字典里面, 这个字典以HTTP状态码为key, 回调函数为value。 当ExceptionMiddleware发现Route请求处理异常时, 可以通过异常的响应HTTP状态码找到对应的回调函数, 并把请求和异常传给用户挂载的对应的回调函数, 最后把用户的回调函数结果抛回上一个ASGI APP。 此外ExceptionMiddleware还支持异常注册, 当Route抛出的异常与注册的异常匹配时, 调用该异常注册的对应的回调函数。 该类的源码和注释如下:

class ExceptionMiddleware:
    def __init__(
        self, app: ASGIApp, handlers: dict = None, debug: bool = False
    ) -> None:
        self.app = app
        self.debug = debug  # TODO: We ought to handle 404 cases if debug is set.
        # startletter是支持HTTP状态码和Exception两种类型
        self._status_handlers = {}  # type: typing.Dict[int, typing.Callable]
        self._exception_handlers = {
            HTTPException: self.http_exception
        }  # type: typing.Dict[typing.Type[Exception], typing.Callable]
        if handlers is not None:
            for key, value in handlers.items():
                self.add_exception_handler(key, value)

    def add_exception_handler(
        self,
        exc_class_or_status_code: typing.Union[int, typing.Type[Exception]],
        handler: typing.Callable,
    ) -> None:
        # 用户通过调用startlette app的方法挂载的异常回调最后都是通过该方法挂载到类里面的_status_handlers或者是_exception_handler里面
        if isinstance(exc_class_or_status_code, int):
            self._status_handlers[exc_class_or_status_code] = handler
        else:
            assert issubclass(exc_class_or_status_code, Exception)
            self._exception_handlers[exc_class_or_status_code] = handler

    def _lookup_exception_handler(
        self, exc: Exception
    ) -> typing.Optional[typing.Callable]:
        # 查找注册异常相关的回调函数, 通过mro发现异常的对应回调函数
        # 
        # 用户挂载的可能是一个基类, 后续在遇到挂载异常的子类时, 也会调用基类注册的回调
        # 比如用户注册了一个基类, 然后会有用户异常和系统异常两个异常都继承于这个基类
        # 后续函数抛出用户异常或系统异常时, 都会执行到基类注册的对应回调
        for cls in type(exc).__mro__:
            if cls in self._exception_handlers:
                return self._exception_handlers[cls]
        return None

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        # 熟悉的ASGI 调用方法
        if scope["type"] != "http":
            # 不支持websocket请求
            await self.app(scope, receive, send)
            return

        # 防止同一个响应产生多个异常
        response_started = False

        async def sender(message: Message) -> None:
            nonlocal response_started

            if message["type"] == "http.response.start":
                response_started = True
            await send(message)

        try:
            # 调用下一个ASGI APP
            await self.app(scope, receive, sender)
        except Exception as exc:
            handler = None

            if isinstance(exc, HTTPException):
                # 如果是HTTPException异常, 则从注册HTTP回调字典中寻找
                handler = self._status_handlers.get(exc.status_code)

            if handler is None:
                # 如果是普通的异常, 则从异常回调字典去寻找
                handler = self._lookup_exception_handler(exc)

            if handler is None:
                # 找不到对应的异常, 则往上面抛
                raise exc from None

            # 一个响应只接收一次异常处理
            if response_started:
                msg = "Caught handled exception, but response already started."
                raise RuntimeError(msg) from exc

            request = Request(scope, receive=receive)
            if asyncio.iscoroutinefunction(handler):
                response = await handler(request, exc)
            else:
                response = await run_in_threadpool(handler, request, exc)
            # 通过回调函数生成的response处理请求
            await response(scope, receive, sender)

2.2.用户中间件

接着就是用户中间件了, 这个也是我们接触最多的中间件, 在使用starlette.middleware时, 我们都会继承于一个叫BaseHTTPMiddleware的中间件, 然后基于如下代码进行拓展:

class DemoMiddleware(BaseHTTPMiddleware):
    def __init__(
        self,
        app: ASGIApp,
    ) -> None:
        super(DemoMiddleware, self).__init__(app)

    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        # before
        response: Response = await call_next(request)
        # after
        return response

如果在请求之前进行预处理, 就在before块编写相关代码,如果要在请求之后进行处理的, 就在after块编写代码, 使用非常简单, 而且他们是处于同一个作用域的, 这就意味着该方法里面的变量不用通过上下文或者动态变量来传播(如果你接触了Django或者Flask的类中间件实现, 也就懂得了starlette这种实现的优雅)。

接下来就来看看它是怎么实现的, 代码非常简单, 大概60行左右, 不过我注释写了很多:

class BaseHTTPMiddleware:
    def __init__(self, app: ASGIApp, dispatch: DispatchFunction = None) -> None:
        # 赋值下一级的ASGI app
        self.app = app
        # 如果用户有传dispatch, 就使用用户传的函数, 否则使用自身的dispatch
        # 一般用户都是继承于BaseHTTPMiddleware, 然后复写dispatch方法
        self.dispatch_func = self.dispatch if dispatch is None else dispatch

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        ASGI 标准的函数签名函数, 代表着ASGI的请求会从这里进来
        """
        if scope["type"] != "http":
            # 如果类型不是http的, 则不会走中间件(也就是websocket的不支持)
            # 要支持websocket的话, 中间件就很难这样实现了, 我在实现rap框架时, 为了支持类websocket的流量中间件处理, 牺牲了一些功能才可以实现
            await self.app(scope, receive, send)
            return

        # 通过scope生成request对象
        request = Request(scope, receive=receive)
        # 进入dispatch逻辑, 也就是用户的处理逻辑
        # 这个逻辑得到的respone实际上是call_next函数生成的, dispatch函数只做了传递的作用
        response = await self.dispatch_func(request, self.call_next)
        # 根据生成的response, 返回数据到上一层
        await response(scope, receive, send)

    async def call_next(self, request: Request) -> Response:
        loop = asyncio.get_event_loop()
        # 通过queue生产消费模式来获取下一级的消息
        queue: "asyncio.Queue[typing.Optional[Message]]" = asyncio.Queue()

        scope = request.scope
        # 通过request.receive对象把uvicorn的receive对象传过来
        # 这里用到的receive对象还是uvicorn初始化时的receive对象
        receive = request.receive
        send = queue.put

        async def coro() -> None:
            try:
                await self.app(scope, receive, send)
            finally:
                # 这个put操作能确保get那边不会被卡死
                await queue.put(None)

        # 通过loop.create_task, 在另一个协程跑下一个ASGI APP
        task = loop.create_task(coro())
        # 等待下一个ASGI APP的返回
        message = await queue.get()
        if message is None:
            # 如果拿到是空的, 则代表下一个ASGI APP没有返回响应, 这时可能出错, 
            # 通过调用task.result(), 如果该协程出现异常, 则会把该协程的错误抛出来
            task.result()
            # 如果没有异常抛出来, 就可能是用户写错等原因, 返回了一个空响应, 
            # 这时候是没办法返回响应给客户端的, 需要自己制造一个异常, 方便后续生成一个500的响应
            raise RuntimeError("No response returned.")
        
        # ASGI处理响应的时候会分多步走, 正常情况下, 上面的queue.get, 是获取响应的第一步
        assert message["type"] == "http.response.start"

        async def body_stream() -> typing.AsyncGenerator[bytes, None]:
            # 其他的处理会交给body_stream函数处理
            # 这个方法所做的就是一直返回数据流
            while True:
                message = await queue.get()
                if message is None:
                    break
                assert message["type"] == "http.response.body"
                yield message.get("body", b"")
            task.result()

        # 将body_stream函数放到Response方法中
        # response本身也是一个类ASGI APP的类, 用户根据教程, 在dispatch方法中通过call_next获得response对象, 
        # 并在最后返回, 所以这个reponse对象将会交给__call__方法中进行处理。
        response = StreamingResponse(
            status_code=message["status"], content=body_stream()
        )
        response.raw_headers = message["headers"]
        return response

    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        raise NotImplementedError()  # pragma: no cover

2.3.ServerErrorMiddleware

ServerErrorMiddleware跟ExceptionMiddleware很像(所以这一part也不做更多的说明), 整个逻辑基本上都是一致的, 不过ExceptionMiddleware负责的是把用户的路由异常进行捕获处理, ServerErrorMiddleware主要负责是做兜底, 确保返回的一定是合法的HTTP响应。

ServerErrorMiddleware的间接调用函数也跟ExceptionMiddleware一样, 不过只有注册的HTTP状态码为500时, 才会把回调注册到ServerErrorMiddleware中:

@app.exception_handlers(500)
def not_found_route() -> None: pass

ServerErrorMiddleware是处于ASGI APP中的最顶级, 它负责异常兜底的工作, 它要做的事情很简单, 如果下一级ASGI APP处理发生异常, 就进入兜底逻辑:

  • 1.如果启用debug, 则返回debug页面
  • 2.如果有注册回调, 则执行注册回调
  • 3.如果都没则返回500响应

3.Route

starlette中, Route分为两部分, 一部分我把它称为Real AppRouter, 它处于中间件的下一层级, 但它负责的是Starlette除中间件外的所有事情, 主要包括路由查找匹配, APP启动关闭处理等, 另外一部分则是注册到Router的路由。

3.1.Router

Router很简单, 他的主要责任就是装载路由和匹配路由, 以下是除装载路由外的源码和注释:

class Router:
    def __init__(
        self,
        routes: typing.Sequence[BaseRoute] = None,
        redirect_slashes: bool = True,
        default: ASGIApp = None,
        on_startup: typing.Sequence[typing.Callable] = None,
        on_shutdown: typing.Sequence[typing.Callable] = None,
        lifespan: typing.Callable[[typing.Any], typing.AsyncGenerator] = None,
    ) -> None:
        # 装填starlette初始化时的信息
        self.routes = [] if routes is None else list(routes)
        self.redirect_slashes = redirect_slashes
        self.default = self.not_found if default is None else default
        self.on_startup = [] if on_startup is None else list(on_startup)
        self.on_shutdown = [] if on_shutdown is None else list(on_shutdown)

        async def default_lifespan(app: typing.Any) -> typing.AsyncGenerator:
            await self.startup()
            yield
            await self.shutdown()

        # 如果初始化lifespan为空, 则把on_startup和on_shuatdown转化为lifespan
        self.lifespan_context = default_lifespan if lifespan is None else lifespan

    async def not_found(self, scope: Scope, receive: Receive, send: Send) -> None:
        """匹配不到路由执行的逻辑"""
        if scope["type"] == "websocket":
            # websocket匹配失败
            websocket_close = WebSocketClose()
            await websocket_close(scope, receive, send)
            return

        # If we're running inside a starlette application then raise an
        # exception, so that the configurable exception handler can deal with
        # returning the response. For plain ASGI apps, just return the response.
        if "app" in scope:
            # 在starlette.applications的__call__方法可以看到starlette把自身存入scope中
            # 这里抛出异常后, 可以被ServerErrorMiddleware捕获
            raise HTTPException(status_code=404)
        else:
            # 对于不是starlette调用的, 直接返回错误
            response = PlainTextResponse("Not Found", status_code=404)
        await response(scope, receive, send)

    async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        Handle ASGI lifespan messages, which allows us to manage application
        startup and shutdown events.
        """
        # lifespan执行的逻辑, 在执行的时候starlette会与ASGI服务器进行通信, 但目前这样的代码估计还有一些待开发的功能
        first = True
        app = scope.get("app")
        await receive()
        try:
            if inspect.isasyncgenfunction(self.lifespan_context):
                async for item in self.lifespan_context(app):
                    assert first, "Lifespan context yielded multiple times."
                    first = False
                    await send({"type": "lifespan.startup.complete"})
                    await receive()
            else:
                for item in self.lifespan_context(app):  # type: ignore
                    assert first, "Lifespan context yielded multiple times."
                    first = False
                    await send({"type": "lifespan.startup.complete"})
                    await receive()
        except BaseException:
            if first:
                exc_text = traceback.format_exc()
                await send({"type": "lifespan.startup.failed", "message": exc_text})
            raise
        else:
            await send({"type": "lifespan.shutdown.complete"})

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        The main entry point to the Router class.
        """
        # 匹配以及执行路由的主要函数
        # 目前只支持http, websocket, lifespan三种类型
        assert scope["type"] in ("http", "websocket", "lifespan")

        # 初始化router到scope中
        if "router" not in scope:
            scope["router"] = self

        if scope["type"] == "lifespan":
            # 执行lifespan逻辑
            await self.lifespan(scope, receive, send)
            return

        partial = None

        # 进行路由匹配
        for route in self.routes:
            match, child_scope = route.matches(scope)
            if match == Match.FULL:
                # 如果是完整匹配(url匹配, method匹配)
                # 则进行路由正常处理
                scope.update(child_scope)
                await route.handle(scope, receive, send)
                return
            elif match == Match.PARTIAL and partial is None:
                # 如果是不完整匹配(url匹配, method不匹配)
                # 则保留值, 继续匹配
                partial = route
                partial_scope = child_scope

        if partial is not None:
            # 如果存在不完整匹配的路由, 也继续执行, 但这时路由会响应HTTP 方法不对的错误
            scope.update(partial_scope)
            await partial.handle(scope, receive, send)
            return

        if scope["type"] == "http" and self.redirect_slashes and scope["path"] != "/":
            # 未匹配的情况, 判断重定向
            redirect_scope = dict(scope)
            if scope["path"].endswith("/"):
                redirect_scope["path"] = redirect_scope["path"].rstrip("/")
            else:
                redirect_scope["path"] = redirect_scope["path"] + "/"

            for route in self.routes:
                match, child_scope = route.matches(redirect_scope)
                if match != Match.NONE:
                    # 再次进行匹配, 如果结果不为空, 则发送重定向response
                    redirect_url = URL(scope=redirect_scope)
                    response = RedirectResponse(url=str(redirect_url))
                    await response(scope, receive, send)
                    return

        # 上面流程都没命中时, 则代表没有找到任务路由, 这时候会执行默认路由, 默认的默认路由是404 not found
        await self.default(scope, receive, send)

可以看出Router的代码非常的简单, 主要的代码都集中在__call__中, 但是在这里出现了多次遍历查询路由且每个路由都是执行一遍正则表达式来判断是否匹配。可能会有人觉得这样的执行速度会很慢, 我曾经也觉得这样的路由查找很慢, 然后就实现了一个路由树来代替它详见route_trie.py, 然而在我实现后做了一次性能测试, 发现在路由没超过50个的情况下, 循环匹配性能是优于路由树的, 在没超过100条的情况下, 两者是相当的, 而在正常情况下, 我们指定的路由都不会超过100个, 所以不用去担心这部分路由的匹配性能, 如果还是很担心, 那么可以使用Mount来对路由进行分组, 使匹配的次数减少。

3.2.其他Route

Moute是继承于BaseRoute, 其它的Route, HostRoute, WebsocketRoute也是一样继承于BaseRoute, 它们提供的方法都差不多, 只是具体实现略有差别而已(主要是初始化,路由匹配和反向查找略有区别), 我们先来看看BaseRoute:

class BaseRoute:
    def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
        # 一个标准的匹配函数签名, 每个Route都要返回一个(Match, Scope)的元祖
        # Match有3种, 分别是
        #   NONE: 没有匹配到
        #   PARTIAL: 部分匹配(url匹配了, method匹配失败)
        #   FULL: 完全匹配(url和method都匹配成功)
        # Scope基本上都会返回如下格式, 不过Mount返回的内容更多:
        #   {"endpoint": self.endpoint, "path_params": path_params}
        raise NotImplementedError()  # pragma: no cover

    def url_path_for(self, name: str, **path_params: str) -> URLPath:
        # 根据名字生成反向查找
        raise NotImplementedError()  # pragma: no cover

    async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
        # 被Router匹配后可以调用的函数
        raise NotImplementedError()  # pragma: no cover

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        A route may be used in isolation as a stand-alone ASGI app.
        This is a somewhat contrived case, as they'll almost always be used
        within a Router, but could be useful for some tooling and minimal apps.
        """
        # 如果该路由被当做ASGI APP单独调用, 则自己进行匹配并响应结果
        match, child_scope = self.matches(scope)
        if match == Match.NONE:
            if scope["type"] == "http":
                response = PlainTextResponse("Not Found", status_code=404)
                await response(scope, receive, send)
            elif scope["type"] == "websocket":
                websocket_close = WebSocketClose()
                await websocket_close(scope, receive, send)
            return

        scope.update(child_scope)
        await self.handle(scope, receive, send)

可以看到BaseRoute提供的功能不多, 其他的路由则是基于BaseRoute进行拓展:

  • Route: 标准的HTTP路由, 负责通过HTTP URL和HTTP Method进行路由匹配, 然后提供调用HTTP路由的方法
  • WebSocketRoute: 标准的WebSocketRoute, 根据HTTP URL进行路由匹配, 然后通过starlette.websocket的WebSocket生成session再传入对应的函数
  • Mount: 一个路由的套娃封装, 他的匹配方法是URL的前缀匹配, 把请求转发给符合规则的下一级ASGI APP, 当他的下一级ASGI APP是Router时, 他的调用链可能会像这样Router->Mount->Router->Mount->Router->Route, 通过使用Mount可以对路由进行分组, 同时也能加快匹配速度, 推荐使用。 不过, 它还支持把请求分发给其他ASGI APP, 也可以做到如Starlette->ASGI Middleware->Mount->Other Starlette->...
  • Host: 它会根据用户请求的Host分发到对应的ASGI APP, 可以选择Route, Mount, 中间件等等ASGI APP

4.其它组件

从上面可以看到, starlette中的组件基本上都设计成ASGI APP, 可以任意的兼容, 这种设计是非常棒的, 虽然会牺牲一点点性能, 但是它的兼容性非常的强, 而其他的组件也都或多或少的设计得像ASGI APP一样, 在介绍其他组件之前, 先看看整个starlette的整个项目结构:

├── middleware                       # 中间件
├── applications.py                  # 启动的应用程序
├── authentication.py                # 验证相关
├── background.py                    # 封装后台任务, 会在返回响应后执行
├── concurrency.py                   # 一些小的asyncio相关的封装, 在新版本中, 直接使用了anyio库来代替
├── config.py                        # 配置
├── convertors.py                    # 一些类型的转换方法
├── datastructures.py                # 一些数据结构, 比如Url, Header, Form, QueryParam, State等等
├── endpoints.py                     # 支持cbv的路由以及一个稍微高级点的Websocket封装
├── exceptions.py                    # 异常处理
├── formparsers.py                   # Form,File之类的解析
├── graphql.py                       # 负责处理graphql相关的
├── __init__.py
├── py.typed                         # starlette需要用到的TypeHints
├── requests.py                      # 请求, 供用户获取数据
├── responses.py                     # 响应, 负责初始化Header和Cookies, 同时根据不同的Respnose类生成响应数据, 然后有个类ASGI调用接口, 该接口会发送ASGI协议到uvicorn服务, 发送完后如果有backgroud task, 则执行backgroud task, 直到执行完成, 该响应流程才结束。
├── routing.py                       # 路由
├── schemas.py                       # OpenApi相关的Schemas
├── staticfiles.py                   # 静态文件
├── status.py                        # HTTP状态码
├── templating.py                    # 基于jinja的模板响应
├── testclient.py                    # 测试客户端
├── types.py                         # 类型
└── websockets.py                    # websocket

上面的文件有很多, 有些比较简单就直接略过。

4.1.Request

Request非常的简单, 它继承于HttpConnection, 这个类主要是通过ASGI协议传过来的Scope进行解析, 提取如url, method等信息, 而Request增加了读取请求数据和返回数据(HTTP1.1支持服务端push数据给客户端)的功能, 其中, 读取数据都依赖于一个核心函数--stram,它的源码如下:

async def stream(self) -> typing.AsyncGenerator[bytes, None]:
    # 如果已经读取过的, 则从缓存中获取数据
    if hasattr(self, "_body"):
        yield self._body
        yield b""
        return

    if self._stream_consumed:
        raise RuntimeError("Stream consumed")

    self._stream_consumed = True
    while True:
        # 从ASGI容器的receive循环获取数据
        message = await self._receive()
        if message["type"] == "http.request":
            body = message.get("body", b"")
            if body:
                # 获取的数据不为空就返回数据
                yield body
            if not message.get("more_body", False):
                # 代表body数据已经被获取完
                break
        elif message["type"] == "http.disconnect":
            # 代表与客户端的连接已经关闭了
            self._is_disconnected = True
            # 抛出异常, 用户调用await request.body()   await request.json()之类的会抛出异常
            raise ClientDisconnect()
    # 返回空字节,标记结束
    yield b""

这个实现非常简单, 但是却有一个小bug, 如果有了解Nginx或者其他Web服务的都会知道, 一般的中间服务器是不会处理body数据的, 只做传递。ASGI也是如此, uvicorn在处理完url和header后就开始调用ASGI APP, 并把sendreceive对象传递下去, 这两个对象会在经过多个ASGI APP后,抵达路由这个ASGI APP, 并在函数里供用户使用,, 所以Request接收的receive对象是uvicorn生成的。 而receive的数据源是源自于是一个asyncio.Queue队列, 从中间件的分析可以知道, 每个ASGI APP都依据scope, receive来生成一个Request对象, 意味着每层ASGI APP的Request对象是不一致的, 如果在中间件调用Request对象读取Body的话, 就会提前消费通过receive消费了队列的数据, 导致后续的ASGI APP无法通过Request对象读取Body数据, 该问题示例代码如下:

import asyncio
from starlette.applications import Starlette
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse, Response

app: Starlette = Starlette()


class DemoMiddleware(BaseHTTPMiddleware):
    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        print(request, await request.body())
        return await call_next(request)


app.add_middleware(DemoMiddleware)


@app.route("/")
async def demo(request: Request) -> JSONResponse:
    try:
        await asyncio.wait_for(request.body(), 1)
        return JSONResponse({"result": True})
    except asyncio.TimeoutError:
        return JSONResponse({"result": False})


if __name__ == "__main__":
    import uvicorn  # type: ignore

    uvicorn.run(app)

运行后执行请求查看结果:

-> curl http://127.0.0.1:8000
{"result":false} 

可以看到执行的结果是false, 意味着执行request.body超时了, 因为此时receive队列已经空了, 是拿不到数据的, 如果不加超时的话这个请求就会一直卡主。 那么要怎么去解决问题呢, 先看看Request获取是如何获取body的, 因为用户可以同时获取多次body, 但一直都是相同的数据, 它的实现思路是获取数据后, 把数据缓存到一个变量里面, 我们也可以采取这个思路, 由于数据都是通过receive获取的, 那么可以在在读取数据后, 构造一个receive函数, 该函数返回类似于ASGI的通信协议的数据, 并且有完整的body数据(满足Request.stream获取body的构造), 代码如下:

async def proxy_get_body(request: Request) -> bytes:
    async def receive() -> Message:
        return {"type": "http.request", "body": body}

    body = await request.body()
    request._receive = receive
    return body

之后任意层级的ASGI APP如果需要获取Body数据的话, 就可以调用该函数来获取Body数据, 同时又不影响后续的ASGI APP获取Body数据。

4.2.TestClient

在基于TestCLient的测试用例运行时, 没有流量转发, 而是通过请求调用到路由函数, 并根据返回数据转化为一个响应对象。 同时, 它还能会自动运行on_startupon_shutdown挂载的函数以及挂载的中间件, 我在一开始接触时, 我很好奇它是怎么实现的, 因为大多数的测试用例框架都很难做到直接调用到路由函数, 同时又满足于框架的其他中间件, on_startupon_shutdown的功能(特别是Python的gRPC自带的测试用例封装...)。

在了解TestClient的运行原理之前, 先看看TestClient的使用用例如下:

from starlette.testclient import TestClient
from requests import Response  # type: ignore


app: Starlette = Starlette()
with TestClient(app) as client:
    response: Response = client.get("/")

这段代码中, 分为几步走:

  • 1:初始化一个app对象
  • 2:把app对象传入TestClient中, 并通过with语法启动一个上下文
  • 3:通过返回的client进行调用, 最后返回一个requests.Response对象。

其中第一点非常简单, 我们也分析过了, 对于第二点, 很难明白为什么要with上下文, 在官方文档说明是可以这样直接运行:

response: Response = TestClient(app).get("/")

但是没办法执行on_startupon_shutdown这两个事件挂载的函数, 所以初步判定with语法与它们有关, 而至于第三步则很难猜透starlette是怎么实现的, 但是返回的是requests.Respnose的对象, 那么一定跟requests这个框架有一些关联, 具体需要分析源码才能知道。

接下来就开始带着问题分析源码, 首先是类和__init__

class TestClient(requests.Session):
    __test__ = False  # For pytest to not discover this up.

    def __init__(
        self,
        app: typing.Union[ASGI2App, ASGI3App],
        base_url: str = "http://testserver",
        raise_server_exceptions: bool = True,
        root_path: str = "",
    ) -> None:
        super(TestClient, self).__init__()
        if _is_asgi3(app):
            app = typing.cast(ASGI3App, app)
            asgi_app = app
        else:
            app = typing.cast(ASGI2App, app)
            asgi_app = _WrapASGI2(app)  #  type: ignore
        # 使用了request的Adapter功能, 
        adapter = _ASGIAdapter(
            asgi_app,
            raise_server_exceptions=raise_server_exceptions,
            root_path=root_path,
        )
        self.mount("http://", adapter)
        self.mount("https://", adapter)
        self.mount("ws://", adapter)
        self.mount("wss://", adapter)
        self.headers.update({"user-agent": "testclient"})
        self.app = asgi_app
        self.base_url = base_url

从这个可以看出, TestClient继承于requests.Session的方法, 证明可以在编写测试用例时, 直接调用到requests.Session的相关的方法。然后在__init__方法中实例化了一个adapter, 这里是使用了requests的adapter机制, 通过adpater机制, 可以拦截请求的数据和响应的数据。 _ASGIdapter的代码比较多, 但是它的实现逻辑很简单, 它重载了Adaptersend方法, 当执行到send方法时, 它会变成执行app(scope, receive, send), 其中receive是负责把请求的数据转换为ASGI协议,供下一级ASGI APP调用。而send(位于Adapter.send里面的闭包函数)则获取ASGI APP返回的数据并存放到字典中, 当ASGI APP执行完毕的时候, Adaptersend方法会根据执行是否异常以及存放数据的字典转化为一个request.Response的实例返回给用户。

通过_ASGIdapter了解了starlette是如何解决第三个问题的, 接下来是with语法相关的__enter__, __exit__

class TestClient(requests.Session):
    def __enter__(self) -> "TestClient":
        loop = asyncio.get_event_loop()
        self.send_queue = asyncio.Queue()  # type: asyncio.Queue
        self.receive_queue = asyncio.Queue()  # type: asyncio.Queue
        self.task = loop.create_task(self.lifespan())
        loop.run_until_complete(self.wait_startup())
        return self

    def __exit__(self, *args: typing.Any) -> None:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.wait_shutdown())

可以看出, 在使用进入上下文和退出上下文时, 自动调用了lifespan方法, 然后通过lifespan机制来实现on_startupon_shutdown功能, 具体的源码和注释如下:

class TestClient(requests.Session):
    async def lifespan(self) -> None:
        # 构造lifespan的scope
        scope = {"type": "lifespan"}
        try:
            # 发送到starlette, 然后starlette就会根据receive执行对应的事件
            await self.app(scope, self.receive_queue.get, self.send_queue.put)
        finally:
            await self.send_queue.put(None)

    async def wait_startup(self) -> None:
        # 发送lifespan开始信息
        await self.receive_queue.put({"type": "lifespan.startup"})
        # 监听starlette返回的lifespan信息, 并判断信息是否正确
        message = await self.send_queue.get()
        if message is None:
            self.task.result()
        assert message["type"] in (
            "lifespan.startup.complete",
            "lifespan.startup.failed",
        )
        # 如果错误, 则消费task.result
        if message["type"] == "lifespan.startup.failed":
            message = await self.send_queue.get()
            if message is None:
                self.task.result()

    async def wait_shutdown(self) -> None:
        # 发送lifespan关闭信息
        await self.receive_queue.put({"type": "lifespan.shutdown"})
        message = await self.send_queue.get()
        if message is None:
            self.task.result()
        assert message["type"] == "lifespan.shutdown.complete"
        # 等待starlette的lifespan执行结束
        await self.task

5.总结

至此, starlette的几个重要的功能代码都分析完了, starlette是一个非常棒的库, 它的设计思路也是非常的棒, 建议大家自己读一遍starlette的源代码, 对以后自己写框架是有帮助的。