Python协程原理全面分析

Python
263
0
0
2023-08-04
目录
  • 序章
  • 生成器如何进化为协程
  • 用作协程的生成器的基本行为
  • 协程的四个状态
  • 示例-使用协程计算平均值
  • 预激协程的装饰器
  • 终止协程和异常处理
  • 获取协程返回值
  • 使用yield from
  • yield from的意义
  • 使用协程做离散事件仿真

序章

yield item这行代码会产出一个值,提供给next()的调用方;此外还会做出让步,暂停执行生成器,让调用方继续工作,知道需要使用另一个值再调用next()。调用方会从生成器中拉取值。

从语法上来看,协程与生成器类似,都是从定义体中包含yield关键字的函数。可是,在协程中,yield通常出现在表达式的右边(如data = yield),可以产出值,也可以不产出:如果yield关键字后面没有表达式,那么生成器产出None。协程可能会从调用方接收数据,不过调用方把提供数据给协程使用的方式是.send(data)方法。调用方会把值推送给协程。

yield关键字甚至可以不接受或传出数据。不管数据如何流动,yield都是一种流程控制工具,使用它可以实现协作式多任务;协程可以把控制器让步给中心调度程序,从而激活其他的协程。

综上,如果从根本上把yield视为控制流程的方式,这样就好理解协程了

协程可以认为是可以明确标记有某种语法元素(yield from)的阶段“暂停”函数

生成器如何进化为协程

协程的框架是在Python2.5(2006年)实现的。自此之后,yield关键字可以在表达式中使用,并且生成器增加了.send(value)方法。生成器的调用方法可以使用.send()发送数据,发送的数据会成为生成器中yield表达式的值。因此生成器可以作为协程使用。协程是指一个过程,这个过程与调用方协作,产出调用方法提供的值。

除了.send()方法,后续还增加了.throw()和.close()方法:前者作用是让调用方抛出异常,在生成器中处理;后者作用是终止生成器。

用作协程的生成器的基本行为

示例,一个简单的协程

def simple_coroutine():
    print('-> coroutine started')
    x = yield
    print('-> coroutine received:', x)
my_coro = simple_coroutine()
print(my_coro)  # 得到的是生成器对象
print(next(my_coro))  # None
print(my_coro.send())
打印
<generator object simple_coroutine atx00D957B0>
-> coroutine started
None
-> coroutine received:
Traceback (most recent call last):
  File "C:/Users/lijiachang/PycharmProjects/collect_demo/test.py", line 13, in <module>
    print(my_coro.send())
StopIteration

知识点:

  • 定义体中x = yield 这种形式,如果协程只需要从客户那接收数据,那么产出值是None,所以next(my_coro)的值为None,这个值是隐式指定的,因为yield关键字右边没有表达式。
  • 首先要调用next(my_coro),因为生成器还没有启动,没在yield语句处暂停,所以一开始是无法发送数据的,首先要调用next(),到yield关键字处暂停。
  • 当执行到my_coro.send(42),协程定义体中的yield表达式会得到42,然后赋值给x;协程继续运行到下一个yield表达式,或者终止。
  • 最后,控制权流动到协程定义体的末尾,导致生成器像往常一样抛出StopIteration异常

协程的四个状态

协程可以身处四个状态中的一个。获取当前状态可以使用inspect.getgeneratorstate()函数确定,该函数会返回下面四种状态的一个:

  • 'GEN_CREATED' :等待开始执行
  • 'GEN_RUNNING' :解释器正在执行 (只有在多线程程序才能看到这个状态)
  • 'GEN_SUSPENDED' :在yield表达式处暂停
  • 'GEN_CLOSED' : 执行结束

因为send方法的参数会成为暂停的yield表达式的值,所以仅当协程处于暂停状态时才能调用send方法,例如my_coro.send(42).。

不过,如果协程还没激活(即状态为GEN_CREATED),始终要调用next(my_coro)激活协程----也可以调用my_coro.send(None)激活。

