Skip to content

Latest commit

 

History

History
634 lines (448 loc) · 35.1 KB

File metadata and controls

634 lines (448 loc) · 35.1 KB

十、使用 Python 实现异步编程

本章将向您介绍 Python 中的asyncio模块。它将涵盖这个新并发模块背后的思想,该模块利用事件循环和协同路由,并提供一个与同步代码一样可读的 API。在本章中,除了通过concurrent.futures模块进行线程和多处理外,我们还将讨论异步编程的实现。在此过程中,我们将通过asyncio最常见的用途介绍异步编程的应用,包括异步输入/输出和避免阻塞任务。

本章将介绍以下主题:

  • 使用asyncio实现异步编程的基本要素
  • asyncio提供的异步编程框架
  • concurrent.futures模块及其用法,关于asyncio

技术要求

以下是本章的先决条件列表:

异步 IO 模块

正如您在上一章中所看到的,asyncio模块提供了一种将顺序程序转换为异步程序的简单方法。在本节中,我们将讨论异步程序的一般结构,以及如何在 Python 中实现从顺序程序到异步程序的转换。

协同程序、事件循环和未来

大多数异步程序都有一些常见的元素,协同路由、事件循环和未来就是其中的三个元素。它们的定义如下:

  • 事件循环是异步程序中任务的主要协调器。事件循环跟踪要异步运行的所有任务,并决定在给定时刻执行哪些任务。换句话说,事件循环处理异步编程的任务切换方面(或执行流)。
  • 协程是一种特殊类型的函数,它围绕特定的任务运行,因此它们可以异步执行。为了指定任务切换应在函数中的何处进行,需要一个协程;换句话说,它们指定函数何时应该将执行流返回给事件循环。协同路由的任务通常存储在任务队列中或在事件循环中创建。
  • 期货是从协程返回的结果的占位符。一旦在事件循环中启动协同路由,这些未来对象就会被创建,因此未来可以表示实际结果、挂起的结果(如果协同路由尚未完成执行),甚至是异常(如果协同路由将返回)。

事件循环、协程及其相应的未来是异步编程过程的核心元素。首先,启动事件循环并与其任务队列交互,以获得第一个任务。然后创建此任务的协同程序及其相应的未来。当任务切换必须在该协同路由内进行时,该协同路由挂起,并调用下一个协同路由;来自第一个协同程序的所有数据和上下文也将被保存。

现在,如果该协程被阻塞(例如,输入/输出处理或休眠),那么执行流将被释放回事件循环,事件循环将继续移动到任务队列中的下一项。事件循环将在切换回第一个协程之前启动任务队列中的最后一项,并从上次挂起的位置继续执行。

当每个任务完成执行时,它将从任务队列中退出队列,其协同路由将终止,相应的 future 将注册从协同路由返回的结果。此过程将继续,直到任务队列中的所有任务都完全执行为止。下图进一步说明了前面描述的异步进程的一般结构:

Asynchronous programming process

异步 API

考虑到异步程序的一般结构,让我们考虑 Apple T0 模块和 Python 为异步程序的实现提供的特定 API。这个 API 的第一个基础是添加到 Python 3.5 中的 Tyl T1 和 Ty2 T2 个关键字。这些关键字用于指定 Python 异步程序的主要元素。

