高级图形操作

高级图形操作

在某些情况下,使用 Dask 集合进行计算会导致内存使用效率低下(例如,整个 Dask DataFrame 被加载到内存中)。这可能发生在 Dask 的调度器没有自动延迟任务图中节点的计算以避免其输出长时间占用内存,或者在重新计算节点比将其输出保留在内存中更便宜的场景中。

本页介绍了一组图操作工具,这些工具可以帮助避免这些情况。特别是,下面描述的工具重写了Dask集合的基础Dask图,生成具有不同键集的等效集合。

考虑以下示例:

>>> import dask.array as da
>>> x = da.random.default_rng().normal(size=500_000_000, chunks=100_000)
>>> x_mean = x.mean()
>>> y = (x - x_mean).max().compute()

上述示例在去除分布的偏差后计算其最大值。这涉及将 x 的块加载到内存中以计算 x_mean。然而,由于在计算 y 时稍后需要 x 数组,因此整个 x 数组保留在内存中。对于大型 Dask 数组来说,这可能会非常成问题。

为了减轻整个 x 数组需要保持在内存中的需求,可以将最后一行重写如下:

>>> from dask.graph_manipulation import bind
>>> xb = bind(x, x_mean)
>>> y = (xb - x_mean).max().compute()

在这里,我们使用 bind() 来创建一个新的 Dask 数组 xb,它产生与 x 完全相同的输出,但其底层 Dask 图具有与 x 不同的键,并且只有在 x_mean 计算完成后才会被计算。

这导致 x 的块被计算并立即通过 mean 进行单独缩减;然后重新计算并再次立即管道化到减法中,随后通过 max 进行缩减。这导致峰值内存使用量大大减少,因为不再将完整的 x 数组加载到内存中。然而,权衡的是计算时间增加,因为 x 被计算了两次。

API

checkpoint(*collections[, split_every])

构建一个 延迟 ,它在输入集合的所有块都被计算完毕后返回 None。

wait_on(*collections[, split_every])

在计算任何块的依赖项之前,确保所有输入集合的所有块都已计算完毕。

bind(children, parents, *[, omit, seed, ...])

使 children 集合(可选地省略子集合)依赖于 parents 集合。

clone(*collections[, omit, seed, assume_layers])

克隆 dask 集合,返回从独立计算生成的等效集合。

定义

dask.graph_manipulation.checkpoint(*collections, split_every: Optional[Union[float, Literal[False]]] = None) dask.delayed.Delayed[源代码]

构建一个 延迟 ,它在输入集合的所有块都被计算完毕后返回 None。

参数
集合

零个或多个 Dask 集合或包含零个或多个集合的嵌套数据结构

split_every: int >= 2 或 False, 可选

确定递归聚合的深度。如果大于输入键的数量,聚合将分多个步骤进行;聚合图的深度将为 \(log_{split_every}(输入键)\)。设置为较低的值可以减少缓存大小和网络传输,但代价是更多的CPU和更大的dask图。

设置为 False 以禁用。默认值为 8。

返回
延迟 产生 None
dask.graph_manipulation.wait_on(*collections, split_every: Optional[Union[float, Literal[False]]] = None)[源代码]

在计算任何块的依赖项之前,确保所有输入集合的所有块都已计算完毕。

以下示例创建了一个 dask 数组 u,当在计算中使用时,只有在数组 x 的所有块都已计算完毕后才会继续进行,但在其他方面与 x 匹配:

>>> import dask.array as da
>>> x = da.ones(10, chunks=5)
>>> u = wait_on(x)

以下示例将创建两个数组 uv ,当在计算中使用时,只有在数组 xy 的所有块都已计算完毕后才会继续,否则它们将与 xy 匹配:

>>> x = da.ones(10, chunks=5)
>>> y = da.zeros(10, chunks=5)
>>> u, v = wait_on(x, y)
参数
集合

零个或多个 Dask 集合或 Dask 集合的嵌套结构

split_every

参见 checkpoint()

返回
collections 相同

与输入类型相同的Dask集合,其计算结果相同,或者与输入等效的嵌套结构,其中原始集合已被替换。新生成的集合节点的键将与原始键不同,以便它们可以在同一个图中使用。

dask.graph_manipulation.bind(children: dask.graph_manipulation.T, parents, *, omit=None, seed: collections.abc.Hashable | None = None, assume_layers: bool = True, split_every: Optional[Union[float, Literal[False]]] = None) dask.graph_manipulation.T[源代码]

