tornado 并发编程系列(8)——生成器与协程 🌪
生成器 🌪
生成器是在 Python 2.2, PEP 255
中首次引入的 , 生成器实现了迭代器协议 , 所以我们可以说生成器是迭代器的构造器 , 通过生成器我们可以在循环中计算下一个值时不会浪费内存 , 也就是可以为我们提供惰性计算
我们来自己实现一个 range
为例 :
非惰性计算 , 一次性生成 , 你需要有足够大的内存存储结果序列
def eager_range(up_to):
"""Create a list of integers, from 0 to up_to, exclusive."""
sequence = []
index = 0
while index < up_to:
sequence.append(index)
index += 1
return sequence
惰性计算 , 生成器方式
def lazy_range(up_to):
"""Generator to return the sequence of integers from 0 to up_to, exclusive."""
index = 0
while index < up_to:
yield index
index += 1
惰性计算 , 闭包方式
def cell_range(up_to):
"""Closure to return the sequence of integers from 0 to up_to, exclusive."""
index = 0
def inner():
nonlocal index
while index < up_to:
index += 1
return index
return inner
对于闭包而言实际上是一次调用完毕的概念 , 而对于生成器而言是暂停代码执行的概念
PyGenObject
在 Python
中 , 生成器的实现就是 PyGenObject
, 我们以 Python 3.6.3
为例 , 来看看它的源代码
Include/genobject.h
, 13~33行
/* _PyGenObject_HEAD defines the initial segment of generator
and coroutine objects. */
#define _PyGenObject_HEAD(prefix) \
PyObject_HEAD \
/* Note: gi_frame can be NULL if the generator is "finished" */ \
/* _frame: PyFrameObject
PyFrameObject 是 Python 对 x86 平台上栈帧的模拟,
同样也是 Python 字节码的执行环境, 也就是当前的上下文
*/
struct _frame *prefix##_frame; \
/* True if generator is being executed. */ \
char prefix##_running; /* 运行状态 */ \
/* The code object backing the generator */ \
PyObject *prefix##_code; /* 字节码 */ \
/* List of weak reference. */ \
PyObject *prefix##_weakreflist; \
/* Name of the generator. */ \
PyObject *prefix##_name; \
/* Qualified name of the generator. */ \
PyObject *prefix##_qualname;
typedef struct {
/* The gi_ prefix is intended to remind of generator-iterator. */
_PyGenObject_HEAD(gi)
} PyGenObject;
_frame (PyFrameObject)
就是生成器的上下文 , Python
在执行时实际上是一条 PyFrameObject
链 , 每个 PyFrameObject
对象中都记录了上一个栈帧对象、字节码对象、字节码执行位置位置
PyGenObject
对象对 PyFrameObject
做了进一层的封装 , 这是由于生成器的特殊性 , 因为 PyFrameObject
对象实际上是一次性的 , 所以必须由其它对象也就是 PyGenObject
来保证生成器的正常运行
send
在 Python 2.5, PEP 342
中 , 添加了将数据发送回暂停的生成器中的功能 , 也就是 send
static PyObject *
gen_send_ex(PyGenObject *gen, PyObject *arg, int exc, int closing)
{
/* 获取当前的线程环境 */
PyThreadState *tstate = PyThreadState_GET();
/* 照当当前生成器的 PyFrameObject 对象 */
PyFrameObject *f = gen->gi_frame;
PyObject *result;
......
if (f->f_lasti == -1) {
/* 未激活 */
if (arg && arg != Py_None) {
char *msg = "can't send non-None value to a "
"just-started generator";
if (PyCoro_CheckExact(gen)) {
msg = NON_INIT_CORO_MSG;
}
else if (PyAsyncGen_CheckExact(gen)) {
msg = "can't send non-None value to a "
"just-started async generator";
}
PyErr_SetString(PyExc_TypeError, msg);
return NULL;
}
} else {
/* Push arg onto the frame's value stack */
result = arg ? arg : Py_None;
Py_INCREF(result); /* 如果有参数, 就将其压入栈中 */
*(f->f_stacktop++) = result;
}
/* Generators always return to their most recent caller, not
* necessarily their creator. */
Py_XINCREF(tstate->frame);
assert(f->f_back == NULL);
f->f_back = tstate->frame;
gen->gi_running = 1; /* 将生成器设置为运行状态 */
result = PyEval_EvalFrameEx(f, exc); /* 运行生成器 */
gen->gi_running = 0;
/* Don't keep the reference to f_back any longer than necessary. It
* may keep a chain of frames alive or it could create a reference
* cycle. */
assert(f->f_back == tstate->frame);
Py_CLEAR(f->f_back);
/* If the generator just returned (as opposed to yielding), signal
* that the generator is exhausted. */
......
if (!result || f->f_stacktop == NULL) {
/* generator can't be rerun, so release the frame */
/* first clean reference cycle through stored exception traceback */
PyObject *t, *v, *tb;
t = f->f_exc_type;
v = f->f_exc_value;
tb = f->f_exc_traceback;
f->f_exc_type = NULL;
f->f_exc_value = NULL;
f->f_exc_traceback = NULL;
Py_XDECREF(t);
Py_XDECREF(v);
Py_XDECREF(tb);
gen->gi_frame->f_gen = NULL;
gen->gi_frame = NULL;
Py_DECREF(f);
}
return result;
}
通过 send
, 将数据回传到暂停的生成器 , 随后将生成器中的栈帧对象挂载到当前线程上 , 执行完毕后再从当前线程上卸载 , 这样就实现了生成器的调用
send
的出现使我们可以进一步对生成器进行控制
生成器的另一种调用方式 next
实际上就是 send(None)
static PyObject *
gen_iternext(PyGenObject *gen)
{
return gen_send_ex(gen, NULL, 0, 0);
}
yield from
在 Python 3.3, PEP 380
, 增加了 yield from
, 让你可以以一种干净的方式重构生成器 , 或者说构造生成器链
def lazy_range(up_to):
"""Generator to return the sequence of integers from 0 to up_to, exclusive."""
index = 0
def gratuitous_refactor():
nonlocal index
while index < up_to:
yield index
index += 1
yield from gratuitous_refactor()
生成器链
def bottom():
# Returning the yield lets the value that goes up the call stack to come right back
# down.
return (yield 42)
def middle():
return (yield from bottom())
def top():
return (yield from middle())
# Get the generator.
gen = top()
value = next(gen)
print(value) # Prints '42'.
try:
value = gen.send(value * 2)
except StopIteration as exc:
value = exc.value
print(value) # Prints '84'.
协程 🌪
从最初我们使用 yield
构建可中断函数 , 再到 send
可以调度生成器函数 , 再到 yield from
可以构建生成器调用链 (这个调用链至关重要 , 因为 send
只是发送数据 , 而 yield from
可以直接调度其他生成器)
拥有了这些基础 , 协程的实现就变得简单了起来
我们先来看两段代码 :
import queue
def task(name, work_queue):
if work_queue.empty():
print(f"Task {name} nothing to do")
else:
while not work_queue.empty():
count = work_queue.get()
total = 0
print(f"Task {name} running")
for x in range(count):
total += 1
print(f"Task {name} total: {total}")
def main():
"""
This is the main entry point for the program
"""
# Create the queue of work
work_queue = queue.Queue()
# Put some work in the queue
for work in [15, 10, 5, 2]:
work_queue.put(work)
# Create some synchronous tasks
tasks = [(task, "One", work_queue), (task, "Two", work_queue)]
# Run the tasks
for t, n, q in tasks:
t(n, q)
if __name__ == "__main__":
main()
运行结果 :
Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do
接下来我们使用 yield
来改造一下 :
import queue
def task(name, queue):
while not queue.empty():
count = queue.get()
total = 0
print(f"Task {name} running")
for x in range(count):
total += 1
yield
print(f"Task {name} total: {total}")
def main():
"""
This is the main entry point for the program
"""
# Create the queue of work
work_queue = queue.Queue()
# Put some work in the queue
for work in [15, 10, 5, 2]:
work_queue.put(work)
# Create some tasks
tasks = [task("One", work_queue), task("Two", work_queue)]
# Run the tasks
done = False
while not done:
for t in tasks:
try:
next(t)
except StopIteration:
tasks.remove(t)
if len(tasks) == 0:
done = True
if __name__ == "__main__":
main()
运行结果 :
Task One running
Task Two running
Task Two total: 10
Task Two running
Task One total: 15
Task One running
Task Two total: 5
Task One total: 2
同样是两个 task
, 在第二段代码中我们实现了异步执行 , 两个 task
交叉协作完成任务 , 这两个 task
就是两个协程
不难发现 , 即使我们不用 yield
, 我们自己通过函数也可以实现这样的并发效果 , 把上面的代码按照 yield
拆分成几个函数功能上是一样的 , 我们把拆分的函数叫做子例程 , 实际上 , 子例程可以看做是特定状态的协程 , 任何的子例程都可以转写成不使用 yield
的协程
相对于子例程而言 , 协程更加灵活 , 协程更加适合用来实现彼此比较熟悉的程序组件 , 或者说耦合度高一点的组件
协程的切换概念是 "让步" , 而子例程的切换概念是 "出产" , 一个主动 , 一个被动 , 以下摘自 Wiki :
- 子例程可以调用其他子例程 , 调用者等待被调用者结束后继续执行 , 故而子例程的生命期遵循后进先出 , 即最后一个被调用的子例程最先结束返回 , 协程的生命期完全由对它们的使用需要来决定
- 子例程的起始处是惟一的入口点 , 每当子例程被调用时,执行都从被调用子例程的起始处开始 , 协程可以有多个入口点 , 协程的起始处是第一个入口点 , 每个
yield
返回出口点都是再次被调用执行时的入口点 - 子例程只在结束时一次性的返回全部结果值 , 协程可以在
yield
时不调用其他协程 , 而是每次返回一部分的结果值 , 这种协程常称为生成器或迭代器
事件循环
事件循环 是一种程序结构或设计模式 , 用于在程序中等待和分发事件或者消息 , 简单来说就是当某件事情发生时 , 接下来该做什么 , 通常它是一个死循环 , 因为它需要不断的收集事件并处理事件
在上面的代码中其实我们已经实现了一个最简单的事件循环 :
# 永不停歇的收集事件并处理事件
while True:
# 收集就绪的事件列表
ready = selector.select()
# 循环处理事件
for event, mask in ready:
if isfunction(event.data):
event.data()
else:
try:
event.data.send(event.fileobj)
except StopIteration as e:
continue
事件循环上就是一个调度器 , 是我们用户程序之间的调度器 , 就是操作系统调度线程一样 , 事件循环可以用来调度我们的协程 , 所以通常你会发现协程总是和事件循环同时出现 , 所以我们对事件循环的要求一般都比较高 , 因为协程调度的性能直接由事件循环的调度方案决定
在早期的 Python
中 , 由 gevent
提供了事件循环能力 , 而 Python 3.4
时引入 asyncio
标准库来提供事件循环能力
async&await
最后我们来说说 async
和 await
在 Python 3.4
中
# This also works in Python 3.5.
import asyncio.coroutine
@asyncio.coroutine
def py34_coro():
yield from stuff()Copy to clipboardErrorCopied
对应的字节码
>>> dis.dis(py34_coro)
2 0 LOAD_GLOBAL 0 (stuff)
3 CALL_FUNCTION 0 (0 positional, 0 keyword pair)
6 GET_YIELD_FROM_ITER
7 LOAD_CONST 0 (None)
10 YIELD_FROM
11 POP_TOP
12 LOAD_CONST 0 (None)
15 RETURN_VALUECopy to clipboardErrorCopied
在 Python 3.5
中
async def py35_coro():
await stuff()Copy to clipboardErrorCopied
对应的字节码
>>> dis.dis(py35_coro)
1 0 LOAD_GLOBAL 0 (stuff)
3 CALL_FUNCTION 0 (0 positional, 0 keyword pair)
6 GET_AWAITABLE
7 LOAD_CONST 0 (None)
10 YIELD_FROM
11 POP_TOP
12 LOAD_CONST 0 (None)
15 RETURN_VALUECopy to clipboardErrorCopied
它们之间的差异仅仅是 GET_YIELD_FROM_ITER
和 GET_AWAITABLE
的差异 , 而这两个函数实际上都是用来标记协程的 , 所以其实 yield from
和 async/await
并无两样
GET_YIELD_FROM_ITER
可以接收生成器或者协程 , 而 GET_AWAITABLE
只接受协程
所以 async/await
并没有做什么特殊的提升 , 这两个关键字也主要是为了将协程规范化 , 明确了协程的意义 , 而不是将生成器和协程混在一起
这些也都是有迹可循的 :
- 3.4:
asyncio
在Python
标准库中引入 , 但是只是临时的 - 3.5:
async/await
成为Python
语法的一部分 , 用于表示和等待协程 , 但它们还不是保留关键字 - 3.6:引入了异步生成器和异步循环 ,
asyncio
不再只是临时的 , 而是稳定的 - 3.7:
async/await
成为保留关键字 , 它们旨在替换asyncio.coroutine()
装饰器
到这里 , 协程的前世今生我们已经理清了 , 不过还有一点 , gevent
是有栈协程的代表 , 而 asyncio
是无栈协程的代表
转载自:https://juejin.cn/post/7273786630267125818