Skip to content

Latest commit

 

History

History
601 lines (403 loc) · 36.1 KB

File metadata and controls

601 lines (403 loc) · 36.1 KB

十九、测试、调试和调度并发应用

在本章中,我们将讨论在更高级别上使用并发 Python 程序的过程。首先,您将了解如何安排 Python 程序在以后一次或定期并发运行。我们将分析 ApsScheduler,这是一个 Python 库,它允许我们在跨平台的基础上实现这一点。此外,我们还将介绍测试和调试,它们是编程中必不可少但经常被忽略的组件。考虑到并发编程的复杂性,测试和调试比传统应用更加困难。本章将介绍一些有效测试和调试并发程序的策略。

本章将介绍以下主题:

  • APScheduler 库及其在并行调度 Python 应用中的使用
  • Python 程序的不同测试技术
  • Python 编程中的调试实践,以及特定于并发性的调试技术

技术要求

以下是本章的先决条件列表:

使用 APScheduler 进行调度

APScheduler(简称Advanced Python Scheduler)是一个外部 Python 库,支持对稍后执行的 Python 代码进行一次或定期调度。该库为我们提供了高级选项,可以在作业列表中动态添加/删除作业,以便安排和执行作业,还可以决定如何将这些作业分发到不同的线程和进程。

