likes
comments
collection
share

python asyncio 概念解释与快速上手

作者站长头像
站长
· 阅读数 4

本文大纲翻译自 medium.com/@superfastp…

Python Asyncio 让我们可以进行基于协程(coroutine)的异步编程。但从python3.4(2014)开始引入,到python3.7的成熟,asyncio其实已经推出许多年了。尽管如此,异步编程仍然是python中最吸引人但又最令人沮丧的领域。为什么呢?因为它是在是太难上手了,尤其对没有js等自带async的编程语言开发经验的人来说。对于python开发者来说,asyncio不是一个添加到python的特性,而是一个完全新的编程范式(paradigm),这需要我们重新架构我们的程序。正是这种不同的思想让开始学习asyncio变的极度沮丧,甚至憎恨asyncio。本文将带你快速上手,趟过这条河。

基本概念

协程

前面我们提到了Asyncio基于协程,我们先简单了解一下协程(Coroutine)的概念,协程往往被称为轻量级线程、纤程,很多编程语言都拥有这个概念,例如go、js包括比较新的java21。我们知道线程之间切换是比较耗时的,需要从用户态切换到内核态,而协程并不涉线程切换,自己维护上下文,一个线程可以维护多个协程。 python asyncio 概念解释与快速上手 但是协程虽好,但是单线程内仍只有一个协程可以获取执行权,对于cpu密集型任务,它甚至可能会更慢,所以协程主要是解决的是io相关的问题。

事件循环 event loop

asyncio 使用了单线程的事件循环队列来执行协程。整个流程就是,将任务提交到事件队列中,会有一个执行线程不停的从队列中获取并执行任务,当遇到需要io的操作时,会指派给操作系统监控状态,然后继续获取任务,直到所有任务都执行完成。当然,执行任务期间,也会产生新的任务,包括io完成也是任务。 python asyncio 概念解释与快速上手 事件循环队列中最重要的是封装了一个对操作系统提供的io多路复用器,不同操作系统实现不同,linux的epoll,windows的iocp等等。简单来说就是,当我们遇到io时(访问文件系统、网络请求等),我们可以把等待io完成的任务交给操作系统,而等待io这期间,我们可以不让线程挂起,去干更多的事情,这对于有着紧箍咒(全局解释锁GIL)的python来说,是非常有用的。

1.如何定义、创建、运行、切换协程

我们使用 async def来定义一个协程,这里可以把async def看作是def的扩张,专门用来定义协程。 普通的def定义的function运行之后会产生一个对应的返回值,而async def运行之后,会返回一个coroutine。 coroutine必须在event loop 内运行,我们可以通过asyncio.run()运行coroutine。

# define a coroutine  
async def custom_coro():  
 print('Hello there')

# create a coroutine  
coro = custom_coro()

# 运行custom_coro()之后返回的是一个coroutine
type(coro)

# run a coroutine in the event loop  
asyncio.run(coro)

我们如何从一个协程切换到另一个协程呢?我们可以使用await关键字。

async def custom_coro():
	# 中断当前协程,然后运行other_coro 
	await other_coro()

当协程遇到await 修饰的函数时,会中断,直到await的协程执行完毕,然后会到当前状态继续继续执行。

2.task与coroutine

task是coroutine的一个封装,相对于coroutine,task拥有查看状态、暂停任务等更丰富的功能。通常来说,我们最好将协程转成task,很多地方目前支持coroutine,实际上是python偷偷帮你转为task,将来的某个版本可能不再支持。

# create and schedule a task  
task = asyncio.create_task(other_coro())

在使用asyncio.create_task()后,会将协程封装成一个task,并且会安排它进入事件循环,并且只要机会,就让它马上执行。 这个调用也返回了一个task object,通过这个返回值,我们可以回去返回值结果、查看任务状态等等。 我们可以使用await来暂停当前线程获取结果,也可根据task的方法来获取结果。

 # 等待,直到task执行完,并获取返回值
 value=await task
 
 # check task,如果task没有完成直接获取result会抛异常
 if task.done():
	 value=task.result()

