查看原文
其他

python协程系列(五)——asyncio的核心概念与基本架构

草yang年华 机器学习与python集中营 2021-09-10

python进阶教程

机器学习

深度学习

长按二维码关注

进入正文


python协程系列(五)——asyncio的核心概念与基本架构





声明:本文针对的是python3.4以后的版本的,因为从3.4开始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不过语法上面有稍微的改变。比如在3.4版本中使用@asyncio.coroutine装饰器和yield from语句,但是在3.5以后的版本中使用async、await两个关键字代替,虽然语法上稍微有所差异,但是原理是一样的。本文用最通俗的语言解释了pythonasyncio背后的一些核心概念,简要解析了asyncio的设计架构,并给出了使用python进行asyncio异步编程的一般模板。本文较长:阅读全文约30min。


目录

一 一些最重要的概念

     1.1 协程(coroutine)——本质就是一个

          函数

     1.2 事件循环——event_loop

     1.3 什么是awaitable对象——即可暂停

          等待的对象

     1.4 什么是task任务

     1.5 什么是future?

二 asyncio的基本架构

     2.1 常见的一些高层API方法

     2.2 Task 类详解

     2.3 异步函数的结果获取

三 asyncio异步编程的基本模板

    3.1 python3.7之前的版本

      3.1.1 例子一:无参数、无返回值

      3.1.2 例子二:有参数、有返回值

      3.1.3 总结:四步走(针对python3.7

              之前的版本)

    3.2 python3.7版本

      3.2.1 例子一:无参数、无返回值

      3.2.2 例子二:有参数、有返回值

      3.2.3 总结:两步走(针对python3.7)

四 协程编程的优点


01

一些最重要的概念

协程(coroutine)——本质就是一个函数


所谓的“协程”就是一个函数,这个函数需要有两个基本的组成要素,第一,需要使用@asyncio.coroutine进行装饰;第二,函数体内一定要有yield from 返回的的generator,或者是说使用yield from 返回另一个协程对象。
当然,这两个条件并不是硬性规定的,如果没有这两个条件,依然是函数,只不过是普通函数而已。
怎么判断一个函数是不是协程?通过asyncio.iscoroutine(obj)和asyncio.iscoroutinefunction(func)加以判断,返回true,则是。

那么协程函数有什么作用呢?


(1)result = yield from future

作用一:返回future的结果。什么是future?后面会讲到。当协程函数执行到这一句,协程会被悬挂起来,知道future的结果被返回。如果是future被中途取消,则会触发CancelledError异常。由于task是future的子类,后面也会介绍,关于future的所有应用,都同样适用于task


(2)result = yield from coroutine
等候另一个协程函数返回结果或者是触发异常

(3)result= yield from task
返回一个task的结果

(4)return expression
作为一个函数,他本身也是可以返回某一个结果的

(5)raise exception

事件循环——event_loop


协程函数,不是像普通函数那样直接调用运行的,必须添加到事件循环中,然后由事件循环去运行,单独运行协程函数是不会有结果的,看一个简单的例子:
import time
import asyncio
async def say_after_time(delay,what):
        await asyncio.sleep(delay)
        print(what)

async def main():
        print(f"开始时间为: {time.time()}")
        await say_after_time(1,"hello")
        await say_after_time(2,"world")
        print(f"结束时间为: {time.time()}")

loop=asyncio.get_event_loop()    #创建事件循环对象
#loop=asyncio.new_event_loop()   #与上面等价,创建新的事件循环
loop.run_until_complete(main())  #通过事件循环对象运行协程函数
loop.close()

在python3.6版本中,如果我们单独像执行普通函数那样执行一个协程函数,只会返回一个coroutine对象(python3.7)如下所示:

>>> main()
<coroutine object main at 0x1053bb7c8>


(1)获取事件循环对象的几种方式:
下面几种方式可以用来获取、设置、创建事件循环对象loop
loop=asyncio.get_running_loop() 返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的loop=asyncio.get_event_loop() 获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop;
loop=asyncio.set_event_loop(loop) 设置一个事件循环为当前线程的事件循环;
loop=asyncio.new_event_loop() 创建一个新的事件循环

(2)通过事件循环运行协程函数的两种方式:

①方式一:创建事件循环对象loop,即asyncio.get_event_loop(),通过事件循环运行协程函数
②方式二:直接通过asyncio.run(function_name)运行协程函数。但是需要注意的是,首先run函数是python3.7版本新添加的,前面的版本是没有的;其次,这个run函数总是会创建一个新的事件循环并在run结束之后关闭事件循环,所以,如果在同一个线程中已经有了一个事件循环,则不能再使用这个函数了,因为同一个线程不能有两个事件循环,而且这个run函数不能同时运行两次,因为他已经创建一个了。即同一个线程中是不允许有多个事件循环loop的
asyncio.run()是python3.7 新添加的内容,也是后面推荐的运行任务的方式,因为它是高层API,后面会讲到它与asyncio.run_until_complete()的差异性,run_until_complete()是相对较低层的API。

注意:到底什么是事件循环?如何理解?

可以这样理解:线程一直在各个协程方法之间永不停歇的游走,遇到一个yield from 或者await就悬挂起来,然后又走到另外一个方法,依次进行下去,知道事件循环所有的方法执行完毕。实际上loop是BaseEventLoop的一个实例,我们可以查看定义,它到底有哪些方法可调用。


什么是awaitable对象——即可暂停等待的对象


有三类对象是可等待的,即 coroutines, Tasks, and Futures.
coroutine:本质上就是一个函数,一前面的生成器yield和yield from为基础,不再赘述;
Tasks: 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;
Future:它是一个“更底层”的概念,他代表一个一步操作的最终结果,因为一步操作一般用于耗时操作,结果不会立即得到,会在“将来”得到异步运行的结果,故而命名为Future。
三者的关系,coroutine可以自动封装成task,而Task是Future的子类

什么是task任务


如前所述,Task用来 并发调度的协程,即对协程函数的进一步包装?那为什么还需要包装呢?因为单纯的协程函数仅仅是一个函数而已,将其包装成任务,任务是可以包含各种状态的,异步编程最重要的就是对异步操作状态的把控了。

(1)创建任务(两种方法):

方法一:task = asyncio.create_task(coro())   # 这是3.7版本新添加的


方法二:task = asyncio.ensure_future(coro())

也可以使用
loop.create_future()
loop.create_task(coro)
也是可以的。

备注:关于任务的详解,会在后面的系列文章继续讲解,本文只是概括性的说明。

(2)获取某一个任务的方法:

方法一:task=asyncio.current_task(loop=None)
返回在某一个指定的loop中,当前正在运行的任务,如果没有任务正在运行,则返回None;
如果loop为None,则默认为在当前的事件循环中获取,

方法二:asyncio.all_tasks(loop=None)
返回某一个loop中还没有结束的任务

什么是future?


Future是一个较低层的可等待(awaitable)对象,他表示的是异步操作的最终结果,当一个Future对象被等待的时候,协程会一直等待,直到Future已经运算完毕。
Future是Task的父类,一般情况下,已不用去管它们两者的详细区别,也没有必要去用Future,用Task就可以了,返回 future 对象的低级函数的一个很好的例子是 loop.run_in_executor().




02

asyncio的基本架构

前面介绍了asyncio里面最为核心的几个概念,如果能够很好地理解这些概念,对于学习协程是非常有帮助的,但是按照我个人的风格,我会先说asyncio的架构,理解asyncio的设计架构有助于更好地应用和理解。

asyncio分为高层API和低层API,我们都可以使用,就像我前面在讲matplotlib的架构的时候所讲的一样,我们前面所讲的Coroutine和Tasks属于高层API,而Event Loop 和Future属于低层API。当然asyncio所涉及到的功能远不止于此,我们只看这么多。下面是是高层API和低层API的概览:

High-level APIs

    ●Coroutines and Tasks(本文要写的)
    ●Streams
    ●Synchronization Primitives
    ●Subprocesses
    ●Queues
    ●Exceptions

Low-level APIs

    ●Event Loop(下一篇要写的)
    ●Futures
    ●Transports and Protocols
    ●Policies
    ●Platform Support

所谓的高层API主要是指那些asyncio.xxx()的方法


常见的一些高层API方法


(1)运行异步协程
asyncio.run(coro, *, debug=False)  #运行一个一步程序,参见上面

(2)创建任务
task=asyncio.create_task(coro)  #python3.7  ,参见上面
task = asyncio.ensure_future(coro())