有些人可能会想到芹菜(http://www.celeryproject.org/ )作为 Python 的 go-to 调度工具。然而,虽然芹菜是一种具有基本调度功能的分布式任务队列,但 APScheduler 恰恰相反:它是一种具有基本任务队列选项和高级调度功能的调度器。此外,这两种工具的用户都报告说,APScheduler 更易于设置和实现。

安装 APScheduler

与大多数常见的 Python 外部库一样,通过在终端中运行以下命令,可以通过软件包管理器pip安装 APScheduler:

pip install apscheduler

如果pip命令不起作用,安装此库的另一种方法是从 PyPI 手动下载源代码,可以在PyPI.org/project/APScheduler/上找到。然后,可以通过运行以下命令提取并安装下载的文件:

python setup.py install

一如既往,要测试 APScheduler 发行版是否已正确安装,请打开 Python 解释器并尝试导入库,如下所示:

>>> import apscheduler

如果没有返回错误,则表示库已完全安装并准备好使用。

不是调度服务

由于“调度器”一词可能会误导特定的开发人员群体,让我们澄清一下 ApsScheduler 提供的功能以及它没有提供的功能。首先也是最重要的一点是,该库可以用作也是特定于应用的跨平台调度器,而不是更常见的特定于平台的调度器,如 cron 守护进程(用于 Linux 系统)或 Windows 任务调度器。

需要注意的是,APScheduler 本身并不是一个具有预构建 GUI 或命令行界面的调度服务。它仍然是一个 Python 库,必须在现有应用中导入和使用(这就是为什么它是特定于应用的)。但是,正如您稍后将了解到的,APScheduler 具有许多功能,可以利用这些功能构建实际的调度服务。

例如,计划作业(特别是后台作业)的能力对于当今的 web 应用来说至关重要,因为它们可以包括不同但重要的功能,例如发送电子邮件或备份和同步数据。在这种情况下,APScheduler 可以说是为涉及 Python 指令的云应用(如 Heroku 和 Pythonywhere)调度任务的最常用工具。

APScheduler 功能

让我们来探索 APScheduler 库提供的一些最常见的功能。在执行方面,它提供了三种不同的调度机制,因此可以选择最适合自己的应用的机制(这些机制有时也称为事件触发器):

  • Cron 风格的调度:该机制允许作业具有预先指定的开始和结束时间
  • 基于间隔的执行:此机制以偶数间隔(例如,每两分钟,每天)运行作业,具有可选的开始和结束时间
  • 延迟执行:此机制允许应用在执行作业列表中的项目之前等待一段特定的时间

此外,ApsScheduler 允许我们存储要在各种后端系统中执行的作业,如常规内存、MongoDB、Redis、RejectDB、Spalchemy 或 ZooKeeper。无论是桌面程序、web 应用还是简单的 Python 脚本,ApsScheduler 都很可能能够处理计划作业的存储方式。

除此之外,该库还可以与常见的 Python 并发框架无缝协作,如 AsyncIO、Gevent、Tornado 和 Twisted。这意味着 APScheduler 库中包含的低级代码包含的指令可以一致地调度和执行在这些框架中实现的函数和程序,使库更加动态。

最后,APScheduler 通过指定适当的执行者,提供了不同的选项来实际执行计划代码。具体来说,您可以简单地以阻塞方式或在后台正常执行作业。我们还可以选择使用线程或进程池以并发方式分发工作。稍后,我们将看一个示例,其中我们使用一个进程池来执行计划的作业。

下图列出了 APScheduler 中包含的所有主要类和功能:

APScheduler—main classes and functionalities

APScheduler API

在本节中,我们将通过分析库提供的不同类和方法,了解如何将 APScheduler 实际集成到现有 Python 程序中。我们还将研究在使用并发执行器运行计划的作业时,作业如何分布在不同的线程和进程中。

调度程序类

首先,让我们看一下主调度器的可用选项,它是调度稍后执行的任务过程中最重要的组件:

  • BlockingScheduler:当计划程序是流程中运行的唯一任务时,应使用此类。顾名思义,该类的实例将阻止同一进程中的任何其他指令。
  • BackgroundScheduler:与BlockingScheduler相反,此类允许在现有应用内的后台执行计划作业。

此外,如果您的应用使用特定的并发框架,还可以使用调度程序类:asyncio模块的AsyncIOSchedulerGeventScheduler用于 Gevent;TornadoScheduler用于龙卷风应用;TwistedScheduler用于扭转应用;等等

执行类

在安排稍后执行的作业的过程中,要做出的另一个重要选择是:哪些执行者应该运行作业?通常,建议使用默认执行器ThreadPoolExecutor,在同一进程中跨不同线程分配工作。但是,正如您所了解的,如果计划的作业包含使用 CPU 密集型操作的指令,那么工作负载应该分布在多个 CPU 核上,并且应该使用ProcessPoolExecutor

需要注意的是,这两个 executor 类与我们在前面章节中讨论的concurrent.futures模块交互,以促进并发执行。两个 executor 类的默认最大工作线程数为10,可以在初始化时更改。

触发关键字

在构建调度器的过程中,最后一个决定是将来如何执行已调度的作业;这是我们前面提到的事件触发器选项。APScheduler 提供三种不同的触发机制;以下关键字应作为参数传递给计划程序初始值设定项,以指定事件触发器类型:

  • 'date':当作业在将来某个特定点运行一次时,使用该关键字。
  • 'interval':当作业以固定的时间间隔运行时,使用该关键字。我们将在后面的示例中使用这个关键字。
  • 'cron':当作业要在一天中的某个时间定期运行时,使用此关键字。

此外,还可以混合和匹配多种类型的触发器。我们还可以选择在所有已注册触发器都指定或至少有一个触发器指定时执行计划作业。

常用调度程序方法

最后,让我们考虑在声明调度器时常用的方法,除了前面的类和关键字。具体来说,scheduler对象调用以下方法:

  • add_executor():调用此方法是为了注册执行器,以便将来运行作业。具体来说,我们通常将字符串'processpool'传递给此方法,以使作业分布在多个进程中。否则,如前所述,as 线程池将用作默认执行器。此方法还返回可以进一步操作的 executor 对象。

  • remove_executor():此方法用于 executor 对象,将其从调度程序中移除。

  • add_job():此方法可用于向作业列表中添加额外作业,稍后执行。该方法首先接受作业列表中新作业的可调用项,以及用于指定作业调度和执行方式的各种其他参数。与add_executor()类似,该方法可以返回一个job对象,该对象可以在该方法之外进行操作。

  • remove_job():类似地,此方法可用于job对象,将其从调度程序中移除。

  • start():此方法与已执行的执行器一起启动计划作业,并开始处理作业列表。

  • shutdown():此方法停止调用调度程序对象及其作业列表和实现的执行器。如果在有当前作业运行时调用,则这些作业不会被中断。

Python 中的示例

在本小节中,我们将了解我们讨论的一些 API 是如何在示例 Python 程序中使用的。从 GitHub 页面下载本书的代码,然后继续导航到Chapter19文件夹。

阻塞调度程序

首先,让我们看一个在 OutT0x 文件中的阻塞调度器的例子:

# Chapter19/example1.py

from datetime import datetime

from apscheduler.schedulers.background import BlockingScheduler

def tick():
    print(f'Tick! The time is: {datetime.now()}')

if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)

    try:
        scheduler.start()
        print('Printing in the main thread.')
    except KeyboardInterrupt:
        pass

