诊断性能

理解分布式计算的性能可能很困难。这在一定程度上是由于影响性能的分布式计算机的许多组件:

  1. 计算时间

  2. 内存带宽

  3. 网络带宽

  4. 磁盘带宽

  5. 调度器开销

  6. 序列化成本

这种困难因这些成本信息分散在多台机器中而加剧,因此没有中心位置来收集数据以识别性能问题。

幸运的是,Dask 在执行过程中收集了各种诊断信息。它这样做既是为了向用户提供性能反馈,也是为了其自身的内部调度决策。观察这些反馈的主要地方是诊断仪表板。本文档描述了可用的各种性能信息以及如何访问它们。

任务开始和结束时间

工作者捕获与任务相关的时间。对于每个通过工作者的任务,我们记录以下各项的开始和停止时间:

  1. 序列化 (灰色)

  2. 从对等节点(红色)收集依赖项

  3. 收集本地数据的磁盘I/O(橙色)

  4. 执行时间(按任务着色)

观察这些时间的主要方式是通过调度器的 /status 页面上的任务流图,其中条形的颜色对应于上面列出的颜色。

Dask 任务流

或者,如果你想对每个任务事件进行自己的诊断,你可能需要创建一个 调度器插件 。当任务从处理状态转换到内存或出错时,所有这些信息都将可用。

统计分析

对于单线程性能分析,Python 用户通常依赖标准库中的 CProfile 模块(Dask 开发者推荐使用 snakeviz 工具进行单线程性能分析)。不幸的是,标准的 CProfile 模块不适用于多线程或分布式计算。

为了解决这个问题,Dask 实现了自己的分布式 统计分析器。每 10 毫秒,每个工作进程检查其每个工作线程正在执行的操作。它捕获调用栈并将此栈添加到计数数据结构中。此计数数据结构每秒记录并清除一次,以便建立一段时间内的性能记录。

用户通常通过工作节点或调度器诊断仪表板上的 /profile 图表来观察这些数据。在调度器页面上,他们观察所有工作节点上所有线程的总聚合配置文件。点击配置文件中的任何条形图,用户可以放大该部分,这与大多数分析工具的典型操作一致。页面底部有一个时间线,允许用户选择不同的时间段。

Dask 分析器

配置文件也按当时正在运行的任务进行分组。您可以从页面顶部的选择菜单中选择一个任务名称。您还可以点击 /status 页面上主任务流图中对应于任务的矩形。

用户也可以使用 Client.profile 函数直接查询这些数据。这将提供用于生成这些图表的原始数据结构。他们还可以传递一个文件名,直接将图表保存为HTML文件。请注意,这个文件需要从类似 python -m http.server 的网络服务器上提供才能可见。

10ms 和 1s 参数可以通过 config.yaml 文件中的 profile-intervalprofile-cycle-interval 条目进行控制。

带宽

Dask workers 在 Worker.transfer_outgoing_logWorker.transfer_incoming_log 属性中记录每一次传入和传出的传输,包括

  1. 传输的总字节数

  2. 压缩字节传输

  3. 开始/停止时间

  4. 键已移动

  5. 同行

这些信息通过 Worker 诊断仪表盘的 /status 页面提供给用户。你可以通过在 workers 上运行命令来明确捕获它们的状态:

client.run(lambda dask_worker: dask_worker.transfer_outgoing_log)
client.run(lambda dask_worker: dask_worker.transfer_incoming_log)

性能报告

在基准测试和/或性能分析时,用户可能希望记录特定的计算甚至整个工作流程。Dask可以将bokeh仪表板保存为包含任务流、工作线程配置文件、带宽等的静态HTML图。这是通过使用 distributed.performance_report 上下文管理器包装计算来完成的:

from dask.distributed import performance_report

with performance_report(filename="dask-report.html"):
    ## some dask computation

以下视频更详细地演示了 performance_report 上下文管理器:

关于时间的一点说明

不同的计算机维护着不同的时钟,这些时钟可能不完全匹配。为了解决这个问题,Dask调度器在每次响应工作节点的心跳时发送其当前时间。工作节点将其本地时间与此时间进行比较,以获得差异的估计。工作节点记录的所有时间都考虑了这种估计的延迟。这有所帮助,但仍然可能存在不精确的测量。

所有时间都是从调度器的角度出发的。

分析内存使用随时间的变化

您可能想知道随着计算的进行,集群范围内的内存使用情况如何随时间变化,或者同一算法的两种不同实现方式在内存使用方面有何比较。

这是通过使用 distributed.diagnostics.MemorySampler 上下文管理器包装一个计算来完成的:

from distributed import Client
from distributed.diagnostics import MemorySampler

client = Client(...)
ms = MemorySampler()
with ms.sample("collection 1"):
    collection1.compute()
with ms.sample("collection 2"):
    collection2.compute()
...
ms.plot(align=True)

示例输出:

MemorySampler 的示例输出
class distributed.diagnostics.MemorySampler[源代码]

每 <interval> 秒采样一次集群范围的内存使用情况。

用法

client = Client()
ms = MemorySampler()

with ms.sample("run 1"):
    <run first workflow>
with ms.sample("run 2"):
    <run second workflow>
...
ms.plot()

或使用异步客户端:

client = await Client(asynchronous=True)
ms = MemorySampler()

async with ms.sample("run 1"):
    <run first workflow>
async with ms.sample("run 2"):
    <run second workflow>
...
ms.plot()
plot(*, align: bool = False, **kwargs: Any) Any[源代码]

绘制迄今为止收集的数据系列

参数
对齐bool (可选)

参见 to_pandas()

kwargs

直接传递给 pandas.DataFrame.plot()

返回
输出 pandas.DataFrame.plot()
sample(label: str | None = None, *, client: Client | None = None, measure: str = 'process', interval: float = 0.5) Any[源代码]

记录集群内存使用情况的上下文管理器。如果客户端是同步的,则它是同步的;如果客户端是异步的,则它是异步的。

样本记录在 self.samples[<标签>] 中。

参数
标签: str, 可选

标签用于记录在 self.samples 字典下的样本。默认:自动生成一个随机标签

client: Client, 可选

用于连接调度器的客户端。默认:使用全局客户端

measure: str, 可选

来自 distributed.scheduler.MemoryState 的措施之一。默认值:采样进程内存

interval: float, 可选

采样间隔,以秒为单位。默认值:0.5

to_pandas(*, align: bool = False) pd.DataFrame[源代码]

将数据系列作为 pandas.Dataframe 返回。

参数
对齐bool, 可选

如果为 True,将每个系列的绝对时间戳转换为从每个系列第一个样本开始的时间差,以便不同的系列可以并排可视化。如果为 False(默认),则使用绝对时间戳。