创建 children 集合,可选地省略子集合,依赖于 parents 集合。以下是两个示例。

第一个示例创建了一个数组 b2,其计算首先完全计算数组 a,然后完全计算 b,在此过程中重新计算 a

>>> import dask
>>> import dask.array as da
>>> a = da.ones(4, chunks=2)
>>> b = a + 1
>>> b2 = bind(b, a)
>>> len(b2.dask)
9
>>> b2.compute()
array([2., 2., 2., 2.])

第二个示例创建了数组 b3c3,其计算首先计算数组 a,然后计算加法,这次在过程中不再重新计算 a

>>> c = a + 2
>>> b3, c3 = bind((b, c), a, omit=a)
>>> len(b3.dask), len(c3.dask)
(7, 7)
>>> dask.compute(b3, c3)
(array([2., 2., 2., 2.]), array([3., 3., 3., 3.]))
参数
子节点

Dask 集合或 Dask 集合的嵌套结构

父级

Dask 集合或 Dask 集合的嵌套结构

省略

Dask 集合或 Dask 集合的嵌套结构

种子

用于种子密钥再生的哈希值。省略则默认为每次调用时生成不同密钥的随机数。

假设层

使用一个在层级上工作的快速算法,该算法假设 childrenomit 中的所有集合

  1. 使用 HighLevelGraph,

  2. 定义 __dask_layers__() 方法,并且

  3. 从未在创建 omit 集合和 children 集合之间压缩和重建它们的图表;换句话说,如果 omit 集合的键可以在 children 集合的键中找到,那么对于图层来说也必须同样成立。

使用一个在键级别工作的较慢算法,这使得上述假设都不成立。

split_every

参见 checkpoint()

返回
children 相同

Dask 集合或等效于 children 的 Dask 集合结构,它们计算得到相同的值。所有 children 的节点将被重新生成,直到并排除 omit 的节点。直接位于 omit 之上的节点,或者如果 omit 中的集合未找到,则阻止其计算,直到 parents 中的所有集合都已完全计算。重新生成的节点的键将与原始键不同,以便它们可以在同一个图中使用。

dask.graph_manipulation.clone(*collections, omit=None, seed: collections.abc.Hashable = None, assume_layers: bool = True)[源代码]

克隆 dask 集合,返回从独立计算生成的等效集合。

参数
集合

零个或多个 Dask 集合或 Dask 集合的嵌套结构

省略

Dask 集合或 Dask 集合的嵌套结构,这些将不会被克隆

种子

参见 绑定()

假设层

参见 绑定()

返回
collections 相同

Dask 集合与输入类型相同,计算结果相同,或与输入等效的嵌套结构,其中原始集合已被替换。新集合中再生节点的键将与原始键不同,以便它们可以在同一个图中使用。

示例

(为了简洁起见,令牌已被简化)

>>> import dask.array as da
>>> x_i = da.asarray([1, 1, 1, 1], chunks=2)
>>> y_i = x_i + 1
>>> z_i = y_i + 2
>>> dict(z_i.dask)  
{('array-1', 0): array([1, 1]),
 ('array-1', 1): array([1, 1]),
 ('add-2', 0): (<function operator.add>, ('array-1', 0), 1),
 ('add-2', 1): (<function operator.add>, ('array-1', 1), 1),
 ('add-3', 0): (<function operator.add>, ('add-2', 0), 1),
 ('add-3', 1): (<function operator.add>, ('add-2', 1), 1)}
>>> w_i = clone(z_i, omit=x_i)
>>> w_i.compute()
array([4, 4, 4, 4])
>>> dict(w_i.dask)  
{('array-1', 0): array([1, 1]),
 ('array-1', 1): array([1, 1]),
 ('add-4', 0): (<function operator.add>, ('array-1', 0), 1),
 ('add-4', 1): (<function operator.add>, ('array-1', 1), 1),
 ('add-5', 0): (<function operator.add>, ('add-4', 0), 1),
 ('add-5', 1): (<function operator.add>, ('add-4', 1), 1)}

clone() 的典型使用模式如下:

>>> x = cheap_computation_with_large_output()  
>>> y = expensive_and_long_computation(x)  
>>> z = wrap_up(clone(x), y)  

在上面的代码中,x 的块一旦被 y 的块消耗,就会被遗忘,然后在计算的最后从头开始重新生成。如果没有 clone(),x 只会计算一次,然后在整个 y 的计算过程中一直保存在内存中,不必要地消耗内存。