内容

排序

内容

排序

备注

这是一个大多数用户不需要担心的进阶主题。

当Dask接收到一个任务图进行计算时,它需要选择一个执行任务的顺序。我们有一些约束条件:依赖项必须在它们的依赖项之前执行。但除此之外,还有大量的选择空间。我们希望Dask选择一个既能最大化并行性又能最小化运行计算所需内存占用的顺序。

从高层次来看,Dask 有一个策略,即通过 大步迈进 来实现 小目标

  1. 小目标:倾向于选择总依赖项少且最终依赖项总依赖项少的任务。

    我们倾向于优先处理那些有助于快速终止计算分支的任务。

    更详细地说,我们计算每个任务所依赖的总依赖数(包括其自身的依赖、其依赖的依赖等),然后我们选择那些总依赖数较少的任务来驱动结果。我们选择优先处理那些首先完成较短计算的任务。

  2. 大步: 优先选择依赖项多的任务

    然而,许多任务都朝着相同的最终依赖项工作。在这些任务中,我们选择那些剩余工作最多的任务。我们希望在开始处理较小的部分之前,先完成子计算中的较大部分。

这是通过 dask.order.order() 完成的。更技术性的讨论可以在 深入调度 中找到。http://www.aidoczh.com/dask-distributed/scheduling-policies.html 也讨论了调度,重点是分布式调度器,其中包括此处文档的静态排序之外的其他选择。

调试

大多数情况下,Dask 的排序做得很好。但这确实是一个非常困难的问题,可能会有一些情况下你会观察到意外的高内存使用或通信,这可能是由于排序不佳造成的。本节描述了如何识别排序问题,以及你可以采取的一些缓解问题的步骤。

考虑一个计算,它从磁盘独立加载多个数据链,将它们的部分堆叠在一起,并进行一些归约操作:

>>> # create data on disk
>>> import dask.array as da
>>> x = da.zeros((12500, 10000), chunks=('10MB', -1))
>>> da.to_zarr(x, 'saved_x1.zarr', overwrite=True)
>>> da.to_zarr(x, 'saved_y1.zarr', overwrite=True)
>>> da.to_zarr(x, 'saved_x2.zarr', overwrite=True)
>>> da.to_zarr(x, 'saved_y2.zarr', overwrite=True)

我们可以加载数据

>>> # load the data.
>>> x1 = da.from_zarr('saved_x1.zarr')
>>> y1 = da.from_zarr('saved_x2.zarr')
>>> x2 = da.from_zarr('saved_y1.zarr')
>>> y2 = da.from_zarr('saved_y2.zarr')

并对它进行一些计算

>>> def evaluate(x1, y1, x2, y2):
...     u = da.stack([x1, y1])
...     v = da.stack([x2, y2])
...     components = [u, v, u ** 2 + v ** 2]
...     return [
...         abs(c[0] - c[1]).mean(axis=-1)
...         for c in components
...     ]
>>> results = evaluate(x1, y1, x2, y2)

你可以使用 dask.visualize() 并设置 color="order" 来可视化包含静态排序的任务图,节点标签中会包含排序信息。与通常使用 dask.visualize 一样,你可能需要将问题缩小到较小的规模,因此我们会截取数据的一个子集。确保包含 optimize_graph=True 以获得任务执行顺序的真实表示。

>>> import dask
>>> n = 125 * 4
>>> dask.visualize(evaluate(x1[:n], y1[:n], x2[:n], y2[:n]),
...                optimize_graph=True, color="order",
...                cmap="autumn", node_attr={"penwidth": "4"})
输出端有几条垂直节点链的复杂任务图,以及几个输入子树。在这些部分之间,存在一个多对多的交叉依赖箭头区域。输出树的颜色编码是交错的,没有清晰的进展。

在这个可视化中,节点按执行顺序(从深红到浅黄)着色,节点标签是Dask分配给每个任务的顺序。

虽然有点难以察觉,但实际上这里有四个基本上独立的“执行塔”。我们从中间右侧的数组(标签1,底部)开始,向上移动到右侧(标签8,右上角),然后跳转到一个完全不同的数组(标签11,左下角)。然而,计算第一个塔(标签8,右上角的下游)需要从我们的第二个输入数组(标签5,右下角)加载一些数据。我们更倾向于完成其下游的任务。

当 Dask 执行该任务图时,您可能会观察到高内存使用率。糟糕的静态排序意味着我们未能完成那些本可以让我们释放数据片段的任务。我们一次性将更多数据片段加载到内存中,导致内存使用率更高。

这种特定的排序失败(可能已修复)来自于每个计算链底部的共享依赖项(每个任务底部的方框,代表输入的Zarr数组)。我们可以将这些内联并查看排序的效果:

>>> # load and profile data
>>> x1 = da.from_zarr('saved_x1.zarr', inline_array=True)
>>> y1 = da.from_zarr('saved_x2.zarr', inline_array=True)
>>> x2 = da.from_zarr('saved_y1.zarr', inline_array=True)
>>> y2 = da.from_zarr('saved_y2.zarr', inline_array=True)

>>> import dask
>>> n = 125 * 4
>>> dask.visualize(evaluate(x1[:n], y1[:n], x2[:n], y2[:n]),
...                optimize_graph=True, color="order",
...                cmap="autumn", node_attr={"penwidth": "4"})
输出端有几条垂直节点链的复杂任务图,以及数量相似的输入块。输出和输入通过每个节点有少量输入的简单节点连接,布局时各部分树之间没有显著交叉。输出链的颜色编码显示了执行顺序的清晰进展,每种输出颜色都有一个相同颜色的对应输入。

乍一看,我们可以看到这种排序看起来更加规则和统一。交叉的线条更少,排序的颜色从下到上、从左到右平滑移动。这表明Dask在完成一个计算链后才进入下一个。

这里的教训 不是 “总是使用 inline_array=True”。虽然静态排序看起来更好,但还有其他需要考虑的 计算阶段。实际性能是否更好将取决于比我们在这里能考虑的更多的因素。更多信息请参见 dask.array.from_array()

相反,这里要吸取的教训是:

  1. 哪些症状可能让你诊断出 Dask 的排序是一个问题(例如,高内存使用)

  2. 如何生成并读取包含Dask排序信息的任务图。