Python AsyncIO 异步编程完全指南 (Python代写)

AsyncIO 是一种并发编程设计,已在Python中获得了专门的支持,从Python 3.4到3.7以及可能以后的版本迅速发展。本教程旨在帮助您回答该问题,从而使您对Python的AsyncIO方法有更深入的了解。

联系我们
微信: biyeprodaixie 欢迎联系咨询

本次CS代写的主要涉及如下领域: Python代写

Python AsyncIO 异步编程完全指南

AsyncIO 是一种并发编程设计,已在Python中获得了专门的支持,从Python 3.4到3.7以及可能以后的版本迅速发展。

您可能会担心“并发,并行性,线程化,多处理。 已经掌握了很多东西。AsyncIO放在哪里?”

本教程旨在帮助您回答该问题,从而使您对Python的AsyncIO方法有更深入的了解。

这是要介绍的内容

  • 异步IO(AsyncIO): 一种与语言无关的范例(模型),具有跨多种编程语言的实现

  • 异步/等待: 两个新的Python关键字,用于定义协程

  • asyncio: Python软件包,为运行和管理协程提供了基础和API

协程(专门的生成器函数)是Python中异步IO的核心,稍后我们将深入探讨它们。

设置环境

您需要Python 3.7或更高版本才能完整阅读本文,以及aiohttp和aiofiles软件包:

$ python3.7 -m venv ./py37async
$ source ./py37async/bin/activate  # Windows: .\py37async\Scripts\activate.bat
$ pip install --upgrade pip aiohttp aiofiles  # Optional: aiodns

异步IO概览

比起多处理和线程处理,异步IO的相关资料要少很多。 本节将为您提供什么是异步IO以及它如何适应其周围环境的完整图谱。

异步IO用在哪里

并发和并行性是不容易涉足的扩展主题。尽管本文着重介绍异步IO及其在Python中的实现,但值得花一点时间将异步IO与同类产品进行比较,以了解异步IO如何适应更大,有时令人头晕的难题。

并行性包括同时执行多个操作。多进程是一种实现并行性的方法,它需要将任务分散到计算机的中央处理单元(CPU或内核)上。多重处理非常适合CPU限制的任务:紧密结合循环和数学计算通常属于此类。

并发是比并行性稍微宽泛的术语。这表明多个任务具有以重叠方式运行的能力。 (有一种说法是并发并不意味着并行。)

线程是并发执行模型,多个线程轮流执行任务。一个进程可以包含多个线程。由于具有GIL,Python与线程之间的关系非常复杂,但这超出了本文的范围。

重要的是要知道线程化对于IO绑定的任务来说更好。尽管CPU密集型任务的特征是计算机内核从头到尾不断地努力工作,但IO受限型工作主要由大量等待输入/输出来完成。

综上所述,并发包括多处理(理想的是CPU绑定任务)和线程处理(适用于IO绑定任务)。多处理是并行性的一种形式,并行性是并发的特定类型(子集)。 Python标准库通过其多处理,线程和parallel.futures包为这两者提供了长期支持。

现在是时候让一个新成员加入了。在过去的几年中,CPython中已经更加全面地构建了一个独立的设计:异步IO,它通过标准库的asyncio包和新的async和await语言关键字启用。需要明确的是,异步IO并不是一个新发明的概念,它已经存在或正在内置到其他语言和运行时环境中,例如Go,C#或Scala。

Python文档将asyncio软件包记为一个用于编写并发代码的库。但是,异步IO不是线程化,也不是多处理。它不是建立在这两个之上的。

实际上,异步IO是一种单线程,单进程设计:它使用协作多任务处理,这个术语您将在本教程结束时充实。换句话说,尽管在单个进程中使用单个线程,但异步IO却带来了并发的感觉。协程(异步IO的主要功能)可以并发进行调度,但是它们并不是天生并发的。

重申一下,异步IO是并发编程的一种形式,但它不是并行性。与多线程处理相比,它与线程处理的关系更加紧密,但两者却截然不同,并且是并发技巧包中的独立成员。

异步是什么意思? 这不是一个严格的定义,但是出于我们此处的目的,我可以想到两个属性:

  • 异步例程可以在等待其最终结果的同时“暂停”,并让其他协程同时运行。
  • 通过上述机制,异步代码有助于并发执行。 换句话说,异步代码给出了并发的外观。

这是将所有内容放在一起的图表。 白色术语代表概念,绿色术语代表其实现或实现的方式:

我将停止并发编程模型之间的比较。 本教程的重点是异步IO的子组件,如何使用它以及围绕它兴起的API。

异步IO解释

异步IO乍看起来似乎违反直觉和自相矛盾。 促进并发代码的事物如何使用单个线程和单个CPU内核? 我从来都不擅长制作示例,所以我想解释一下Miguel Grinberg在2017年的PyCon演讲中的一个,它很好地解释了所有内容:

国际象棋大师朱迪特·波尔加尔(JuditPolgár)举办国际象棋展览,在其中她扮演多个业余玩家。 她有两种举办展览的方式:同步和异步。

假设

  • 24个对手
  • Judit在5秒内使每盘棋移动
  • 对手各花费55秒采取行动
  • 游戏平均30对动作(总共60个动作)

同步版本:Judit一次只能玩一场游戏,决不能一次玩两局,直到游戏完成为止。 每个游戏需要(55 + 5)* 30 == 1800秒或30分钟。 整个展览需要24 * 30 == 720分钟或12个小时。

异步版本:Judit在一个表之间移动,在每个表上移动一个。 她离开桌子,让对手在等待时间内采取下一步行动。 在所有24场比赛中,一动需要Judit 24 * 5 == 120秒或2分钟。 整个展览现在缩短为120 * 30 == 3600秒,或仅1小时。

JuditPolgár只有一只,只有两只手,一次只能动一动。 但是异步播放可以将展览时间从12小时减少到一小时。 因此,协作式多任务处理是一种奇特的方式,可以说程序的事件循环(稍后会详细介绍)与多个任务进行通信,以使每个任务在最佳时间轮流运行。

异步IO需要较长的等待时间,否则功能将被阻塞,并允许其他功能在停机期间运行。 (有效阻止的功能从开始到返回为止一直禁止其他人运行。)

异步IO并不简单

我听说:“尽可能使用异步IO; 必要时使用线程。” 事实是,构建持久的多线程代码可能很困难且容易出错。 异步IO避免了线程设计可能会遇到的某些潜在的速度颠簸。

但这并不是说Python中的异步IO很简单。 请注意:当您冒险进入水平面以下时,异步编程也可能会很困难! Python的异步模型是基于诸如回调,事件,传输,协议和期货之类的概念构建的,只是术语可能令人生畏。 其API不断变化的事实使其变得不那么容易。

幸运的是,asyncio已经发展到其大部分功能不再是临时的程度,而其文档已得到了巨大的改进,与此相关的一些优质资源也开始出现。

asyncio包和async / await

现在,您已经对异步IO作为设计有了一定的了解,下面让我们探讨Python的实现。 Python的asyncio程序包(在Python 3.4中引入)及其两个关键字async和await具有不同的用途,但可以一起帮助您声明,构建,执行和管理异步代码。

async/await 语法和原生协同程序

异步IO的核心是协程。 协程是Python生成器函数的专用版本。 让我们从基线定义开始,然后在此处进行构建:协程是一个函数,可以在到达返回值之前暂停其执行,并且可以将控制权间接传递给另一个协程一段时间。

稍后,您将更深入地研究如何将传统生成器准确地用于协程。 目前,了解协程工作方式的最简单方法是开始制作协程。

让我们采用沉浸式方法并编写一些异步IO代码。 这个简短的程序是异步IO的Hello World,但是在说明其核心功能方面还有很长的路要走:

#!/usr/bin/env python3
# countasync.py

import asyncio

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    import time
    s = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

执行此文件时,请注意与仅使用def和time.sleep()定义函数的外观有所不同:

$ python3 countasync.py
One
One
One
Two
Two
Two
countasync.py executed in 1.01 seconds.

此输出的顺序是异步IO的核心。 与count()的每个调用进行交谈都是一个事件循环或协调器。 当每个任务到达等待asyncio.sleep(1)时,该函数都会大喊事件循环并对其进行控制,并说:“我要睡一秒钟。 继续,让其他有意义的事情同时进行。”

将此与同步版本进行对比:

#!/usr/bin/env python3
# countsync.py

import time

def count():
    print("One")
    time.sleep(1)
    print("Two")

def main():
    for _ in range(3):
        count()

if __name__ == "__main__":
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

当执行时,顺序和执行时间会有微小但关键的变化:

$ python3 countsync.py
One
Two
One
Two
One
Two
countsync.py executed in 3.01 seconds.

虽然使用time.sleep()和asyncio.sleep()似乎很平庸,但它们用作涉及等待时间的所有耗时的过程的替身。 (您可以等待的最普通的事情是基本上不执行任何操作的sleep()调用。)也就是说,time.sleep()可以表示任何耗时的阻塞函数调用,而asyncio.sleep()用于站立 进行非阻塞呼叫(但也需要一些时间才能完成)。

正如您将在下一节中看到的那样,等待某些内容(包括asyncio.sleep())的好处是,周围的函数可以暂时将控制权让给另一个更容易立即执行某项功能的函数。 相反,time.sleep()或任何其他阻塞调用与异步Python代码不兼容,因为它将在睡眠时间内停止轨道中的所有内容。

异步IO规则

此时,对async,await和它们创建的协程函数的更正式定义是有序的。 这部分内容比较繁琐,但是掌握异步/等待功能是有帮助的,因此,如果需要,请返回至此:

  • 语法async def引入了本机协程或异步生成器。 与和异步的表达式也有效,稍后您将看到它们。

  • 关键字await将功能控制传递回事件循环。 (它暂停了周围协程的执行。)如果Python在g()范围内遇到await f()表达式,这就是await告诉事件循环的方式,“暂停g()的执行,直到我等待的是 返回f()的结果。 同时,让其他东西运行。”

在代码中,第二个要点大致如下所示:

async def g():
    # Pause here and come back to g() when f() is ready
    r = await f()
    return r

关于何时以及如何以及不可以使用异步/等待,还有一套严格的规则。无论您仍是语法还是已经使用async / await,这些都可以很方便:

  • 您使用async def引入的功能是协程。它可以使用await,return或yield,但是所有这些都是可选的。声明异步def noop():pass有效:

    • 使用等待和/或返回将创建协程函数。要调用协程函数,必须等待它以获得结果。

    • 在异步def块中使用yield的情况不太普遍(并且只有最近才在Python中合法)。这将创建一个异步生成器,您可以使用异步生成器对其进行迭代。暂时不要使用异步生成器,而将重点放在获取协程函数的语法上,协程函数使用等待和/或返回。

    • 用async def定义的任何内容都可能不使用yield from,这将引发SyntaxError。

  • 就像在def函数之外使用yield的SyntaxError一样,在异步def协程之外使用await也是SyntaxError。您只能在协程体内使用await。

以下是一些简短的示例,旨在总结上述几条规则:

async def f(x):
    y = await z(x)  # OK - `await` and `return` allowed in coroutines
    return y

async def g(x):
    yield x  # OK - this is an async generator

async def m(x):
    yield from gen(x)  # No - SyntaxError

def m(x):
    y = await z(x)  # Still no - SyntaxError (no `async def` here)
    return y

最后,当您使用await f()时,要求f()是可等待的对象。 好吧,那不是很有帮助,是吗? 现在,只知道一个可等待的对象是(1)另一个协程或(2)定义返回迭代器的.__ await __()dunder方法的对象。 如果您正在编写程序,则出于大多数目的,您只需要担心案例1。

这给我们带来了另一个可能会弹出的技术区别:将函数标记为协程的一种较旧的方法是用@ asyncio.coroutine装饰一个普通的def函数。 结果是基于生成器的协程。 自从在Python 3.5中使用async / await语法以来,这种构造已经过时了。

这两个协程在本质上是等效的(都可以等待),但是第一个协程基于生成器,而第二个是本地协程:

import asyncio

@asyncio.coroutine
def py34_coro():
    """Generator-based coroutine, older syntax"""
    yield from stuff()

async def py35_coro():
    """Native coroutine, modern syntax"""
    await stuff()

如果您自己编写任何代码,则最好使用本机协程,以使其显式而不是隐式。基于生成器的协程将在Python 3.10中删除。

在本教程的后半部分,我们将仅出于说明的目的触及基于生成器的协程。引入异步/等待的原因是为了使协程成为Python的独立功能,可以很容易地将其与普通的生成器函数区分开,从而减少了歧义。

不要陷入基于发电机的协程中,这些协程已经被异步/等待故意地过时了。它们有自己的一小套规则(例如,不能在基于生成器的协程中使用await),如果您坚持使用async / await语法,则这些规则在很大程度上是不相关的。

事不宜迟,让我们举一些更多的例子。

这是异步IO如何减少等待时间的一个示例:给定一个协程makerandom(),该协程不断产生范围为[0,10]的随机整数,直到其中一个超过阈值为止,您要让该协程多次调用不需要等待彼此相继完成。您可以在很大程度上遵循上述两个脚本的模式,并稍作更改:

#!/usr/bin/env python3
# rand.py

import asyncio
import random

# ANSI colors
c = (
    "\033[0m",   # End of color
    "\033[36m",  # Cyan
    "\033[91m",  # Red
    "\033[35m",  # Magenta
)

async def makerandom(idx: int, threshold: int = 6) -> int:
    print(c[idx + 1] + f"Initiated makerandom({idx}).")
    i = random.randint(0, 10)
    while i <= threshold:
        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
        await asyncio.sleep(idx + 1)
        i = random.randint(0, 10)
    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
    return i

async def main():
    res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
    return res

if __name__ == "__main__":
    random.seed(444)
    r1, r2, r3 = asyncio.run(main())
    print()
    print(f"r1: {r1}, r2: {r2}, r3: {r3}")

如图所示:

该程序使用一个主要协程makemakerandom(),并在3个不同的输入上同时运行它。大多数程序将包含小型模块化协程和一个包装器功能,用于将每个较小的协程链接在一起。 main()然后用于通过在一些可迭代或池中映射中央协程来收集任务(未来)。

在此微型示例中,池为range(3)。在稍后提供的完整示例中,它是需要同时请求,解析和处理的一组URL,并且main()为每个URL封装整个例程。

尽管“制作随机整数”(比CPU绑定更多的东西)可能不是asyncio的最佳选择,但在示例中正是asyncio.sleep()的存在旨在模仿IO绑定的过程等待时间不确定的地方。例如,asyncio.sleep()调用可能表示消息应用程序中两个客户端之间发送和接收非随机整数。

异步IO设计模式

异步IO带有自己的一组可能的脚本设计,本节将介绍它们。

链协程

协程的一个关键特征是它们可以链接在一起。 (请记住,一个协程对象是可以等待的,因此另一个协程可以等待它。)这使您可以将程序分解为较小的,可管理的,可回收的协程:

# chained.py

import asyncio
import random
import time

async def part1(n: int) -> str:
    i = random.randint(0, 10)
    print(f"part1({n}) sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-1"
    print(f"Returning part1({n}) == {result}.")
    return result

async def part2(n: int, arg: str) -> str:
    i = random.randint(0, 10)
    print(f"part2{n, arg} sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-2 derived from {arg}"
    print(f"Returning part2{n, arg} == {result}.")
    return result

async def chain(n: int) -> None:
    start = time.perf_counter()
    p1 = await part1(n)
    p2 = await part2(n, p1)
    end = time.perf_counter() - start
    print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")

async def main(*args):
    await asyncio.gather(*(chain(n) for n in args))

if __name__ == "__main__":
    import sys
    random.seed(444)
    args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
    start = time.perf_counter()
    asyncio.run(main(*args))
    end = time.perf_counter() - start
    print(f"Program finished in {end:0.2f} seconds.")

注意输出,part1()睡眠一段可变的时间,part2()在结果可用时开始处理它们:

$ python3 chained.py 9 6 3
part1(9) sleeping for 4 seconds.
part1(6) sleeping for 4 seconds.
part1(3) sleeping for 0 seconds.
Returning part1(3) == result3-1.
part2(3, 'result3-1') sleeping for 4 seconds.
Returning part1(9) == result9-1.
part2(9, 'result9-1') sleeping for 7 seconds.
Returning part1(6) == result6-1.
part2(6, 'result6-1') sleeping for 4 seconds.
Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
-->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
Returning part2(6, 'result6-1') == result6-2 derived from result6-1.
-->Chained result6 => result6-2 derived from result6-1 (took 8.01 seconds).
Returning part2(9, 'result9-1') == result9-2 derived from result9-1.
-->Chained result9 => result9-2 derived from result9-1 (took 11.01 seconds).
Program finished in 11.01 seconds.

在此设置中,main()的运行时间将等于它收集在一起并计划的任务的最大运行时间。

使用队列

asyncio软件包提供的队列类旨在与队列模块的类相似。到目前为止,在我们的示例中,我们实际上并不需要队列结构。在chained.py中,每个任务(未来)都由一组协程组成,这些协程明确地相互等待,并通过每条链上的单个输入。

还有一种可以与异步IO一起使用的替代结构:彼此不相关的许多生产者将项目添加到队列中。每个生产者可以在交错,随机,未通知的时间将多个项目添加到队列中。一群消费者在贪婪地出现时将它们从队列中拉出,而不必等待任何其他信号。

在这种设计中,没有任何个人消费者链接到生产者。消费者不知道生产者的数量,甚至不知道将要添加到队列中的项目的累计数量。

每个生产者或消费者花费可变的时间分别从队列中放入和提取项目。队列用作可以与生产者和消费者进行通信的吞吐量,而无需他们彼此直接交谈。

注意:尽管由于queue.Queue()的线程安全性,所以队列经常在线程程序中使用,但异步IO时您不必担心线程安全。 (当您将两者结合在一起时是个例外,但是本教程中没有这样做。)

队列的一个用例(如此处的情况)是队列充当生产者和消费者的发送者,而这些生产者和消费者在其他情况下并没有直接链接或关联。

该程序的同步版本看起来非常令人沮丧:一组阻塞的生产者将项目串行添加到队列中,一次添加一个生产者。只有在所有生产者都完成之后,才能由一个消费者逐项处理队列。此设计存在大量延迟。物品可能闲置地排在队列中,而不是立即拿起并处理。

下面是一个异步版本asyncq.py。此工作流程中具有挑战性的部分是,需要向消费者发出生产已完成的信号。否则,await q.get()将无限期地挂起,因为队列已被完全处理,但是消费者不会知道生产已经完成。

(非常感谢StackOverflow用户提供的一些帮助,帮助他们整理了main():关键是等待q.join(),该操作将阻塞直到队列中的所有项目都已被接收并处理,然后取消使用方)任务,否则将挂断并无休止地等待其他队列项目出现。)

这是完整的脚本:

#!/usr/bin/env python3
# asyncq.py

import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()

async def randsleep(a: int = 1, b: int = 5, caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()

async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()

if __name__ == "__main__":
    import argparse
    random.seed(444)
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--nprod", type=int, default=5)
    parser.add_argument("-c", "--ncon", type=int, default=10)
    ns = parser.parse_args()
    start = time.perf_counter()
    asyncio.run(main(**ns.__dict__))
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

前几个协程是辅助函数,它们返回一个随机字符串,一个小数秒性能计数器和一个随机整数。 生产者将1到5个项目放入队列中。 每个项目都是(i,t)的元组,其中i是随机字符串,t是生产者尝试将元组放入队列的时间。

消费者将商品拉出时,它仅使用放入商品的时间戳来计算该商品在队列中的经过时间。

请记住,asyncio.sleep()用于模仿其他一些更复杂的协程,如果这是常规的阻止函数,则会消耗时间并阻止所有其他执行。

这是由两个生产者和五个消费者进行的测试:

$ python3 asyncq.py -p 2 -c 5
Producer 0 sleeping for 3 seconds.
Producer 1 sleeping for 3 seconds.
Consumer 0 sleeping for 4 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 sleeping for 3 seconds.
Consumer 3 sleeping for 5 seconds.
Consumer 4 sleeping for 4 seconds.
Producer 0 added <377b1e8f82> to queue.
Producer 0 sleeping for 5 seconds.
Producer 1 added <413b8802f8> to queue.
Consumer 1 got element <377b1e8f82> in 0.00013 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 got element <413b8802f8> in 0.00009 seconds.
Consumer 2 sleeping for 4 seconds.
Producer 0 added <06c055b3ab> to queue.
Producer 0 sleeping for 1 seconds.
Consumer 0 got element <06c055b3ab> in 0.00021 seconds.
Consumer 0 sleeping for 4 seconds.
Producer 0 added <17a8613276> to queue.
Consumer 4 got element <17a8613276> in 0.00022 seconds.
Consumer 4 sleeping for 5 seconds.
Program completed in 9.00954 seconds.

在这种情况下,项目将在几分之一秒内完成处理。延迟可能有两个原因:

  • 标准,在很大程度上是不可避免的开销

当项目出现在队列中时所有消费者都在睡觉的情况

  • 关于第二个原因,幸运的是,扩展到成百上千的消费者是完全正常的。 python3 asyncq.py -p 5 -c 100应该没有问题。这里的要点是,从理论上讲,您可以在不同的系统上使用不同的用户来控制生产者和消费者的管理,而队列则作为中心吞吐量。

到目前为止,您已经陷入困境,并看到了三个相关的示例,这些示例显示了异步调用用async和await定义的协程。如果您不完全遵循或只是想更深入地了解现代协程在Python中的使用机理,那么您将从第一节开始下一节。

异步IO的根源

之前,您看到了一个基于生成器的老式协程的示例,这些协程已被更明确的本地协程过时了。该示例值得稍作调整以重新显示:

import asyncio

@asyncio.coroutine
def py34_coro():
    """Generator-based coroutine"""
    # No need to build these yourself, but be aware of what they are
    s = yield from stuff()
    return s

async def py35_coro():
    """Native coroutine, modern syntax"""
    s = await stuff()
    return s

async def stuff():
    return 0x10, 0x20, 0x30

作为实验,如果不经等待就调用py34_coro()或py35_coro(),而没有等待,也没有对asyncio.run()或其他asyncio“瓷器”函数的任何调用,会发生什么? 孤立地调用协程将返回协程对象:

>>> py35_coro()
<coroutine object py35_coro at 0x10126dcc8>

表面上这不是很有趣。 单独调用协程的结果是一个等待的协程对象。

测验时间:Python还有什么其他功能? (当单独调用Python时,Python的什么功能实际上并没有“做什么”?)

希望您将生成器视为此问题的答案,因为协程是引擎盖下的增强型生成器。 在这方面,行为是相似的:

>>> def gen():
...     yield 0x10, 0x20, 0x30
...
>>> g = gen()
>>> g  # Nothing much happens - need to iterate with `.__next__()`
<generator object gen at 0x1012705e8>
>>> next(g)
(16, 32, 48)

碰巧的是,生成器函数是异步IO的基础(无论您是否使用异步def声明协程,而不是使用旧的@ asyncio.coroutine包装器声明协程)。 从技术上讲,等待比从收益更类似于收益。 (但请记住,x()的收益只是句法糖,可以代替x()中的i:收益i。)

与异步IO有关的生成器的一项关键功能是可以有效地随意停止和重新启动它们。 例如,您可以中断对生成器对象的迭代,然后在以后的其余值上恢复迭代。 当生成器函数达到yield时,它会生成该值,但随后会处于空闲状态,直到被告知要生成其后续值。

可以通过一个示例充实一下:

>>> from itertools import cycle
>>> def endless():
...     """Yields 9, 8, 7, 6, 9, 8, 7, 6, ... forever"""
...     yield from cycle((9, 8, 7, 6))

>>> e = endless()
>>> total = 0
>>> for i in e:
...     if total < 30:
...         print(i, end=" ")
...         total += i
...     else:
...         print()
...         # Pause execution. We can resume later.
...         break
9 8 7 6 9 8 7 6 9 8 7 6 9 8

>>> # Resume
>>> next(e), next(e), next(e)
(6, 9, 8)

关键字await的行为类似,它标记了协程自身暂停并让其他协程工作的断点。在这种情况下,“暂停”是指已暂时放弃控制权但尚未完全退出或结束的协程。请记住,收益率,从广义上讲是等待收益率,是生成器执行过程中的一个断点。

这是函数和生成器之间的根本区别。一个功能是全有还是全无。一旦启动,它不会停止,直到它返回收益,然后将该值推送给调用方(调用它的函数)。另一方面,发电机每次达到产量时都会暂停,并且不再前进。它不仅可以将该值推送到调用堆栈,而且还可以在通过调用next()恢复它时保留其局部变量。

发电机的第二个鲜为人知的功能也很重要。您也可以通过其.send()方法将值发送到生成器中。这允许生成器(和协程)相互调用(等待)而不会阻塞。我不会再赘述此功能了,因为它主要对幕后协程的实现很重要,但是您根本不需要自己直接使用它。

如果您有兴趣探索更多内容,可以从正式引入协程的PEP 342开始。布雷特·坎农(Brett Cannon)的《如何在Python中进行异步等待》也是一本不错的书,关于异步的PYMOTW文章也很不错。最后,还有大卫·比兹利(David Beazley)的有关协程和并发的好奇课程,它深入探讨了协程的运行机制。

让我们尝试将以上所有文章压缩成几句话:这些协程实际上是通过一种特殊的非常规机制运行的。它们的结果是异常对象的属性,该异常对象在调用其.send()方法时被抛出。所有这些还有更多的细节,但这可能无法帮助您在实践中使用这部分语言,因此,让我们继续。

为了将事情联系在一起,以下是协程作为生成器的一些关键点:

  • 协程是经过重新利用的生成器,可以利用生成器方法的独特性。

  • 基于老式生成器的协程使用yield from等待协程结果。本机协程中的现代Python语法仅将yield from与await替换为等待协程结果的方法。等待类似于从中获得收益,通常有助于将其视为收益。

  • 使用await是标记断点的信号。它允许协程暂时中止执行,并允许程序稍后返回。

其他功能:异步和异步生成器+理解

与普通的async / await一起,Python还使async用于在异步迭代器上进行迭代。异步迭代器的目的是使它能够在迭代时在每个阶段调用异步代码。

这个概念的自然扩展是异步生成器。回想一下,您可以在本地协程中使用wait,return或yield。在Python 3.6中(通过PEP 525)可以在协程中使用yield,它引入了异步生成器,目的是允许在同一个协程函数体中使用wait和yield:

>>> async def mygen(u: int = 10):
...     """Yield powers of 2."""
...     i = 0
...     while i < u:
...         yield 2 ** i
...         i += 1
...         await asyncio.sleep(0.1)

最后但并非最不重要的一点是,Python使用async for启用了异步理解。 像它的堂兄一样,这主要是句法糖

>>> async def main():
...     # This does *not* introduce concurrent execution
...     # It is meant to show syntax only
...     g = [i async for i in mygen()]
...     f = [j async for j in mygen() if not (j // 3 % 5)]
...     return g, f
...
>>> g, f = asyncio.run(main())
>>> g
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
>>> f
[1, 2, 16, 32, 256, 512]

这是一个关键的区别:异步生成器和理解都不会使迭代并发。他们所做的只是提供同步对象的外观,但具有使所讨论的循环放弃对事件循环的控制权以便其他协程运行的能力。

换句话说,异步迭代器和异步生成器并未设计为在序列或迭代器上同时映射某些功能。它们只是为了让封闭的协程允许其他任务轮流使用。仅在使用plain for或with会“破坏”协程中await的性质的情况下,才需要async for和async with语句。异步和并发之间的区别是要把握的关键。

事件循环和asyncio.run()

您可以将事件循环想像为while True循环,它监视协程,获取有关空闲状态的反馈并四处寻找可以同时执行的事情。当协程正在等待的东西可用时,它能够唤醒一个空闲的协程。

到目前为止,事件循环的整个管理已由一个函数调用隐式处理:

asyncio.run(main())  # Python 3.7+

Python 3.7中引入的asyncio.run()负责获取事件循环,运行任务直到将其标记为完成,然后关闭事件循环。

使用get_event_loop(),可以更广泛地管理asyncio事件循环。 典型的模式如下所示:

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在较早的示例中,您可能会看到loop.get_event_loop()随处可见,但是除非您特别需要微调对事件循环管理的控制,否则asyncio.run()对于大多数程序而言就足够了。

如果确实需要与Python程序中的事件循环进行交互,则循环是一种老式的Python对象,它支持使用loop.is_running()和loop.is_closed()进行内省。 如果您需要获得更精细的控制,则可以进行操作,例如在通过将循环作为参数传递来计划回调的过程中。

更关键的是要对事件循环的机制有一些了解。 以下是有关事件循环的一些要点。

#1:协同程序只有在与事件循环相关联的情况下,才能自行完成很多工作。

您之前在有关发电机的说明中已经看到了这一点,但值得重申。 如果您有一个等待他人的主要协程,则简单地单独调用它几乎没有效果:

>>> import asyncio

>>> async def main():
...     print("Hello ...")
...     await asyncio.sleep(1)
...     print("World!")

>>> routine = main()
>>> routine
<coroutine object main at 0x1027a6150>

请记住使用asyncio.run()通过调度main()协程(未来对象)在事件循环上执行来实际强制执行:

>>> asyncio.run(routine)
Hello ...
World!

(其他协程可以使用await执行。通常只将main()包装在asyncio.run()中,然后从那里调用带有await的链式协程。)

#2:默认情况下,异步IO事件循环在单个线程和单个CPU内核上运行。通常,在一个CPU内核中运行一个单线程事件循环绰绰有余。也可以跨多个内核运行事件循环。请查看约翰·里斯(John Reese)的演讲,以获取更多信息,并被警告您的笔记本电脑可能会自燃。

#3。事件循环是可插入的。也就是说,如果您确实需要,可以编写自己的事件循环实现,并使它运行相同的任务。这在uvloop软件包中得到了很好的演示,该软件包是Cython中事件循环的实现。

这就是术语“可插入事件循环”的含义:您可以使用事件循环的任何可行实现,而与协程本身的结构无关。 asyncio程序包本身带有两种不同的事件循环实现,默认实现基于选择器模块。 (第二种实现仅适用于Windows。)

完整程序:异步请求

到目前为止,您已经做到了,现在该是有趣而轻松的部分了。在本部分中,您将使用aiohttp(一个非常快的异步HTTP客户端/服务器框架)构建一个抓取网址的网址收集器areq.py。 (我们只需要客户端部分。)这样的工具可用于映射站点集群之间的连接,链接形成有向图。

注意:您可能想知道为什么Python的请求包与异步IO不兼容。请求建立在urllib3的顶部,而urllib3则使用Python的http和套接字模块。

默认情况下,套接字操作处于阻塞状态。这意味着Python不会喜欢await request.get(url),因为.get()无法等待。相反,aiohttp中的几乎所有内容都是可等待的协程,例如session.request()和response.text()。否则,它是一个很棒的软件包,但是您通过使用异步代码中的请求来对自己造成损害。

高级程序结构如下所示:

  1. 从本地文件urls.txt中读取URL序列。

  2. 发送对URL的GET请求并解码结果内容。如果失败,请在此处停止输入URL。

  3. 在响应的HTML中的href标记内搜索URL。

  4. 将结果写到foundurls.txt。

  5. 尽可能异步和同时执行上述所有操作。 (将aiohttp用于请求,将aiofiles用于文件附件。这是两个非常适合异步IO模型的IO主要示例。)

这是urls.txt的内容。它并不庞大,并且包含流量最高的网站:

$ cat urls.txt
https://regex101.com/
https://docs.python.org/3/this-url-will-404.html
https://www.nytimes.com/guides/
https://www.mediamatters.org/
https://1.1.1.1/
https://www.politico.com/tipsheets/morning-money
https://www.bloomberg.com/markets/economics
https://www.ietf.org/rfc/rfc2616.txt

列表中的第二个URL应该返回404响应,您需要对其进行适当处理。 如果您正在运行此程序的扩展版本,则可能需要处理比这更棘手的问题,例如服务器断开连接和无止尽的重定向。

请求本身应该使用单个会话发出,以充分利用会话的内部连接池。

让我们看一下完整的程序。 我们将逐步介绍以下内容:

#!/usr/bin/env python3
# areq.py

"""Asynchronously get links embedded in multiple pages' HMTL."""

import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse

import aiofiles
import aiohttp
from aiohttp import ClientSession

logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')

async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
    """GET request wrapper to fetch page HTML.

    kwargs are passed to `session.request()`.
    """

    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status()
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    html = await resp.text()
    return html

async def parse(url: str, session: ClientSession, **kwargs) -> set:
    """Find HREFs in the HTML of `url`."""
    found = set()
    try:
        html = await fetch_html(url=url, session=session, **kwargs)
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logger.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
        return found
    except Exception as e:
        logger.exception(
            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
        )
        return found
    else:
        for link in HREF_RE.findall(html):
            try:
                abslink = urllib.parse.urljoin(url, link)
            except (urllib.error.URLError, ValueError):
                logger.exception("Error parsing URL: %s", link)
                pass
            else:
                found.add(abslink)
        logger.info("Found %d links for %s", len(found), url)
        return found

async def write_one(file: IO, url: str, **kwargs) -> None:
    """Write the found HREFs from `url` to `file`."""
    res = await parse(url=url, **kwargs)
    if not res:
        return None
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info("Wrote results for source URL: %s", url)

async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
    """Crawl & write concurrently to `file` for multiple `urls`."""
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                write_one(file=file, url=url, session=session, **kwargs)
            )
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    outpath = here.joinpath("foundurls.txt")
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")

    asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

该脚本比我们最初的玩具程序要长,所以让我们对其进行分解。

常量HREF_RE是一个正则表达式,用于提取我们最终在HTML中搜索的href标签:

>>> HREF_RE.search('Go to <a href="https://realpython.com/">Real Python</a>')
<re.Match object; span=(15, 45), match='href="https://realpython.com/"'>

协程fetch_html()是GET请求的包装,用于发出请求并解码生成的页面HTML。 它发出请求,等待响应,并在非200状态下立即引发:

resp = await session.request(method="GET", url=url, **kwargs)
resp.raise_for_status()

如果状态正常,则fetch_html()返回页面HTML(str)。 值得注意的是,此功能没有完成任何异常处理。 逻辑是将该异常传播给调用者,并在那里进行处理:

html = await resp.text()

我们正在等待session.request()和resp.text(),因为它们是等待的协程。否则,请求/响应周期将是应用程序的长尾,耗时的部分,但是对于异步IO,fetch_html()可使事件循环在其他易于使用的作业(例如,解析和写入已获取的URL)上工作。

协同程序链中的下一个是parse(),它在fetch_html()中等待给定的URL,然后从该页面的HTML中提取所有href标记,确保每个标记均有效并将其格式化为绝对路径。

诚然,parse()的第二部分是阻塞的,但是它由快速的正则表达式匹配组成,并确保将发现的链接设置为绝对路径。

在这种特定情况下,此同步代码应该快速而不起眼。但是,请记住,给定协程中的任何行都会阻塞其他协程,除非该行使用yield,await或return。如果解析是一个比较繁琐的过程,则可能需要考虑使用loop.run_in_executor()在其自己的过程中运行此部分。

接下来,协程write()接收一个文件对象和一个URL,然后等待parse()返回一组已解析的URL,通过使用aiofiles(用于打包的包)将每个URL及其源URL异步写入文件中。异步文件IO。

最后,bulk_crawl_and_write()是脚本协程链的主要入口。它使用单个会话,并为最终从urls.txt中读取的每个URL创建一个任务。

以下是一些值得一提的其他要点:

  • 默认的ClientSession具有最多100个打开连接的适配器。要更改此设置,请将asyncio.connector.TCPConnector的实例传递给ClientSession。您还可以基于每个主机指定限制。

  • 您可以为整个会话和单个请求指定最大超时。

  • 该脚本还使用async with,它与异步上下文管理器一起使用。我没有专门讨论这个概念,因为从同步上下文管理器到异步上下文管理器的过渡非常简单。后者必须定义.__ aenter ()和. aexit ()而不是. exit ()和. enter __()。如您所料,async with只能在用async def声明的协程函数中使用。

如果您想了解更多内容,请在GitHub上的本教程随附文件中还附带注释和文档字符串。

这是所有执行情况的结果,因为areq.py在一秒钟之内即可获取,解析并保存9个网址的结果:

$ python3 areq.py
21:33:22 DEBUG:asyncio: Using selector: KqueueSelector
21:33:22 INFO:areq: Got response [200] for URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 115 links for https://www.mediamatters.org/
21:33:22 INFO:areq: Got response [200] for URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Got response [200] for URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.ietf.org/rfc/rfc2616.txt
21:33:22 ERROR:areq: aiohttp exception for https://docs.python.org/3/this-url-will-404.html [404]: Not Found
21:33:22 INFO:areq: Found 120 links for https://www.nytimes.com/guides/
21:33:22 INFO:areq: Found 143 links for https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Wrote results for source URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 0 links for https://www.ietf.org/rfc/rfc2616.txt
21:33:22 INFO:areq: Got response [200] for URL: https://1.1.1.1/
21:33:22 INFO:areq: Wrote results for source URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Wrote results for source URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Found 3 links for https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Wrote results for source URL: https://www.bloomberg.com/markets/economics
21:33:23 INFO:areq: Found 36 links for https://1.1.1.1/
21:33:23 INFO:areq: Got response [200] for URL: https://regex101.com/
21:33:23 INFO:areq: Found 23 links for https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://1.1.1.1/

不太破旧! 作为健全性检查,您可以检查输出中的行数。 以我为例,是626,但请注意,这可能会有所波动:

$ wc -l foundurls.txt
     626 foundurls.txt

$ head -n 3 foundurls.txt
source_url  parsed_url
https://www.bloomberg.com/markets/economics https://www.bloomberg.com/feedback
https://www.bloomberg.com/markets/economics https://www.bloomberg.com/notices/tos

后续步骤:如果您想提高底注,请使此网络爬虫递归。 您可以使用aio-redis跟踪树中已爬网的URL以避免两次请求,并将链接与Python的networkx库连接。

记住要友善。 发送1000个并发请求到一个小的,毫无戒心的网站是不好的,不好的,不好的。 有一些方法可以限制您在一批中发出的并发请求,例如使用asyncio的sempahore对象或使用类似这种模式的方法。 如果您不注意此警告,则可能会收到大量TimeoutError异常,最终只会损害您自己的程序。

上下文中的异步IO

既然您已经看了很多健康的代码,那么让我们退一步,考虑一下什么时候异步IO是理想的选择,以及如何进行比较以得出结论或选择其他并发模型。

什么时候以及为什么异步IO是正确的选择?

本教程不适用于异步IO,线程与多处理的扩展论述。但是,了解异步IO何时可能是三个中的最佳候选者很有用。

异步IO与多处理之间的斗争根本不是一场争斗。实际上,它们可以一起使用。如果您有多个相当统一的CPU约束任务(一个很好的例子是在诸如scikit-learn或keras之类的库中进行网格搜索),那么多处理应该是一个明显的选择。

如果所有函数都使用阻塞调用,则简单地在每个函数之前放置异步是一个坏主意。 (这实际上可能会使您的代码变慢。)但是,如前所述,异步IO和多处理在某些地方可以和谐共处。

异步IO与线程之间的竞争更为直接。我在导言中提到“线程很难实现”。全文是,即使在线程似乎易于实现的情况下,由于竞争条件和内存使用等原因,它仍可能导致臭名昭著的无法跟踪的错误。

由于线程是具有有限可用性的系统资源,因此线程的扩展也往往比异步IO的扩展规模小。在许多计算机上创建数千个线程将失败,因此我不建议您首先尝试。创建数千个异步IO任务是完全可行的。

当您有多个IO绑定任务时,异步IO会发光,否则这些任务将通过阻塞IO绑定等待时间来控制,例如:

  • 网络IO,无论您的程序是服务器端还是客户端

  • 无服务器设计,例如对等,多用户网络(如群组聊天室)

  • 您想要模仿“即弃即忘”风格的读/写操作,而不必担心在阅读和写入内容时锁定任何东西

不使用它的最大原因是,await仅支持定义一组特定方法的一组特定对象。如果您要对某个DBMS执行异步读取操作,则不仅需要查找该DBMS的Python包装器,还需要查找支持async / await语法的包装器。包含同步调用的协程会阻止其他协程和任务运行。

有关使用async / await的库的简短列表,请参阅本教程末尾的列表。

异步IO是,但是哪个?

本教程重点介绍异步IO,异步/等待语法以及如何将异步用于事件循环管理和指定任务。当然,asyncio并不是唯一的异步IO库。纳撒尼尔·史密斯(Nathaniel J.Smith)的观察表明:

在几年内,asyncio可能会沦落为成为精明的开发人员避免使用的stdlib库之一,例如urllib2。

实际上,我要说的是asyncio是其自身成功的受害者:设计时,它使用了可能的最佳方法;但是从那时起,异步的启发下的工作(例如添加异步/等待)就改变了格局,以便我们可以做得更好,现在异步已被其早期的承诺所束缚。 (资源)

为此,尽管使用不同的API和不同的方法,但一些可以做asyncio的大牌替代品是curio和trio。我个人认为,如果您要构建大小适中,简单明了的程序,仅使用asyncio就足够了并且可以理解,并且可以避免在Python标准库之外添加其他大型依赖项。

但是,无论如何,请查看curio和trio,您可能会发现它们以相同的方式完成了对您来说对用户而言更直观的事情。这里介绍的许多与软件包无关的概念也应渗透到其他异步IO软件包中。

其他

在接下来的几节中,您将介绍asyncio和async / await的其他各个部分,这些部分到目前为止还没有很好地适合本教程,但是对于构建和理解完整的程序仍然很重要。

其他顶级异步功能 除了asyncio.run()之外,您还看到了其他一些包级功能,例如asyncio.create_task()和asyncio.gather()。

您可以使用create_task()安排协程对象的执行,然后使用asyncio.run():

>>> import asyncio

>>> async def coro(seq) -> list:
...     """'IO' wait time is proportional to the max element."""
...     await asyncio.sleep(max(seq))
...     return list(reversed(seq))
...
>>> async def main():
...     # This is a bit redundant in the case of one task
...     # We could use `await coro([3, 2, 1])` on its own
...     t = asyncio.create_task(coro([3, 2, 1]))  # Python 3.7+
...     await t
...     print(f't: type {type(t)}')
...     print(f't done: {t.done()}')
...
>>> t = asyncio.run(main())
t: type <class '_asyncio.Task'>
t done: True

这种模式有一个精妙之处:如果您不等待main()中的内容,它可能会在main()本身表示已完成之前就结束了。因为asyncio.run(main())调用loop.run_until_complete(main()),所以事件循环仅关注main()完成(而不等待),而不涉及在main()中创建的任务是否完成。完成。如果不等待,循环的其他任务可能会在完成之前被取消。如果需要获取当前待处理任务的列表,则可以使用asyncio.Task.all_tasks()。

注意:asyncio.create_task()是Python 3.7中引入的。在Python 3.6或更低版本中,请使用asyncio.ensure_future()代替create_task()。

另外,还有asyncio.gather()。尽管它没有做任何特别的事情,但是collect()的目的是将协程(期货)的集合整齐地放入单个未来中。结果,它返回一个未来的对象,并且,如果您等待asyncio.gather()并指定多个任务或协程,则您正在等待所有这些任务或协程完成。 (这与我们前面的示例有点类似queue.join()。)collect()的结果将是输入结果的列表:

>>> import time
>>> async def main():
...     t = asyncio.create_task(coro([3, 2, 1]))
...     t2 = asyncio.create_task(coro([10, 5, 0]))  # Python 3.7+
...     print('Start:', time.strftime('%X'))
...     a = await asyncio.gather(t, t2)
...     print('End:', time.strftime('%X'))  # Should be 10 seconds
...     print(f'Both tasks done: {all((t.done(), t2.done()))}')
...     return a
...
>>> a = asyncio.run(main())
Start: 16:20:11
End: 16:20:21
Both tasks done: True
>>> a
[[1, 2, 3], [0, 5, 10]]

您可能已经注意到,collect()等待传递给它的Future或协程的整个结果集。 或者,您可以遍历asyncio.as_completed()以按完成顺序获取任务完成时的任务。 该函数返回一个迭代器,该迭代器在完成任务时产生任务。 下面,coro([3,2,1])的结果将在coro([10,5,0])完成之前可用,而collect()则不是这样:

>>> async def main():
...     t = asyncio.create_task(coro([3, 2, 1]))
...     t2 = asyncio.create_task(coro([10, 5, 0]))
...     print('Start:', time.strftime('%X'))
...     for res in asyncio.as_completed((t, t2)):
...         compl = await res
...         print(f'res: {compl} completed at {time.strftime("%X")}')
...     print('End:', time.strftime('%X'))
...     print(f'Both tasks done: {all((t.done(), t2.done()))}')
...
>>> a = asyncio.run(main())
Start: 09:49:07
res: [1, 2, 3] completed at 09:49:10
res: [0, 5, 10] completed at 09:49:17
End: 09:49:17
Both tasks done: True

最后,您可能还会看到asyncio.ensure_future()。 您几乎不需要它,因为它是一个较低级的管道API,并在很大程度上被稍后介绍的create_task()所取代。

等待的先例

尽管它们的行为有些相似,但是await关键字的优先级远高于yield。 这意味着,由于绑定更紧密,因此在许多情况下,您需要在yield from语句中使用括号,而在类似的await语句中则不需要。 有关更多信息,请参见PEP 492中的await表达式示例。

总结

现在,您已经可以使用async / await和由此建立的库。 以下是您所涵盖内容的回顾:

  • 异步IO作为与语言无关的模型,以及通过使协程彼此间接通信来实现并发的方法

  • Python新的async和await关键字的细节,用于标记和定义协程

  • asyncio,Python包,提供用于运行和管理协程的API