asyncio:从原理、源码到实现
选自《源码读Python》
作者:闲谈后
https://zhuanlan.zhihu.com/p/64991670
去年的这个时候吧,我开始找工作,招聘启事上说,需要了解 asyncio。
我回去翻了翻书,orz·····
今年,在一次内部技术分享会上,我要分享的内容涉及到一些 python 协程,我又去看 asyncio 的文档。
结果依旧是 orz ····
无奈,我将 asyncio 的源码通读了一遍,然后自己用 python 实现了一个简化版 asyncio,才确信自己应该了解了协程到底是个什么什么玩意儿。
python 的协程实现并不复杂,好的设计本来也不应该复杂。但吊诡的是,如果你没有真正接触使用过 python 中的协程的话,那么很多概念就会看起来很唬人,很复杂。
比如 最基础的 future 是一个容器 或者占位符,代表一个即将进行或者尚未完成的操作。什么玩意儿啊····
我个人原因,比较喜欢写教程,而非写博客。尽管我自己也是个渣渣·····
写这个系列文章的目的,就是希望各位像我一样,没有机会使用 python asyncio、只懂得最基本概念的小白,也能通过实现一个 asyncio 来弄清楚协程,到底是个什么东西。其中当然少不了一睹 cpython 与 asyncio 的源码。
01. 进程、线程与协程
首先我们要解决的问题 :进程 线程 协程是什么?
多进程、多线程、协程的模型都是为了解决多个任务同时进行的问题。
多任务系统实现的关键在于暂停当前任务, 保存当前任务的现场、 选择下一个任务、恢复下一个任务的的现场,执行下一个任务
在计算机中,一般我们不称之为现场,而是上下文。
在 《深入理解操作系统》 中,开篇就是 信息 = 位 + 上下文,我这里稍微做一个引申
程序 = 指令序列 + 上下文
指令序列的含义是固定的,在计算机中就是 pc 寄存器地址指向的值,也就是 cpu 要执行的指令。但是对于计算机的不同层次,上下文的含义也不尽相同。
对于 cpu 来说,上下文就是,操作数寄存器、栈寄存器、状态寄存器等各类寄存器。
对于进程来说,上下文就是,寄存器、信号、分配的内存空间,文件描述符等各类由 cpu 抽象的出来的硬件资源
对于线程来说,上下文就是,寄存器,线程堆栈···
对于一个函数来说,上下文就是当前的命名空间。
那么对于 协程 来说,上下文又是什么呢?
而进程的切换是,操作系统控制,也由操作系统执行。
python 的线程切换,是由 python 虚拟机控制,通过一个系统调用,来进行线程切换。协程的切换过程完全由程序自身控制。
进程的切换 需要 切换系统资源和指令,消耗时间最长
线程的切换,不需要切换系统资源,只需要切换指令、线程堆栈。但这个过程也需要系统调用。
协程的切换都在用户空间内进行,不需要进行系统调用。
协程优于线程的主要在于
python 线程调度方式是,每执行 100 个字节码或者遇到阻塞就停止当前线程,然后进行一个系统调用,让 os 内核选出下一个线程。但是协程 只会在 阻塞的时候,切换到下一个协程。100个字节码,说多不多,说少不少,你调用两个库函数说不定就没了,因此线程的切换存在很多是无效的切换,当线程数量越大,这种因为调度策略的先天不足带来的性能损耗就越大。
线程需要进行系统调用,协程不需要。系统调用需要进入内核态,无效的调度会让这部分开销显得更大
协程可以自主调度,而线程只能决定合适退出,但是下一个线程是谁则依赖于操作系统。
协程 有两种,一种 无栈协程,python 中 以 asyncio 为代表, 一种有栈协程,python 中 以 gevent 为代表。
两者对如何保存切换上下文的方式大不一样,我们先看看 asyncio ,以后有机会再补上 gevent 相关的内容。
02. yield 与 yield from
最新版的 python 已经不采用基于 yield 的协程了。但我这里则只用yield 和 yield from 来实现协程,而不使用 await 与 async。这样能更好地展示生成器到协程的全过程,原理也都是相通的。
我们先看一个最简单的 yield 例子
def ex():
print("yield 1")
yield 1
print("yield 2")
yield 2
gen = ex()
print("启动生成器")
a = gen.send(None)
print('从生成器中获取一个值', a)
b = gen.send(None)
print('获取到的第二个值', b)
# out
# 启动生成器
# yield 1
# 从生成器中获取一个值 1
# yield 2
# 获取到的第二个值 2
程序序 运行到 第一个 yield 的时候 保存了函数的上下文,并退出了。
接下来 又通过 next 方法再次进入这个 函数,继续运行。在这里,被保存的上下文得到了恢复。
这几行代码里已经展示了一个协程所需要的的全部。保存上下文 切换运行程序 恢复上下文 重新进入程序。
接下来,让我们深入看一下,在 cpython 中 yield 的实现原理,这会为我们之后深入了解 gevent 提供参照物。这里需要对 cpython 有一些简单的认知。如果没有相应的基础,不烦看看我的另一篇文章:源码读python(一)
python 中的上下文,被封装成了一个叫做 PyFrameObject 的结构,又称之为栈帧,看一下他的源码:https://github.com/xianth123/cpython/blob/master/Include/frameobject.h#L17
typedef struct _frame {
PyObject_VAR_HEAD
struct _frame *f_back; /* previous frame, or NULL 上一个栈帧*/
PyCodeObject *f_code; /* code segment 代码段*/
PyObject *f_builtins; /* builtin symbol table (PyDictObject) 内建变量表*/
PyObject *f_globals; /* global symbol table (PyDictObject) 全局变量表*/
PyObject *f_locals; /* local symbol table (any mapping) 局部变量表*/
PyObject **f_valuestack; /* points after the last local 栈底*/
/* Next free slot in f_valuestack. Frame creation sets to f_valuestack.
Frame evaluation usually NULLs it, but a frame that yields sets it
to the current stack top. */
PyObject **f_stacktop; /* 栈顶 */
PyObject *f_trace; /* Trace function */
char f_trace_lines; /* Emit per-line trace events? */
char f_trace_opcodes; /* Emit per-opcode trace events? */
/* Borrowed reference to a generator, or NULL 专为生成器设计的指针*/
PyObject *f_gen;
int f_lasti; /* Last instruction if called 运行的上一个字节码位置*/
/* Call PyFrame_GetLineNumber() instead of reading this field
directly. As of 2.3 f_lineno is only valid when tracing is
active (i.e. when f_trace is set). At other times we use
PyCode_Addr2Line to calculate the line from the current
bytecode index. */
int f_lineno; /* Current line number 运行字节码对应的python源代码的行数*/
int f_iblock; /* index in f_blockstack */
char f_executing; /* whether the frame is still executing */
PyTryBlock f_blockstack[CO_MAXBLOCKS]; /* for try and loop blocks */
PyObject *f_localsplus[1]; /* locals+stack, dynamically sized */
} PyFrameObject;
我做了简单的注释。
在Python实际的执行中,会产生很多PyFrameObject对象,而这些对象会被链接起来,形成一条执行环境链表。这正是对x86机器上栈帧间关系的模拟。在x86上,栈帧间通过esp指针和ebp指针建立了关系,使新的栈帧在结束之后能顺利回到旧的栈帧中,而Python正是利用f_back来完成这个动作。
另外比较重要的两点就是各种环境变量表,以及程序运行必不可少的堆栈。f_lasti 记录了字节码运行的位置,这也就意味着在 PyFrameObject 中,我们可以随时恢复代码的运行。
接下来看一下 cpython 中对 生成器的定义:https://github.com/xianth123/cpython/blob/master/Include/genobject.h#L15
在 python 中,生成器的语法规则比较特殊,长得像个函数,但是调用之后却返回一个生成器对象。所以他的结构体定义也比较特殊,是一个宏。其中最重要的 是 prefix_frame, ## 是连接符。它指向了一个 PyFrameObject 对象,就是该生成器的上下文。
这个结构体中,有三个最重要的成员
指向生成器上下文的 指针
一个指示生成器状态的字符串 未启动,停止,运行,结束
生成器的字节码
也就是 上下文 + 指令序列 + 状态
next 与 send 作用是一致的,只是 send 可以传入一个参数,我们来看一下 send 的实现。
https://github.com/xianth123/cpython/blob/3.7/Objects/genobject.c#L151github.com
static PyObject *
gen_send_ex(PyGenObject *gen, PyObject *arg, int exc, int closing)
{
PyThreadState *tstate = PyThreadState_GET();
PyFrameObject *f = gen->gi_frame;
PyObject *result;
·······
if (f->f_lasti == -1) {
if (arg && arg != Py_None) {
const char *msg = "can't send non-None value to a "
"just-started generator";
if (PyCoro_CheckExact(gen)) {
msg = NON_INIT_CORO_MSG;
}
else if (PyAsyncGen_CheckExact(gen)) {
msg = "can't send non-None value to a "
"just-started async generator";
}
PyErr_SetString(PyExc_TypeError, msg);
return NULL;
}
} else {
/* Push arg onto the frame's value stack */
result = arg ? arg : Py_None;
Py_INCREF(result); /* 如果有参数 就将其压栈*/
*(f->f_stacktop++) = result;
}
/* Generators always return to their most recent caller, not
* necessarily their creator. */
Py_XINCREF(tstate->frame);
assert(f->f_back == NULL);
f->f_back = tstate->frame;
gen->gi_running = 1; /* 将生成器设置为运行状态 */
gen->gi_exc_state.previous_item = tstate->exc_info;
tstate->exc_info = &gen->gi_exc_state;
result = PyEval_EvalFrameEx(f, exc); /* 正式运行 生成器 得到返回值*/
tstate->exc_info = gen->gi_exc_state.previous_item;
gen->gi_exc_state.previous_item = NULL;
gen->gi_running = 0;
········
return result;
}
我省略了一些代码,不过关键的代码也就这些。如果有传入参数,就将参数入栈,用这种方式来向生成器传递值。然后更改生成器运行状态。通过 PyEval_EvalFrameEx 函数运行生成器保存的栈帧,返回值。
03. yield from
在生成器中,可以用 return 返回值,但如果 send 走到 return 语句的时候会报一个StopIteration。return 返回值的 就在 exception 的 value 中。
yield from 有两重性质,一方面,它是一个表达式,表达式自然是有值的,他的值,就是yield from 后面生成器 return 的返回值。非常关键的一点,生成器的 yield 语句会向外产出值,但是 return 的值并不会向外产出。想要获得 return 的返回值,要么用 try 语句捕获异常要么用 yield from 表达式获取值。
yield from 另外一点就是,能将内层的生成器的返回值,传到外层。
内层生成器 ex2() 通过 yield from,可以在最外层取出来。
关于 yield from,我觉得讲的最清楚的是 流畅的python 16章 协程的有关内容,各位有兴趣可以看看,我这里不做深入展开。
有了 yield from 这个工具,我们便可以将多个生成器串联起来。
我尝试用树形结构来描述一个生成器,也许对协程的理解会有帮助。我们将 yield 的返回值视为 子节点,将生成器本身和 return 的值视为父节点。可以用一个图来描述。
yield from 的意义在于,将这些生成器串联起来形成一颗树,并且提供了一种便捷的方法,将这颗树的叶子节点依次返回。
yield from 将多个生成器连接起来的方式,我们可以使用很简单的方式就可以将所有的 yield 返回值一一提取出来。不断的对根节点的生成器 进行send 操作即可。也就是 gen3.send(None)。
04. 从生成器到协程
(本文的协程,单指 asyncio 中的协程)
生成器可以描述为一颗树,生成器是协程实现的基础,那么协程自然也可以描述为一颗树。
在 asyncio 的实现中,协程与生成器最大的区别,就是生成器的叶节点可以是 数字、函数、字符串等各种对象,但是 asyncio 的协程实现中,叶节点只能是 None 或者 future。
05. future
future 本质上是一个用生成器实现的 回调管理器。
我们之所以使用协程就是为了,在遇到 io、阻塞的时候,将运行的权利交出去,当阻塞事件完成的时候,通过一个回调来唤醒程序继续往下走,并且返回io事件的值。future 就是对这个过程的包装。可以简单写一个伪代码。
def future():
callback = future.send # 回调函数为 生成器的 send 方法,当然这种写法有问题,此时生成器还未形成
do_something(callback) # 进行 io 操作,并将 callback 注册为回调函数
result = yield
return result
我们需要将生成器用 yield 送出去,以便回调函数使用,一个 函数不能满足我们的需求,我们将它扩充为一个类,用 yield 返回 self。
class Future:
_FINISHED = 'finished'
_PENDING = 'pending'
_CANCELLED = 'CANCELLED'
def __init__(self):
self.status = self._PENDING
def set_result(self, result):
# 给future设置结果,并将 future 置为结束状态
self.status = self._FINISHED
self._result = result
def done(self):
return self.status != self._PENDING
def result(self):
# 获取future 的结果
if self.status != self._FINISHED:
raise InvalidStateError('future is not ready')
return self._result
def __iter__(self):
if not self.done():
self._blocking = True
yield self # 返回自身
assert self.done(), 'future not done' # 下一次运行 future 的时候,要确定 future 对应的事件已经运行完毕
return self.result()
最关键的就是这个 __iter__ 方法,第一次启动的时候,将自身设置为 阻塞状态,然后返回 self 。暴露出 set_result 方法让回调函数可以给 future 设置返回值,并且将 future 更改为结束状态。
现在我们可以描绘出协程的树状结构了。所有的叶子节点返回的值 为 self,自然,驱使协程往下走的回调函数,统一变成了最外层的 coro3.send(None)。Coroutine 指代 协程。
那么协程的运行路线就已经很清楚了。coro 通过 coro.send(None) 启动,遇到 io 操作,会用 yield 返回一个 future。io 操作完成之后, 回调函数通过 coro.send(None) 继续往下进行。直到 coro.send(None) 爆出 StopIteration 异常,协程运行完毕。
然而我们不会只运行一个协程,当一个 coro 将自己的控制权交出去之后,谁来接接管呢?
我们需要有一个调度者 也就是 eventloop ,看名字就知道,这是一个事件循环。
所谓的事件驱动模式其实也很简单。事件,就是函数。
事件驱动模式,就是有一个队列,里面存放着一堆函数,从第一个函数开始执行,在函数执行的过程中,可能会有新的函数继续加入到这个队列中。一直到队列中所有的函数被执行完毕,并且再也不会有新的函数被添加到这个队列中。
协程非常适合这种模式,协程的启动就是将 coro.send(None) 加入到 eventloop 的队列中。future 回调完成之后,再将 coro.send(None) 加入到队列中就可以驱使协程继续往下走。
我们来写一个 eventloop.
class Eventloop:
def __init__(self):
self._ready = collections.deque() # 事件队列
self._stopping = False
def stop(self):
self._stopping = True
def call_soon(self, callback, *args):
# 将事件添加到队列里
handle = Handle(callback, self, *args)
self._ready.append(handle)
def add_ready(self, handle):
# 将事件添加到队列里
if isinstance(handle, Handle):
self._ready.append(handle)
else:
raise EventloopError('only handle is allowed to join in ready')
def run_once(self):
# 执行队列里的事件
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
handle._run()
def run_forever(self):
while True:
self.run_once()
if self._stopping:
break
通过 run_forever 不断执行队列里的函数,通过 stop 来停止 eventloop。通过 add_ready 将事件添加到自身的队列里。
handle 是对函数和参数的一个简单封装。展示一下他的代码。
class Handle:
def __init__(self, callback, loop, *args):
self._callback = callback
self._args = args
def _run(self):
self._callback(*self._args)
因为 协程的推动需要将 coro.send(None) 添加到 eventloop 里,所以将 eventloop 设置为一个全局变量,用一个函数来获取他。
_event_loop = None
def get_event_loop():
global _event_loop
if _event_loop is None:
_event_loop = Eventloop()
return _event_loop
class Eventloop:
pass
这样写的坏处是,所有线程的 eventloop 都是同一个,不能支持多线程。如果想支持多线程,那么get_event_loop 获取的应该是一个线程里的全局变量。为了简单起见,我们暂时采用简单的实现,多线程版本后期再加上。
因为有了 eventloop, future 也需要改变一下。主要是增加了 add_done_callback 接口,为 future 增加回调函数。
当为 future 的 set_result 的时候,会执行 _schedule_callbacks。他的作用是将回调函数列表添加到 eventloop 的函数运行队列里,通过 eventloop,来运行回调函数。
class Future:
_FINISHED = 'finished'
_PENDING = 'pending'
_CANCELLED = 'CANCELLED'
def __init__(self, loop=None):
if loop is None:
self._loop = get_event_loop() # 获取当前的 eventloop
else:
self._loop = loop
self._callbacks = []
self.status = self._PENDING
self._blocking = False
self._result = None
def _schedule_callbacks(self):
# 将回调函数添加到事件队列里,eventloop 稍后会运行
for callbacks in self._callbacks:
self._loop.add_ready(callbacks)
self._callbacks = []
def set_result(self, result):
self.status = self._FINISHED
self._result = result
self._schedule_callbacks() # future 完成后,执行回调函数
def add_done_callback(self, callback, *args):
# 为 future 增加回调函数
if self.done():
self._loop.call_soon(callback, *args)
else:
handle = Handle(callback, self._loop, *args)
self._callbacks.append(handle)
def done(self):
return self.status != self._PENDING
def result(self):
if self.status != self._FINISHED:
raise InvalidStateError('future is not ready')
return self._result
def __iter__(self):
if not self.done():
self._blocking = True
yield self
assert self.done(), 'future not done'
return self.result()
我们还需要用 task 来对协程本身做一层封装。task 是 future 的子类。
class Task(Future):
def __init__(self, coro, loop=None):
super().__init__(loop=loop)
self._coro = coro # 协程
self._loop.call_soon(self._step) # 启动协程
def _step(self, exc=None):
try:
if exc is None:
result = self._coro.send(None)
else:
result = self._coro.throw(exc) # 有异常,则抛出异常
except StopIteration as exc: # 说明协程已经执行完毕,为协程设置值
self.set_result(exc.value)
else:
if isinstance(result, Future):
if result._blocking:
self._blocking = False
result.add_done_callback(self._wakeup, result)
else:
self._loop.call_soon(
self._step, RuntimeError('你是不是用了 yield 才导致这个error?')
)
elif result is None:
self._loop.call_soon(self._step)
else:
self._loop.call_soon(self._step, RuntimeError('你产生了一个不合规范的值'))
def _wakeup(self, future):
try:
future.result() # 查看future 运行是否有异常
except Exception as exc:
self._step(exc)
else:
self._step()
task 的 _coro 就是协程。task只有两个方法。_step 实际上就是执行 _coro.send(None),根据协程的产出值来进行下一步。当返回了一个 future,如果是阻塞中的状态 _blocking ,就将唤醒自己作为 future 的回调函数。future 回调完毕之后,就会唤醒协程进行下一步。
如果产出一个 None,那么就无须阻塞,继续往下进行。将self._step 添加到 eventloop 的事件队列里。等待 eventloop 稍后执行。
比较有意思的是他的异常处理方式, _step() 可以接受一个异常,并将其抛出。如果yield 返回了不规范的值,并不会直接爆出异常,而是将异常作为 _step 的参数,在下一次运行的时候抛出来。
06. 写在最后
asyncio 中协程最核心的设计大概就是这样。加起来不到两百行代码。
当然,这些还远远不够。
以此为框架,我会慢慢增添定时任务的处理方式, 就像 asyncio.sleep()就是通过定时任务实现的 , 添加多线程支持(基于 threeding),添加对 socket 事件的监听(基于 selector 模块),我们还可以此基础上实现 http 的 request(get\post等) 功能,原版的 asyncio 模块都没有这个功能。这才算实现了一个简单的异步。
自己在工作中没什么机会使用 asyncio,理解可能多有偏差,有什么错谬的地方还请各位指出,有什么宝贵意见也请说明。还请尽管吐槽。
推荐阅读
喜欢文章,点个在看