Python 10_Python并发编程之协程

一、协程概述

1.1 协程的概念

协程(Coroutines) 是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。当协程调度切换时,将寄存器上下文和栈保存到其它地方,在切回来的时就恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。正如一个进程可以拥有多个线程一样,一个线程也可以拥有多个协程,协程程序是在线程里面跑的,因此协程又称微线程和纤程等。最重要的是,协程不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行)。这样带来的好处就是性能得到了很大的提升,不会像线程切换那样消耗资源。由于协程是用户调度的,所以不会出现执行一半的代码片段被强制中断了,因此无需原子操作锁。

Tips:协程没有线程的上下文切换消耗,协程的调度切换是用户(程序员)手动切换的,因此更加灵活,因此又叫用户空间线程。

协程并不是并行,而是类似于并发,即使在多核CPU上。

协程就是一种用户态内的上下文切换技术,又称微线程,纤程,一种用户态的轻量级线程。所谓协程,就是同时开启多个任务,但一次只顺序执行一个。等到所执行的任务遭遇阻塞,就切换到下一个任务继续执行,以期节省下阻塞所占用的时间。对协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因为在任意时刻只有一个协程运行。

可以简单的理解为:一个可以暂停的函数,并且可以向暂停的地方传入值。进程和线程的切换都是由操作系统控制,切换过程都是 用户态 ——> 内核态 ——> 用户态。而协程则完全由程序控制,切换过程就在用户态中完成,它也不需要锁,所以协程的开销远远小于线程。

为什么会有协程的出现?

  • 采用常规的同步编程去实现异步效果难度极高,因为函数无法中断,如果一个函数中有阻塞的方法,那么这个函数所在的任务就会全部阻塞。
  • 采用多线程的方式,线程相对于进程来说已经轻量化了,但它仍然是由操作系统来创建和切换,因为同步锁和线程状态的频繁切换,导致消耗大并发也上不去。
  • 所以总的来说,函数写法简单,但是满足不了多任务的并发。线程消耗大,并发上不去。所以这个时候就有人提出一种概念:有没有一种方法可以兼顾这两者的优点,又可以解决两者的缺点呢?既能用同步的方式编写异步代码,又不需要多线程那样那么大的开销,那么这个东西就是协程了。

进程和线程是抢占式的调度,而协程是协同式的调度,也就是说,协程需要自己做调度。协程看上去也是子程序,但执行过程中,在子程序内部可中断(不是函数调用,有点类似CPU的中断,实际上是程序员控制的中断),然后转而执行别的子程序,在适当的时候再返回来接着执行。

1.2 协程的实现方式简介

传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。

如果改用协程,生产者生产消息后,通过协程直接通过跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。

Python中的协程有多种实现方式,包括但不限于生成器、greenlet、asyncio库等。Python中的协程经历了很长的一段发展历程。最初的生成器yield和send()语法,然后在Python3.4中加入了asyncio模块,引入@asyncio.coroutine装饰器和yield from语法,在Python 3.5之后,Python添加了新的语法 async 和 await ,使得协程的编写更加简单。

实现方式:

  • greenlet:一个第三方模块,需要提前安装 pip3 install greenlet才能使用;
  • yield生成器:借助生成器的特点亦可以实现协程代码;
  • asyncio:在python3.4 种引入的模块,用于编写协程代码;

说明:主要通过装饰器 @asyncio.coroutine 来实现协程函数定义;Python3.8之后 @asyncio.coroutine 装饰器会被移除,推荐使用async & awit 关键字实现协程代码。

  • async & awiat:在python3.5中引入的两个关键字,结合asyncio模块使用;

二、Python协程的实现

2.1 yield/send 实现的协程

在Python中,可以通过 yield 实现协程。因为当一个函数中有 yield 存在的时候,这个函数是生成器,那么当调用这个函数的时候,在函数体中写的代码并没有被执行,而是只返回了一个生成器对象,这个需要特别注意。然后,代码将会在每次使用这个生成器的时候被执行。