(3)睡眠
await asyncio.sleep(delay, result=None, *, loop=None)
这个函数表示的是:当前的那个任务(协程函数)睡眠多长时间,而允许其他任务执行。这是它与time.sleep()的区别,time.sleep()是当前线程休息,注意他们的区别哦。
另外如果提供了参数result,当当前任务(协程)结束的时候,它会返回;
loop参数将会在3.10中移除,这里就不再说了。

(4)并发运行多个任务
await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
它本身也是awaitable的。
*coros_or_futures是一个序列拆分操作,如果是以个协程函数,则会自动转换成Task。
当所有的任务都完成之后,返回的结果是一个列表的形式,列表中值的顺序和*coros_or_futures完成的顺序是一样的。
return_exceptions:

False,这是他的默认值,第一个出发异常的任务会立即返回,然后其他的任务继续执行;
True,对于已经发生了异常的任务,也会像成功执行了任务那样,等到所有的任务执行结束一起将错误的结果返回到最终的结果列表里面。

如果gather()本身被取消了,那么绑定在它里面的任务也就取消了。

(5)防止任务取消
await asyncio.shield(*arg, *, loop=None)
它本身也是awaitable的。顾名思义,shield为屏蔽、保护的意思,即保护一个awaitable 对象防止取消,一般情况下不推荐使用,而且在使用的过程中,最好使用try语句块更好。

try:
    res = await shield(something())
except CancelledError:
    res = None

(6)设置timeout——一定要好好理解

await asyncio.wait_for(aw, timeout, *, loop=None)

如果aw是一个协程函数,会自动包装成一个任务task。参见下面的例子:

import asyncio

async def eternity():
    print('我马上开始执行')
    await asyncio.sleep(3600)  #当前任务休眠1小时,即3600秒
    print('终于轮到我了')

async def main():
    # Wait for at most 1 second
    try:
        print('等你3秒钟哦')
        await asyncio.wait_for(eternity(), timeout=3)  #休息3秒钟了执行任务
    except asyncio.TimeoutError:
        print('超时了!')

asyncio.run(main())

'''运行结果为:
等你3秒钟哦
我马上开始执行
超时了!
'''

为什么?首先调用main()函数,作为入口函数,当输出‘等你3秒钟哦’之后,main挂起,执行eternity,然后打印‘我马上开始执行’,然后eternity挂起,而且要挂起3600秒,大于3,这时候出发TimeoutError。修改一下:‘’

import asyncio

async def eternity():
    print('我马上开始执行')
    await asyncio.sleep(2)  #当前任务休眠2秒钟,2<3
    print('终于轮到我了')

async def main():
    # Wait for at most 1 second
    try:
        print('等你3秒钟哦')
        await asyncio.wait_for(eternity(), timeout=3)  #给你3秒钟执行你的任务
    except asyncio.TimeoutError:
        print('超时了!')

asyncio.run(main())

'''运行结果为:
等你3秒钟哦
我马上开始执行
终于轮到我了
'''


总结:当异步操作需要执行的时间超过waitfor设置的timeout,就会触发异常,所以在编写程序的时候,如果要给异步操作设置timeout,一定要选择合适,如果异步操作本身的耗时较长,而你设置的timeout太短,会涉及到她还没做完,就抛出异常了。

(7)多个协程函数时候的等候

await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
与上面的区别是,第一个参数aws是一个集合,要写成集合set的形式,比如:
{func(),func(),func3()}
表示的是一系列的协程函数或者是任务,其中协程会自动包装成任务。事实上,写成列表的形式也是可以的。

注意:该函数的返回值是两个Tasks/Futures的集合:
(done, pending)
其中done是一个集合,表示已经完成的任务tasks;pending也是一个集合,表示还没有完成的任务。
常见的使用方法为:done, pending = await asyncio.wait(aws)

参数解释:
timeout (a float or int), 同上面的含义一样,需要注意的是,这个不会触发asyncio.TimeoutError异常,如果到了timeout还有任务没有执行完,那些没有执行完的tasks和futures会被返回到第二个集合pending里面。
return_when参数,顾名思义,他表示的是,什么时候wait函数该返回值。只能够去下面的几个值。

ConstantDescription
FIRST_COMPLETEDfirst_completes.当任何一个task或者是future完成或者是取消,wait函数就返回
FIRST_EXCEPTION当任何一个task或者是future触发了某一个异常,就返回,.如果是所有的task和future都没有触发异常,则等价与下面的 ALL_COMPLETED.
ALL_COMPLETED当所有的task或者是future都完成或者是都取消的时候,再返回。

