跳转至

asyncio 教程🔗

本章摘自 YiriMirai 的文档。

YiriMirai 是一个轻量, 低耦合的 Python QQ Bot 框架.

Python 从 3.4 开始引入了 asyncio 库,提供异步协程支持,从 Python 3.4 到 3.7,对异步的支持一直在不断地进步。可以说,异步就是 Python 未来的趋势之一。

在了解异步之前,你可能已经接触过或听说过很多类似的概念,比如“并发”“并行”“多线程”等等,这么多名词,再加上一个“异步”,让人有些混乱。

不过,虽然看起来很复杂,但是,通过这篇教程,我相信你能够对“异步”和“协程”有一个清晰的把握。

以下是这篇教程涵盖的内容:

  • 异步:一种编程范式,独立于编程语言,可以具有多种语言的不同实现。
  • async/await:两个新的 Python 关键字,用于规定异步逻辑。
  • asyncio:一个 Python 内置的包,提供协程的基础功能和管理协程的方法。

如果你心急了,请直接跳到 使用协程

什么是异步?🔗

在讲异步之前,需要先知道一个与之相对的概念:同步。

其实同步对我们而言并不陌生。在接触到异步之前,我们认知的一切都是同步的。我们可以想象代码一句一句地执行,遇到函数就进入,执行完函数再执行函数后面的东西,所有的执行顺序都严格地和代码书写的顺序一致——这就是同步。

比如这样的代码:

import time

print(1)
time.sleep(10)
print(2)

我们可以清晰地预知它的行为:先打印 1,然后等待 10 秒,最后打印 2。

同步最大的优点是 思路清晰你永远可以知道你的程序是按照怎样的顺序执行的,不用担心任何意料之外的情况

但同时,同步也有不可避免的缺点:在某些情境下,它会带来性能的浪费。

以一个生活中的场景为例:你在网上下载一个文件,但是文件很大,需要很长时间才能下完。这时候,你肯定不会盯着进度条一点点走,而是去做点别的事情,读一会书,看一会电影,之类的。

但如果有一个完全“同步”的人,他为自己设定的程序是下载这个文件,然后运行它,这样的话,他为了不让自己的运行逻辑乱掉,就会一直守在电脑前,直到文件下载完成才做下一步的工作。正常人肯定不能忍受这样无聊的等待,也不会希望自己的程序在这种无聊的事上浪费时间。

为了能充分利用这些可能被浪费掉的时间,人们想出过很多方法,比如 多线程 ,比如 回调

多线程是最容易想到的一种方式了。我们可以创建多个线程,每个线程内部是同步的,再让这些线程同时运行。这样,在一个线程等待的时候,其余的线程可以继续运行,不会因此停止。

这看上去十分美好,然而现实并不像我们想象得那么简单。虽然线程各自内部的代码是同步的,但不同线程之间的代码执行顺序已经无从确定。换言之,我们引入了“异步”。 代码的执行顺序与书写顺序不一致的现象就是异步

熟悉多线程的人应该了解过“锁”的各种花样。两个线程共享资源时,如果同时对同一个变量进行修改,就有可能导致出乎预料的情况。所以人们发明了“锁”,来保证同一时刻只有唯一的线程对某一个变量进行修改。可以说,“锁”是为了制服异步而发明的。

graph TD;
    subgraph 执行顺序
    H{{b = 1}} --> F[a = 1]
    F --> G{{a += b}}
    G --> I["print(a)"]
    end
    subgraph Thread1
    A[a = 1] --> B["print(a)"]
    B --> E([Output: 2])
    end
    subgraph Thread2
    C{{b = 1}} --> D{{a += b}}
    D -.->|Breaking!| E
    end

如此看来,异步似乎是一件不太好的事情。它让程序的运行平添了一种不可控的随意性,我们不再能看到代码就直接推演出结果,因为即使是相邻书写的僵局代码,之间也可能会有别的代码执行。

但是 异步相较于同步而言,带来的性能提升实在是太诱人了 。一个网站的服务器,如果是同步的,那么它同时只能让一个用户访问——这简直是不可理喻的事情。异步就像几万年前蹲踞在草原清冷的月光下,用锐利的眼神眺望人类营地篝火的一只秀美的狼,而猎手们则思考着如何给它套上项圈,变成自己忠实的猎犬。

