Dask 数组由许多 NumPy(或类似 NumPy)数组组成。这些数组的排列方式会显著影响性能。例如,对于一个方形数组,你可以沿着行、列或更方形的样式排列你的块。不同排列的 NumPy 数组对于不同的算法会有不同的速度表现。

思考和控制分块对于优化高级算法非常重要。

指定块形状

我们总是指定一个 chunks 参数来告诉 dask.array 如何将底层数组分解为块。我们可以以多种方式指定 chunks

  1. 一个统一的维度大小,如 1000,意味着每个维度中的块大小为 1000

  2. 一个统一的块形状,如 (1000, 2000, 3000),表示在第一个轴上大小为 1000 的块,在第二个轴上大小为 2000 的块,在第三个轴上大小为 3000 的块。

  3. 所有维度上所有块的完全显式大小,例如 ((1000, 1000, 500), (400, 400), (5, 5, 5, 5, 5))

  4. 一个指定每个维度块大小的字典,例如 {0: 1000, 1: 2000, 2: 3000}。这只是上面形式2和3的另一种写法。

你的数据块输入将被标准化并存储在第三种也是最明确的形式中。注意,chunks 代表的是“数据块形状”而不是“数据块数量”,因此指定 chunks=1 意味着你将有许多数据块,每个数据块中只有一个元素。

为了性能,选择 chunks 的一个好方法是遵循以下规则:

  1. 一个块应该足够小,以舒适地适应内存。我们将在内存中同时拥有许多块。

  2. 一个块必须足够大,以便在该块上的计算时间明显长于Dask调度产生的每个任务1ms的开销。一个任务应花费超过100ms的时间。

  3. 块大小在 10MB-1GB 之间很常见,具体取决于 RAM 的可用性和计算的持续时间。

  4. 块应与您想要执行的计算对齐。

    例如,如果你计划频繁地沿某一维度切片,那么如果你的块是对齐的,使得你只需要接触较少的块,这样会更高效。如果你想将两个数组相加,那么如果这些数组具有匹配的块模式,就会很方便。

  5. 块应与您的存储对齐(如果适用)。

    数组数据格式通常也会被分块。在加载或保存数据时,如果Dask数组的分块与存储的分块对齐,通常在每个方向上都是更大的偶数倍,这将非常有用。

在 Genevieve Buckley 的 Choosing good chunk sizes in Dask 中了解更多。

未知块

一些数组具有未知的块大小。这种情况发生在数组的大小依赖于我们尚未执行的惰性计算时,例如以下情况:

>>> rng = np.random.default_rng()
>>> x = da.from_array(rng.standard_normal(100), chunks=20)
>>> x += 0.1
>>> y = x[x > 0]  # don't know how many values are greater than 0 ahead of time

上述操作会导致形状和块大小未知的数组。形状或块中的未知值使用 np.nan 而不是整数来表示。这些数组支持许多(但不是所有)操作。特别是,切片等操作是不可能的,并且会导致错误。

>>> y.shape
(np.nan,)
>>> y[4]
...
ValueError: Array chunk sizes unknown

A possible solution: http://www.aidoczh.com/dask/array-chunks.html#unknown-chunks.
Summary: to compute chunks sizes, use

    x.compute_chunk_sizes()  # for Dask Array
    ddf.to_dask_array(lengths=True)  # for Dask DataFrame ddf

使用 compute_chunk_sizes() 允许此示例运行:

>>> y.compute_chunk_sizes()
dask.array<..., chunksize=(19,), ...>
>>> y.shape
(44,)
>>> y[4].compute()
0.78621774046566

注意 compute_chunk_sizes() 会立即执行计算并就地修改数组。

在使用 Dask DataFrame 创建 Dask 数组时,也会出现未知的 chunksizes:

>>> ddf = dask.dataframe.from_pandas(...)
>>> ddf.to_dask_array()
dask.array<..., shape=(nan, 2), ..., chunksize=(nan, 2)>

使用 to_dask_array() 可以解决这个问题:

>>> ddf.to_dask_array(lengths=True)
dask.array<..., shape=(100, 2), ..., chunksize=(20, 2)>