如下面例子所示:

import asyncio
import time

a=time.time()

async def hello1():  #大约2秒
    print("Hello world 01 begin")
    yield from asyncio.sleep(2)
    print("Hello again 01 end")

async def hello2():  #大约3秒
    print("Hello world 02 begin")
    yield from asyncio.sleep(3)
    print("Hello again 02 end")

async def hello3():  #大约4秒
    print("Hello world 03 begin")
    yield from asyncio.sleep(4)
    print("Hello again 03 end")

async def main():   #入口函数
    done,pending=await asyncio.wait({hello1(),hello2(),hello3()},return_when=asyncio.FIRST_COMPLETED)
    for i in done:
        print(i)
    for j in pending:
        print(j)

asyncio.run(main()) #运行入口函数

b=time.time()
print('---------------------------------------')
print(b-a)

'''运行结果为:
Hello world 02 begin
Hello world 01 begin
Hello world 03 begin
Hello again 01 end
<Task finished coro=<hello1() done, defined at e:\Python学习\基础入门\asyncio3.4详解\test11.py:46> result=None>
<Task pending coro=<hello3() running at e:\Python学习\基础入门\asyncio3.4详解\test11.py:61> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394438>()]>>
<Task pending coro=<hello2() running at e:\Python学习\基础入门\asyncio3.4详解\test11.py:55> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394378>()]>>
---------------------------------------
2.033155679702759
'''

从上面可以看出,hello1()试运行结束了的,hello2()和hello3()还没结束。
(8)asyncio.as_completed()函数

这个函数我没有找到合适的中文名称去描述,所以哪位大神如果知道,望告知,不胜感激!它的函数原型如下:
asyncio.as_completed(aws, *, loop=None, timeout=None)
第一个参数aws:同上面一样,是一个集合{}集合里面的元素是coroutine、task或者future
第三个参数timeout:意义和上面讲的的一样
那到底什么作用呢?其实很简单,个人感觉有点鸡肋,从一个例子看起:

import asyncio
import time
import threading

a=time.time()

@asyncio.coroutine
def hello1():
    print("Hello world 01 begin")
    yield from asyncio.sleep(5)  #大约5秒
    print("Hello again 01 end")
    return '哈哈1'

