likes
comments
collection
share

python的asyncio原理源码分析-透过asyncio.sleep

作者站长头像
站长
· 阅读数 5

事件循环是什么

本质上,asyncio.run就是使用事件循环去运行一段python函数。那么事件循环是什么呢?

个人简单的理解:在单个线程内,最大化利用cpu的时间, 循环执行任务组里面的任务。

举个栗子:

设无阻塞的任务为c(concurrent), 存在阻塞的任务为b(block)

假设存在某个任务组, 开始的执行顺序是:c1 -> c2 -> b1 -> c3

我们将任务组放到事件循环里面去运行,那么他就会从头到尾执行这个任务组的任务,一直到里面的任务执行完。

我们可以模拟流程,如下

第一轮事件循环:

  • c1 因为不存在阻塞,执行完。第一轮任务组还剩下c2, b1, c3
  • c2 因为不存在阻塞,执行完。第一轮任务组还剩下b1, c3
  • b1 因为存在阻塞,假设b1运行到代码段的2/3处阻塞,那么让出执行权.第一轮任务组还剩下c3.b1等候下一轮事件循环。
  • c3 因为不存在阻塞,执行完。第一轮任务组执行完。

第二轮事件循环:

  • 现在就剩下b1未执行了,检测b1有没有被什么事件唤醒,如果没有唤醒就一直阻塞到b1唤醒。
  • b1唤醒之后,从2/3处开始执行,一直把b执行完

至此,整个事件循环结束。因为任务组所以的任务都执行结束。

用一个简单的python案例来看看事件循环的结果:

async def level1_task1():
    print("start to sleep 1...")
    await asyncio.sleep(4)
    print("weak up 1...")


async def level1_task2():
    print("start to sleep 2...")
    await asyncio.sleep(2)
    print("weak up 2...")


async def level1():
    await asyncio.gather(*[level1_task1(), level1_task2()])

if __name__ == "__main__":
    asyncio.run(level1())
start to sleep 1...
start to sleep 2...
weak up 2...
weak up 1...

可以看到level1_task2并没有因为level1_task1的sleep而一直阻塞,而让出了运行权限。 等到后面轮到他的时候,才进行了唤醒执行,

所以事件循环最大化的利用了cpu的空闲时间!

ascynio.run是怎么运行的呢?

asyncio.sleep是怎么运行的?

为什么优先讨论这个问题?研究一个问题,最好的办法是从一个最简单的case入手,那么asyncio.sleep就是python事件循环中最容易写的一个例子。我们可以这个case从中看看是怎么运行的。

我们先看看asyncio.sleep内部是如何实现的?

async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    if loop is None:
        loop = events.get_running_loop()
    else:
        warnings.warn("The loop argument is deprecated since Python 3.8, "
                      "and scheduled for removal in Python 3.10.",
                      DeprecationWarning, stacklevel=2)

    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    try:
        return await future
    finally:
        h.cancel()

这个代码里面,有两个比较重要的变量loopfuture,这是后续所有的事件循环中的两个核心的概念。

  • loop = 一个事件循环执行器,你可以理解,所有的事件循环都需要用它来触发。
  • future = 你可以理解现在有个任务,future可以拿到这个任务的完成状态,还有任务完成的时候需要执行一些什么回调。

那么这段代码就比较清晰了:

  1. 先拿到当前环境中正在运行的事件循环执行器
  2. 调用loop.call_later,这个函数的意思是,事件循环将在多少秒后,执行一个函数。在当前这个代码中,要执行的函数就是futures._set_result_unless_cancelled,而且执行的时候需要给这个函数传入两个参数, futureresult。这个执行的函数也很简单
    def _set_result_unless_cancelled(fut, result):
        """Helper setting the result only if the future was not cancelled."""
        if fut.cancelled():
            return
        fut.set_result(result)
    
    核心实现就是调用future.set_result来标记这个future任务完成了。
  3. 然后await future, 意思就是等待future任务完成,future未完成之前会让出控制权,当前任务就停止了,让其他任务执行。

我当时看到这,就产生了一个疑问,那么这个call_later是怎么做到可以在多少s之后设置future完成的呢?然后我继续debugcall_later函数看

def call_later(self, delay, callback, *args, context=None):
    timer = self.call_at(self.time() + delay, callback, *args,
                         context=context)
    if timer._source_traceback:
        del timer._source_traceback[-1]
    return timer

继续debugcall_at

def call_at(self, when, callback, *args, context=None):
    self._check_closed()
    if self._debug:
        self._check_thread()
        self._check_callback(callback, 'call_at')
    timer = events.TimerHandle(when, callback, args, self, context)
    if timer._source_traceback:
        del timer._source_traceback[-1]
    heapq.heappush(self._scheduled, timer)
    timer._scheduled = True
    return timer

看到这里,开始豁然开朗,请注意heapq.heappush(self._scheduled, timer), 那就意味着,是往某个队列里面加入了一个任务, 而且还是个小顶堆。从名字来看self._scheduled是个延迟队列。events.TimerHandle应该也是一个延时任务。 关于TimerHandle不放在这里讨论,第一个自己也还没研究的很透彻(哈哈),其次这并不影响探讨事件循环,我们只需要知道这是一个延迟执行的任务。从这个TimerHandle的构造来看,其实就是把刚刚的call_later的延迟执行的函数,放到TimeHandle里面去了,TimeHandle才是真正执行任务的对象.

看到这里,我们其实大概知道了ascynio.sleep实现的本质:往事件循环里面加入一个延迟任务,然后await future,交出任务使用权, 一直等到n秒之后,事件循环器唤醒这个future,任务重新执行。