yield表达式有两个关键作用:1. 返回一个值;2. 接收调用者的参数。“调用者”与“被调用者”之间的通信是通过send()进行联系的,正是因为yield实现的生成器具备“中断等待的功能”,才使得yield可以实现协程。

Python中利用协程实现生产者-消费者模式: 代码中创建了一个叫做consumer的协程,并且在主线程中生产数据,协程中消费数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def consume():
    while True:
        number = yield      # consumer 协程等待接收数据
        print (f"Start consume {number}")

consumer = consume()

next(consumer)  # 让初始化状态的 consumer 协程开始运行,在 yield 处等待数据

for num in range(0,100):
    print(f"Start product {num}")
    consumer.send(num)  # 生产者协程发送数据 consumer 协程

其中 yield 是python当中的语法,当一个函数中包含 yield 语句时,python会自动将其识别为一个生成器。这时 consume() 调用并不会真正调用函数体执行,而是以函数体生成了一个生成器对象实例 consumer。主进程中调用 next 开始执行consume 函数,遇到 yield 就暂停执行,yield在这里可以保留 consume 函数的计算现场,暂停 consume 函数的计算,直至主进程中调用consumer.send(num)方法发送数据,此时 consumer 协程才会接到数据继续执行。然后下一次遇到 yield 时,重复停止等待。

当协程执行到yield关键字时,会暂停在那一行,等到主线程调用send方法发送了数据,协程才会接到数据继续执行。 但是,yield让协程暂停,和线程的阻塞是有本质区别的。协程的暂停完全由程序控制,线程的阻塞状态是由操作系统内核来进行切换。因此,协程的开销远远小于线程的开销。

yield/send 实现的协程发展过程:

  • 在 Python2.2 中,第一次引入了生成器,生成器实现了一种惰性、多次取值的方法,此时还是通过 next 构造生成迭代链或 next 进行多次取值。
  • 直到在 Python2.5 中,yield 关键字被加入到语法中,这时,生成器有了记忆功能,下一次从生成器中取值可以恢复到生成器上次 yield 执行的位置。
  • 之前的生成器都是关于如何构造迭代器,在 Python2.5 中生成器还加入了 send 方法,与 yield 搭配使用。此时,生成器不仅仅可以 yield 暂停到一个状态,还可以往它停止的位置通过 send 方法传入一个值改变其状态。最初的yield只能返回并暂停函数,并不能实现协程的功能。后来,Python为它定义了新的功能——接收外部发来的值( send 方法),这样一个生成器就变成了协程

另外一个示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def consumer():
    r = ''
    while True:
        n = yield r  # 执行的中断点
        if not n:
            return
        print('[消费者] 正在消费:{0}'.format(n))
        r = '200 人民币'
 
def produce(c):
    c.send(None)  # 启动消费者(生成器)——实际上是函数调用,只不过生成器不是直接象函数那般调用的
    n = 0
    while n < 3:
        n = n + 1
        print('[生产者] 正在生产:{0}'.format(n))
        r = c.send(n) #给消费者传入值——实际上也是函数调用
        print('[生产者] 消费者返回:{0}'.format(r))
        print('-------------------------------------------------')
    c.close()

if __name__ == '__main__':
    c = consumer()#构造一个生成器
    produce(c)
 
'''运行结果为:
[生产者] 正在生产:1
[消费者] 正在消费:1
[生产者] 消费者返回:200 人民币
-------------------------------------------------
[生产者] 正在生产:2
[消费者] 正在消费:2
[生产者] 消费者返回:200 人民币
-------------------------------------------------
[生产者] 正在生产:3
[消费者] 正在消费:3
[生产者] 消费者返回:200 人民币
-------------------------------------------------
'''

示例分析:

第一步:在produce(c)函数中,调用了c.send(None)启动了生成器,这相当于是调用consumer(),但是如果consumer是一个普通函数而不是生成器,就要等到consumer执行完了,主动权才会重新回到producer手里。但就是因为consumer是生成器,所以第一次遇到yield暂停;接着执行produce()中接下来的代码,从运行结果看,确实打印出了[生产者] 正在生产 1 ,当程序运行至c.send(n)时,再次调用生成器并且通过yield传递了参数(n = 1),这个时候,进入consumer()函数先前在yield停下的地方,继续向后执行,所以打印出[消费者] 正在消费 1。

第二步:[消费者] 正在消费 1 这句话被打印出来之后,接下consumer()函数中此时 r 被赋值为’200 人民币’,接着consumer()函数里面的第一次循环结束,进入第二次循环,又遇到yield, 所以consumer()函数又暂停并且返回变量 r 的值,consumer()函数暂停,此时程序又进入produce(c)函数中接着执行。

第三步:由于先前produce(c)函数接着第一次循环中c.send(n)处相当于是调用消费者consumer(),跳入到了consumer()里面去执行,现在consumer暂停,producer重新我有主动权,故而继续往下执行打印出[生产者] 消费者返回: 200 人民币,然后producer的第一次循环结束,并进行第二次循环,打印出[生产者] 正在生产 1,然后,又调用c.send(n) 又调用消费者consumer,将控制权交给consumer,如此循环回到第一步!

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import time
 
#定义一个消费者,它有名字name
#因为里面有yield,本质上是一个生成器
def consumer(name): 
    print(f'{name}  准备吃包子啦!呼吁店小二')
    while True:
        baozi=yield  #接收send传的值,并将值赋值给变量baozi
        print(f'包子 {baozi+1} 来了,被 {name} 吃了!')
 
#定义一个生产者,生产包子的店家,店家有一个名字name,并且有两个顾客c1 c2
def producer(name,c1,c2):
    next(c1)  #启动生成器c1
    next(c2)  #启动生成器c2
    print(f'{name} 开始准备做包子啦!')
    for i in range(3):
        time.sleep(1)
        print(f'做了第{i+1}包子,分成两半,你们一人一半')
        c1.send(i)
        c2.send(i)
        print('------------------------------------')
 
c1=consumer('张三') #把函数变成一个生成器
c2=consumer('李四')
producer('店小二',c1,c2)
 
'''运行结果为:
张三  准备吃包子啦!呼吁店小二
李四  准备吃包子啦!呼吁店小二
店小二 开始准备做包子啦!
做了第1包子,分成两半,你们一人一半
包子 1 来了,被 张三 吃了!
包子 1 来了,被 李四 吃了!
------------------------------------
做了第2包子,分成两半,你们一人一半
包子 2 来了,被 张三 吃了!
包子 2 来了,被 李四 吃了!
------------------------------------
做了第3包子,分成两半,你们一人一半
包子 3 来了,被 张三 吃了!
包子 3 来了,被 李四 吃了!
------------------------------------
'''

示例分析:

第一步:producer 中依次启动生成器 c1,c2,c1先运行,打印出 “张三 准备吃包子啦!呼吁店小二” 继续运行到第一个循环的 yield 处暂停,然后c2运行,“李四 准备吃包子啦!呼吁店小二”, 也运行到第一个 yield 处暂停。

第二步:现在相当于两个顾客等着吃包子,控制权交给店小二生产包子,于是打印出 “店小二 开始准备做包子啦!",并且进入producer的第一个循环,花了1秒钟,生产第一个包子,然后将其一分为二,打印出:“做了第1包子,分成两半,你们一人一半”。

第三步:此时producer店小二调用send()函数,相当于将包子给两位客人,这个时候先执行c1.send(),即先把包子给c1,然后c1获得了控制权,打印出 “包子 1 来了,被 张三 吃了!” 然后 c1 吃完进入第二次循环遇见了yield,又暂停。控制权重新回到producer手上,再执行c2.send(),将包子给c2,c2掌握控制权,于是打印出 “包子 1 来了,被 李四 吃了!” 它在进入第二次循环,遇到yield,然后又暂停了,控制权重新回到producer店小二手中,店小二打印出一段虚线,然后进入第二次循环,重新花了1秒钟,又做了一个包子,一次这样下去。

yield实现协程的不足之处:

  • 协程函数的返回值不是特别方便获取,只能够通过出发StopIteration异常,然后通过该异常的value属性获取;
  • Python的生成器是协程coroutine的一种形式,但它的局限性在于只能向它的直接调用者每次yield一个值。这意味着那些包含yield的代码不能想其它代码那样被分离出来放到一个单独的函数中。这也正是yield from要解决的。

2.2 yield from 实现的协程

在 Python3.3 中,生成器又引入了 yield from 关键字,它的主要作用有两个:拼接可迭代对象和实现生成器的嵌套。yield from 实现了在生成器内调用另外生成器的功能,可以轻易的重构生成器,比如将多个生成器连接在一起执行。yield from的作用还体现可以像一个管道一样将send信息传递给内层协程,并且处理好了各种异常情况。

简单地说,yield from iterable。实际上就是返回另外一个生成器。而yield只是返回一个元素。从这个层面来说,有下面的等价关系:yield from iterable 本质上等于 for item in iterable: yield item

yield from iterable 是针对 yield 的不足来加以改进的。

  • 针对yield无法获取生成器return的返回值

在使用 yield生成器 的时候,如果使用 for 语句去迭代生成器,则不会显式的发出 StopIteration 异常,而是自动捕获 StopIteration 异常,所以如果遇到return,只是会终止迭代,而不会触发异常,故而也就没办法获取 return 的值。如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def my_generator():
    for i in range(5):
        if i==2:
            return '我被迫中断了'
        else:
            yield i
 
def main(generator):
    try:
        for i in generator:  #不会显式触发异常,故而无法获取到return的值
            print(i)
    except StopIteration as exc:
        print(exc.value)
 
g=my_generator()  #调用
main(g)
'''运行结果为:
0
1
'''

从上面的例子可以看出,for迭代语句不会显式触发异常,故而无法获取到return的值,迭代到2的时候遇到return语句,隐式的触发了StopIteration异常,就终止迭代了,但是在程序中不会显示出来。

但是如果我是使用 next(g)一次一次迭代,则会显式触发异常,但要获取return的返回值,需要如下操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def my_generator():
    for i in range(5):
        if i==2:
            return '我被迫中断了'
        else:
            yield i
 
def main(generator):
    try:
        print(next(generator))   #每次迭代一个值,则会显式出发StopIteration
        print(next(generator))
        print(next(generator))
        print(next(generator))
        print(next(generator))
    except StopIteration as exc:
        print(exc.value)     #获取返回的值
 
g=my_generator()
main(g)
'''运行结果为:
0
1
我被迫中断了
'''

使用yield from来完成上面的同样的功能:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def my_generator():
    for i in range(5):
        if i==2:
            return '我被迫中断了'
        else:
            yield i
 
def wrap_my_generator(generator):  #定义一个包装“生成器”的生成器,它的本质还是生成器
    result=yield from generator    #自动触发StopIteration异常,并且将return的返回值赋值给yield from表达式的结果,即result
    print(result)
 
def main(generator):
    for j in generator:
        print(j)
 
g=my_generator()
wrap_g=wrap_my_generator(g)
main(wrap_g)  #调用
'''运行结果为:
0
1
我被迫中断了
'''

从上面的比较可以看出,yield from具有以下几个特点:

  • 上面的my_generator是原始的生成器,main是调用方,使用yield的时候,只涉及到这两个函数,即“调用方”与“生成器(协程函数)”是直接进行交互的,不涉及其它方法,即“调用方——>生成器函数(协程函数)”;

  • 在使用yield from的时候,多了一个对原始my_generator的包装函数,然后调用方是通过这个包装函数(后面会讲到它专有的名词)来与生成器进行交互的,即“调用方——>生成器包装函数——>生成器函数(协程函数)”;

  • yield from iteration结构会在内部自动捕获 iteration生成器的StopIteration 异常。这种处理方式与 for 循环处理 StopIteration 异常的方式一样。而且对 yield from 结构来说,解释器不仅会捕获 StopIteration 异常,还会把return返回的值或者是StopIteration的value 属性的值变成 yield from 表达式的值,即上面的result。

  • yield from所实现的数据传输通道

前面总结的几个特点里面已经介绍了yield和yield from的数据交互方式,yield涉及到“调用方与生成器两者”的交互,生成器通过next()的调用将值返回给调用者,而调用者通过send()方法向生成器发送数据;

但是 yield from 还有一个第三者函数,下面将先从相关的概念说起

委派生成器:包含 yield from 表达式的生成器函数;即上面的wrap_my_generator生成器函数

子生成器:从 yield from 表达式中 部分获取的生成器;即上面的my_generator生成器函数

调用方:调用委派生成器的客户端代码;即上面的main生成器函数

委派生成器在 yield from 表达式处暂停时,调用方可以直接把数据发给子生成器,子生成器再把产出的值发给调用方。子生成器返回之后,解释器会抛出StopIteration 异常,并把返回值附加到异常对象上,此时委派生成器会恢复。

yield from 主要设计用来向子生成器委派操作任务,但yield from可以向任意的可迭代对象委派操作;

**委派生成器(group)**相当于管道,所以可以把任意数量的委派生成器连接在一起—一个委派生成器使用yield from 调用一个子生成器,而那个子生成器本身也是委派生成器,使用yield from调用另。

  • 拼接可迭代对象 yield from 是yield的升级改进版本,如果将yield理解成“返回”,那么yield from就是“从什么(生成器)里面返回”,这就构成了yield from的一般语法,即:yield from iterable

表达式对 iterable 对象所做的第一件事就是,调用 iter(iterable),从中获取迭代器,所以 yield from 后面可以跟的可以是 生成器 、元组、 列表、range()函数产生的序列等可迭代对象

来看一个例子,python 中有一个内置函数 itertools.chain(),它可以将多个可迭代对象组合在一起,并生成一个可迭代对象输出,我们可以自己来实现一个功能类似的函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# 使用 yield 实现
my_list = [7, 3, 5]
my_dict = {'name': 'jaye', 'age': 18}

def my_chain(*args, **kwargs):
    for my_iterable in args:
        for value in my_iterable:
            yield value

res = my_chain(my_list, my_dict, range(9,13))
print(list(res))
# [7, 3, 5, 'name', 'age', 9, 10, 11, 12]

# 使用 yield from 实现
def my_chain1(*args, **kwargs):
    for my_iterable in args:
        yield from my_iterable

res1 = my_chain1(my_list, my_dict, range(9,13))
print(list(res1))
# [7, 3, 5, 'name', 'age', 9, 10, 11, 12]
  • 打开双向通道,实现生成器的嵌套

yield from 还可以实现生成器的嵌套,它能在调用方与子生成器之间建立一个双向通道,把最外层的调用方与最内层的子生成器连接起来,实现消息互通,也就是调用方可以通过 send() 直接发送消息给子生成器,而子生成器 yield 的值,直接返回给调用方。这里首先需要了解三个概念:

委托生成器:包含 yield from 表达式的生成器函数。委托生成器只起一个桥梁作用,它没有办法也没有权力对子生成器 yield 出来的内容做拦截。

调用方:调用委派生成器的一方。

子生成器:yield from 后面跟的生成器函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
final_result = {}

# 子生成器
def sales_sum(pro_name):
    total = 0
    nums = []
    while True:
        x = yield
        print(f"{pro_name} 总分: {x}")
        if not x:
            break
        total += x
        nums.append(x)
    return total, nums

# 委托生成器
def middle(key):
    while True:
        final_result[key] = yield from sales_sum(key)
        print(f"{key} 成绩统计完成!")


# 调用方
def main():
    data_sets = {
        "小胡": [28, 33, 27, 22],
        "小进": [65, 72, 68, 78],
        "小洋": [97, 98, 98, 97]
    }
    for key, data_set in data_sets.items():
        print(f"开始统计 {key} 成绩:")
        m = middle(key)
        m.send(None) # 预激middle协程
        for value in data_set:
            m.send(value)   # 给协程传递每一组的值
        m.send(None)
    print("final_result:", final_result)

if __name__ == '__main__':
    main()

当然实现生成器嵌套,并不一定要使用yield from,但是 yield from 帮我们做了很多异常处理的情况,如果自己去处理的话难度极高,写出来的代码可读性也差。

  1. 子生成器生产的值,都是直接传给调用方的;调用方通过.send()发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的__next__()方法,如果不是 None,会调用子生成器的.send()方法;
  2. 子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
  3. ield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
  4. 如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其它的异常会向上 “冒泡”;
  5. 传入委托生成器的异常里,除了GeneratorExit之外,其它的所有异常全部传递给子生成器的.throw()方法;如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其它的异常全部向上 “冒泡”;
  6. 如果在委托生成器上调用.close()或传入GeneratorExit异常,会调用子生成器的.close()方法,没有的话就不调用。如果在调用.close()的时候抛出了异常,那么就向上 “冒泡”,否则的话委托生成器会抛出GeneratorExit异常。

2.3 asynico.coroutine 装饰器实现的协程

Python3.4 中新加入了asyncio库,并提供了一个默认的event loop。Python3.4有了足够的基础工具进行异步并发编程。

asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。asyncio的异步操作,需要在coroutine中通过yield from完成。

并发编程同时执行多条独立的逻辑流,每个协程都有独立的栈空间,即使它们是都工作在同个线程中的。以下是一个示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@asyncio.coroutine
def test(i):
	print("test_1",i)
	r=yield from asyncio.sleep(1)
	print("test_2",i)
loop=asyncio.get_event_loop()
tasks=[test(i) for i in range(5)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
 
'''
test_1 3
test_1 4
test_1 0
test_1 1
test_1 2
test_2 3
test_2 0
test_2 2
test_2 4
test_2 1
'''

说明:从运行结果可以看到,跟gevent达到的效果一样,也是在遇到IO操作时进行切换(所以先输出test_1,等test_1输出完再输出test_2)。

在Python3.4中,asyncio.coroutine 装饰器是用来将函数转换为协程的语法,这也是 Python 第一次提供的生成器协程 。只有通过该装饰器,生成器才能实现协程接口。使用协程时,需要使用 yield from 关键字将一个 asyncio.Future 对象向下传递给事件循环,当这个 Future 对象还未就绪时,该协程就暂时挂起以处理其它任务。一旦 Future 对象完成,事件循环将会侦测到状态变化,会将 Future 对象的结果通过 send 方法方法返回给生成器协程,然后生成器恢复工作。

@asyncio.coroutine:asyncio模块中的装饰器,用于将一个生成器声明为协程。可以把一个generator标记为coroutine类型,然后,就把这个coroutine扔到EventLoop中执行。test()会首先打印出test_1,然后,yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其它可以执行的coroutine了,因此可以实现并发执行。

从Python3.4开始asyncio模块加入到了标准库,通过asyncio我们可以轻松实现协程来完成异步IO操作。asyncio是一个基于事件循环的异步IO模块,通过yield from,可以将协程asyncio.sleep()的控制权交给事件循环,然后挂起当前协程;之后,由事件循环决定何时唤醒asyncio.sleep,接着向后执行代码。

2.4 async/await 异步编程 和 协程

在Python3.5中引入的 async 和 await ,可以将它们理解成asyncio.coroutine/yield from的完美替身。当然,从Python设计的角度来说,async/await让协程表面上独立于生成器而存在,不再使用yield语法,将细节都隐藏于asyncio模块之下,语法更清晰明了。

几个重要概念:

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
  • coroutine:协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
  • future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

await 的行为类似 yield from,但是它们异步等待的对象并不一致,yield from 等待的是一个生成器对象,而await接收的是定义了__await__方法的 awaitable 对象。在 Python 中,协程也是 awaitable 对象,collections.abc.Coroutine 对象继承自 collections.abc.Awaitable。

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。请注意,async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:

  • 把 @asyncio.coroutine 替换为 async;
  • 把yield from替换为await。
1
2
3
4
5
6
7
8
9
import asyncio
async def test(i):
	print("test_1",i)
	await asyncio.sleep(1)
	print("test_2",i)
loop=asyncio.get_event_loop()
tasks=[test(i) for i in range(5)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import asyncio
 
async def execute(x):
    print('Number:', x)
 
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
 
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine) # 将协程注册到事件循环 loop 中,然后启动
print('After calling loop')
 
 '''
Coroutine: <coroutine object execute at 0x1034cf830>
After calling execute
Number: 1
After calling loop
'''

async 定义的方法就会变成一个无法直接执行的 coroutine 对象,必须将其注册到事件循环中才可以执行。

从 Python 语言发展的角度来说,async/await 并非是多么伟大的改进,只是引进了其它语言中成熟的语义,协程的基石还是在于 eventloop 库的发展,以及生成器的完善。

从结构原理而言,asyncio 实质担当的角色是一个异步框架,async/await 是为异步框架提供的 API,因为使用者目前并不能脱离 asyncio 或其它异步库使用 async/await 编写协程代码。即使用户可以避免显式地实例化事件循环,比如支持 asyncio/await 语法的协程网络库 curio,但是脱离了 eventloop 如心脏般的驱动作用,async/await 关键字本身也毫无作用。

asyncio模块是基于协程的异步IO模块。asyncio的使用可分三步走:

  • 创建事件循环
  • 指定循环模式并运行
  • 关闭循环

通常使用asyncio.get_event_loop()方法创建一个循环。

运行循环有两种方法:一是调用run_until_complete()方法,二是调用run_forever()方法。run_until_complete()内置add_done_callback回调函数,run_forever()则可以自定义add_done_callback()。

  • 使用run_until_complete()方法
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio
 
async def func(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')
 
if __name__ == '__main__':
 
    loop = asyncio.get_event_loop()
    future = asyncio.Future()
    asyncio.ensure_future(func(future))
    print(loop.is_running())   # 查看当前状态时循环是否已经启动
    loop.run_until_complete(future)
    print(future.result())
    loop.close()
  • 使用run_forever()方法
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import asyncio
 
async def func(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')
 
def call_result(future):
    print(future.result())
    loop.stop()
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    future = asyncio.Future()
    asyncio.ensure_future(func(future))
    future.add_done_callback(call_result)        # 注意这行
    try:
        loop.run_forever()
    finally:
        loop.close()

在 Python 的异步编程中,生成器和 async、await 等关键字结合使用,用于实现异步操作和协程。比如下面这段异步批量采集某个api数据:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import aiohttp
import asyncio

async def fetch_url(url):
    """
    异步函数,用于发送网络请求并返回响应的内容。
    """
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            # 获取响应内容并返回
            content = await response.text()
            return content

async def main():
    """
    主函数,演示如何并发发送多个网络请求。
    """
    # 定义一组要请求的 URL 列表
    urls = [
        'http://xmishu.zhujinhui.net/api/hot_words',
        'http://xmishu.zhujinhui.net/api/hot_words',
        'http://xmishu.zhujinhui.net/api/hot_words',
    ]

    # 创建任务列表,使用列表推导式创建多个异步任务
    tasks = [fetch_url(url) for url in urls]

    # 使用 asyncio.gather 运行所有任务,并等待它们完成
    results = await asyncio.gather(*tasks)

    # 输出所有请求的结果
    for i, result in enumerate(results):
        print(f"Response from URL {urls[i]}:")
        print(result[:100])  # 只打印前100个字符

# 使用 asyncio.run 来运行主函数
if __name__ == '__main__':
    asyncio.run(main())
Licensed under CC BY-NC-SA 4.0