Dask 最佳实践

使用 Dask 的 API 很容易上手,但要 熟练 使用它们需要一些经验。本页包含 Dask 最佳实践的建议,并提供了解决常见 Dask 问题的方案。

本文档特别关注所有 Dask API 之间共享的最佳实践。读者可能首先希望先查阅某个特定 API 的最佳实践文档。

从小处着手

并行性带来了额外的复杂性和开销。有时对于较大的问题来说是必要的,但通常并非如此。在将像Dask这样的并行计算系统添加到你的工作负载之前,你可能想先尝试一些替代方案:

  • 使用更好的算法或数据结构:NumPy、pandas、Scikit-learn 可能有一些更快的函数来实现你正在尝试做的事情。咨询专家或再次阅读他们的文档以找到更好的预构建算法可能是值得的。

  • 更好的文件格式: 支持随机访问的高效二进制格式通常可以帮助你高效且简单地管理大于内存的数据集。请参见下面的 高效存储数据 部分。

  • 编译代码: 使用 Numba 或 Cython 编译你的 Python 代码可能会使并行化变得不必要。或者你可能会使用这些库中可用的多核并行化。

  • 采样: 即使你有很多数据,使用全部数据可能并没有太大优势。通过智能采样,你可能能够从更易管理的数据子集中得出相同的见解。

  • 概况: 如果你想加速慢代码,首先理解它为什么慢是很重要的。在分析代码上适度的时间投资可以帮助你识别什么在拖慢你。这些信息可以帮助你做出更好的决定,比如并行化是否可能有所帮助,或者是否其他方法可能更有效。

使用仪表盘

Dask 的仪表盘帮助你了解工作者的状态。这些信息可以帮助你找到高效的解决方案。在并行和分布式计算中,存在新的成本需要关注,因此你过去的直觉可能不再适用。通过使用仪表盘,可以帮助你重新学习什么是快和慢,以及如何应对。

更多信息请参阅 Dask 仪表板的文档

避免非常大的分区

你的数据块应该足够小,以便许多数据块可以一次性适应工作者的可用内存。当你在Dask DataFrame中选择分区大小(参见 DataFrame分区)或在Dask Array中选择块大小(参见 Array块)时,你通常会控制这一点。

Dask可能会在一台机器上并行处理尽可能多的块,数量与你机器上的核心数相当。因此,如果你有1 GB的块和十个核心,那么Dask可能会使用*至少*10 GB的内存。此外,Dask通常会有2-3倍的块可供处理,以便它总是有工作可做。

如果你有一台拥有100 GB内存和10个核心的机器,那么你可能会选择在1GB范围内进行分块。每个核心可以处理十个分块,这为Dask提供了健康的余量,而不会使任务过于细小。

请注意,您还需要避免分块大小过小。详情请参见下一节。有关为 Dask 数组选择分块大小的更详细指南,请参阅这篇关于 选择合适的分块大小 的博客文章。

避免非常大的图表

Dask 工作负载由 任务 组成。任务是一个 Python 函数,例如应用于 pandas DataFrame 或 NumPy 数组的 np.sum。如果你在使用具有许多分区的 Dask 集合,那么你执行的每个操作,例如 x + 1,可能会生成许多任务,至少与你的集合中的分区数量一样多。

每个任务都会带来一些开销。这个开销大约在200微秒到1毫秒之间。如果你有一个包含数千个任务的计算,这是可以接受的,大约会有1秒的开销,这可能不会困扰你。

然而,当你有包含数百万任务的非常大的图时,这可能会变得麻烦,这不仅因为开销现在在10分钟到几小时范围内,而且因为处理如此大的图的开销可能会开始压倒调度器。

