效率
内容
效率¶
良好的并行计算是响应迅速且有益的。然而,有几个障碍可能会阻碍这一过程。本节描述了确保性能的常见方法。
将数据保留在集群上¶
尽可能长时间地在本机收集数据。如果你想对集群上的大量数据提出问题,通常将函数提交到该数据上比将数据下载到本地计算机更快。
例如,如果我们有一个集群上的 numpy 数组,并且我们想知道它的形状,我们可能会选择以下选项之一:
慢: 将 numpy 数组收集到本地进程,访问
.shape
属性快速: 将一个 lambda 函数发送到集群以计算形状
>>> x = client.submit(np.random.random, (1000, 1000))
>>> type(x)
Future
慢
>>> x.result().shape # Slow from lots of data transfer
(1000, 1000)
快速
>>> client.submit(lambda a: a.shape, x).result() # fast
(1000, 1000)
使用更大的任务¶
调度器每处理一个任务或 Future 对象大约会增加 一毫秒 的开销。虽然这听起来很快,但如果运行十亿个任务,这就会变得相当慢。如果你的函数运行速度快于 100 毫秒左右,那么你可能不会看到使用分布式计算带来的任何加速效果。
一个常见的解决方案是将输入批量处理为更大的块。
慢
>>> futures = client.map(f, seq)
>>> len(futures) # avoid large numbers of futures
1000000000
快速
>>> def f_many(chunk):
... return [f(x) for x in chunk]
>>> from tlz import partition_all
>>> chunks = partition_all(1000000, seq) # Collect into groups of size 1000
>>> futures = client.map(f_many, chunks)
>>> len(futures) # Compute on larger pieces of your data at once
1000
在线程和进程之间调整¶
默认情况下,一个 Worker
使用与计算节点核心数相同的线程并行运行多个计算。当使用纯Python函数时,这可能不是最佳选择,你可能希望在每个节点上运行几个独立的worker进程,每个进程使用一个线程。在配置集群时,你可能希望使用 dask worker
可执行文件的选项,如下所示:
$ dask worker ip:port --nworkers 8 --nthreads 1
请注意,如果你主要使用 NumPy、Pandas、SciPy、Scikit Learn、Numba 或其他 C/Fortran/LLVM/Cython 加速库,那么这对你来说不是问题。你的代码很可能适合多线程使用。
不要分布式¶
考虑 dask 和 concurrent.futures 模块,它们与 distributed 有相似的 API,但在单台机器上运行。可能你的问题在笔记本电脑或大型工作站上表现良好。
考虑通过并行以外的其他方式加速你的代码。更好的算法、数据结构、存储格式,或者只是一点C/Fortran/Numba代码,可能就足以给你带来你正在寻找的10倍速度提升。并行和分布式计算是加速应用程序的昂贵方式。