关于 to_dask_array() 的更多细节,在 Dask 数组创建文档 中如何从 Dask DataFrame 创建 Dask 数组的章节中有提及。

块示例

在这个示例中,我们展示了如何使用不同的 chunks= 输入来分割以下数组:

1 2 3 4 5 6
7 8 9 0 1 2
3 4 5 6 7 8
9 0 1 2 3 4
5 6 7 8 9 0
1 2 3 4 5 6

在这里,我们展示了不同的 chunks= 参数如何将数组分割成不同的块

chunks=3:大小为3的对称块:

1 2 3  4 5 6
7 8 9  0 1 2
3 4 5  6 7 8

9 0 1  2 3 4
5 6 7  8 9 0
1 2 3  4 5 6

chunks=2: 大小为2的对称块:

1 2  3 4  5 6
7 8  9 0  1 2

3 4  5 6  7 8
9 0  1 2  3 4

5 6  7 8  9 0
1 2  3 4  5 6

chunks=(3, 2): 大小为 (3, 2) 的非对称但重复的块:

1 2  3 4  5 6
7 8  9 0  1 2
3 4  5 6  7 8

9 0  1 2  3 4
5 6  7 8  9 0
1 2  3 4  5 6

chunks=(1, 6): 大小为 (1, 6) 的非对称但重复的块:

1 2 3 4 5 6

7 8 9 0 1 2

3 4 5 6 7 8

9 0 1 2 3 4

5 6 7 8 9 0

1 2 3 4 5 6

chunks=((2, 4), (3, 3)): 非对称且不重复的块:

1 2 3  4 5 6
7 8 9  0 1 2

3 4 5  6 7 8
9 0 1  2 3 4
5 6 7  8 9 0
1 2 3  4 5 6

chunks=((2, 2, 1, 1), (3, 2, 1)): 非对称且不重复的块:

1 2 3  4 5  6
7 8 9  0 1  2

3 4 5  6 7  8
9 0 1  2 3  4

5 6 7  8 9  0

1 2 3  4 5  6

讨论

后者的例子在原始数据中很少由用户提供,但它们来自于复杂的切片和广播操作。通常人们使用最简单的形式,直到他们需要更复杂的形式。块的选择应与您想要进行的计算相一致。

例如,如果你计划沿第一个维度切出薄片,那么你可能希望使该维度比其他维度更窄。如果你计划进行线性代数运算,那么你可能需要更多对称的块。

加载分块数据

像 HDF5、NetCDF、TIFF 和 Zarr 这样的现代 NDArray 存储格式,允许数组以块或瓦片的形式存储,以便可以高效地提取数据块,而不必在连续的数据流中进行查找。最好将 Dask 数组的块与底层数据存储的块对齐。

然而,数据存储通常比Dask数组理想的块大小更细粒度,因此通常会选择一个是存储块大小倍数的块大小,否则可能会产生高开销。

例如,如果你正在加载一个以 (100, 100) 块为单位分块的数据存储,那么你可能会选择一个更大的分块,如 (1000, 2000),它仍然可以被 (100, 100) 整除。数据存储技术将能够告诉你它们的数据是如何分块的。

重新分块

rechunk(x[, chunks, threshold, ...])

将 dask 数组 x 中的块转换为新的块。

有时你需要改变数据的块布局。例如,可能数据是以行块的形式提供给你的,但你需要进行一个如果按列操作会快得多的操作。你可以使用 rechunk 方法来改变块布局。

x = x.rechunk((50, 1000))

跨轴重新分块可能会很昂贵并产生大量通信,但 Dask 数组有相当高效的算法来完成这项任务。

注意:rechunk 方法期望输出数组的形状与输入数组的形状相同,并且不支持重塑。在使用 rechunk 之前,确保所需的输出形状与输入形状匹配非常重要。

你可以传递任何有效的分块形式来进行重新分块:

x = x.rechunk(1000)
x = x.rechunk((50, 1000))
x = x.rechunk({0: 50, 1: 1000})

重塑