您可以通过以下方式构建较小的图表:

  • 增加你的块大小: 如果你有1,000 GB的数据并且使用10 MB的块,那么你将有100,000个分区。对这样一个集合的每个操作将至少生成100,000个任务。

    然而,如果你将你的chunksize增加到1 GB甚至几GB,那么你将减少数量级的开销。这要求你的工作节点拥有远超过1 GB的内存,但对于较大的工作负载来说这是典型的。

  • 将操作融合在一起: Dask 会自己做一些这样的工作,但你可以帮助它。如果你有一个非常复杂的操作,包含数十个子操作,也许你可以将这些操作打包成一个 Python 函数,并使用类似 da.map_blocksdd.map_partitions 的函数。

    一般来说,你能将更多的管理工作移入你的函数中,效果会更好。这样,Dask调度器就不需要考虑所有的细粒度操作。

  • 分解你的计算: 对于非常大的工作负载,你可能还想尝试一次将较小的块发送到 Dask。例如,如果你正在处理一个 PB 级的数据,但发现 Dask 只能处理 100 TB,也许你可以将你的计算分解成十个部分,然后一个接一个地提交它们。

学习定制技巧

Dask 的高级集合(数组、DataFrame、包)包含了遵循 NumPy 和 pandas 标准 Python API 的常见操作。然而,许多 Python 工作负载是复杂的,可能需要这些高级 API 中未包含的操作。

幸运的是,有许多选项可以支持自定义工作负载:

  • 所有集合都有一个 map_partitionsmap_blocks 函数,该函数将用户提供的函数应用于集合中的每个 pandas DataFrame 或 NumPy 数组。因为 Dask 集合由普通的 Python 对象组成,所以通常很容易在不进行太多修改的情况下将自定义函数映射到数据集的分区上。

    df.map_partitions(my_custom_func)
    
  • 更复杂的 map_* 函数。有时您的自定义行为并不是非常容易并行化的,而是需要更高级的通信。例如,您可能需要从一个分区向另一个分区传递少量信息,或者您可能想要构建一个自定义聚合。

    Dask 集合也包含这些方法。

  • 对于更复杂的任务,您可以将集合转换为单独的块,并使用 Dask Delayed 按照您的喜好排列这些块。通常每个集合都有一个 to_delayed 方法。

map_partitions(func, *args[, meta, ...])

在每个 DataFrame 分区上应用 Python 函数。

map_overlap(func, df, before, after, *args)

对每个分区应用一个函数,与相邻分区共享行。

groupby.Aggregation(name, chunk, agg[, finalize])

用户定义的分组聚合。

blockwise(func, out_ind, *args[, name, ...])

张量操作:广义内积和外积

map_blocks(func, *args[, name, token, ...])

将一个函数映射到 dask 数组的所有块上。

map_overlap(func, *args[, depth, boundary, ...])

在具有一定重叠的数组块上应用函数

reduction(x, chunk, aggregate[, axis, ...])

简化的一般版本

高效存储数据

随着计算能力的提升,您可能会发现数据访问和I/O占用了更多的时间。此外,并行计算通常会为数据存储方式增加新的约束,特别是在如何提供与计算计划相符的数据块随机访问方面。

例如:

  • 对于压缩,您可能会发现您放弃了 gzip 和 bz2,转而采用像 lz4、snappy 和 Z-Standard 这样的新系统,它们提供了更好的性能和随机访问。

  • 对于存储格式,您可能会发现您需要自描述格式,这些格式针对随机访问、元数据存储和二进制编码进行了优化,例如 ParquetORCZarrHDF5GeoTIFF

  • 在云端工作时,你可能会发现一些较旧的格式,如 HDF5,可能无法很好地工作。

  • 您可能希望以与常见查询相匹配的方式对数据进行分区或分块。在 Dask DataFrame 中,这可能意味着选择一个列进行排序,以便快速选择和连接。对于 Dask Array,这可能意味着选择与您的访问模式和算法对齐的块大小。

进程和线程

如果你主要使用 Numpy、pandas、Scikit-learn、Numba 和其他释放 GIL 的库进行数值工作,那么主要使用线程。如果你主要处理文本数据或 Python 集合(如列表和字典),那么主要使用进程。