ps:如果创建协程对象后立即把None之外的值发给它,会出现以下错误,清晰明了:TypeError: can't send non-None value to a just-started generator

最先调用next(my_coro)函数这一步通常称为“预激”(prime)协程,即让协程向前执行到第一个yield表达式,准备好作为活跃的协程使用。

from inspect import getgeneratorstate
def simple_coro(a):
    print('-> started: a=', a)
    b = yield a
    print('->  received: b=', b)
    c = yield a + b
    print('->  received: c=', c)
my_coro = simple_coro2(14)
print(getgeneratorstate(my_coro))
print(next(my_coro))
print(getgeneratorstate(my_coro))
print(my_coro.send(28))
print(my_coro.send(99))
打印
GEN_CREATED
-> started: a=

GEN_SUSPENDED
->  received: b=

->  received: c=
Traceback (most recent call last):
  File "C:/Users/lijiachang/PycharmProjects/collect_demo/test.py", line 19, in <module>
    print(my_coro.send(99))
StopIteration

知识点:

  • 协程在yield关键字所在的位置暂停执行。
  • 在赋值语句中,=右边的代码在赋值前执行。因此对于b = yield a这行代码,等到客户端代码再激活协程时才会设定b的值。

示例-使用协程计算平均值

示例,一个累积计算平均值的函数,使用协程实现

def averager():
    total =.0
    count =
    average =
    while True:
        num = yield average
        total += num
        count +=
        average = total / count
aver = averager()
print(aver.send(None))  # 预激协程 
print(aver.send())
print(aver.send())
print(aver.send())
打印
0
2.0
3.0
4.0

知识点:

  • 使用协程之前,需要预激协程,除了使用aver.send(None)还可以使用next(aver)
  • 这个示例中,是个无限循环的,仅当调用方在协程上调用.close()方法,或者没有对协程的引用而被垃圾回收程序回收时,这个协程才会终止。

预激协程的装饰器

如果不预激,那么协程没什么用。

调用my_coro.send(x)之前,一定要先调用next(my_coro)。为了简化协程的用法,有时候会使用一个预激装饰器。

from inspect import getgeneratorstate
from functools import wraps
def coroutine(func):
    """预激协程的装饰器,向前执行到第一个yield表达式"""
    @wraps(func)
    def primer(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)
        return gen
    return primer
@coroutine
def averager():
    total =.0
    count =
    average =
    while True:
        num = yield average
        total += num
        count +=
        average = total / count
aver = averager()
print(getgeneratorstate(aver))
print(aver.send())
print(aver.send())
print(aver.send())
打印
GEN_SUSPENDED
2.0
3.0
4.0

可以看到,使用预激装饰器装饰了averager函数上之后,协助立即处于GEN_SUSPENED状态,因此这个协程已经准备好,可以接收值了。

很多框架都提供了处理协程的特殊装饰器,不过不是所有装饰器都用于预激协程,有些会提供其他服务,例如勾入事件循环。

使用yeild from句法调用协程时,会自动预激,因此使用@coroutine等装饰器不兼容。Python3.4标准库中的asyncio.coroutine装饰器不会预激协程,因此能兼容yeild from句法。

终止协程和异常处理

协程中未处理的异常会向上冒泡,传给触发协程的对象(next函数或send函数的调用方)。

未处理的异常会导致协程终止:

In [4]: aver.send(20)
Out[4]: 20.0
In [5]: aver.send(30)
Out[5]: 25.0
In [6]: aver.send('spam')
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
TypeError: unsupported operand type(s) for +=: 'float' and 'str'
In [7]: aver.send(40)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
StopIteration:

以上可以联想到,终止协程的一种方式:发送某个哨符值,让协程退出。

比如内置的None和Ellipsis(省略号...)常量经常用作哨符值。Ellipsis的优点是数据流中不太常有这个值。也有人把StopIteration作为哨符值,这样:my_coro.send(StopIteration)