具体来说,async通常在声明函数时放在def关键字前面。前面有async关键字的函数将被 Python 解释为协同程序。正如我们所讨论的,在每个协程中,必须有一个关于任务切换事件何时发生的规范。然后使用await关键字指定何时何地将执行流返回给事件循环;这通常是通过等待另一个协同程序生成结果(await coroutine或通过asyncio模块的辅助函数(如asyncio.sleep()asyncio.wait()函数)来完成的。

需要注意的是,asyncawait关键字实际上是由 Python 提供的,不由asyncio模块管理。这意味着异步编程实际上可以在没有asyncio的情况下实现,但正如您将看到的,asyncio提供了一个框架和基础设施来简化此过程,因此是 Python 中实现异步编程的主要工具。

具体来说,asyncio模块中最常用的 API 是事件循环管理功能。有了asyncio,您就可以开始通过直观、简单的函数调用来操纵任务和事件循环,而无需编写大量的样板代码。这些措施包括:

  • asyncio.get_event_loop():此方法返回当前上下文的事件循环,它是一个AbstractEventLoop对象。大多数时候,我们不需要担心这个类,因为asyncio模块已经提供了一个高级 API 来管理我们的事件循环。

  • AbstractEventLoop. create_task():此方法由事件循环调用。它将其输入添加到调用事件循环的当前任务队列中;输入通常是一个协程(即带有async关键字的函数)。

  • AbstractEventLoop. run_until_complete():此方法也将由事件循环调用。它接收异步程序的主协程并执行它,直到返回该协程的相应未来。当该方法启动事件循环执行时,它也会阻止它后面的所有后续代码,直到所有未来都完成。

  • AbstractEventLoop. run_forever():此方法与AbstractEventLoop.run_until_complete()有点类似,只是根据方法名称的建议,调用事件循环将永远运行,除非调用AbstractEventLoop.stop()方法。因此,即使获得返回的期货,循环也将继续运行,而不是退出。

  • AbstractEventLoop.stop():此方法导致调用事件循环停止执行,并在最近的适当时机退出,而不会导致整个程序崩溃。

除了这些方法之外,我们还使用一些非阻塞函数来促进任务切换事件。这些措施包括:

  • asyncio.sleep():虽然该函数本身是一个协程,但它会创建一个额外的协程,该协程在给定时间后完成(由输入指定,以秒为单位)。通常用作asyncio.sleep(0),以引起即时任务切换事件。
  • asyncio.wait():此功能也是一个协同程序,可以用来切换任务。它接收未来的序列(通常是一个列表),并等待它们完成执行。

asyncio 框架正在运行

正如您所看到的,asyncio提供了一种简单直观的方法,用 Python 的异步编程关键字实现异步程序的框架。这样,让我们考虑在 Python 中应用提供给同步应用的框架的过程,并将其转换为异步应用。

异步倒计时

让我们来看看 AUT0T0 文件,如下:

# Chapter10/example1.py

import time

def count_down(name, delay):
    indents = (ord(name) - ord('A')) * '\t'

    n = 3
    while n:
        time.sleep(delay)

        duration = time.perf_counter() - start
        print('-' * 40)
        print('%.4f \t%s%s = %i' % (duration, indents, name, n))

        n -= 1

start = time.perf_counter()

count_down('A', 1)
count_down('B', 0.8)
count_down('C', 0.5)

print('-' * 40)
print('Done.')

本例的目的是说明独立任务的处理时间和等待时间重叠的异步性质。为此,我们将分析一个倒计时函数(count_down()),该函数接收一个字符串和一个延迟时间。然后,它将以秒为单位从 3 倒计时到 1,同时打印出从函数开始执行到输入字符串所经过的时间(使用当前倒计时数字)。

在我们的主程序中,我们将调用字母ABC上的count_down()函数,具有不同的延迟时间。运行脚本后,您的输出应类似于以下内容:

> python example1.py
----------------------------------------
1.0006 A = 3
----------------------------------------
2.0041 A = 2
----------------------------------------
3.0055 A = 1
----------------------------------------
3.8065         B = 3
----------------------------------------
4.6070         B = 2
----------------------------------------
5.4075         B = 1
----------------------------------------
5.9081                 C = 3
----------------------------------------
6.4105                 C = 2
----------------------------------------
6.9107                 C = 1
----------------------------------------
Done.

行开头的数字表示从程序开始经过的总秒数。您可以看到,程序首先以 1 秒的间隔对字母A倒计时,然后以 0.8 秒的间隔对字母B倒计时,最后以 0.5 秒的间隔对字母C倒计时。这是一个纯粹的顺序同步程序,因为处理时间和等待时间之间没有重叠。此外,运行程序大约需要 6.9 秒,这是所有三个字母的倒计时时间之和:

1 second x 3 (for A) + 0.8 seconds x 3 (for B) + 0.5 seconds x 3 (for C) = 6.9 seconds

记住异步编程背后的思想,我们可以看到,实际上我们可以将这个程序转换为异步程序。具体来说,假设在程序的第一秒钟,当我们等待字母A倒计时时,我们可以切换任务以移动到其他字母。事实上,我们将为count_down()函数中的所有字母实现此设置(换句话说,我们将count_down()转换为一个协程)。

理论上,现在所有的倒计时任务都是异步程序中的协程,我们应该为我们的程序获得更好的执行时间和响应能力。由于这三个任务都是独立处理的,因此倒计时消息应按顺序打印(在不同的字母之间跳跃),异步程序所用的时间应与最大任务所用的时间大致相同(即字母A为 3 秒)。

但首先,让我们让我们的程序异步。要做到这一点,我们首先需要将count_down()放入一个协程,并在函数中指定一个点作为任务切换事件。换言之,我们将在函数前面添加关键字async,而不是time.sleep()函数,我们将使用asyncio.sleep()函数和await关键字;函数的其余部分应保持不变。我们的count_down()合作计划如下:

# Chapter10/example2.py

async def count_down(name, delay):
    indents = (ord(name) - ord('A')) * '\t'

    n = 3
    while n:
        await asyncio.sleep(delay)

        duration = time.perf_counter() - start
        print('-' * 40)
        print('%.4f \t%s%s = %i' % (duration, indents, name, n))

        n -= 1

至于我们的主程序,我们需要初始化和管理一个事件循环。具体来说,我们将使用asyncio.get_event_loop()方法创建一个空事件循环,使用 AbstractEventLoop. create_task()将所有三个倒计时任务添加到任务队列中,最后使用AbstractEventLoop.run_until_complete()开始运行事件循环。我们的主程序应该如下所示:

# Chapter10/example2.py

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(count_down('A', 1)),
    loop.create_task(count_down('B', 0.8)),
    loop.create_task(count_down('C', 0.5))
]