scheduler.shutdown()

在本例中,我们为前面代码中指定的tick()函数实现了一个调度器,它只打印出当前执行时间。在我们的主函数中,我们使用从 APScheduler 导入的BlockingScheduler类中的一个实例作为该程序的调度器。除此之外,上述add_job()方法用于将tick()注册为稍后执行的作业。具体来说,它应该定期执行,以偶数的间隔执行(由传入的'interval'字符串指定)——特别是每三秒执行一次(由参数seconds=3指定)。

回想一下,阻塞调度程序将在其运行的同一进程中阻塞所有其他指令。为了测试这一点,我们还在启动调度器之后插入一个print语句,以查看它是否会被执行。运行脚本后,您的输出应类似于以下内容(打印的特定时间除外):

> python3 example1.py
Tick! The time is: 2018-10-31 17:25:01.758714
Tick! The time is: 2018-10-31 17:25:04.760088
Tick! The time is: 2018-10-31 17:25:07.762981

请注意,此调度程序将永远运行,除非它被KeyboardInterrupt事件或其他潜在异常停止,并且我们放置在主程序末尾附近的打印语句将永远不会执行。由于这个原因,BlockingScheduler类只能在其进程中运行的唯一任务时使用。

后台调度程序

在本例中,我们将研究如果我们希望在后台与其他任务并发地执行调度程序,BackgroundScheduler类的使用是否会有所帮助。本例代码包含在Chapter19/example2.py文件中,如下所示:

# Chapter19/example2.py

from datetime import datetime
import time

from apscheduler.schedulers.background import BackgroundScheduler

def tick():
    print(f'Tick! The time is: {datetime.now()}')

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    scheduler.start()

    try:
        while True:
            time.sleep(2)
            print('Printing in the main thread.')
    except KeyboardInterrupt:
        pass

scheduler.shutdown()

本例中的代码与前面的代码几乎相同。然而,在这里,我们使用这个类作为后台调度器,并且每两秒钟打印一次来自主程序的消息,在一个无限while循环中。理论上,如果scheduler对象确实可以在后台运行计划的作业,我们的输出将包括主程序和tick()函数中的打印语句组合。

以下是我在执行脚本后的输出:

> python3 example2.py
Printing in the main thread.
Tick! The time is: 2018-10-31 17:36:35.231531
Printing in the main thread.
Tick! The time is: 2018-10-31 17:36:38.231900
Printing in the main thread.
Printing in the main thread.
Tick! The time is: 2018-10-31 17:36:41.231846
Printing in the main thread.

同样,调度程序将永远继续运行,直到从键盘触发中断。在这里,我们可以看到我们期望看到的:来自主程序和计划作业的打印语句同时生成,这表明调度程序确实在后台运行。

执行人池

ApsScheduler 提供的另一项功能是能够跨多个 CPU 核心(或进程)分发要执行的计划作业。在本例中,您将学习如何使用后台调度程序来实现这一点。导航到Chapter19/example3.py文件并检查包含的代码,如下所示:

# Chapter19/example3.py

from datetime import datetime
import time
import os