所以,多线程、回调,包括协程,这一系列设计,与其说是为了充分利用计算资源,不如说是为了 在人类可理解的范畴内,更好地制服异步

锁的“困境”🔗

多线程和锁的结合,至今仍然是异步编程的最佳选择之一。因此,怎么加锁、在何处加锁,就成为了一门重要的学问。

锁是个好东西,但是不能有太多。一方面,反复地获取锁释放锁会占用运行时间;另一方面,当一个线程长时间持有某个锁时,其他的线程如果想要访问这个资源,也必须在原地等待, 极端情况下,同一时间只有一个线程在执行!

人们在优化锁的使用上花了很大工夫,从中衍生出的各种理论此处不再赘述。随着锁的使用方式逐渐变得复杂,人们发现,如果想要完美地控制锁的粒度,就不得不对花大量的代码去精细地控制每一个锁,这让编写代码的难度大大提高了。

有的时候,控制锁带来的复杂度已经超出了人能忍受的范围。于是我们经常见到许多简单粗暴的操作——比如在 CPython 中臭名昭著的 GIL。它将整个 Python 解释器加锁,来彻底解决 Python 代码内的线程冲突问题;后果就是, 所有的 Python 线程都必须等待这个锁,硬生生地让多线程程序几乎退化成了单线程

当然,GIL 的存在也没有完全把多线程的优点抹消掉。比如一个线程在 sleep 的时候,或者在等待网络请求返回的时候,还是会乖乖地释放掉 GIL 锁,让其他线程运行的。不过这时候,释放 GIL 的线程实际上只是在等着,什么都不干,最后还是只有一个线程在运行。

既然还是只有一个线程在运行,那么为什么不干脆用单线程实现呢?

答案是:当然可以。对于这个问题,人们给出了许多答案,其中最著名的是两种:回调和协程

从回调到协程(上)🔗

介绍了这么多,我们终于第一次提到了“协程”这个词。不过先不要着急,要想理解协程的概念,我们还需要一些基础的东西。

回到上面刚刚提出的问题,不过这里要换个说法:怎样在单线程中实现异步

其实在刚才的讨论中我们已经知道了问题的答案,那就是,像带着 GIL 的多线程一样,在某个地方需要等待时,就立马切换到别的任务,等待完成之后,再继续刚才的任务。

graph LR;
    A["任务1(发起请求)"] -.-> C{{等待}}
    A --->|切换到| D[任务2]
    C -..-> E{{等待完成}}
    E -.-> F["任务3(需要请求结果)"]
    D --->|继续执行| F

单线程异步的逻辑看起来就是这么简单,也十分容易理解。但是,易于理解不代表易于实现。当人们真的开始动手写一段单线程异步的代码时,就发现有许多显而易见却难以说明的问题。

“需要等待”是什么?“切换”是怎么完成的?“等待完成”指的是什么?“继续”又是怎么实现的?

这些问题说复杂也复杂,说简单也简单。为了不浪费大家的思考时间,我在这里直接公布答案:

第一个问题的答案其实很直白。哪些任务需要等待,在程序运行之前就能看出来。简单如 sleep,复杂如网络请求,这些消耗时间,但不怎么消耗 CPU 的任务,就是需要等待的任务。

第二个问题的答案有两种,对这个问题的回答的不同也正是回调与协程最初的分歧之处。协程式的答案稍后会说,暂且不表,先看看回调式的答案,非常简单粗暴:不要切换

举个例子:

import requests
# job 1
response = requests.get('http://example.org/very_large_file.txt')
print(response.text.count('e'))
# job 2
for i in range(10):
    print(i)

我们有两个任务,一个是抓取网络上的文件,一个是打印数字。网络上的文件非常大,需要很长时间来加载,这时候,我们希望可以在等待文件下载的时候,去执行打印数字的任务。但是问题来了:前两行代码写的严丝合缝,该怎么把打印数字的任务插进去?

实际上,我们知道,一切问题的根源都出在 requests.get 的调用上。这是一个同步的调用,不等到下载完成就不会返回。因此,我们需要的是一个异步的方法,能够在下载完成之前就返回。

def get_async(): ...
# job 1
get_async('http://example.org/very_large_file.txt')
# print(response.text.count('e'))
# job 2
for i in range(10):
    print(i)

虽然实际上 Python 并没有哪一个库提供这样的一个 get_async,但在这里我们不妨做一次迷人的假设。我们希望能有一个异步的 get_async,调用后会发起一次网络请求,然后立刻返回,这样,程序的流程就顺理成章地走到了打印数字的地方

只是有一个问题还没有解决——从哪里读取下载的文件呢?

好像有点尴尬。get_async 创建了下载任务之后,就把它丢到一边不管了,下载完的东西也没有办法拿到。我们当然不允许这种买椟还珠的行为,所以还需要一点点的处理,让我们能够以某种方式,拿到下载的文件。

这就涉及到刚才提出的第三和第四个问题的答案了。“等待完成”自然是指文件下载完成,而“继续”的方式才是重头戏—— 回调

回调(callback)是将程序的一部分以函数的形式传递出去,供外部调用的一种模式。这么说有点抽象,我们结合刚才的例子来说明。

在刚才,我们遇到了没办法拿到下载的文件的问题。其实换个角度来看,我们需要的,是一个 能够在下载完成之后,执行一段利用下载的文件中的内容的代码的方法

利用回调,我们可以轻松地完成这一点。

def get_async(): ...
# job 1
get_async('http://example.org/very_large_file.txt', callback=job1_continuation)

def job1_continuation(response):
    print(response.text.count('e'))

# job 2
for i in range(10):
    print(i)

把第一个任务的剩余部分写到回调函数里,然后传给 get_async。当文件下载完成后,回调函数就会以下载的文件的内容作为参数,调用回调函数。这就成功地将一段同步的代码改造成了异步。可喜可贺,可喜可贺。

不过,好像忘了点什么?

我们并没有说这个要怎么实现,实际上,像 Go Kotlin 等语言都使用基于线程的协程,并没有 JavaScript Python 式的基于回调与IO复用的协程。

等下,我刚刚是不是提到了 I/O 复用

没错,这就是 Python 中网络回调的底层实现 select 模块与 selectors 模块,对异步操作提供了最基本的支持。

比如,这是一个简单的异步网络服务器:

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

在 Windows 下基于 I/O Completion Port 的模型比基于 Selector 的模型更可拓展(支持子进程等),当然这是题外话了。

从回调到协程(下)🔗

上一节中,我们已经简单说了基于底层 selector 的回调协程,它都需要通过 socket 创建的原始套接字进行操作,非常笨拙且不友好。

回调的本质,是将一个任务分成两部分,在耗时操作之前的部分,和耗时操作之后的部分,后者是前者的继续,或者叫做续体(continuation)。回调式就是把续体写成回调函数的形式,传递到其他地方,这种操作又叫做 续体传递(continuation-passing) 。从这个层面上看,回调是续体传递风格(Continuation-Passing Style, CPS)的一种。

CPS 其实是很早就被研究过的东西,它最初的应用不是在异步,而是在函数式编程中。用 CPS 书写的 IR 可以方便地实现惰性求值,而且因为续体天生就是 Monad,所以顺便可以解决求值顺序的问题。(话说,为什么到处都是 Monad 呢?)上一句话大可不必理解,毕竟我们不是在讲函数式编程,不过是借用一下 CPS 的术语,提供一个新的视角而已。

从 CPS 的角度看,续体到底是什么形式并不重要,只要他能包含任务中尚未完成的部分就可以。于是我们有了一个大胆的想法:续体能不能是这个任务自身呢

这是一个回调式的任务,它被拆成了两半:

def job1(url):
    get_async(url, callback=continuation)

def continuation(response):
    print(response.text.count('e'))

现在我们要把它拼回去:

from functools import partial

def job1(url, cont=False):
    if not cont:
        get_async(url, callback=partial(job1, cont=True))
    else:
        response = url
        print(response.text.count('e'))

看起来怪怪的。这里用了一个参数 cont 来指示调用时进入的是任务的前半部分,还是后半部分。回调函数传入是就是这个函数本身,只是用 partial 规定了一下参数,让回调能进入续体部分。

这种奇怪的写法说不上好,可读性也不是很高。他只是把回调和任务本身强行拼在一起而已。

可是有一点优势,至少 在写代码的顺序上,他看起来和同步代码更像了 。这个优点说大也大,说小也小。如果一个函数中,要有很多次异步调用,如果一个一个全部拆分到回调函数里,就会显得特别杂乱(尤其是在 Python 的匿名函数特别丑陋的情况下)。如果我们能找到一个良好的写法,既能把破碎的回调函数拼回去,又能保持异步的优点,那就再好不过了。

问题的关键在哪里?上面这个函数写的很奇怪,原因是它要 实现同一个函数的两次调用执行不同的代码

……确实是个很奇怪的需求。不过换一个角度是不是就容易理解了呢?表面看是两次调用执行不同的代码,实际上是 第一次调用后,在某一处暂停,然后第二次调用,就从这个地方继续

如果有两个神秘的函数 pauseresume,能让我们实现这一功能,我们立马可以把代码写得十分优雅:

def job1(url):
    get_async(url, callback=resume)
    response = pause()
    print(response.text.count('e'))

执行到 pause 的时候,这个函数暂停,等到 resume 被调用时,才继续执行。

非常好,现在问题只有一个了:怎么才能实现暂停的功能呢

答案就是:协程

async 和 await🔗

按照最简单的方式来理解,协程就是可以暂停的函数

如果你对 Python 的其他部分有一定了解,你一定会想到—— 生成器 (Generator)

实际上,asyncio 刚刚被引入时,协程就是通过 @asyncio.coroutine 包装生成器而成的。

yield 时,函数暂停执行且保留本地变量,直到在其上执行 send()next() .

对于以下 Python 代码:

def generator():
    yield "start"
    return "end"
def main():
    gen = generator()
    start = next(gen)
    try:
        next(gen)
    except StopIteration as e:
        end = e.value
    print(start, end)
main() # 输出 start end

它的执行流程如下 (箭头反映了控制权的交换):

flowchart TD;
    subgraph main
    A["gen = generator()"] --> B["start = next(gen)"]
    C["next(gen)"]
    D["end = e.value"]
    D --> E["输出 start, end"]
    end
    subgraph gen
    B --> Q["gen 执行到 yield,返回 'start'"]
    Q -->|"返回的 'start' 赋值给 start"| C
    C --> R["gen 执行到 return"] --> S["抛出 StopIteration"]
    Q -->|"gen 挂起"| R
    S -->|"返回的 'end' 赋值给 end"| D
    end

Python 的协程就是使用了这个思路,只不过这样创建的协程,控制权不会直接移交给调用者,而是 事件循环

那协程怎么互相调用呢?没关系,通过 yield from 语法,可以进一步转交控制权给下层生成器,也就是协程。

没过多久 (Python 3.5),asyncawait 就被加入 Python 了。

使用协程🔗

通过 async 关键字可以将一个函数变为 定义上 的异步函数,通过 await 关键字调用其他的异步函数。

要在协程内部并发多个协程,我们需要 创建任务(Task)

我们可以通过 asyncio.create_task() 函数来创建一个任务。

有一个几乎等效的函数,叫做 asyncio.ensure_future()

等一下,Future 是什么?

Future 是用于表示 异步运算结果 的对象,它用于将回调式异步编程与 async await 联系在一起。

通过 Future.result() 方法通知事件循环异步运算完成(无论成功还是失败),事件循环便会将结果传递给正在等待的协程。

Task 对象将协程包裹在 Future 中,用于控制协程执行。

再等等,Future 既然可以表示异步运算结果,那么......

通过 AbstractEventLoop.run_in_executor() ,可以将同步函数在线程/进程内执行,再包装成可以被等待的 Future 对象。

async for async with 则是通过实现 __aiter__() __anext__() __aenter__() __aexit__() 等 dunder 方法实现的。