start = time.perf_counter()
loop.run_until_complete(asyncio.wait(tasks))

print('-' * 40)
print('Done.')

完整的脚本也可以在本书的代码库中找到,在名为example2.pyChapter10子文件夹中。运行脚本后,您的输出应类似于以下内容:

> python example2.py
----------------------------------------
0.5029                 C = 3
----------------------------------------
0.8008         B = 3
----------------------------------------
1.0049 A = 3
----------------------------------------
1.0050                 C = 2
----------------------------------------
1.5070                 C = 1
----------------------------------------
1.6011         B = 2
----------------------------------------
2.0090 A = 2
----------------------------------------
2.4068         B = 1
----------------------------------------
3.0147 A = 1
----------------------------------------
Done.

现在,您可以看到异步程序如何提高程序的执行时间和响应速度。我们的程序现在不再按顺序执行单个任务,而是在不同的倒计时之间切换,并重叠它们的处理/等待时间。正如我们所讨论的,这会导致不同的字母在彼此之间或同时打印出来。

在程序开始时,程序不会等待整个第一秒钟打印出第一条消息A = 3,而是切换到任务队列中的下一个任务(在这种情况下,等待字母B的时间为 0.8 秒)。此过程持续到 0.5 秒后打印出C = 3,0.3 秒后(0.8 秒时)打印出B = 3。这一切都发生在A = 3打印出来之前。

异步程序的这种任务切换特性使其响应速度显著提高。在打印第一条消息之前,程序不再挂起一秒钟,现在只需 0.5 秒(最短的等待时间)即可打印出第一条消息。至于执行时间,您可以看到,这次执行整个程序总共只需要 3 秒(而不是 6.9 秒)。这与我们推测的一致:执行时间大约是执行最大任务所需的时间。

关于阻塞函数的一个注记

如您所见,我们必须用asyncio模块的等效函数替换原来的time.sleep()函数。这是因为time.sleep()本质上是一个阻塞函数,这意味着它不能用于实现任务切换事件。为了测试这一点,在我们的Chapter10/example2.py文件(我们的异步程序)中,我们将替换以下代码行:

await asyncio.sleep(delay)

上述代码将替换为以下代码:

time.sleep(delay)

运行此新脚本后,您的输出将与原始顺序同步程序的输出相同。因此,将await asyncio.sleep()替换为time.sleep()实际上会将我们的程序转换回同步,忽略我们实现的事件循环。发生的事情是,当我们的程序进入count_down()函数的那一行时,time.sleep()实际上阻止了执行流的释放,基本上使整个程序再次同步。将time.sleep()恢复为await asyncio.sleep()以解决此问题。

下图举例说明了阻塞和非阻塞文件处理之间执行时间的差异:

Blocking versus non-blocking

这种现象引发了一个有趣的问题:如果一个繁重的、长时间运行的任务被阻塞,那么实际上不可能将该任务作为协同程序来实现异步编程。因此,如果我们真的想在异步应用中实现阻塞函数返回的结果,我们需要实现该阻塞函数的另一个版本,它可以被制作成一个协程,并允许任务切换事件至少在函数内部的一个点发生。

幸运的是,在将asyncio作为 Python 的官方特性之一实现之后,Python 核心开发人员一直在努力开发最常用的 Python 阻塞函数的协同程序版本。这意味着,如果您发现阻止程序真正异步的阻塞函数,您很可能会找到这些函数的协同路由版本,以便在程序中实现。

然而,Python 中存在异步版本的传统阻塞函数,这些函数可能具有不同的 API,这意味着您需要从不同的函数中熟悉这些 API。处理阻塞函数而不必实现其协同程序版本的另一种方法是使用执行器在单独的线程或单独的进程中运行函数,以避免阻塞主事件循环的线程。

异步素数检查

从开始倒计时的例子开始,让我们重新考虑上一章的例子。作为复习,以下是程序同步版本的代码:

# Chapter09/example1.py

from math import sqrt

def is_prime(x):
    print('Processing %i...' % x)

    if x < 2:
        print('%i is not a prime number.' % x)

    elif x == 2:
        print('%i is a prime number.' % x)

    elif x % 2 == 0:
        print('%i is not a prime number.' % x)

    else:
        limit = int(sqrt(x)) + 1
        for i in range(3, limit, 2):
            if x % i == 0:
                print('%i is not a prime number.' % x)
                return

        print('%i is a prime number.' % x)

if __name__ == '__main__':

    is_prime(9637529763296797)
    is_prime(427920331)
    is_prime(157)

正如我们在上一章中所讨论的,这里有一个简单的素数检查函数is_prime(x),它打印出消息,指示输入的整数x是否是素数。在我们的主程序中,我们对三个素数按降序依次调用is_prime()。此设置再次创建了一个重要的时间段,在此期间,程序在处理大输入时似乎处于挂起状态,从而导致程序的响应速度较低。

程序产生的输出将类似于以下内容:

Processing 9637529763296797...
9637529763296797 is a prime number.
Processing 427920331...
427920331 is a prime number.
Processing 157...
157 is a prime number.

为了实现此脚本的异步编程,首先,我们必须创建第一个主要组件:事件循环。为此,我们将不使用'__main__'作用域,而是将其转换为单独的函数。这个函数和我们的is_prime()素数检查函数将是我们最终异步程序中的协同程序。

现在,我们需要将is_prime()main()函数都转换为协程;同样,这意味着将async关键字放在def关键字前面,将await关键字放在每个函数中,以指定任务切换事件。对于main(),我们只需使用aysncio.wait()在等待任务队列时实现该事件,如下所示:

