uvicorn源码分析
「这是我参与11月更文挑战的第5天,活动详情查看:2021最后一次更文挑战」
前记
Uvicorn
是一个基于uvloop
和httptools
的ASGI服务器, 性能比较强劲, 通过它可以与使用ASGI规范的Python
应用程序进行交互。ASGI与WSGI很像, 只不过ASGI原生支持HTTP2.0和WebSocket, 同时更多的是支持Python
的Asyncio
生态的WEB应用程序。通过了解Uvicron
,能知道一个稳定的Web服务器的工作方式以及能更好的去了解其他基于ASGI的WEB应用程序。
最新修订见原文, 关注公众号<博海拾贝diary>可以及时收到新推文通知
Uvicron
通过一个通用的协定接口与ASGI应用程序交互, 应用程序只要实现如下代码, 即可通过Uvicorn
发送和接收信息:
async def app(scope, receive, send):
# 一个最简单的ASGI应用程序
assert scope['type'] == 'http'
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
]
})
await send({
'type': 'http.response.body',
'body': b'Hello, world!',
})
if __name__ == "__main__":
# uvicorn服务
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")
其中应用程序的scope
代表有关传入连接信息的字典, receive
是一个从服务器接收传入信息的通道, send
是一个将消息发送到服务器的通道, 不过这不是本文的重点, 更多scope
信息可以访问ASGI interface了解, 接下来将从例子的uvicorn.run
开始, 通过源码分析uvicorn
工作原理。
1.uvicorn主流程源码分析
分析源码之前, 首先是了解它的源码结构, uvicorn
的源码结构如下:
├── lifespan
├── loops
├── protocols
├── middleware
├── supervisors
├── config.py
├── importer.py
├── __init__.py
├── logging.py
├── __main__.py
├── main.py
├── server.py
├── subprocess.py
├── _types.py
└── workers.py
uvicron
做了很好的分类, 每个文件夹/文件都有自己的功能:
- lifespan
告诉基于ASGI的应用程序
uvicorn
即将启动和停止的消息,uvicorn
在启动的时候会初始化,然后发送初始化协议并等待ASGI应用程序返回, 如果ASGI应用程序返回complele
则uvicorn
会继续运行, 返回failed
则报错退出。 - loops
自动加载事件循环, 优先加载
uvloop
, 这将会获得极大的性能提升 - protocols 里面存放着读取连接数据和解析消息体的协议, 如HTTP和WebSockets, 可以把他认为是一个序列化器。
- middleware 存放着一些简单通用的ASGI中间件
- supervisors
uvicorn
本身是以一个进程启动的, 这个文件夹存放着uvicorn
的几种启动方式, 如多进程启动,监控文件变动自动重启的方式等。 - config.py
uvicorn
的配置文件, 它不仅读取用户的配置, 还自动加载上面所述的lifespan
,loops
,protocols
等等 - importer.py
uvicron
中很多地方使用了动态加载配置和动态加载库, 这里是把这个方法进行统一封装 - logging.py 提供了根据日志等级渲染不同颜色的日志以及访问日志(但是很少人用)
- main.py
uvicorn
的入口文件, 包括代码运行和命令行运行两种方式 - server.py
uvicorn
的核心服务, 用于处理进出流量以及处理自身的服务状态 - subprocess.py
给
supervisors/multiprocess.py
使用的, 可能是为了以后拓展需要, 才放在一级目录 - workers.py
其他工作模式的
Uvicorn
, 比如里面有个UvicornWorker
, 就是用于gunicorn
启动uvicorn
结构了解完, 接下来开始正式步入源码之旅, 这里直接忽略掉命令行的启动方式, 从uvicorn.run
开始, 实际上命令行启动方式也是通过获取参数, 然后传入uvicorn.run
方法中, 这个uvicorn.run
方法会接受符合ASGI的app和kwargs
参数, 然后生成对应的配置实例config
, 再生成server
, 接着依靠配置判断执行不同的启动模式, 具体代码如下:
def run(app, **kwargs):
# 加载配置
config = Config(app, **kwargs)
# 加载server
server = Server(config=config)
if (config.reload or config.workers > 1) and not isinstance(app, str):
# 只有命令行模式才可以使用reload
logger = logging.getLogger("uvicorn.error")
logger.warning(
"You must pass the application as an import string to enable 'reload' or "
"'workers'."
)
sys.exit(1)
if config.should_reload:
# 启动reload逻辑
sock = config.bind_socket()
supervisor = ChangeReload(config, target=server.run, sockets=[sock])
supervisor.run()
elif config.workers > 1:
# 多进程方式启动
sock = config.bind_socket()
supervisor = Multiprocess(config, target=server.run, sockets=[sock])
supervisor.run()
else:
# 最普通的方法启动
server.run()
config
很简单, 它负责装填配置, 然后调用configure_logging
配置全局的logger
, 此外还有一个load
的方法, 将会在Server
中调用, 接下来, 先忽略其中涉及到的模板, 到server.run
之中, 看看普通模式下, 服务是怎么启动的。
这个run
方法很简单, 就是设置事件循环, 然后通过事件循环调用serve
来启动服务:
def run(self, sockets=None):
self.config.setup_event_loop()
loop = asyncio.get_event_loop()
loop.run_until_complete(self.serve(sockets=sockets))
serve
是启动服务的最核心代码, 首先会执行config.load
方法加载一些动态的配置, 如解析http的库, 解析websocket的库, 还有通过用户传过来的app来加载app, 并判断是使用WSGI
,ASGI2
或者是ASGI3
, 并进行配置(uvicorn
在这里是通过ASGI中间件的方式来支持), 最后根据配置启动对应的中间件。
接着会跳转到server.startup
方法, 该方法首先会通过lifaspan.startup
与用户传过来的app
通信, 校验是否是合法的应用程序, 然后初始化变量, 先是初始化一个信号处理函数, 当收到信号时, 会把变量should_exit
设置会True
。
然后会初始化一个名为create_protocol
的变量, 它是继承于asyncio.Protocol
, asyncio.Protocol
主要用于从socket获取数据和写入数据, 同时也有一些TCP相关的调用, create_protocol
的主要作用就是作为socket和应用程序的中间层, 负责把HTTP数据与ASGI数据互转, 如下图:
接着根据用户传过来的变量方式来启动服务, 这些都是
Python
的Asyncio
封装好的, 具体为以下几种:
- 当用户传socket过来的时候: 基于该scoket和
create_protocol
创建服务, 如果是多进程且是Windows系统, 则要显示的共享socket。 - 当用户传文件描述符的时候: 基于该文件描述符获取scoket, 并通过该socket和
create_protocol
创建服务。 - 当用户传unix domain socket的时候: 基于unix domain socket和
create_protocol
创建服务。 - 当用户传host和port参数的时候: 基于host和port和
create_protocol
创建服务。
创建完服务后, socket的处理就转给了应用程序了, 但是采用了事件循环的思路, 需要uvicorn
使用while
使程序一直跑, 防止主程序退出:
async def main_loop(self):
counter = 0
should_exit = await self.on_tick(counter)
while not should_exit:
counter += 1
counter = counter % 864000
await asyncio.sleep(0.1)
should_exit = await self.on_tick(counter)
每次循环执行的时候都会调用on_tick
方法, 该方法主要是进行服务统计以及判断啥时候可以退出服务, 比如请求总数超过配置的限制数, 或者收到信号,把变量should_exit
设置为True
等等, 如果在循环中判断程序需要进行退出, 就会进入退出逻辑shutdown
, 该逻辑比较简单, 注释和代码如下:
async def shutdown(self, sockets=None):
logger.info("Shutting down")
# 关闭socket, 不让有新的连接建立
for server in self.servers:
server.close()
for sock in sockets or []:
sock.close()
for server in self.servers:
await server.wait_closed()
# 关闭已经创建的连接, 并等待他们处理完毕
for connection in list(self.server_state.connections):
connection.shutdown()
await asyncio.sleep(0.1)
# 等待连接关闭或者用户强制关闭
if self.server_state.connections and not self.force_exit:
msg = "Waiting for connections to close. (CTRL+C to force quit)"
logger.info(msg)
while self.server_state.connections and not self.force_exit:
await asyncio.sleep(0.1)
# 等待后台任务完成或者用户强制关闭
if self.server_state.tasks and not self.force_exit:
msg = "Waiting for background tasks to complete. (CTRL+C to force quit)"
logger.info(msg)
while self.server_state.tasks and not self.force_exit:
await asyncio.sleep(0.1)
# 通过lifespan告诉ASGI应用程序即将关闭
if not self.force_exit:
await self.lifespan.shutdown()
至此整个主流程分析完毕, 下图是我整理后的一个流程图:
从图中可以很清晰的看清
uvicorn
与ASGI应用程序的关系, 接下来是上面部分没有详细讲过的小组件源码分析。
2.uvicorn.protocols源码分析
在了解了uvicorn
的主流程后只大概的知道uvicorn
是通过uvicorn.protocols
与应用程序进行通信, 但是不明白他们具体是如何通信的, 接下来就开始了解uvicorn
中最核心的uvicorn.protocols
。
从上面的分析中我们可以知道, 作为一个Web服务器, uvicorn
是通过一个socket来接收发发送请求的。 而对于socket来说, 它只关心怎么创建连接,关闭连接以及如何传输内容, 它不会关心传输的字节流的上层协议是如何实现的。
好在Asyncio
提供了几种简单的网络传输模型, 它们都是对于这些传输数据的抽象, 这些抽象会在如loop.create_server
和loop.create_unix_server
方法中使用, 通过这个抽象我们能很方便的使用TCP和UDP连接。
uvicorn
在基于Asyncio
创建服务器时, 把protocol
抽象通过protocols参数传入loop.create_server
和loop.create_unix_server
后, 维护他们返回的server
对象, 剩下的与ASGI应用程序的数据交互全由protocol
对象处理。
uvicorn
封装的对象继承于asyncio.Protocols
, 它是针对TCP协议的封装, 它总共有6个方法,包括启动时的connection_made
, 接收数据时的data_received
, 断开时的eof_received
以及丢失连接时的connection_lost
, 然后还有当TCP连接出现堵塞时的暂停pause_writing
和恢复resume_writing
, 具体的方法传输的参数和使用方法如下:
class Protocol(BaseProtocol):
def connection_made(self, transport):
"""
在建立连接时调用.
参数是表示管道连接的transport,
此时得到的transport需要设置为该类的transport, 方便后续connection_lost控制关闭管道.
"""
def data_received(self, data):
"""
通过该方法可以获取到客户端传输过来的数据
"""
def eof_received(self):
"""
当另一端调用write_eof()或等效函数时调用.
如果返回一个假值(包括None),则传输将关闭自身。
如果它返回true值,则关闭传输取决于协议.
"""
def connection_lost(self, exc):
"""
当连接丢失或关闭时调用, 根据exc判断是否要关闭trnasport.
参数是一个异常对象或None(后者表示接收到常规EOF或中止或关闭连接).
"""
def pause_writing(self):
"""
当transport缓冲区超过高水位(high-water mark)时调用,
此时应该能控制外部不再写入数据(通常是一个asyncio.Future),
同时应该通过transport.pause_reading来停止获取数据, 之后TCP就会通过拥塞机制使得客户端减缓发送数据的速度。
"""
def resume_writing(self):
"""
当transport缓冲区排放低于低水位线(low-water mark)时调用.
此时要释放标志, 使得外部可以继续写入数据,
同时通过transport.resume_reading来恢复获取数据,
之后TCP就会通过拥塞机制知道服务端的处理能力上来了, 使客户端加快发送速度。
"""
了解完asycnio.Protocol
后, 可以正式了解uvicorn
对asyncio.Protocol
做了哪些修改来达到跟应用程序进行通信, 由于各个Protocol
的封装差不多, 这里以httptools_impl.HttpToolsProtocol
为例子进行说明。
首先是类的初始化, HttpToolsProtocol
在初始化时会加载config
并配置到日志,对应的websocket协议处理,对应的http解析器以及serve
创建的统计容器等, 其中需要注意的是, 在初始化时, 传入的变量是HttpToolsProtocol
的实例化本身。
接着是Protocol
的几大主要协议接口函数, 这里以源码和注释进行分析:
class HttpToolsProtocol(asyncio.Protocol):
def connection_made(self, transport):
# 添加实例本身到集合, 代表当前还有连接在处理
self.connections.add(self)
self.transport = transport
# 初始化流控制
self.flow = FlowControl(transport)
# 简单的初始化实例trsnaport的相关编列
self.server = get_local_addr(transport)
self.client = get_remote_addr(transport)
self.scheme = "https" if is_ssl(transport) else "http"
if self.logger.level <= TRACE_LOG_LEVEL:
prefix = "%s:%d - " % tuple(self.client) if self.client else ""
self.logger.log(TRACE_LOG_LEVEL, "%sConnection made", prefix)
def connection_lost(self, exc):
# 从集合删除实例本身, 代表当前连接已经处理玩了, 不需要进入统计容器
self.connections.discard(self)
if self.logger.level <= TRACE_LOG_LEVEL:
prefix = "%s:%d - " % tuple(self.client) if self.client else ""
self.logger.log(TRACE_LOG_LEVEL, "%sConnection lost", prefix)
# 设置cycle, 告诉他连接已经断开
if self.cycle and not self.cycle.response_complete:
self.cycle.disconnected = True
if self.cycle is not None:
self.cycle.message_event.set()
if self.flow is not None:
self.flow.resume_writing()
def _unset_keepalive_if_required(self):
"""取消keep alive timeout的任务,
一般来说, 在发送数据后服务端会等待客户端发送数据, 如果超过多少秒没有发送数据则可以判断该客户端已经断开了, 服务端可以主动关闭连接
而uvicorn通过timeout_keep_alive_task来实现
"""
if self.timeout_keep_alive_task is not None:
self.timeout_keep_alive_task.cancel()
self.timeout_keep_alive_task = None
def data_received(self, data):
self._unset_keepalive_if_required()
try:
# 接受字节数据, 并交由http解析器进行解析
self.parser.feed_data(data)
except httptools.HttpParserError as exc:
# 解析失败, 应该不是http协议的数据, 断开连接
msg = "Invalid HTTP request received."
self.logger.warning(msg, exc_info=exc)
self.transport.close()
except httptools.HttpParserUpgrade:
# 已经超过了解析器能解析的协议版本, 应该交由更新的协议解析器处理
self.handle_upgrade()
分析完了几个跟连接相关的主要方法后就会发现分析路线已经断了, 而该类中还有很多on_xxx
的方法, 它们也没有被其他方法调用。
这是因为在初始化HTTP协议解析器的时候,uvicorn.protocol
把自己的实例传入了HTTP解析器中, 解析器会边接收数据边按照url, header, body来顺序解析, 并在执行每种数据解析后, 会通过回调告诉传入的实例, uvicorn
正是通过on_xxx
方法来监听这些回调并处理解析完的HTTP数据:
class HttpToolsProtocol(asyncio.Protocol):
def on_url(self, url):
"""这是收到一个请求后的第一次解析, 可以认为是该请求体的初始化, 此时会根据url和连接数据进行初始化, 并存放在实例的scope中"""
method = self.parser.get_method()
parsed_url = httptools.parse_url(url)
raw_path = parsed_url.path
path = raw_path.decode("ascii")
if "%" in path:
path = urllib.parse.unquote(path)
self.url = url
self.expect_100_continue = False
self.headers = []
self.scope = {
"type": "http",
"asgi": {"version": self.config.asgi_version, "spec_version": "2.1"},
"http_version": "1.1",
"server": self.server,
"client": self.client,
"scheme": self.scheme,
"method": method.decode("ascii"),
"root_path": self.root_path,
"path": path,
"raw_path": raw_path,
"query_string": parsed_url.query if parsed_url.query else b"",
"headers": self.headers,
}
def on_header(self, name: bytes, value: bytes):
"""解析器在解析header时, 是按照header一行一行进行解析的, 所以每即系一行header都会调用一次on_header, 并把他们存在实例的headers中"""
name = name.lower()
if name == b"expect" and value.lower() == b"100-continue":
self.expect_100_continue = True
self.headers.append((name, value))
def on_headers_complete(self):
"""对于大部分前置web框架来说, 一般解析到header后就结束不再解析了, 会开始发送到正真处理的应用程序, uvicorn也是这样的"""
http_version = self.parser.get_http_version()
if http_version != "1.1":
self.scope["http_version"] = http_version
if self.parser.should_upgrade():
# 如果发现当前http版本更加高级(比如websocket), 则不再处理, 在另外一个逻辑会转到websocket处理
return
# Handle 503 responses when 'limit_concurrency' is exceeded.
if self.limit_concurrency is not None and (
len(self.connections) >= self.limit_concurrency
or len(self.tasks) >= self.limit_concurrency
):
# 当前并发数过高, 不再转发给后面的应用程序, 直接返回错误, 这里是一个具有ASGI标准函数签名的函数, 里面实现的功能是发送错误信息到socket
app = service_unavailable
message = "Exceeded concurrency limit."
self.logger.warning(message)
else:
app = self.app
# cycle相当于一个request的处理流程
# 普通的HTTP请求只对应一个cycle就可以了, 这里是兼容Pipelined HTTP请求
existing_cycle = self.cycle
self.cycle = RequestResponseCycle(
scope=self.scope,
transport=self.transport,
flow=self.flow,
logger=self.logger,
access_logger=self.access_logger,
access_log=self.access_log,
default_headers=self.default_headers,
message_event=asyncio.Event(),
expect_100_continue=self.expect_100_continue,
keep_alive=http_version != "1.0",
on_response=self.on_response_complete,
)
if existing_cycle is None or existing_cycle.response_complete:
# 如果上个请求已经处理完了, 则开始处理这个请求(通过run_asgi来运行)
task = self.loop.create_task(self.cycle.run_asgi(app))
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)
else:
# 如果上个请求没有处理完, 就先暂停读取数据, 并把该cycle放到pipeline暂存
self.flow.pause_reading()
self.pipeline.insert(0, (self.cycle, app))
def on_body(self, body: bytes):
"""读取到原生的body字节, 如果ASGI处理者还在运行, 且不是websocket, 则转给ASGI处理者
注: 一个请求可能会触发多次on_body"""
if self.parser.should_upgrade() or self.cycle.response_complete:
return
self.cycle.body += body
if len(self.cycle.body) > HIGH_WATER_LIMIT:
# 由于ASGI应用程序会根据调用者需要才来获取body(比如starlette的 await request.body()), 如果应用程序没有需要则会暂缓获取body数据
self.flow.pause_reading()
# 告诉ASGI应用程序, body已经获取结束(通常在cycle的more_body为False的时候, 才会检查message_event)
self.cycle.message_event.set()
def on_message_complete(self):
if self.parser.should_upgrade() or self.cycle.response_complete:
return
# 表示body已经读取结束了
self.cycle.more_body = False
self.cycle.message_event.set()
def on_response_complete(self):
"""返回响应时的回调"""
self.server_state.total_requests += 1
if self.transport.is_closing():
return
# 设置一个keep_alive的机制, 服务端返回响应后会设置一个倒计时future, 该future只有在上面data_received收到请求的时候才会取消
# 如果该future没有取消, 则会调用timeout_keep_alive_handler函数来关闭transport通道
self._unset_keepalive_if_required()
self.timeout_keep_alive_task = self.loop.call_later(
self.timeout_keep_alive, self.timeout_keep_alive_handler
)
# 恢复读取数据
self.flow.resume_reading()
if self.pipeline:
# 如果是pipeline请求, 则开始处理刚才暂存的cycle
cycle, app = self.pipeline.pop()
task = self.loop.create_task(cycle.run_asgi(app))
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)
在了解解析HTTP数据的时候, 经常会遇到一个cycle对象, 这个对象是基于ASGI负责读写数据转换的对象, 这个对象有send
和receive
两个方法, 这两个方法的命名是站在ASGI应用程序的角度来命名的。
其中send
通过传入的参数message
获取到ASGI应用程序返回的数据, 并依据ASGI协议进行解析, 并拼接成HTTP协议的字节流, 当ASGI应用程序发送结束标记时, send
会把拼接的字节流通过socket返回给客户端, 同时触发on_response_complete
方法。
而receive
比较简单, 它只负责接收获取到已经解析完成的HTTP数据(早前面on_xxx时会把数据传给cycle), 然后发送到ASGI应用程序中。
这两个方法都是通过on_headers_complete
中执行的run_asgi
方法来调用的, 通过该方法, uvicorn
会把数据的处理权转给ASGI应用程序, 如果ASGI应用程序处理异常, 则会返回HTTP状态码为500的响应给客户端并关闭transport。
分析完了cycle对象后, 再次回到protocol的data_received
的方法中, 这里通过获取httptools.HttpParserUpgrade
异常的方式得知当前可能是一个WebSocket请求, 于是进入handle_upgrade
逻辑, 这个逻辑会检查是否加载了解析WebSocket解析器以及请求体是否满足WebSocket条件, 如果不满足就会返回一个响应体告诉客户端当前无法支持该HTTP请求的升级协议, 如果满足则会生成WebSocketProtocols来处理socket的数据, 并把它设置为当前的transport向关联, 不过这个WebsocketProtocols基本上是HTTP protocols和cycle两个对象的融合, 具体处理步骤也差不多, 这里就不多做描述了。
至此, 大体上的uvicorn.protocols
源码就分析完了, 由于uvicorn
是把asyncio.Protocol
, 解析器, cycle
三者结合在一起, 所以分析起来要经常跳转, 因此我把他们的流程转成如下的图:
3.总结
至此, uvicorn
的核心流程已经分析完了, 它先是通过server
来启动一个服务, 并管理服务状态, 然后再通过protocol
负责做双端的序列化, 使ASGI应用程序能够按照ASGI协议读写数据, 其中protocol
还融合了HTTP解析器解析HTTP并通过它来解析数据。
当然, 除了上述主流程外, uvicorn
还包括了中间件, 多进程启动以及监控文件变化重启服务等组件, 这些组件的代码量不大, 分析源码也说不出啥, 这里就简单略过。
转载自:https://juejin.cn/post/7034150387774914597