from apscheduler.schedulers.background import BackgroundScheduler

def task():
    print(f'From process {os.getpid()}: The time is {datetime.now()}')
    print(f'Starting job inside {os.getpid()}')
    time.sleep(4)
    print(f'Ending job inside {os.getpid()}')

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_executor('processpool')
    scheduler.add_job(task, 'interval', seconds=3, max_instances=3)
    scheduler.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pass

scheduler.shutdown()

在这个程序中,我们想要调度的作业(函数task()打印出在每次调用时运行它的进程的标识符(使用os.getpid()方法),并设计为持续大约 4 秒。在主程序中,我们使用与上一个示例中相同的后台调度程序,但我们指定调度的作业应在进程池中执行:

scheduler.add_executor('processpool')

请记住,此池中进程数的默认值为 10,可以更改为其他值。接下来,当我们将作业添加到调度程序时,我们还必须指定该作业可以在多个流程实例(在本例中为三个实例)中执行;这使我们的流程池执行器得到充分有效的利用:

scheduler.add_job(task, 'interval', seconds=3, max_instances=3)

运行程序后,我的输出的前几行如下:

> python3 example3.py
From process 1213: The time is 2018-11-01 10:18:00.559319
Starting job inside 1213
From process 1214: The time is 2018-11-01 10:18:03.563195
Starting job inside 1214
Ending job inside 1213
From process 1215: The time is 2018-11-01 10:18:06.531825
Starting job inside 1215
Ending job inside 1214
From process 1216: The time is 2018-11-01 10:18:09.531439
Starting job inside 1216
Ending job inside 1215
From process 1217: The time is 2018-11-01 10:18:12.531940
Starting job inside 1217
Ending job inside 1216
From process 1218: The time is 2018-11-01 10:18:15.533720
Starting job inside 1218
Ending job inside 1217
From process 1219: The time is 2018-11-01 10:18:18.532843
Starting job inside 1219
Ending job inside 1218
From process 1220: The time is 2018-11-01 10:18:21.533668
Starting job inside 1220
Ending job inside 1219
From process 1221: The time is 2018-11-01 10:18:24.535861
Starting job inside 1221
Ending job inside 1220
From process 1222: The time is 2018-11-01 10:18:27.531543
Starting job inside 1222
Ending job inside 1221
From process 1213: The time is 2018-11-01 10:18:30.532626
Starting job inside 1213
Ending job inside 1222
From process 1214: The time is 2018-11-01 10:18:33.534703
Starting job inside 1214
Ending job inside 1213

从打印的进程标识符中可以看出,计划的作业是在不同的进程中执行的。您还将注意到第一个进程的 ID 是1213,并且,当我们的调度程序开始使用 ID 为1222的进程时,它立即切换回1213进程(注意前面输出的最后几行)。这是因为我们的流程池包含 10 个工人,1222流程是池中的最后一个元素。

在云端运行

前面,我们提到承载 Python 代码的云服务,例如 Heroku 和 Pythonywhere,是应用 APScheduler 功能的最常见的地方。在本小节中,我们将查看 Heroku 网站用户指南中的一个示例,该示例可在Chapter19/example4.py文件中找到:

# ch19/example4.py
# Copied from: http://devcenter.heroku.com/articles/clock-processes-python

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

@scheduler.scheduled_job('interval', minutes=3)
def timed_job():
    print('This job is run every three minutes.')

@scheduler.scheduled_job('cron', day_of_week='mon-fri', hour=17)
def scheduled_job():
    print('This job is run every weekday at 5pm.')

scheduler.start()

您可以看到,此程序使用装饰器为调度程序注册计划作业。具体地说,当scheduler对象调用scheduled_job()方法时,整个指令可以用作函数的修饰符,将其转换为该调度器的调度器作业。您还可以在前面的代码中看到一个cron计划作业的示例,它可以在一天中的特定时间执行(在本例中,它是每个工作日下午 5:00)。

作为关于 ApsScheduler 的最后一点说明,我们已经看到使用库 API 的指令也是 Python 代码,而不是单独的服务本身。然而,考虑到该库在提供不同调度选项方面的灵活性,以及其程序在与外部服务(如基于云的服务)协作方面的可插拔性,APScheduler 是一个非常有价值的 Python 应用调度工具。

Python 中的测试和并发

如前所述,测试是软件开发和一般编程的一个重要(但常常被忽视)组件。测试的目标是唤起错误,这些错误表明我们的程序中存在 bug。这将与调试过程形成对比,调试过程用于识别 bug 本身;我们将在下一节讨论调试主题。

在最普遍的意义上,测试是关于确定特定的功能和方法是否能够执行并产生我们想要的结果;这通常是通过比较生成的结果来完成的。换句话说,测试就是收集关于我们程序正确性的证据。

然而,测试不能确保所考虑的程序中的所有潜在缺陷和 bug 都能被识别出来。此外,测试结果与测试本身一样好,如果测试没有覆盖某些特定的潜在错误,那么这些错误很可能在测试过程中检测不到。

测试并发程序

在本章中,我们将考虑两个不同的测试主题,即并发性:To.T0.测试并发程序 To1 T1 和 Ty2 T2。当涉及到并发程序的测试时,普遍的共识是,它要求极高,而且很难做到正确。正如您在前几章中所看到的,死锁或竞争条件等错误在并发程序中可能非常微妙,并且可以以多种方式表现出来。

此外,并发的一个显著特征是不确定性,这意味着有可能在一次测试中检测到并发错误,而在另一次测试中则不可见。这是因为并发编程的一个主要组成部分是任务的调度,并且,就像并发程序中不同任务的执行顺序一样,并发错误可以以不可预测的方式显示和隐藏自己。我们称这些测试为不可再现的,表明我们不能可靠地通过或以一致的方式通过这些测试。

尽管如此,有一些通用策略可以帮助我们在测试并发程序的过程中导航。在下一节中,我们将探讨各种工具,这些工具可以帮助我们测试并发程序的特定策略。

单元测试

我们将考虑的第一个策略是单元测试。该术语表示对所考虑程序的单个单元进行测试的方法,其中单元是程序的最小可测试部分。由于这个原因,单元测试并不意味着测试一个完整的并发系统。具体来说,建议您不要将并发程序作为一个整体进行测试,而是将程序分解为更小的组件,并分别进行测试。

像往常一样,Python 提供了一些库,这些库提供直观的 API 来解决编程中最常见的问题;在这种情况下,它是unittest模块。该模块最初的灵感来自 Java 编程语言 JUnit 的单元测试框架;它还提供其他语言中的通用单元测试功能。让我们考虑一个快速的例子,说明如何使用 Oracle T1 来测试 Pothon 函数中的 Python 函数:

# Chapter19/example5.py

import unittest

def fib(i):
    if i in [0, 1]:
        return i

    return fib(i - 1) + fib(i - 2)

class FibTest(unittest.TestCase):
    def test_start_values(self):
        self.assertEqual(fib(0), 0)
        self.assertEqual(fib(1), 1)

    def test_other_values(self):
        self.assertEqual(fib(10), 55)

if __name__ == '__main__':
    unittest.main()

在本例中,我们想测试生成斐波那契序列中特定元素的fib()函数(其中一个元素是其前两个元素的总和),其起始值分别为01

现在,让我们把注意力集中在FibTest类上,它从unittest模块扩展了TestCase类。此类包含不同的方法,用于测试fib()函数返回的结果的特定情况。具体地说,我们有一种方法可以查看此函数的边情况,即序列的前两个元素,还有一种方法可以测试序列中的任意值。

运行上述脚本后,您的输出应类似于以下内容:

> python3 unit_test.py
..
----------------------------------------------------------------------
Ran 2 tests in 0.000s

OK

输出表明我们的测试通过了,没有任何错误。另外,正如类名所建议的,这个类是一个单独的测试用例,它是一个测试单元。您可以将不同的测试用例扩展到一个测试套件,它被定义为测试用例、测试套件或两者的集合。测试套件通常用于组合要一起运行的测试。

静态代码分析

识别并发程序中潜在错误和 bug 的另一种可行方法是执行静态代码分析。此方法在代码本身中查找模式,而不是执行部分(或全部)代码。换句话说,静态代码分析通过直观地查看程序的结构、变量和指令的使用以及程序的不同部分如何相互作用来检查程序。

使用静态代码分析的主要优点是,我们不仅仅依靠程序的执行以及在该过程中产生的结果(换句话说,动态测试)来确定程序是否正确设计。此方法可以检测在已实现的测试中无法(轻松或根本无法)表现出来的错误和 bug。因此,静态代码分析应该与其他测试方法(如单元测试)相结合,以创建一个全面的测试过程。

静态代码分析通常用于识别细微的错误或 bug,例如未使用的变量、空的 catch 块,甚至是不必要的对象创建。在并发编程方面,该方法可用于分析程序中使用的同步技术。具体来说,静态代码分析可以查找程序中共享资源的原子性,然后揭示可能产生有害竞争条件的非原子资源的任何不协调使用。

可以使用各种工具来促进 Python 程序的静态代码分析,其中一种更常见的工具是 PMD(https://github.com/pmd/pmd )。话虽如此,这些工具的具体使用超出了本书的范围,我们将不再深入讨论。

同时测试程序

结合测试和并发编程的另一个方面是以并发方式执行测试。测试的这一方面比测试并发程序本身更直接和直观。在本小节中,我们将探索一个库,它可以帮助我们简化这个过程,concurrencytest,它可以与前面的unittest模块实现的测试用例无缝地工作。

concurrencytest设计为testtools扩展,在运行的测试套件中实现并发性。可使用pip从 PyPI 安装,如下所示:

pip install concurrencytest

此外,concurrencytest依赖于testtoolspypi.org/project/testtools/)和python-subunitpypi.org/project/python-subunit/)库,这两个库分别是测试扩展框架和测试结果的简化协议。这些库也可以通过pip安装,如下所示:

pip install testtools
pip install python-subunit

一如既往,要验证您的安装,请尝试在 Python 解释器中导入库:

>>> import concurrencytest

未收到打印错误表示库及其依赖项已成功安装。现在,让我们来看看这个库如何帮助我们获得更好的测试速度。导航到 AutoT0x 文件并考虑下面的代码:

# Chapter19/example6.py

import unittest

def fib(i):
    if i in [0, 1]:
        return i

    a, b = 0, 1
    n = 1
    while n < i:
        a, b = b, a + b
        n += 1

    return b

class FibTest(unittest.TestCase):
    def __init__(self, *args, **kwargs):
        super(FibTest, self).__init__(*args, **kwargs)
        self.mod = 10 ** 10

    def test_start_values(self):
        self.assertEqual(fib(0), 0)
        self.assertEqual(fib(1), 1)

    def test_big_value_v1(self):
        self.assertEqual(fib(499990) % self.mod, 9998843695)

    def test_big_value_v2(self):
        self.assertEqual(fib(499995) % self.mod, 1798328130)

    def test_big_value_v3(self):
        self.assertEqual(fib(500000) % self.mod, 9780453125)

if __name__ == '__main__':
    unittest.main()

本节示例的主要目标是测试生成斐波那契序列中的数字的函数,特别是具有大索引的数字。我们拥有的fib()函数与上一个示例类似,尽管这个函数以迭代方式执行计算,而不使用递归。

在我们的测试用例中,除了两个起始值之外,我们现在正在测试索引 499990、499995 和 500000 处的数字。由于生成的数字非常大,我们只测试每个数字的最后十位数字(这是通过测试类的mod属性完成的,在初始化方法中指定)。该测试过程将在一个过程中按顺序执行。

运行该程序,您的输出应类似于以下内容:

> python3 example6.py
....
----------------------------------------------------------------------
Ran 4 tests in 8.809s

OK

同样,输出中指定的时间可能因系统而异。这样,记住程序所花费的时间,这样你就可以将它与我们稍后考虑的其他程序的速度进行比较。

现在,让我们看看如何使用concurrencytest将测试工作负载分配到多个流程。考虑 AUTT1 文件,如下:

# Chapter19/example7.py

import unittest
from concurrencytest import ConcurrentTestSuite, fork_for_tests

def fib(i):
    if i in [0, 1]:
        return i

    a, b = 0, 1
    n = 1
    while n < i:
        a, b = b, a + b
        n += 1

    return b

class FibTest(unittest.TestCase):
    def __init__(self, *args, **kwargs):
        super(FibTest, self).__init__(*args, **kwargs)
        self.mod = 10 ** 10

    def test_start_values(self):
        self.assertEqual(fib(0), 0)
        self.assertEqual(fib(1), 1)

    def test_big_value_v1(self):
        self.assertEqual(fib(499990) % self.mod, 9998843695)

    def test_big_value_v2(self):
        self.assertEqual(fib(499995) % self.mod, 1798328130)

    def test_big_value_v3(self):
        self.assertEqual(fib(500000) % self.mod, 9780453125)

if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(FibTest)
    concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4))
    runner.run(concurrent_suite)

此版本的程序正在使用相同的测试用例检查相同的fib()函数。但是,在主程序中,我们正在初始化来自concurrencytest库的ConcurrentTestSuite类的一个实例。此实例接收一个测试套件,该套件是使用来自unittest模块的TestLoader()API 创建的,fork_for_tests()函数和参数4指定我们希望使用四个单独的流程来分发测试过程。

现在,让我们运行此程序,并将其速度与之前的测试进行比较:

> python3 example7.py
....
----------------------------------------------------------------------
Ran 4 tests in 4.363s

OK

您可以看到,通过这种多处理方法,速度显著提高。然而,这种改进并不是围绕完美的可伸缩性(在第 16 章设计基于锁和无互斥的并发数据结构中讨论);这是因为在创建可跨多个进程执行的并发测试套件时会有很大的开销。

我们还应该提到的一点是,通过使用我们在前几章中讨论过的传统并发编程工具,例如concurrent.futuresmultiprocessing,可以实现我们在这里实现的相同多处理设置。如上所述,concurrencytest库,正如我们所看到的,能够消除重要的样板代码,从而提供一个简单快速的 API。

调试并发程序

在最后一节中,我们将讨论各种高级调试策略,这些策略可以单独使用,也可以相互结合使用,以检测和查明程序中的错误。一般来说,术语调试用于表示程序员试图识别和解决问题或缺陷的过程,否则这些问题或缺陷将导致其所在的计算机应用产生错误结果,甚至停止运行。

我们将讨论的策略包括一般调试策略,以及调试并发应用时使用的特定技术。系统地应用这些策略将提高调试过程的效率和速度。

调试工具和技术

首先,让我们简要介绍一些最常用的技术和工具,它们可以促进 Python 中的调试过程:

  • 打印调试:这可能是最基本、最直观的调试方法。该方法涉及在所考虑的程序执行的不同点插入变量值或函数状态的打印语句。这样做可以让我们跟踪这些值和状态在整个程序中是如何相互作用和变化的,从而让我们了解特定错误或异常是如何产生的。

  • 记录:在计算机科学领域,记录是记录特定程序执行过程中发生的各种事件的过程。本质上,日志记录与打印调试非常相似;但是,前者通常写入日志文件,以便以后查看。Python 提供了优秀的日志记录功能,包括在内置的logging模块中。用户可以指定日志记录过程的重要性级别;例如,通常情况下,只能记录重要事件和操作,但在调试期间会记录所有内容。

  • 跟踪:这是跟踪程序执行的另一种形式。跟踪遵循程序执行的实际低级细节,而不仅仅是变量和函数的更改。跟踪功能可以通过 Python 中的sys.settrace()方法实现。

  • 使用调试器有时候,最强大的调试选项可以通过自动调试器实现。Python 语言中最流行的调试器是 Python 调试器:pdb。此模块提供了一个交互式调试环境,该环境实现了一些有用的功能,例如断点、单步执行源代码或检查堆栈。