# Chapter09/example2.py

async def main():

    task1 = loop.create_task(is_prime(9637529763296797))
    task2 = loop.create_task(is_prime(427920331))
    task3 = loop.create_task(is_prime(157))

    await asyncio.wait([task1, task2, task3])

is_prime()函数中,事情更加复杂,因为没有明确的点,在此期间,执行流应该释放回事件循环,就像我们前面的倒计时示例一样。回想一下,异步编程的目标是获得更好的执行时间和响应能力,为了实现这一点,任务切换事件应该发生在繁重、长时间运行的任务中。但是,此要求取决于程序的具体情况,特别是协同程序、程序的任务队列以及队列中的各个任务。

例如,我们程序的任务队列由三个数字组成:9637529763296797427920331157;因此,我们可以把它们看作是一项大任务、一项中等任务和一项小任务。为了提高响应能力,我们希望在大任务期间切换任务,而不是在小任务期间切换任务。此设置允许在执行大任务期间启动、处理和可能完成中小型任务,即使大任务位于程序的任务队列的前面。

然后,我们将考虑我们的 Oracle T0 协同程序。在检查一些特定的边缘情况后,它在for循环中迭代输入整数平方根下的每个奇数,并测试输入相对于当前奇数的可除性。在这个长时间运行的for循环中,是切换任务的最佳位置,也就是说,将执行流释放回事件循环。

但是,我们仍然需要决定在for循环中的哪些特定点实现任务切换事件。同样,考虑到任务队列中的各个任务,我们正在寻找一个在大任务中相当常见、在中等任务中不太常见、在小任务中不存在的点。我已经确定这一点是每 100000 个数字周期一次,这确实满足了我们的要求,并且我已经使用了await asyncio.sleep(0)命令来促进任务切换事件,如下所示:

# Chapter09/example2.py

from math import sqrt
import asyncio

async def is_prime(x):
    print('Processing %i...' % x)

    if x < 2:
        print('%i is not a prime number.' % x)

    elif x == 2:
        print('%i is a prime number.' % x)

    elif x % 2 == 0:
        print('%i is not a prime number.' % x)

    else:
        limit = int(sqrt(x)) + 1
        for i in range(3, limit, 2):
            if x % i == 0:
                print('%i is not a prime number.' % x)
                return
            elif i % 100000 == 1:
                await asyncio.sleep(0)

        print('%i is a prime number.' % x)

最后,在我们的主程序中(不要与main()协程混淆),我们创建事件循环并使用它运行main()协程,直到它完成执行:

try:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
except Exception as e:
    print('There was a problem:')
    print(str(e))
finally:
    loop.close()

正如您在上一章中所看到的,通过这个异步版本的脚本可以实现更好的响应。具体地说,在处理第一个大任务时,我们的程序不再表现为挂起,而是在完成大任务之前打印出其他较小任务的输出消息。我们的最终结果将类似于以下内容:

Processing 9637529763296797...
Processing 427920331...
427920331 is a prime number.
Processing 157...
157 is a prime number.
9637529763296797 is a prime number.

对 Python3.7 的改进

截至 2018 年,Python3.7 刚刚面世,具有几个主要的新特性,如数据类、有保证的有序字典、更好的计时精度等等。异步编程和asyncio模块得到了许多重要的改进。

首先,asyncawait现在是 Python 中官方保留的关键字。虽然我们一直称它们为关键字,但实际上,直到现在,Python 还没有将这些单词视为保留关键字。这意味着在 Python 程序中,asyncawait都不能用于命名变量或函数。如果您使用的是 Python 3.7,请启动 Python 解释器并尝试将这些关键字用于变量或函数名,您将收到以下错误消息:

>>> def async():
 File "<stdin>", line 1
 def async():
 ^
SyntaxError: invalid syntax
>>> await = 0
 File "<stdin>", line 1
 await = 0
 ^