@asyncio.coroutine
def hello2():
    print("Hello world 02 begin")
    yield from asyncio.sleep(3#大约3秒
    print("Hello again 02 end")
    return '哈哈2'

@asyncio.coroutine
def hello3():
    print("Hello world 03 begin")
    yield from asyncio.sleep(4#大约4秒
    print("Hello again 03 end")
    return '哈哈3'

async def main():
    s=asyncio.as_completed({hello1(),hello2(),hello3()})
    for f in s:
        result=await f
        print(result)

asyncio.run(main())

b=time.time()
print('---------------------------------------')
print(b-a)

'''运行结果为:
Hello world 03 begin
Hello world 01 begin
Hello world 02 begin
Hello again 01 end
哈哈1
Hello again 02 end
哈哈2
Hello again 03 end
哈哈3
---------------------------------------
4.0225794315338135
'''

结论:asyncio.as_completed()函数返回的是一个可迭代(iterator)的对象,对象的每个元素就是一个future对象,很多小伙伴说,这不是相当于没变吗?其实返回的future集合是对参数的future集合重新组合,组合的顺序就是,最先执行完的协程函数(coroutine、task、future)最先返回,从上面的代码可知,参数为
aws={hello1(),hello2(),hello3()},因为hello1大约花费5秒、hello2大约花费3秒、hello3大约花费4秒。返回的结果为
s={hello2()、hello3()、hello(1)},因为hello2时间最短,故而放在前面,hello1时间最长,故而放在最后面。然后对返回的集合s开始迭代。


Task 类详解


先来看一下Task类的简单介绍(英文原文文档)。

上面的文字描述中推出了几个非常重要的信息,特在此总结如下

(1)他是作为一个python协程对象,和Future对象很像的这么一个对象,但不是线程安全的;他继承了Future所有的API,,除了Future.set_result()和Future.set_Exception();

(2)使用高层API  asyncio.ccreate_task()创建任务,或者是使用低层API loop.create_task()或者是loop.ensure_future()创建任务对象;

(3)相比于协程函数,任务时有状态的,可以使用Task.cancel()进行取消,这会触发CancelledError异常,使用cancelled()检查是否取消。

下面介绍Task类常见的一些使用函数

(1)cancel()
Request the Task to be cancelled.

其实前面已经有所介绍,最好是使用他会出发CancelledError异常,所以需要取消的协程函数里面的代码最好在try-except语句块中进行,这样方便触发异常,打印相关信息,但是Task.cancel()没有办法保证任务一定会取消,而Future.cancel()是可以保证任务一定取消的。可以参见下面的一个例子:

import asyncio

async def cancel_me():
    print('cancel_me(): before sleep')
    try:
        await asyncio.sleep(3600#模拟一个耗时任务
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    #通过协程创建一个任务,需要注意的是,在创建任务的时候,就会跳入到异步开始执行
    #因为是3.7版本,创建一个任务就相当于是运行了异步函数cancel_me
    task = asyncio.create_task(cancel_me()) 
    #等待一秒钟
    await asyncio.sleep(1)
    print('main函数休息完了')
    #发出取消任务的请求
    task.cancel()  
    try:
        await task  #因为任务被取消,触发了异常
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

'''运行结果为:
cancel_me(): before sleep
main函数休息完了
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now
'''

运行过程分析:

首先run函数启动主函数入口main,在main中,因为第一句话就是调用异步函数cancel_me()函数,所以先打印出了第一句话;
然后进入cancel_me中的try语句,遇到await,暂停,这时候返回main中执行,但是有在main中遇到了await,也会暂停,但是由于main中只需要暂停1秒,而cancel_me中要暂停3600秒,所以等到main的暂停结束后,接着运行main,所以打印出第二句话;
接下来遇到取消任务的请求task.cancel(),然后继续执行main里面的try,又遇到了await,接着main进入暂停,接下来进入到cancel_me函数中,但是由于main中请求了取消任务,所以那个耗时3600秒的任务就不再执行了,直接触发了Cancelled_Error异常,打印出第三句话,接下来又raise一个异常信息;
接下来执行cancel_me的finally,打印出第四句话,此时cancel_me执行完毕,由于他抛出了一个异常,返回到主程序main中,触发异常,打印出第五句话。

(2)done()
当一个被包装得协程既没有触发异常、也没有被取消的时候,意味着它是done的,返回true。

(3)result()
返回任务的执行结果,
当任务被正常执行完毕,则返回结果;
当任务被取消了,调用这个方法,会触发CancelledError异常;
当任务返回的结果是无用的时候,则调用这个方法会触发InvalidStateError;
当任务出发了一个异常而中断,调用这个方法还会再次触发这个使程序中断的异常。

(4)exception()

返回任务的异常信息,触发了什么异常,就返回什么异常,如果任务是正常执行的无异常,则返回None;
当任务被取消了,调用这个方法会触发CancelledError异常;
当任务没有做完,调用这个方法会触发InvalidStateError异常。
下面还有一些不常用的方法,如下:

(5)add_done_callback(callback, *, context=None)
(6)remove_done_callback(callback)
(7)get_stack(*, limit=None)
(8)print_stack(*, limit=None, file=None)
(9)all_tasks(loop=None),这是一个类方法
(10)current_task(loop=None),这是一个类方法


异步函数的结果获取


对于异步编程、异步函数而言,最重要的就是异步函数调用结束之后,获取异步函数的返回值,我们可以有以下几种方式来获取函数的返回值,第一是直接通过Task.result()来获取;第二种是绑定一个回调函数来获取,即函数执行完毕后调用一个函数来获取异步函数的返回值。

(1)直接通过result来获取

import asyncio
import time


async def hello1(a,b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模拟耗时任务3秒
    print("Hello again 01 end")
    return a+b

coroutine=hello1(10,5)
loop = asyncio.get_event_loop()                #第一步:创建事件循环
task=asyncio.ensure_future(coroutine)         #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(task)                  #第三步:通过事件循环运行
print('-------------------------------------')
print(task.result())
loop.close() 

'''运行结果为
Hello world 01 begin
Hello again 01 end
-------------------------------------
15
'''

(2)通过定义回调函数来获取

import asyncio
import time


async def hello1(a,b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模拟耗时任务3秒
    print("Hello again 01 end")
    return a+b

def callback(future):   #定义的回调函数
    print(future.result())

loop = asyncio.get_event_loop()                #第一步:创建事件循环
task=asyncio.ensure_future(hello1(10,5))       #第二步:将多个协程函数包装成任务
task.add_done_callback(callback)                      #并被任务绑定一个回调函数

loop.run_until_complete(task)                  #第三步:通过事件循环运行
loop.close()                                   #第四步:关闭事件循环


'''运行结果为:
Hello world 01 begin
Hello again 01 end
15
'''

注意:所谓的回调函数,就是指协程函数coroutine执行结束时候会调用回调函数。并通过参数future获取协程执行的结果。我们创建的task和回调里的future对象,实际上是同一个对象,因为task是future的子类。




03

asyncio异步编程的基本模板

事实上,在使用asyncio进行异步编程的时候,语法形式往往是多样性的,虽然理解异步编程的核心思想很重要,但是实现的时候终究还是要编写语句的,本次给出的模板,是两个不同的例子,例子一是三个异步方法,它们都没有参数,没有返回值,都模拟一个耗时任务;例子二是三个异步方法,都有参数,都有返回值。


python3.7之前的版本


例子一:无参数、无返回值
import asyncio
import time

a=time.time()

async def hello1():
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模拟耗时任务3秒
    print("Hello again 01 end")

async def hello2():
    print("Hello world 02 begin")
    await asyncio.sleep(2)   #模拟耗时任务2秒
    print("Hello again 02 end")

async def hello3():
    print("Hello world 03 begin")
    await asyncio.sleep(4)   #模拟耗时任务4秒
    print("Hello again 03 end")

loop = asyncio.get_event_loop()                #第一步:创建事件循环
tasks = [hello1(), hello2(),hello3()]          #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(asyncio.wait(tasks))   #第三步:通过事件循环运行
loop.close()                                   #第四步:取消事件循环

'''运行结果为:
Hello world 02 begin
Hello world 03 begin
Hello world 01 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
'''


例子二:有参数、有返回值

import asyncio
import time


async def hello1(a,b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模拟耗时任务3秒
    print("Hello again 01 end")
    return a+b

async def hello2(a,b):
    print("Hello world 02 begin")
    await asyncio.sleep(2)   #模拟耗时任务2秒
    print("Hello again 02 end")
    return a-b

async def hello3(a,b):
    print("Hello world 03 begin")
    await asyncio.sleep(4)   #模拟耗时任务4秒
    print("Hello again 03 end")
    return a*b

loop = asyncio.get_event_loop()                #第一步:创建事件循环
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
tasks = [task1,task2,task3]                    #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(asyncio.wait(tasks))   #第三步:通过事件循环运行
print(task1.result())                               #并且在所有的任务完成之后,获取异步函数的返回值   
print(task2.result())
print(task3.result())
loop.close()                                   #第四步:关闭事件循环

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''



总结:四步走(针对python3.7之前的版本)

第一步·:构造事件循环

loop=asyncio.get_running_loop() #返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的

loop=asyncio.get_event_loop() #获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop;

loop=asyncio.set_event_loop(loop) #设置一个事件循环为当前线程的事件循环;

loop=asyncio.new_event_loop()  #创建一个新的事件循环

第二步:将一个或者是多个协程函数包装成任务Task

#高层API
task = asyncio.create_task(coro(参数列表))   # 这是3.7版本新添加的
task = asyncio.ensure_future(coro(参数列表)) 

#低层API
loop.create_future(coro)
loop.create_task(coro)

'''需要注意的是,在使用Task.result()获取协程函数结果的时候,使用asyncio.create_task()却会显示错
但是使用asyncio.ensure_future却正确,本人暂时不知道原因,哪位大神知道,望告知,不胜感激!'''

第三步:通过事件循环运行

loop.run_until_complete(asyncio.wait(tasks))  #通过asyncio.wait()整合多个task

loop.run_until_complete(asyncio.gather(tasks))  #通过asyncio.gather()整合多个task

loop.run_until_complete(task_1)  #单个任务则不需要整合

loop.run_forever()  #但是这个方法在新版本已经取消,不再推荐使用,因为使用起来不简洁

'''
使用gather或者wait可以同时注册多个任务,实现并发,但他们的设计是完全不一样的,在前面的2.1.(4)中已经讨论过了,主要区别如下:
(1)参数形式不一样
gather的参数为 *coroutines_or_futures,即如这种形式
      tasks = asyncio.gather(*[task1,task2,task3])或者
      tasks = asyncio.gather(task1,task2,task3)
      loop.run_until_complete(tasks)
wait的参数为列表或者集合的形式,如下
      tasks = asyncio.wait([task1,task2,task3])
      loop.run_until_complete(tasks)
(2)返回的值不一样
gather的定义如下,gather返回的是每一个任务运行的结果,
      results = await asyncio.gather(*tasks) 
wait的定义如下,返回dones是已经完成的任务,pending是未完成的任务,都是集合类型
 done, pending = yield from asyncio.wait(fs)
(3)后面还会讲到他们的进一步使用
'''

简单来说:async.wait会返回两个值:done和pending,done为已完成的协程Task,pending为超时未完成的协程Task,需通过future.result调用Task的result。而async.gather返回的是已完成Task的result。

第四步:关闭事件循环

loop.close()

'''
以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢?
简单来说,loop 只要不关闭,就还可以再运行:
loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()
但是如果关闭了,就不能再运行了:
loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3))  # 此处异常
建议调用 loop.close,以彻底清理 loop 对象防止误用
'''



python3.7版本


在最新的python3.7版本中,asyncio又引进了一些新的特性和API


例子一:无参数、无返回值
import asyncio
import time


async def hello1():
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模拟耗时任务3秒
    print("Hello again 01 end")

async def hello2():
    print("Hello world 02 begin")
    await asyncio.sleep(2)   #模拟耗时任务2秒
    print("Hello again 02 end")

async def hello3():
    print("Hello world 03 begin")
    await asyncio.sleep(4)   #模拟耗时任务4秒
    print("Hello again 03 end")

async def main():
    results=await asyncio.gather(hello1(),hello2(),hello3())
    for result in results:
        print(result)     #因为没返回值,故而返回None

asyncio.run(main())

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
None
None
None
'''


例子二:有参数、有返回值

import asyncio
import time


async def hello1(a,b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模拟耗时任务3秒
    print("Hello again 01 end")
    return a+b

async def hello2(a,b):
    print("Hello world 02 begin")
    await asyncio.sleep(2)   #模拟耗时任务2秒
    print("Hello again 02 end")
    return a-b

async def hello3(a,b):
    print("Hello world 03 begin")
    await asyncio.sleep(4)   #模拟耗时任务4秒
    print("Hello again 03 end")
    return a*b

async def main():
    results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))
    for result in results:
        print(result)

asyncio.run(main())

'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''


总结:两步走(针对python3.7)

第一步:构建一个入口函数main
他也是一个异步协程函数,即通过async定义,并且要在main函数里面await一个或者是多个协程,同前面一样,我可以通过gather或者是wait进行组合,对于有返回值的协程函数,一般就在main里面进行结果的获取。

第二步:启动主函数main
这是python3.7新添加的函数,就一句话,即
asyncio.run(main())

注意:
不再需要显式的创建事件循环,因为在启动run函数的时候,就会自动创建一个新的事件循环。而且在main中也不需要通过事件循环去掉用被包装的协程函数,只需要向普通函数那样调用即可 ,只不过使用了await关键字而已。



04

协程编程的优点


1、无cpu分时切换线程保存上下文问题(协程上下文怎么保存)

2、遇到io阻塞切换(怎么实现的)

3、无需共享数据的保护锁(为什么)

4、系列文章下篇预告——介绍低层的API,事件循环到底是怎么实现的以及future类的实现。



推 荐 阅 读

python标准库系列教程(五)——unittest单元测试(上篇)

python标准库系列教程(五)——unittest单元测试(中篇)

python标准库系列教程(五)——unittest单元测试(下篇)

python标准库系列教程(四)——collections库详细教程
python标准库系列教程(三)——operator库详细教程

python标准库系列教程(二)——functools&nbsp;(上篇)

python标准库系列教程(一)——itertools

python协程系列(四)——详解『同步|异步』『并发|并行』『线程|进程』

python协程系列(三)——yield&nbsp;from详解

python协程系列(二)——协程的通俗理解及yield关键字实现协程

Python协程系列(一)——从generator和yield表达式说起




赶紧关注我们吧

您的点赞和分享是我们进步的动力!

↘↘↘

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存