效率

良好的并行计算是响应迅速且有益的。然而,有几个障碍可能会阻碍这一过程。本节描述了确保性能的常见方法。

将数据保留在集群上

尽可能长时间地在本机收集数据。如果你想对集群上的大量数据提出问题,通常将函数提交到该数据上比将数据下载到本地计算机更快。

例如,如果我们有一个集群上的 numpy 数组,并且我们想知道它的形状,我们可能会选择以下选项之一:

  1. 慢: 将 numpy 数组收集到本地进程,访问 .shape 属性

  2. 快速: 将一个 lambda 函数发送到集群以计算形状

>>> x = client.submit(np.random.random, (1000, 1000))
>>> type(x)
Future

>>> x.result().shape  # Slow from lots of data transfer
(1000, 1000)

快速

>>> client.submit(lambda a: a.shape, x).result()  # fast
(1000, 1000)

使用更大的任务

调度器每处理一个任务或 Future 对象大约会增加 一毫秒 的开销。虽然这听起来很快,但如果运行十亿个任务,这就会变得相当慢。如果你的函数运行速度快于 100 毫秒左右,那么你可能不会看到使用分布式计算带来的任何加速效果。

一个常见的解决方案是将输入批量处理为更大的块。

>>> futures = client.map(f, seq)
>>> len(futures)  # avoid large numbers of futures
1000000000

快速

>>> def f_many(chunk):
...     return [f(x) for x in chunk]

>>> from tlz import partition_all
>>> chunks = partition_all(1000000, seq)  # Collect into groups of size 1000

>>> futures = client.map(f_many, chunks)
>>> len(futures)  # Compute on larger pieces of your data at once
1000

在线程和进程之间调整

默认情况下,一个 Worker 使用与计算节点核心数相同的线程并行运行多个计算。当使用纯Python函数时,这可能不是最佳选择,你可能希望在每个节点上运行几个独立的worker进程,每个进程使用一个线程。在配置集群时,你可能希望使用 dask worker 可执行文件的选项,如下所示:

$ dask worker ip:port --nworkers 8 --nthreads 1

请注意,如果你主要使用 NumPy、Pandas、SciPy、Scikit Learn、Numba 或其他 C/Fortran/LLVM/Cython 加速库,那么这对你来说不是问题。你的代码很可能适合多线程使用。

不要分布式

考虑 daskconcurrent.futures 模块,它们与 distributed 有相似的 API,但在单台机器上运行。可能你的问题在笔记本电脑或大型工作站上表现良好。

考虑通过并行以外的其他方式加速你的代码。更好的算法、数据结构、存储格式,或者只是一点C/Fortran/Numba代码,可能就足以给你带来你正在寻找的10倍速度提升。并行和分布式计算是加速应用程序的昂贵方式。