SyntaxError: invalid syntax

Python3.7 的一个主要改进是asyncio模块。具体地说,您可能已经从前面的示例中注意到,主程序通常包含大量用于启动和运行事件循环的样板代码,这在所有异步程序中很可能保持不变:

loop = asyncio.get_event_loop()
asyncio.run_until_complete(main())

由于main()是我们程序中的一个协程,asyncio允许我们使用asyncio.run()方法在事件循环中简单地运行它。这消除了 Python 异步编程中重要的样板代码。

因此,我们可以在 Python 3.7 中将前面的代码转换为更简化的版本,如下所示:

asyncio.run(main())

在 Python 3.7 中实现的异步编程在性能和易用性方面还有其他改进;然而,我们不会在本书中讨论它们。

固有阻塞任务

在本章的第一个示例中,您看到异步编程可以为 Python 程序提供更好的执行时间,但情况并非总是如此。只有在所有处理任务都是非阻塞的情况下,异步编程才能提高速度。然而,与编程任务中并发性和固有顺序性之间的比较类似,Python 中的一些计算任务本质上是阻塞的,因此异步编程无法利用它们。

这意味着,如果异步编程在某些协程中固有地阻塞了任务,那么该程序将不会从异步体系结构中获得任何额外的速度改进。虽然这些程序中仍会发生任务切换事件,这将提高程序的响应能力,但指令不会相互重叠,因此也不会获得额外的速度。事实上,由于在 Python 中实现异步编程有相当大的开销,我们的程序甚至可能比原始的同步程序花费更长的时间来完成它们的执行。

例如,让我们看一下我们的主要检查程序的两个版本之间的速度比较。由于程序的主要处理部分是is_prime()协程,它只包含数字运算,因此我们知道该协程包含阻塞任务。因此,实际上,异步版本的运行速度要比同步版本慢。

导航到代码库的Chapter10子文件夹,查看文件example3.pyexample4.py。这些文件包含与我们看到的同步和异步质数检查程序相同的代码,但除此之外,我们还跟踪运行相应程序所需的时间。以下是我运行程序同步版本example3.py后的输出:

> python example3.py
Processing 9637529763296797...
9637529763296797 is a prime number.
Processing 427920331...
427920331 is a prime number.
Processing 157...
157 is a prime number.
Took 5.60 seconds.

下面的代码显示了我在运行异步程序example4.py时的输出:

> python example4.py
Processing 9637529763296797...
Processing 427920331...
427920331 is a prime number.
Processing 157...
157 is a prime number.
9637529763296797 is a prime number.
Took 7.89 seconds.

虽然您收到的输出在运行两个程序所需的特定时间可能不同,但正如我们所讨论的,异步程序实际上比同步(顺序)程序运行的时间更长。同样,这是因为is_prime()协程中的数字处理任务是阻塞的,我们的异步程序在执行时只是在这些任务之间切换,而不是为了获得额外的速度而重叠这些任务。在这种情况下,只有通过异步编程才能实现响应。

然而,这并不意味着如果程序包含阻塞函数,异步编程是不可能的。如前所述,异步程序中的所有执行(如果未另行指定)都完全发生在同一线程和进程中,因此阻止 CPU 限制的任务可以防止程序指令相互重叠。但是,如果任务分布到单独的线程/进程,则情况并非如此。换句话说,线程和多处理可以帮助带有阻塞指令的异步程序获得更好的执行时间。

concurrent.futures 作为阻塞任务的解决方案

在本节中,我们将考虑另一种实现线程/多处理的方法:concurrent.futures模块,它被设计为实现异步任务的高级接口。具体而言,concurrent.futures模块与asyncio模块无缝工作,此外,它还提供了一个名为Executor的抽象类,其中包含两个分别实现异步线程和多处理的主要类的框架(如其名称所示):ThreadPoolExecutorProcessPoolExecutor

框架的变化