那么我们可以尝试去更深层的去讨论这个问题,任务是怎么被调度起来的呢?

asyncio.run 是怎么运行的

勿怂,debug到asyncio.run。后续的代码部分,会省略一些次要代码。

def run(main, *, debug=None):
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        if debug is not None:
            loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            events.set_event_loop(None)
            loop.close()

这里我们只看loop.run_until_complete即可,其他的都是一些校验还有事件循环执行器的创建。

def run_until_complete(self, future):
    new_task = not futures.isfuture(future)
    future = tasks.ensure_future(future, loop=self)
    if new_task:
        future._log_destroy_pending = False

    future.add_done_callback(_run_until_complete_cb)
    try:
        self.run_forever()
    except:
        if new_task and future.done() and not future.cancelled():
            future.exception()
        raise
    finally:
        future.remove_done_callback(_run_until_complete_cb)
    if not future.done():
        raise RuntimeError('Event loop stopped before Future completed.')

    return future.result()

这里就开始有点意思了:

  • 我们先看第三行代码, tasks.ensure_future(future, loop=self), 这个future其实就是asyncio.run(func())我们传入的这个func()。ensure_future就是把这个future任务加入的事件循环队列里面去。
  • 并且还给这个future设置一个完成时候的回调_run_until_complete_cb
    def _run_until_complete_cb(fut):
        if not fut.cancelled():
            exc = fut.exception()
            if isinstance(exc, (SystemExit, KeyboardInterrupt)):
                return
        futures._get_loop(fut).stop()
    
    目的就是说,我们传入的这个函数执行完成之后,就关掉整个事件循环执行器,这个也比较符合常理,任务执行完了,也没必要一直循环了。

然后就是很核心的代码self.run_forever() 此处省略了一些代码。

while True:
    self._run_once()
    if self._stopping:
        break

可以看到,这里就是循环开始的函数,在事件循环执行器未被标记关闭之前,会一直循环执行任务。

我们再看看最终我们要研究的单次循环,具体如何调度任务self._run_once()此处省略一些代码。代码的分析和代码放在一块,这样便于理解。

def _run_once(self):
    timeout = None
    # 首先声明两个概念
    # self._ready 是就绪队列, 当前可以执行的任务的都放在这
    # self._scheduled 是延迟队列,call_later的任务都放在这,比如asyncio.sleep的任务
    if self._ready or self._stopping:
        # 当就绪队列存在任务的时候,会里面返回执行
        timeout = 0
    elif self._scheduled:
        # 当就绪队列不存在任务,延迟队列存在任务的时候,会延迟timeout秒
        when = self._scheduled[0]._when
        timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
    # timeout=None会一直等到epoll有消息为止
    # timeout=0会立马返回
    # timeout > 0 会等到timeout秒后再唤醒
    # 这个_selector后续会讲解,底层就是epoll,这里可以暂时理解成可以中断的sleep
    event_list = self._selector.select(timeout)
    self._process_events(event_list)

    # 计算出当前事件循环运行的事件
    end_time = self.time() + self._clock_resolution
    # 如果就绪队列里面有元素
    while self._scheduled:
        # 从队列里面取出第一个元素
        # 这里的队列是一个小顶堆,延迟最小的任务放在第一个
        handle = self._scheduled[0]
        # 判断任务的执行时间是否有到达
        if handle._when >= end_time:
            break
        # 如果事件已经到,把延迟任务绪队列里面移除
        handle = heapq.heappop(self._scheduled)
        handle._scheduled = False
        # 将移除的延迟任务加入到就绪队列中
        self._ready.append(handle)

    # 开始执行一轮就绪队列里面的任务
    # 并且本次事件循环执行完的任务,也都会从队列里面移除。
    ntodo = len(self._ready)
    for i in range(ntodo):
        handle = self._ready.popleft()
        if handle._cancelled:
            continue
        handle._run()
    handle = None  # Needed to break cycles when an exception occurs.

好,如果你刚刚已经认真的看了上面的解释的话,应该大概有一些理解了。我们就刚刚我们运行的代码进行一下分析。

async def level1_task1():
    print("start to sleep 1...")
    await asyncio.sleep(4)
    print("weak up 1...")


async def level1_task2():
    print("start to sleep 2...")
    await asyncio.sleep(2)
    print("weak up 2...")


async def level1():
    await asyncio.gather(*[level1_task1(), level1_task2()])

if __name__ == "__main__":
    asyncio.run(level1())

开始了哦!

第一轮, 就是我们刚调用ensure_future的时候,level1被放入了就绪队列。

python的asyncio原理源码分析-透过asyncio.sleep

第二轮, level1执行的用gather往就绪队列加入了两个任务

python的asyncio原理源码分析-透过asyncio.sleep

第三轮, level1_task1, level1_task2都运行到asyncio.sleep并且让出了时间,并且两个任务都往延迟队列加入了两个延迟任务。

python的asyncio原理源码分析-透过asyncio.sleep

第五轮然后现在就绪队列里面没有元素,只有延迟队列有,那么_selector就开始睡,睡延迟最少的那个任务.睡2s。

第六轮, 2s时间到,level1_task2被唤醒,被放入就绪队列, 处理就绪队列元素

python的asyncio原理源码分析-透过asyncio.sleep

第七轮然后现在就绪队列里面没有元素,只有延迟队列有,那么_selector就开始睡,现在就剩一个任务了,只需要再睡4s-2s了。

第八轮, 2s时间到,level1_task1被唤醒, 被放入就绪队列, 处理就绪队列元素

python的asyncio原理源码分析-透过asyncio.sleep

至此level1函数执行完毕,整个事件循环结束。