dask.array.reshape 的效率可能强烈依赖于输入数组的块划分。在重塑操作中,存在“快速移动”或“高”轴的概念。对于一个二维数组,第二轴(axis=1)是移动最快的,其次是第一轴。这意味着如果我们画一条线来指示值的填充方式,我们会先沿着“列”(沿 axis=1)移动,然后向下移动到下一行。考虑 np.ones((3, 4)).reshape(12)

二维(3行4列)NumPy数组的视觉表示,被重塑为一维(12列1行)。箭头表示原始数组中的值按顺序复制到新数组的方式,首先沿轴1移动列,然后沿轴0移动到下一行。

现在考虑 Dask 的分块对这个操作的影响。如果慢速移动的轴(在这种情况下只是 axis=0)的分块大小大于 1,我们就会遇到问题。

_images/reshape_problem.png

第一个块的形状是 (2, 2)。根据 reshape 的规则,我们从第一个块的第一行取两个值。但随后我们在第一个块中仍有两个“未使用”的值时跨越了一个块边界(从1到2)。无法将输入块与输出形状对齐。我们需要以某种方式重新分块输入,使其与输出形状兼容。我们有两个选项。

  1. 使用 dask.array.rechunk() 中的逻辑合并块。这避免了创建过多的任务/块,代价是一些通信和更大的中间结果。这是默认行为。

  2. 使用 da.reshape(x, shape, merge_chunks=False) 来避免通过 分割输入 来合并块。特别是,我们可以将所有慢速移动的轴重新分块,使其块大小为1。这避免了通信和移动大量数据,代价是任务图更大(可能大得多,因为慢速移动轴上的块数将等于这些轴的长度。)。

视觉上,这是第二个选项:

_images/reshape_rechunked.png

哪一个更好取决于你的问题。如果通信非常昂贵且你的数据在慢速移动轴上相对较小,那么 merge_chunks=False 可能更好。让我们比较一下这两种方法在将3维数组重塑为2维数组的问题上的任务图,其中输入数组在慢速移动轴上没有 chunksize=1

>>> a = da.from_array(np.arange(24).reshape(2, 3, 4), chunks=((2,), (2, 1), (2, 2)))
>>> a
dask.array<array, shape=(2, 3, 4), dtype=int64, chunksize=(2, 2, 2), chunktype=numpy.ndarray>
>>> a.reshape(6, 4).visualize()
_images/merge_chunks.png
>>> a.reshape(6, 4, merge_chunks=False).visualize()
_images/merge_chunks_false.png

默认情况下,一些中间块会被合并,导致任务图更加复杂。通过设置 merge_chunks=False,我们可以分割输入块(这会导致更多的总体任务,取决于数组的大小),但避免了后续的通信。

自动分块

Chunks 还包括三个特殊值:

  1. -1: 在此维度上不进行分块

  2. None: 不改变该维度上的分块(对 rechunk 有用)

  3. "auto": 允许在此维度中进行分块以适应理想的分块大小

因此,例如,可以将一个3D数组重新分块,使其在第零维度上没有分块,但仍然具有合理的分块大小,如下所示:

x = x.rechunk({0: -1, 1: 'auto', 2: 'auto'})

或者可以允许 所有 维度自动缩放以达到一个良好的块大小:

x = x.rechunk('auto')

自动分块会扩展或收缩所有标记为 "auto" 的维度,以尝试达到与配置值 array.chunk-size 相等的分块大小(以字节为单位),该值默认为 128MiB,但您可以在 配置 中更改。

>>> dask.config.get('array.chunk-size')
'128MiB'

自动重新分块尝试尊重自动重新缩放维度的中位数块形状,但会修改这一点以适应完整数组的形状(不能有比数组本身更大的块),并找到能够很好地分割形状的块形状。

这些值也可以在通过 dask.array.onesdask.array.from_array 等操作创建数组时使用。

>>> dask.array.ones((10000, 10000), chunks=(-1, 'auto'))
dask.array<wrapped, shape=(10000, 10000), dtype=float64, chunksize=(10000, 1250), chunktype=numpy.ndarray>