在我们从concurrent.futures开始讨论 API 之前,让我们先讨论异步线程/多处理的理论基础,以及它如何在asyncio提供的异步编程框架中发挥作用。

提醒一下,我们的异步编程生态系统中有三个主要元素:事件循环、协同路由及其相应的未来。在利用线程/多处理时,我们仍然需要事件循环来协调任务并处理其返回的结果(未来),因此这些元素通常与单线程异步编程保持一致。

至于协同路由,由于将异步编程与线程和多处理相结合的想法涉及到通过在单独的线程和进程中执行来避免阻塞协同路由中的任务,因此协同路由不必再被 Python 解释为实际的协同路由。相反,它们可以只是传统的 Python 函数。

我们需要实现的一个新元素是促进线程或多处理的执行器;这可以是ThreadPoolExecutor类或ProcessPoolExecutor类的实例。现在,每次在事件循环中将任务添加到任务队列中时,我们还需要引用此执行器,以便在单独的线程/进程中执行单独的任务。这是通过AbstractEventLoop.run_in_executor()方法完成的,该方法接收执行器、协同路由(尽管它也不一定是实际的协同路由)以及在单独的线程/进程中执行协同路由的参数。我们将在下一节中看到此 API 的示例。

Python 中的示例

让我们看一下concurrent.futures模块的具体实现。回想一下,在本章的第一个示例(倒计时示例)中,阻塞time.sleep()函数阻止我们的异步程序变得真正异步,因此必须用非阻塞版本asyncio.sleep()替换。现在,我们在单独的线程或进程中执行单独的倒计时,这意味着阻塞time.sleep()函数不会对异步执行程序造成任何问题。

导航到Chapter10/example5.py文件,如下所示:

# Chapter10/example5.py

from concurrent.futures import ThreadPoolExecutor
import asyncio
import time

def count_down(name, delay):
    indents = (ord(name) - ord('A')) * '\t'

    n = 3
    while n:
        time.sleep(delay)

        duration = time.perf_counter() - start
        print('-' * 40)
        print('%.4f \t%s%s = %i' % (duration, indents, name, n))

        n -= 1

async def main():
    futures = [loop.run_in_executor(
        executor,
        count_down,
        *args
    ) for args in [('A', 1), ('B', 0.8), ('C', 0.5)]]

    await asyncio.gather(*futures)

    print('-' * 40)
    print('Done.')

start = time.perf_counter()
executor = ThreadPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

请注意,count_down()被声明为一个典型的非协程 Python 函数。在main()中,它仍然是一个协程,我们为事件循环声明我们的任务队列。在这个过程中,我们再次使用了run_in_executor()方法,而不是单线程异步编程中使用的create_task()方法。在我们的主程序中,我们还需要启动一个执行器,在本例中,它是来自concurrent.futures模块的ThreadPoolExecutor类的实例。

正如我们在前几章中讨论的那样,使用线程和多处理之间的决定取决于程序的性质。这里,我们需要在不同的协程之间共享start变量(保存程序开始执行的时间),以便它们可以执行倒计时动作;因此,选择线程而不是多处理。

运行脚本后,您的输出应类似于以下内容:

> python example5.py
----------------------------------------
0.5033                 C = 3
----------------------------------------
0.8052         B = 3
----------------------------------------
1.0052 A = 3
----------------------------------------
1.0079                 C = 2
----------------------------------------
1.5103                 C = 1
----------------------------------------
1.6064         B = 2
----------------------------------------
2.0093 A = 2
----------------------------------------
2.4072         B = 1
----------------------------------------
3.0143 A = 1
----------------------------------------
Done.

这个输出与我们从异步程序中获得的纯asyncio支持的输出相同。因此,即使使用块处理函数,我们也能够使程序的执行异步,线程由concurrent.futures模块实现。

现在,让我们将相同的概念应用于素数检查问题。我们首先将is_prime()协程转换为其原始的非协程形式,并在单独的进程中再次执行它(这比线程更可取,因为is_prime()函数是一项密集的数字运算任务)。使用原始版本的is_prime()的另一个好处是,我们不必对单线程异步程序中的任务切换条件进行检查:

elif i % 100000 == 1:
    await asyncio.sleep(0)

这也将为我们提供显著的加速。让我们来看看 AUT0T0 文件,如下:

# Chapter10/example6.py

from math import sqrt
import asyncio
from concurrent.futures import ProcessPoolExecutor
from timeit import default_timer as timer

#async def is_prime(x):
def is_prime(x):
    print('Processing %i...' % x)

    if x < 2:
        print('%i is not a prime number.' % x)

    elif x == 2:
        print('%i is a prime number.' % x)

    elif x % 2 == 0:
        print('%i is not a prime number.' % x)

    else:
        limit = int(sqrt(x)) + 1
        for i in range(3, limit, 2):
            if x % i == 0:
                print('%i is not a prime number.' % x)
                return

        print('%i is a prime number.' % x)

async def main():

    task1 = loop.run_in_executor(executor, is_prime, 9637529763296797)
    task2 = loop.run_in_executor(executor, is_prime, 427920331)
    task3 = loop.run_in_executor(executor, is_prime, 157)

    await asyncio.gather(*[task1, task2, task3])

if __name__ == '__main__':
    try:
        start = timer()

        executor = ProcessPoolExecutor(max_workers=3)
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())

        print('Took %.2f seconds.' % (timer() - start))

    except Exception as e:
        print('There was a problem:')
        print(str(e))

    finally:
        loop.close()

运行脚本后,我获得了以下输出:

> python example6.py
Processing 9637529763296797...
Processing 427920331...
Processing 157...
157 is a prime number.
427920331 is a prime number.
9637529763296797 is a prime number.
Took 5.26 seconds.

同样,您的执行时间很可能与我的不同,尽管此版本与我们的主要检查程序的其他两个版本之间的比较应始终保持一致:原始同步版本比单线程异步版本花费的时间少,但比多处理异步版本花费的时间多。换句话说,通过将多处理与异步编程相结合,我们可以实现两个方面的最佳效果:异步编程的一致响应性和多处理的速度提高。

总结

在本章中,您学习了异步编程,这是一种编程模型,它利用协调计算任务来重叠等待和处理时间。异步程序有三个主要组件:事件循环、协程和未来。事件循环负责使用其任务队列调度和管理协同路由。协同程序是异步执行的计算任务;每个协同程序必须在其函数内部指定将执行流返回到事件循环(即任务切换事件)的确切位置。未来是占位符对象,包含从协同路由获得的结果。

asyncio模块与 Python 关键字asyncawait一起,提供了一个易于使用的 API 和一个直观的框架来实现异步程序;此外,该框架使异步代码与同步代码一样可读,这在异步编程中通常非常罕见。然而,我们不能单独使用asyncio模块将单线程异步编程应用于阻塞计算任务。解决方案是concurrent.futures模块,它提供了一个高级 API 来实现异步线程和多处理,可以在asyncio模块之外使用。

在下一章中,我们将讨论异步编程最常见的应用之一,传输控制协议TCP),作为服务器-客户端通信的一种方式。您将了解该概念的基础知识、它如何利用异步编程以及如何在 Python 中实现它。

问题

  • 什么是异步编程?它有什么优势?
  • 异步程序中的主要元素是什么?它们如何相互作用?
  • asyncawait关键字是什么?它们有什么用途?
  • 在异步编程的实现方面,asyncio模块提供了哪些选项?
  • 关于 Python 3.7 中提供的异步编程,有哪些改进?
  • 什么是阻塞函数?为什么它们会给传统的异步编程带来问题?
  • concurrent.futures如何为异步编程提供阻塞函数的解决方案?它提供了哪些选择?

进一步阅读

有关更多信息,请参阅以下链接: