诊断(本地)

分析并行代码可能具有挑战性,但 dask.diagnostics 提供了功能来帮助分析和检查使用 本地任务调度器 的执行情况。

本页描述了以下几个内置选项:

  1. 进度条

  2. 分析器

  3. ResourceProfiler

  4. CacheProfiler

此外,本页面还提供了如何构建自己的自定义诊断的说明。

进度条

ProgressBar([minimum, width, dt, out])

一个用于 dask 的进度条。

ProgressBar 类基于上述调度器回调,在计算过程中在终端或笔记本中显示进度条。这可以在长时间运行的图执行过程中提供良好的反馈。它可以作为上下文管理器在调用 getcompute 时用于分析计算:

>>> import dask.array as da
>>> from dask.diagnostics import ProgressBar
>>> a = da.random.default_rng().normal(size=(10000, 10000), chunks=(1000, 1000))
>>> res = a.dot(a.T).mean(axis=0)

>>> with ProgressBar():
...     out = res.compute()
[########################################] | 100% Completed | 17.1 s

或者使用 register 方法全局注册:

>>> pbar = ProgressBar()
>>> pbar.register()
>>> out = res.compute()
[########################################] | 100% Completed | 17.1 s

要从全局回调中注销,请调用 unregister 方法:

>>> pbar.unregister()

分析器

Profiler()

一个用于在任务级别执行dask的分析器。

Dask 提供了一些用于分析执行的工具。与 ProgressBar 类似,它们每个都可以作为上下文管理器使用,或者全局注册。

Profiler 类用于在任务级别上分析 Dask 的执行。在执行过程中,它会记录每个任务的以下信息:

  1. 任务

  2. 自纪元以来的开始时间(以秒为单位)

  3. 自纪元以来的完成时间(秒)

  4. Worker id

ResourceProfiler

ResourceProfiler([dt])

一个用于资源使用的分析器。

ResourceProfiler 类用于在资源级别上分析 Dask 的执行情况。在执行过程中,它会记录每个时间步的以下信息:

  1. 自纪元以来的秒数

  2. 内存使用量(MB)

  3. % CPU 使用率

默认的时间步长是1秒,但可以使用 dt 关键字手动设置:

>>> from dask.diagnostics import ResourceProfiler
>>> rprof = ResourceProfiler(dt=0.5)

CacheProfiler

CacheProfiler([metric, metric_name])

一个用于调度器缓存级别dask执行的分析器。

CacheProfiler 类用于在调度器缓存级别分析 Dask 的执行。在执行过程中,它记录每个任务的以下信息:

  1. 任务

  2. 尺寸度量

  3. 自纪元以来的缓存条目时间(以秒为单位)

  4. 缓存退出时间,自纪元以来的秒数

这里的尺寸度量是每个任务结果上调用的函数的输出。默认度量是计算每个任务(所有任务的 metric 为 1)。其他函数可以通过 metric 关键字作为度量使用。例如,cachey 中的 nbytes 函数可以用来测量调度器缓存中的字节数:

>>> from dask.diagnostics import CacheProfiler
>>> from cachey import nbytes
>>> cprof = CacheProfiler(metric=nbytes)

示例

作为一个演示使用诊断功能的例子,我们将对使用 Dask Array 完成的线性代数进行性能分析。我们将创建一个随机数组,取其 QR 分解,然后通过将 Q 和 R 分量相乘来重建初始数组。请注意,由于分析器(以及所有诊断工具)只是上下文管理器,因此可以在一个 with 块中使用多个分析器:

>>> import dask.array as da
>>> from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
>>> a = da.random.random(size=(10000, 1000), chunks=(1000, 1000))
>>> q, r = da.linalg.qr(a)
>>> a2 = q.dot(r)

>>> with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof,
...         CacheProfiler() as cprof:
...     out = a2.compute()

每个分析器的结果都存储在其 results 属性中,作为 namedtuple 对象的列表:

>>> prof.results[0]
TaskData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 5, 0),
         task=(qr, (_apply_random, 'random_sample', 1060164455, (1000, 1000), (), {})),
         start_time=1454368444.493292,
         end_time=1454368444.902987,
         worker_id=4466937856)

>>> rprof.results[0]
ResourceData(time=1454368444.078748, mem=74.100736, cpu=0.0)

>>> cprof.results[0]
CacheData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 7, 0),
          task=(qr, (_apply_random, 'random_sample', 1310656009, (1000, 1000), (), {})),
          metric=1,
          cache_time=1454368444.49662,
          free_time=1454368446.769452)

这些可以单独分析,或者使用每个分析器上的 visualize 方法在 bokeh 图中查看:

>>> prof.visualize()

要同时查看多个分析器,可以使用 dask.diagnostics.visualize() 函数。该函数接受一个分析器列表,并创建一个沿 x 轴对齐的垂直堆叠图:

>>> from dask.diagnostics import visualize
>>> visualize([prof, rprof, cprof])

观察上图,从上到下:

  1. Profiler 对象的结果:这显示了每个任务的执行时间作为一个矩形,沿着 y 轴按工作者(在这种情况下是线程)组织。相似的任务按颜色分组,通过悬停在每个任务上,可以看到每个块所代表的键和任务。

  2. ResourceProfiler 对象的结果:这显示了两条线,一条表示所有工作线程使用的总CPU百分比,另一条表示总内存使用情况。

  3. CacheProfiler 对象的结果:这显示了每个任务组的行,绘制了当前 metric 在缓存中随时间变化的和。在这种情况下,它是默认的度量(计数),线条表示在时间点上每个对象在缓存中的数量。请注意,分组和着色与 Profiler 图相同,并且可以通过将鼠标悬停在线上来找到每条线所代表的任务。

从这些图中我们可以看到,初始任务(对每个块调用 numpy.random.randomnumpy.linalg.qr)是并发运行的,但仅使用了略高于 100% 的 CPU。这是因为对 numpy.linalg.qr 的调用目前不会释放全局解释器锁(GIL),因此这些调用无法真正并行执行。接下来,有一个归约步骤,其中所有块被合并。这要求第一步的所有结果都保存在内存中,如缓存中结果数量的增加和内存使用量的增加所示。在此任务结束后,缓存中的元素数量立即减少,表明它们仅在此步骤中需要。最后,有一系列交替调用 dotsum。从 CPU 图来看,这表明它们既并发又并行运行,因为 CPU 百分比峰值达到约 350%。

自定义回调

Callback([start, start_state, pretask, ...])

使用回调机制的基类

基于 dask.local.get_async 的调度器(目前包括 dask.getdask.threaded.getdask.multiprocessing.get)接受五个回调,允许对调度器执行进行检查。

回调函数包括:

1. start(dsk): Run at the beginning of execution, right before the state is initialized. Receives the Dask graph

2. start_state(dsk, state): Run at the beginning of execution, right after the state is initialized. Receives the Dask graph and scheduler state

3. pretask(key, dsk, state): Run every time a new task is started. Receives the key of the task to be run, the Dask graph, and the scheduler state

4. posttask(key, result, dsk, state, id): Run every time a task is finished. Receives the key of the task that just completed, the result, the Dask graph, the scheduler state, and the id of the worker that ran the task

5. finish(dsk, state, errored): Run at the end of execution, right before the result is returned. Receives the Dask graph, the scheduler state, and a boolean indicating whether or not the exit was due to an error

可以通过使用上述方法中的某些方法作为关键字实例化 Callback 类,或者通过子类化 Callback 类来创建自定义诊断。这里我们创建一个类,在计算每个键时打印其名称:

from dask.callbacks import Callback
class PrintKeys(Callback):
    def _pretask(self, key, dask, state):
        """Print the key of every task as it's started"""
        print("Computing: {0}!".format(repr(key)))

这现在可以在计算过程中用作上下文管理器:

>>> from operator import add, mul
>>> dsk = {'a': (add, 1, 2), 'b': (add, 3, 'a'), 'c': (mul, 'a', 'b')}

>>> with PrintKeys():
...     get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!

或者,可以将函数作为关键字参数传递给 Callback

>>> def printkeys(key, dask, state):
...    print("Computing: {0}!".format(repr(key)))

>>> with Callback(pretask=printkeys):
...     get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!

API

CacheProfiler([metric, metric_name])

一个用于调度器缓存级别dask执行的分析器。

Callback([start, start_state, pretask, ...])

使用回调机制的基类

Profiler()

一个用于在任务级别执行dask的分析器。

ProgressBar([minimum, width, dt, out])

一个用于 dask 的进度条。

ResourceProfiler([dt])

一个用于资源使用的分析器。

visualize(profilers[, filename, show, save, ...])

在 bokeh 图中可视化分析结果。

dask.diagnostics.ProgressBar(minimum=0, width=40, dt=0.1, out=None)[源代码]

一个用于 dask 的进度条。

参数
最小int, 可选

显示进度条前的最短时间阈值(以秒为单位)。默认值为 0(总是显示)

宽度int, 可选

条形的宽度

dtfloat, 可选

更新分辨率以秒为单位,默认是0.1秒

文件对象,可选

进度条将被写入的文件对象。可以是 sys.stdoutsys.stderr 或任何能够写入 str 对象的其他文件对象。默认是 sys.stdout

示例

下面我们创建一个进度条,显示前有一个最小阈值为1秒。对于廉价的计算,什么都不会显示:

>>> with ProgressBar(minimum=1.0):      
...     out = some_fast_computation.compute()

但对于昂贵的计算,会显示一个完整的进度条:

>>> with ProgressBar(minimum=1.0):      
...     out = some_slow_computation.compute()
[########################################] | 100% Completed | 10.4 s

最后一次计算的持续时间可作为属性使用

>>> pbar = ProgressBar()                
>>> with pbar:                          
...     out = some_computation.compute()
[########################################] | 100% Completed | 10.4 s
>>> pbar.last_duration                  
10.4

你也可以注册一个进度条,以便它在所有计算中显示:

>>> pbar = ProgressBar()                
>>> pbar.register()                     
>>> some_slow_computation.compute()     
[########################################] | 100% Completed | 10.4 s
dask.diagnostics.Profiler()[源代码]

一个用于在任务级别执行dask的分析器。

记录每个任务的以下信息:
  1. 任务

  2. 自纪元以来的开始时间(以秒为单位)

  3. 自纪元以来的完成时间(秒)

  4. Worker id

示例

>>> from operator import add, mul
>>> from dask.threaded import get
>>> from dask.diagnostics import Profiler
>>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
>>> with Profiler() as prof:
...     get(dsk, 'z')
22
>>> prof.results        
[TaskData(key='y', task=(add, 'x', 10), start_time=..., end_time=..., worker_id=...),
 TaskData(key='z', task=(mul, 'y', 2), start_time=..., end_time=..., worker_id=...)]

这些结果可以使用 visualize 方法在 bokeh 图中可视化。请注意,这需要安装 bokeh。

>>> prof.visualize()    

您可以全局激活分析器

>>> prof.register()

如果你全局使用分析器,你需要手动清除旧的结果。

>>> prof.clear()
>>> prof.unregister()
dask.diagnostics.ResourceProfiler(dt=1)[源代码]

一个用于资源使用的分析器。

记录以下每个时间步
  1. 自纪元以来的秒数

  2. 内存使用量(MB)

  3. % CPU 使用率

示例

>>> from operator import add, mul
>>> from dask.threaded import get
>>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
>>> with ResourceProfiler() as prof:
...     get(dsk, 'z')
22

这些结果可以使用 visualize 方法在 bokeh 图中可视化。请注意,这需要安装 bokeh。

>>> prof.visualize() 

您可以全局激活分析器

>>> prof.register()

如果你全局使用分析器,你需要手动清除旧的结果。

>>> prof.clear()

请注意,当作为上下文管理器使用时,数据将在封闭块的持续时间内被收集。相比之下,当全局注册时,数据仅在dask调度器处于活动状态时被收集。

>>> prof.unregister()
dask.diagnostics.CacheProfiler(metric=None, metric_name=None)[源代码]

一个用于调度器缓存级别dask执行的分析器。

记录每个任务的以下信息:
  1. 任务

  2. 尺寸度量

  3. 自纪元以来的缓存条目时间(以秒为单位)

  4. 缓存退出时间,自纪元以来的秒数

示例

>>> from operator import add, mul
>>> from dask.threaded import get
>>> from dask.diagnostics import CacheProfiler
>>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
>>> with CacheProfiler() as prof:
...     get(dsk, 'z')
22
>>> prof.results    
[CacheData(key='y', task=(add, 'x', 10), metric=1, cache_time=..., free_time=...),
 CacheData(key='z', task=(mul, 'y', 2), metric=1, cache_time=..., free_time=...)]

默认情况下,每个任务都会被计数(所有任务的 metric 为 1)。通过 metric 关键字,可以使用其他函数作为度量标准。例如,可以在 cachey 中找到的 nbytes 函数可以用来测量缓存中的字节数。

>>> from cachey import nbytes                   
>>> with CacheProfiler(metric=nbytes) as prof:  
...     get(dsk, 'z')
22

分析结果可以使用 visualize 方法在 bokeh 图中可视化。请注意,这需要安装 bokeh。

>>> prof.visualize() 

您可以全局激活分析器

>>> prof.register()

如果你全局使用分析器,你需要手动清除旧的结果。

>>> prof.clear()
>>> prof.unregister()
dask.diagnostics.Callback(start=None, start_state=None, pretask=None, posttask=None, finish=None)[源代码]

使用回调机制的基类

创建一个具有以下签名的函数回调:

>>> def start(dsk):
...     pass
>>> def start_state(dsk, state):
...     pass
>>> def pretask(key, dsk, state):
...     pass
>>> def posttask(key, result, dsk, state, worker_id):
...     pass
>>> def finish(dsk, state, failed):
...     pass

然后,您可以使用任意数量的回调对象来构建回调对象。

>>> cb = Callback(pretask=pretask, finish=finish)

并将其作为计算/获取调用的上下文管理器使用

>>> with cb:            
...     x.compute()

或者使用 register 方法全局注册

>>> cb.register()
>>> cb.unregister()

或者使用你自己的方法子类化 Callback 类。

>>> class PrintKeys(Callback):
...     def _pretask(self, key, dask, state):
...         print("Computing: {0}!".format(repr(key)))
>>> with PrintKeys():   
...     x.compute()
dask.diagnostics.visualize(profilers, filename='profile.html', show=True, save=None, mode=None, **kwargs)[源代码]

在 bokeh 图中可视化分析结果。

如果传递了多个分析器,图表将垂直堆叠。

参数
分析器分析器或列表

分析器或分析器列表。

文件名字符串,可选

绘图输出文件的名称。

显示布尔值,可选

如果为 True(默认),图表将在浏览器中打开。

保存布尔值,可选

如果为True(在非笔记本环境中默认为True),图表将被保存到磁盘。

模式str, 可选

传递给 bokeh.output_file() 的模式

**kwargs

其他关键字参数,传递给 bokeh.figure。这些将覆盖 visualize 设置的所有默认值。

返回
完成的 bokeh 绘图对象。