从Python2.5开始,可以在生成器对象调用两个方法,显式的把异常发给协程。

这两个方法是throw抛和close:

generator.throw(exc_type[, exc_value[, traceback]])

使生成器在yeild表达式处抛出指定的异常。

如果生成器处理了抛出的异常,代码会向前执行到下一个yeild表达式处,而产生的值会成为调用generator.throw方法得到的返回值。

如果生成器没有处理抛出的异常,异常会向上冒泡,传给调用方的上下文中。

generator.close()

使生成器在暂停的yield表达式处抛出GeneratorExit异常。

如果生成器没有处理这个异常,或者抛出了StopIteration异常(表示运行到结尾),调用方不会报错。

如果收到GeneratorExit异常,生成器一定不能产出值,否则解释器会抛出RuntimeError异常。

生成器抛出的其他异常会向上冒泡,传给调用方。

示例,使用close和throw方法控制协程。

from inspect import getgeneratorstate
class DemoException(Exception):
    pass
def demo_exc_handling():
    """异常处理demo"""
    print('-> coroutine started')
    while True:
        try:
            x = yield
        except DemoException:
            print('*** DemoException handled')  # 处理异常
        else:
            print('-> coroutine received: {!r}'.format(x))  # 如果没有异常,显示接收的值
exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.send()
exc_coro.close()  # 正常关闭协程
print(getgeneratorstate(exc_coro))
exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.throw(DemoException)  # 把异常传入协程后,如果有处理,不会终止协程
exc_coro.send(22)
print(getgeneratorstate(exc_coro))
exc_coro.throw(ZeroDivisionError)  # 如果把无法处理的异常传入,协程会终止
打印
-> coroutine started
-> coroutine received:
GEN_CLOSED
-> coroutine started
*** DemoException handled
-> coroutine received:
GEN_SUSPENDED
Traceback (most recent call last):
  File "C:/Users/lijiachang/PycharmProjects/collect_demo/test.py", line 33, in <module>
    exc_coro.throw(ZeroDivisionError)
  File "C:/Users/lijiachang/PycharmProjects/collect_demo/test.py", line 13, in demo_exc_handling
    x = yield
ZeroDivisionError

知识点:

  • 使用.close()关闭协程,没有任何异常抛出
  • 如果协程中有对应的异常处理代码,.throw()的异常不会终止协程。如果没有异常处理,协程会终止

如果不管协程如何结束都想要做些清理工作的话,要把协程定义体汇总相关的代码放入try/finally块中

def demo_exc_handling():
    """异常处理demo"""
    print('-> coroutine started')
    try:
        while True:
            try:
                x = yield
            except DemoException:
                print('*** DemoException handled')  # 处理异常
            else:
                print('-> coroutine received: {!r}'.format(x))  # 如果没有异常,显示接收的值
    finally:
        print('-> coroutine ending.')
        print('do something.')

获取协程返回值

下面是averager累积求平均数的另一个版本,这个版本不会随着增加元素返回平均值,而是最后返回一个值。

from collections import namedtuple
from functools import wraps
def coroutine(func):
    """预激协程装饰器"""
    @wraps(func)
    def prime(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)
        return gen
    return prime
Result = namedtuple('Result', 'count average')
@coroutine
def averager():
    count =
    average =.0
    total =
    while True:
        item = yield
        if item is None:
            break  # 为了获取返回值,协程必须正常终止,因此要有个判断,以便退出累计循环
        count +=
        total += item
        average = total / count
    return Result(count, average)  # 最终返回一个namedtuple,包含信息
ave = averager()
ave.send()
ave.send()
print(ave.send(None))
打印
Traceback (most recent call last):
  File "C:/Users/lijiachang/PycharmProjects/collect_demo/test.py", line 39, in <module>
    print(ave.send(None))
StopIteration: Result(count=, average=3.0)

知识点:

  • 使用ave.send(None)发送None终止循环,或者使用next(ave)也可以。结果就是导致协程结束,返回结果
  • 代码中break跳出了while循环,导致运行到定义体结束,也就抛出StopIteration很正常。
  • 异常对象的value保存着return返回的值。return表达式的值会偷偷传给调用方,赋值给StopIteration异常的一个属性。这样做有点不合常理,但是能保留生成器对象的常规行为:耗尽时抛出StopIteration异常。

既然协程定义体中return返回值,是寄托到了异常的value值中,那么就捕获异常:

try:
    print(ave.send(None))
except StopIteration as e:
    result = e.value
print(result)
打印
Result(count=, average=3.0)

获取协程的返回值虽然要绕个圈子,但是这是PEP 380定义的方式。

但是我们平时不需要这样做,因为yield from结构会在内部自动捕获StopIteration异常。这种处理方式和for循环处理StopIteration异常的方式一样:循环机制会让给用户易于理解的方式处理异常。

而且,对yeild from结构来说,不仅可以捕获StopIteration异常,还会把异常的value属性值变为yeild from的值。

所以接下来要介绍yield from结构

使用yield from

要知道yield from是全新的语言结构。它的作用比yield多得多,因此人们认为继续使用yield 关键字多少会引起误解。

在Python3.4之后,被await关键字代替。

在生成器gen中使用yeild from subgen()时,subgen会获得控制权,把产出的值传给gen的调用方,即调用方直接可以控制subgen来得到产出值。以此同时,gen会阻塞,带灯subgen终止。

yield from x表达式对x对象所做的第一件事是,调用iter(x),从中获取迭代器。因此x是任何可迭代对象。

yield from 结构不是简单的替代产出值的嵌套for循环,而是把职责委托给子生成器的句法。

yield from 的主要功能是打开双向通道,把最外层的调用方和最内层的子生成器连接起来,

这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量打异常处理。有了这个结构,协程可以通过以前不可能的方式委托职责。

相关的专业术语:

委派生成器

包含yield from <iterable> 表达式的生成器函数。

子生成器

从yield from表达式中<iterable>部分获取的生成器。

调用方

指代调用委派生成器的客户端代码。

委派生成器在yield from表达式处暂停时,调用方可以直接把数据发给子生成器,子生成器再把产生的值发给调用方。子生成器返回之后,解释器会抛出StopIteration异常,并把返回值附加到异常对象上(异常的value属性),此时委派生成器会恢复。

下面的示例,用于说明yield from结构的用法

from collections import namedtuple
from functools import wraps
Result = namedtuple('Result', 'count average')
# 作为子生成器使用
def averager():
    count =
    average =.0
    total =
    while True:
        item = yield
        if item is None:
            break
        count +=
        total += item
        average = total / count
    return Result(count, average)  # 返回的结果最后会成为grouper函数中的yield from中的值
# 委派生成器
def grouper(results, key):
    while True:  # 每次循环都会产生一个新的averager实例,每个实例都作为协程使用的生成器对象
        results[key] = yield from averager()  # grouper每次接受的到值都通过yield from处理,通过管道传给averager实例
# 客户端代码,即调用方
def main(data):
    results = {}
    for key, values in data.items():
        group = grouper(results, key)  # group作为协程使用
        next(group)  # 预激协程
        for value in values:
            group.send(value)  # 把value传给grouper,最终到达的是averager函数的item = yield那行。
        group.send(None)  # 把None传给grouper,让当前averager实例终止,让grouper继续运行。
    print(results)
data = {
    'girls;kg': [.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    'girls;m': [.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    'boys;kg': [.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    'boys;m': [.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == "__main__":
    main(data)

知识点:

  • grouper会在yield from表达式处暂停,等待averager实例处理客户端发来的值。averager实例运行完毕后,返回的值绑定到了results[key]上,while循环会不断创建averager实例,处理更多的值
  • 对于委派生成器grouper来说,永远不知道传入的值是什么。因为当value值传给grouper,最终到达的是averager函数的item = yield那行。

group.send(None) 的作用非常重要,它让当前的averager实例终止,然后在创建一个新的averager实例。如果没有它,results结果中不会有任何内容。

下面说明没有group.send(None) 时,最终的运作方式:

  • 外层for循环每次迭代会新建一个grouper实例,赋值给group变量;group是委派生成器
  • 调用next(group),是预激委派生成器grouper,此时进入while Ture循环,调用子生成器averager后,在yield from表达式处暂停
  • 内层for循环调用group.send(value),直接把值传给子生成器averager。同时,当前的grouper实例(group)在yield from表达式处暂停。
  • 内层for循环结束后,group实例依旧在yield from表达式处暂停,因此,grouper函数定义体中为results[key]赋值的语句还没有执行。
  • 如果外层for循环的末尾没有group.send(None),那么averager子生成器永远不会终止,委派生成器group永远不会再次激活,因此永远不会为results[key]赋值
  • 外层for循环重新迭代时会新建一个grouper实例,然后绑定到group变量上。前一个grouper实例以及它创建的未终止的averager子生成器实例,会被垃圾回收程序回收。

上面想说明的关键一点是,如果子生成器不终止,委派生成器会在yield from表达式处永远暂停。

如果这样,程序不会向前进行,因为yield from把控制权交给了客户端代码(即委派生成器的调用方)。

以上的示例和说明,展示了yield from结构最简单的用法,只有委派生成器和一个子生成器。因为委派生成器相当于管道,所以可以把任意数量个委派生成器连接在一起:一个委派生成器使用yield from调用一个子生成器,而那个子生成器本身也是委派生成器,使用yield from调用另一个生成器,以此类推。最终,这个链条要以一个只使用yield 表达式的简单生成器结束,或者以任何可迭代对象结束。

任何yield from链条都必须由客户驱动,在最外层委派生成器上调用next()函数或.send()方法。可以用for循环等隐式调用。

yield from的意义

在PEP 380中有这么一段话,yield from的作者Greg Ewing的草稿,可以粗略的解释:

把迭代器当作生成器使用,相当于 把子生成器的定义体内联在yield from表达式中。此外,子生成器可以执行return语句,返回一个值,而返回的值会成为yield from表达式的值。

批准后的PEP 380,分六点说明了yield from的行为:

  • 子生成器产出的值都直接传给委派生成器的调用方(即客户端代码)
  • 使用send()方法发送给委派生成器的值都直接传给子生成器。如果发送的值是None,那么会调用子生成器的__next__()方法。如果发送的值不是None,那么会调用子生成器的send()方法。如果调用的方法抛出StopIteration异常,那么委派生成器会恢复运行。任何的其他异常会向上冒泡,传给委派生成器。
  • 生成器退出时,生成器或者子生成器中的return expr表达式会触发StopIteration(expr)异常抛出。
  • yield from表达式的值,是子生成器终止时传给StopIteration异常的第一个参数。
  • 传入委派生成器的异常,除了GeneratorExit之外都传给子生成器的throw()方法。如果调用throw()方法时抛出StopIteration异常,委派生成器恢复运行。StopIteration之外的异常会向上冒泡,传给委派生成器。
  • 如果把GeneratorExit异常传给委派生成器,或者在委派生成器上调用close()方法,那么会在子生成器上调用close()方法(前提是有此方法)。如果调用close()方法导致异常抛出,那么异常会向上冒泡,传给委派生成器;否则,委派生成器抛出GeneratorExit异常。

yield from的具体语义难以理解,尤其是最后两点。

Greg Ewing还使用了伪代码,演示了yield from的行为。下面是把原40行的伪代码简化了的逻辑,因为原40行代码难以理解。假设了客户端代码有在委派生成器上调用.throw()和.close()方法,假设子生成器不会抛出异常,而是一直运行到终止,让解释器抛出StopIteration异常。

下面列出的伪代码,是对这行代码的扩充:RESULT = yield from EXPR

"""RESULT = yield from EXPR 的伪代码"""
_i = iter(EXPR)  # EXPR 为任何可迭代对象,获取迭代器_i使用iter()函数,这里是获取子生成器
try:
    _y = next(_i)  # 预激子生成器,把结果保存在_y中,作为产出的第一个值
except StopIteration as _e:
    _r = _e.value  # 如果抛出StopIteration异常,获取异常对象中的value属性,赋值给_r 这是最简单情况下的返回值
else:
    while: # 运行这个循环时,委派生成器会阻塞,值作为调用方和子生成器之间的通道
        _s = yield _y  # 产出子生成器当前产出的元素;等待调用方发送_s保存的值。
        try:
            _y = _i.send(_s)  # 尝试让子生成器向前执行,转发调用方发送的_s
        except StopIteration as _e: # 如果子生成器抛出StopIteration异常,获取异常对象中的value属性,赋值给_r;然后退出循环,委派生成器恢复运行
            _r = _e.value
            break
RESULT = _r # 返回的结果是_r,即是整个yield from表达式的值

RESULT = _r # 返回的结果是_r,即是整个yield from表达式的值

  • _i : 迭代器,子生成器
  • _y : 产出的值,子生成器产出的值
  • _r : 结果,最终的结果,即子生成器运行结束后yield from表达式的值
  • _s : 发送的值,调用方发给委派生成器的值,这个值会转发给子生成器
  • _e : 异常对象

但是,现实情况会复杂一些,因为客户要对.throw()和.close()方法调用,二者两个方法的执行操作,必须传入子生成器。

子生成器可能只是纯粹的迭代器,不支持.throw和.close()方法,因此yield from结构逻辑必须处理这种情况。

如果子生成器实现了这两个方法,而在子生成器内部,这两个方法都会触发异常抛出,这种情况也必须由yield from机制处理。

调用方可能会无缘无故的让子生成器自己抛出异常,实现yield from结构时要处理这种情况。

最后,为了优化,如果调用方调用next函数或者send方法,都要转交职责,在子生成器上调用next函数,仅当调用方发送的值不是None时,才使用子生成器的send方法。

完整的伪代码,参见《流畅的Python》第401页

使用协程做离散事件仿真

协程能自然地表述很多算法,例如仿真、游戏、异步IO,以及其他事件驱动型变成形式或协作式多任务。--Guido

协程是asyncio包的基础构。通过仿真系统能够说明如何使用协程代替线程实现并发活动。

离散事件仿真(DES:Discrete Event Simulation)是一种把系统建模成一系列事件的仿真类型。

在离散事件仿真中,仿真“钟”向前推进的量不是固定的,而是直到推进到下一个事件模型的模拟时间。假如我们抽象模拟出租车的运营过程,其中一个事件是乘客上车,下一个事件则是乘客下车。使用离散事件仿真可以在不到一秒钟的时间内模拟一年的出租车运营过程。这与连续仿真不同,连续仿真的仿真钟以固定的量不断向前前进。

显然,回合制游戏就是离散事件仿真的例子:游戏的状态只在玩家操作时变化,而且一旦玩家决定下一步怎么走了,仿真钟就会冻结。而实时游戏则是连续仿真,仿真钟一直在运行,游戏的状态在一秒钟内更新很多次。

这两种仿真类型都能使用多线程或在单线程中使用面向事件的编程技术(例如事件循环驱动的回调或协程)实现。可以说,为了实现连续仿真,在多个线程中处理实时并行的操作更自然。而协程恰好为实现离散事件仿真提供了合理的抽象。SimPy是一个实现离散事件仿真的Python包,通过一个协程表示离散事件仿真系统的各个进程。