下面给一个完整例子:使用coroutine创建一个task,然后过一段时间后再获取结果,我为大家标出了执行顺序,以便更好的理解。

# example of scheduling an async task  
import asyncio  
  
# coroutine to perform some useful task  
async def task_coro():  
#3. report a message  
	print('The task is running...')  
#4. suspend and sleep for a moment  
	await asyncio.sleep(1)  
#6. report a message  
	print('The task done')  
#7. return a result  
return 'The answer is 100'  
  
# main coroutine  
async def main():  
#1. run a task independently  
	task = asyncio.create_task(task_coro())  
#2. suspend a moment, allow the scheduled task to run  
	await asyncio.sleep(0)  
#5. wait for the async task to complete  
	await task  
#8. report the return value  
	print(f'Got: {task.result()}')  
  
# create the coroutine and run it in the event loop  
asyncio.run(main())

这里又个需要注意的点:从1.2.3步我们可以看到task创建以后并没有直接执行(默认情况下),而是在await asyncio.sleep(0) 后main协程中断之后,task_coro才开始执行。

3.多个协程如何并行

我们可以在aysncio程序内并行的运行多个程序,这对于并行下载多个文件或者高并发场景下都很有用。

我们可以使用asyncio.gather()来运行协程,它可以接受多个coroutine或者task,然后返回一个asyncio.Future。Future是Task的父类,代表着未来将会有一个结果,Task是一个为了包裹协程的Future。

future = asyncio.gather(coro1(), coro2() coro3())

Future 与Task Coroutine一样,他们的对象都可以被await修饰的对象,都是实现了__await__。在这里我们可以await future 来实现等待所有协程或者任务,最终会返回一个迭代器,获取每个协程的运行结果,顺序与添加的顺序一样。

下面的例子中,准备了100个协程,每个协程随机sleep 0-1秒,所有协程并发执行。

# example of running many coroutines concurrently
import random
import asyncio
import time
# coroutine to perform some useful task


async def task_coro(arg) -> str:
    # generate a random value between 0 and 1
    value = random.random()
    # suspend and sleep for a moment
    await asyncio.sleep(value)
    # report the argument and value
    return f'Task {arg} done after {value} seconds'

# main coroutine


async def main():
    # create many coroutines
    coros = [task_coro(i) for i in range(100)]
    # suspend and run all coroutines
    now=time.time()
    results=await asyncio.gather(*coros)
    print(f"total taken:{time.time()-now}")
    for er in results:
        print(er)
# create the coroutine and run it in the event loop
asyncio.run(main())

可以看到,总共花费不到1秒就都执行完毕,然后循环输出了,每个协程的返回值。

total taken:0.9943249225616455
Task 0 done after 0.2671076110393307 seconds
Task 1 done after 0.514333336910391 seconds
...

4.如何等待多个tasks

如果我们不是仅仅想等待所有task都完成,而是有更复杂的需求,第一个出现异常就暂停等等操作,我们可以使用asyncio.wait()。 通过函数的定义,我们可以看到我们能够设置超时时间和何时返回,默认时全部完成时ALL_COMPLETED返回,也可以是第一个完成FIRST_COMPLETED,或者是第一个异常返回FIRST_EXCEPTION。

(function) def wait(  
fs: Iterable[Awaitable[_T@wait]],  
*,  
timeout: float | None = None,  
return_when: str = "ALL_COMPLETED")

继续使用上一个的场景进行多个task的等待,只不过这次我们我们使用asyncio.wait(),获取第一个完成的结果。

# SuperFastPython.com  
# example of waiting for a collection of tasks  
import random  
import asyncio  
  
# coroutine to perform some useful task  
async def task_coro(arg):  
# generate a random value between 0 and 1  
value = random.random()  
# suspend and sleep for a moment  
await asyncio.sleep(value)  
# return a value unique for this task  
return arg * value  
  
# main coroutine  
async def main():  
# create and schedule many independent tasks  
tasks = [asyncio.create_task(task_coro(i)) for i in range(100)]  
# suspend and wait for the first task to complete  
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)  
# report the result from the first task  
task = done.pop()  
print(f'First task got: {task.result()}')  
  
# create the coroutine and run it in the event loop  
asyncio.run(main())

5. 按任务完成顺序处理结果

当任务完成时,我们可以处理或者提交结果。与前面按照提交任务顺序返回结果不同,当某些任务很快,某些任务完成很慢,如果等待所有结果,将会浪费更多的时间。我们可以通过asyncio.as_complete()按照任务完成顺序来处理结果。

asyncio.as_complete()接受一个awaitable集合,比如一个List[Task],返回一个迭代器,每次迭代将会yield 一个task,当使用await时,等一个已经完成的task后返回结果。

# get a generator that yields awaitables in completion order then iterate
for task in asyncio.as_completed(tasks):  
	# 获取第一个已经完成的任务
	result = await task

6. 如何实现协程之间共享数据

协程之间数据共享我们可以通过asyncio.Queue,它是一个先进先出、协程安全的队列,我们无需担心Queue的添加、获取的竞争问题。

创建一个队列,可以通过添加maxsize参数设置上限
queue = asyncio.Queue()

我们可以通过put()来添加数据到队列中,同时会返回一个协程。事实上,我们put时,一定要await修饰,因为如果Queue达到上限的时候会阻塞住。同样的获取数据时也需要使用await修饰get()。

# add and retrieve
await queue.put(item)
item = await queue.get()

下面我们使用asyncio.Queue 来写一个经典的生产者消费者程序。生产者往队列里放十个元素,每放进去一个,消费者就获取一个打印出来,直到最后消费到None为止。

from random import random
import asyncio

# coroutine to generate work
async def producer(queue):
    print('Producer: Running')
    # generate work
    for i in range(10):
        value = random()
        await asyncio.sleep(value)
        await queue.put(value)
    await queue.put(None)
    print('Producer: Done')

async def main():
    # create the shared queue
    queue = asyncio.Queue()
    # run the consumer as an independent task
    asyncio.create_task(producer(queue))
    # consume items from the queue until a None is seen
    while value:=await queue.get():
        # report the value
        print(f'Got: {value}')

asyncio.run(main())

7. 如何写NIO程序

我们可以通过asyncio.open_connection()来创建一个TCP client。调用之后会返回一个协程,等待协程完成之后将会返回一个读流StreamReader和一个写流StreamWriter。

# 如果需要ssl,则需要设置ssl=True
reader, writer = await asyncio.open_connection('www.baidu.com', 443, ssl=True)

我们可以使用encode()将string转为bytes,然后使用将数据写入socket,然后使用await writer.drain()来等待所有的数据已经发送出去。

# encode string data to byte data  
byte_data = string_data.encode()  
# write byte data  
writer.write(bytes_data)
# wait for data to be transmitted  
await writer.drain()

read是与write相反的操作。

# read byte data  
byte_data = await reader.read()  
# decode bytes to strings  
string_data = byte_data.decode()

下面我们进行一下实战,以NIO的方式访问百度首页。

import asyncio

async def fetch_baidu_homepage():
    host = 'www.baidu.com'
    port = 80

    # 构建HTTP请求
    request = (
        f"GET / HTTP/1.1\r\n"
        f"Host: {host}\r\n"
        "Connection: close\r\n"
        "\r\n"
    )
    # 使用asyncio.open_connection建立TCP连接
    reader, writer = await asyncio.open_connection(host, port)

    # 发送HTTP请求
    writer.write(request.encode())

    # 读取响应
    response = b""
    while True:
        data = await reader.read(1024)
        if not data:
            break
        response += data

    # 关闭连接
    writer.close()

    print(response.decode())


if __name__ == '__main__':
    asyncio.run(fetch_baidu_homepage())

最后

这里简单介绍了asyncio主要的一些操作和概念,实际过程中往往更加复杂,或者会有一些框架对其进行了封装。总之,通过这些例子,我们能对python的异步编程有一定的理解,开始向异步编程的世界迈出开始的第一步。