likes
comments
collection
share

【从1到∞精通Python】9、asyncio事件循环的创建

作者站长头像
站长
· 阅读数 17

原文链接:【Hard Python】【第二章-异步IO】1、asyncio事件循环的创建

python3中增加的重要特性之一即为asyncio,其提供了异步编程的原语支持,从而能够让python在事件驱动、协程协同等方面的编程场景大杀四方。

事件循环EventLoop是异步编程中的核心概念之一。python的异步IO,就从事件循环的实现开始讲起。

首先看一段示例代码:

async def _test_run_main():
    for i in range(3):
        await asyncio.sleep(1)
        print(f'[test_run] {i}')


def test_run():
    coro = _test_run_main()
    asyncio.run(coro)

通过async def定义的函数,其返回值是一个异步协程coroutine。协程相当于是事件循环里的一个单位任务,通过asyncio.run接口就可以将其运行起来。因此我们先来看asyncio.run的实现:

def run(main, *, debug=None):
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")
    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        if debug is not None:
            loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
            loop.run_until_complete(loop.shutdown_default_executor())
        finally:
            events.set_event_loop(None)
            loop.close()

asyncio.run中,首先会检查协程合法性以及当前线程是否有已在跑的事件循环,之后会新启动一个事件循环,并设置为当前线程在跑的事件循环,最后等待协程完成。完成后,会关闭事件循环,并取消当前线程事件循环的设置。

事件循环的诞生,便是从new_event_loop方法开始了。以windows为例,我们来看下当创建一个新的事件循环时,会发生哪些调用:

# runners.py
def new_event_loop():
    return get_event_loop_policy().new_event_loop()


# events.py
def _init_event_loop_policy():
    global _event_loop_policy
    with _lock:
        if _event_loop_policy is None:  # pragma: no branch
            from . import DefaultEventLoopPolicy
            _event_loop_policy = DefaultEventLoopPolicy()


def get_event_loop_policy():
    """Get the current event loop policy."""
    if _event_loop_policy is None:
        _init_event_loop_policy()
    return _event_loop_policy


# windows_events.py
class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
    _loop_factory = ProactorEventLoop
    
    
DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy


# events.py
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    def new_event_loop(self):
        return self._loop_factory()
    

# windows_events.py
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
    def __init__(self, proactor=None):
        if proactor is None:
            proactor = IocpProactor()
        super().__init__(proactor)

事件循环创建的策略有多种,在调用new_event_loop时,实质是执行默认事件循环策略的创建方法。以windows为例,默认策略是ProactorEventLoop

proactor模型本身为异步IO而生,其基本工作原理如下:

  • 用户态应用预先设定一组针对不同IO操作完成事件的回调(Handler),同时向内核注册一个完成事件的dispatcher(也就是proactor
  • 用户态线程发起异步IO操作后会即刻返回结果
  • IO操作在内核执行完成后会通知proactorproactor根据完成事件的类型,触发对应的完成事件回调

在windows下ProactorEventLoop实际是使用了IOCP模型,中文翻译叫IO完成端口,其基本工作原理如下:

  • 通过CreateIoCompletionPort创建完成端口
    • 完成端口,实质是一个用于缓存IO完成事件的队列
  • 创建一组worker thread关联完成端口
  • 创建listen server
  • listen serveraccept到客户端连接后,创建PerHandleData实例,将客户端socketPerHandleData实例与完成端口关联起来。执行上述的关联后,可以通过WSARecv发起接收客户端数据的异步IO操作,然后继续accept
  • worker thread中,通过GetQueueCompletionStatus方法获取IO操作的完成结果。如WSARecv完成后,可以直接提取接收到的客户端数据,执行对应的操作
  • listen server退出,通过PostQueuedCompletionStatus向完成端口发送特殊的数据包,用以让worker thread退出

了解了proactoriocp的基本工作原理后,我们就可以看python版ProactorEventLoop的具体实现了。

# windows_events.py
class IocpProactor:
    def __init__(self, concurrency=0xffffffff):
        self._loop = None
        self._results = []
        self._iocp = _overlapped.CreateIoCompletionPort(
            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
        self._cache = {}
        self._registered = weakref.WeakSet()
        self._unregistered = []
        self._stopped_serving = weakref.WeakSet()
        
 
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
    def __init__(self, proactor=None):
        if proactor is None:
            proactor = IocpProactor()
        super().__init__(proactor)
        
        
# proactor_events.py
class BaseProactorEventLoop(base_events.BaseEventLoop):
    def __init__(self, proactor):
        super().__init__()
        logger.debug('Using proactor: %s', proactor.__class__.__name__)
        self._proactor = proactor
        self._selector = proactor   # convenient alias
        self._self_reading_future = None
        self._accept_futures = {}   # socket file descriptor => Future
        proactor.set_loop(self)
        self._make_self_pipe()
        if threading.current_thread() is threading.main_thread():
            # wakeup fd can only be installed to a file descriptor from the main thread
            signal.set_wakeup_fd(self._csock.fileno())
            
    def _make_self_pipe(self):
        # A self-socket, really. :-)
        self._ssock, self._csock = socket.socketpair()
        self._ssock.setblocking(False)
        self._csock.setblocking(False)
        self._internal_fds += 1
        

# base_events.py
class BaseEventLoop(events.AbstractEventLoop):

    def __init__(self):
        self._timer_cancelled_count = 0
        self._closed = False
        self._stopping = False
        self._ready = collections.deque()
        self._scheduled = []
        self._default_executor = None
        self._internal_fds = 0
        # Identifier of the thread running the event loop, or None if the
        # event loop is not running
        self._thread_id = None
        self._clock_resolution = time.get_clock_info('monotonic').resolution
        self._exception_handler = None
        self.set_debug(coroutines._is_debug_mode())
        # In debug mode, if the execution of a callback or a step of a task
        # exceed this duration in seconds, the slow callback/task is logged.
        self.slow_callback_duration = 0.1
        self._current_handle = None
        self._task_factory = None
        self._coroutine_origin_tracking_enabled = False
        self._coroutine_origin_tracking_saved_depth = None

        # A weak set of all asynchronous generators that are
        # being iterated by the loop.
        self._asyncgens = weakref.WeakSet()
        # Set to True when `loop.shutdown_asyncgens` is called.
        self._asyncgens_shutdown_called = False
        # Set to True when `loop.shutdown_default_executor` is called.
        self._executor_shutdown_called = False

ProactorEventLoop实例初始化时,会先创建IocpProactor实例,里面通过CreateIoCompletionPort创建了一个完成端口。之后再调用BaseProactorEventLoop的初始化函数。BaseProactorEventLoop先初始化BaseEventLoop,然后设置proactor,并创建了一组socketpair

这样,事件循环的实例就被创建出来了。