如果你使用的是线程数较多(大于10)的大型机器,那么你无论如何都应该将任务分成至少几个进程。Python在每个进程中使用10个线程进行数值工作时可以非常高效,但50个线程则不然。

有关线程、进程以及如何在Dask中配置它们的更多信息,请参阅 调度器文档

使用 Dask 加载数据

我们常见的一种反模式是,人们在客户端(即他们的本地机器)上创建大型Python对象,如DataFrame或数组,这些对象不在Dask之外,然后将它们嵌入到计算中。这意味着Dask不得不多次通过网络发送这些对象,而不是仅仅传递数据的指针。

这会带来大量的开销,并显著减慢计算速度,特别是在客户端与调度器之间的网络连接较慢的情况下。它还可能导致调度器过载,从而导致内存不足错误。相反,您应该使用 Dask 方法来加载数据,并使用 Dask 来控制结果。

以下是一些应避免的常见模式及其更好的替代方案:

我们正在使用 Dask 读取一个 parquet 数据集,然后向其中追加一组 pandas DataFrame。在将数据发送到 Dask 之前,我们先将 csv 文件加载到内存中。

ddf = dd.read_parquet(...)

pandas_dfs = []
for fn in filenames:
    pandas_dfs(pandas.read_csv(fn))     # Read locally with pandas
ddf = dd.concat([ddf] + pandas_dfs)     # Give to Dask

相反,我们可以使用 Dask 直接读取 csv 文件,将所有数据保留在集群上。

ddf = dd.read_parquet(...)
ddf2 = dd.read_csv(filenames)
ddf = dd.concat([ddf, ddf2])

我们使用 NumPy 在将数据传递给 Dask 之前创建一个内存中的数组,迫使 Dask 将数组嵌入任务图中,而不是处理指向数据的指针。

f = h5py.File(...)

x = np.asarray(f["x"])  # Get data as a NumPy array locally
x = da.from_array(x)   # Hand NumPy array to Dask

相反,我们可以使用 Dask 直接读取文件,将所有数据保留在集群上。

f = h5py.File(...)
x = da.from_array(f["x"])  # Let Dask do the reading

我们使用 pandas 读取一个大型 CSV 文件,然后在构建图时使用 delayed 来并行化数据的计算。

@dask.delayed
def process(a, b):
    ...

df = pandas.read_csv("some-large-file.csv")  # Create large object locally
results = []
for item in L:
    result = process(item, df)  # include df in every delayed call
    results.append(result)

相反,我们也可以使用延迟来读取数据。这避免了将大文件嵌入到图中,Dask 只需传递对延迟对象的引用。

@dask.delayed
def process(a, b):
   ...

df = dask.delayed(pandas.read_csv)("some-large-file.csv")  # Let Dask build object
results = []
for item in L:
   result = process(item, df)  # include pointer to df in every delayed call
   results.append(result)

将大型对象(如 pandas DataFrame 或数组)嵌入到计算中是 Dask 用户经常遇到的一个痛点。这会在调度器接收到数据并能够开始计算之前增加显著的延迟,并在计算过程中给调度器带来压力。

使用 Dask 加载这些对象可以避免这些问题,并显著提高计算性能。

避免重复调用计算

调用 compute 将会阻塞客户端的执行,直到 Dask 计算完成。我们经常看到的一种模式是用户在循环中或按顺序对略有不同的查询调用 compute

这禁止了Dask在集群上并行化不同的计算,并且禁止在不同的查询之间共享中间结果。

foo = ...
results = []
for i in range(...):
     results.append(foo.select(...).compute())

每次迭代到达计算调用时,这都会执行,一次计算一个查询。

foo = ...
results = []
for i in range(...):
     results.append(foo.select(...))  # no compute here
results = dask.compute(*results)

这使得 Dask 只需计算一次计算的共享部分(如上面的 foo 对象),而不是每次 compute 调用都计算一次,并且允许 Dask 并行处理不同的选择,而不是按顺序运行它们。