**同样,前面的策略既适用于传统程序,也适用于并发程序,并且它们中的多个组合可以帮助程序员在调试过程中获得有价值的信息。

调试和并发

与测试并发程序的问题类似,当应用于并发时,调试可能变得越来越复杂和困难。同样,这是因为共享资源可以同时与多个代理交互(并被多个代理更改)。尽管如此,仍然有一些策略可以使调试并发程序的过程更加简单。这些措施包括:

  • 最小化并发应用通常在复杂的互联系统中实现。在发生错误时调试整个系统可能非常危险,而且不太可行。该策略是将系统的不同部分隔离为单独的、较小的程序,并识别以与大型系统相同的方式出现故障的程序。在这里,我们想把一个大程序分成越来越小的部分,直到它们不能再分开。然后,可以很容易地识别并有效地修复原始错误。

** 单线程和处理此方法类似于最小化,但只关注并发编程的一个方面:不同线程/进程之间的交互。通过消除并发编程中并发性的最大方面,您可以将错误隔离到程序逻辑本身(即使在顺序运行时也可能导致错误)或线程/进程之间的交互(这可能是我们在前面章节中讨论的常见并发错误造成的)。 ** 操纵调度来放大潜在的 bug:我们在前面的章节中已经实际看到了这种方法的应用。如果我们的程序中实现的线程/进程没有计划以特定的方式执行,那么一些并发错误不会经常出现。例如,如果共享资源与其他代理之间的交互发生得太快,以至于它们不会经常重叠,那么现有的竞争条件可能不会影响共享资源。这导致了这样一个事实,即测试可能不会揭示竞争条件,即使它实际上存在于程序中。*

*可以在 Python 中实现各种方法,从而放大由并发错误导致的错误值和操作。其中最常见的两种是模糊化,通过在线程/进程指令中的命令之间插入休眠函数来实现,并通过使用sys.setcheckinterval()方法(在第 17 章内存模型和原子类型操作中讨论)来最小化系统线程切换间隔。这些方法以不同的方式破坏了 Python 中线程和进程执行的常规调度协议,并可以有效地揭示隐藏的并发错误。

总结

在本章中,我们通过调度、测试和调试对 Python 中的并发程序进行了高级分析。可以通过 APScheduler 模块在 Python 中完成调度,该模块提供了强大而灵活的功能,可以指定以后如何执行调度作业。此外,该模块允许在不同的线程和进程之间分配和执行计划作业,从而提高了测试速度。

并发还引入了测试和调试方面的复杂问题,这是由于程序中代理之间的同步和并行交互造成的。然而,通过有条理的解决方案和适当的工具,可以有效地解决这些问题。

本主题标志着我们在 Python中掌握并发性之旅的结束。在本书中,我们深入考虑并分析了 Python 语言并发编程的各种元素,如线程、多处理和异步编程。除了在 Python 中处理并发性的程序员所面临的常见问题外,还讨论了涉及并发性的强大应用,如上下文管理、简化操作、图像处理和网络编程。

从最普遍的意义上讲,本书是一些更高级的并发概念的指南;我希望,通过阅读这本书,您有机会对并发编程这一主题有更深入的了解。

问题

  • 什么是调度程序?为什么它不是一个调度服务?
  • APScheduler 的主要调度功能是什么?
  • ApsScheduler 和 Python 中的另一个调度工具芹菜有什么区别?
  • 在编程中测试的目的是什么?在并发编程中有什么不同?
  • 本章讨论了哪些测试方法?
  • 在编程中调试的目的是什么?在并发编程中有什么不同?
  • 本章讨论了哪些调试方法?

进一步阅读

有关更多信息,请参阅以下链接: