共享内存

异步调度器接受任何 concurrent.futures.Executor 实例。这包括 Python 标准库中定义的 ThreadPoolExecutorProcessPoolExecutor 实例,以及任何第三方库中的子类。Dask 还定义了自己的 SynchronousExecutor,它只是简单地在主线程上运行函数(对调试有用)。

完整的 dask get 函数分别存在于 dask.threaded.getdask.multiprocessing.getdask.get 中。

政策

异步调度器维护着索引数据结构,这些结构显示了哪些任务依赖于哪些数据,哪些数据是可用的,哪些数据在等待哪些任务完成之前无法释放,以及哪些任务当前正在运行。它可以在常数时间内相对于总任务数和可用任务数更新这些索引结构。这些索引结构使得dask异步调度器能够在单台机器上扩展到非常多的任务。

令人尴尬的并行 dask 流程

为了保持内存占用小,我们选择将准备运行的任务保存在一个后进先出的栈中,这样最新可用的任务会优先执行。这鼓励在开始新的任务链之前完成相关的任务链。这也可以在常数时间内查询。

性能

简而言之 线程调度器的开销大致如下:

  • 每个任务有200微秒的开销

  • 10微秒启动时间(如果你希望每次都创建一个新的 ThreadPoolExecutor)

  • 任务数量的常数缩放

  • 每个任务的依赖项数量的线性缩放

调度器引入了开销。这种开销实际上限制了我们并行性的粒度。下面我们测量了在不同应用函数(线程化、同步、多进程)和不同负载类型(极度并行、密集通信)下异步调度器的开销。

我们可以做的最快/最简单的测试是使用 IPython 的 timeit 魔法:

In [1]: import dask.array as da

In [2]: x = da.ones(1000, chunks=(2,)).sum()

In [3]: len(x.dask)
Out[3]: 1168

In [4]: %timeit x.compute()
33 ms +- 337 us per loop (mean +- std. dev. of 7 runs, 10 loops each)

因此,每个任务大约需要 90 微秒。其中大约 100 毫秒来自开销:

In [5]: x = da.ones(1000, chunks=(1000,)).sum()

In [6]: %timeit x.compute()
1.23 ms +- 36.5 us per loop (mean +- std. dev. of 7 runs, 1,000 loops each)

每次启动 ThreadPoolExecutor 都会有一些开销。这可以通过使用全局或上下文池来缓解:

>>> from concurrent.futures import ThreadPoolExecutor
>>> pool = ThreadPoolExecutor()
>>> dask.config.set(pool=pool)  # set global ThreadPoolExecutor

or

>>> with dask.config.set(pool=pool)  # use ThreadPoolExecutor throughout with block
...     ...

我们现在测量任务数量的扩展和图密度的扩展:

添加节点

任务数量的线性缩放

随着图中的任务数量增加,我们看到调度开销呈线性增长。每个任务的渐近成本取决于调度器。依赖某种异步池的调度器成本为几毫秒,而单线程调度器的成本为几微秒。

图表展示了Dask在任务图中的节点数量增加时的扩展性能。图表显示了y轴上的持续时间(以秒为单位)与x轴上的每个任务的边数。调度整个图的时间最初是恒定的,随后在多进程和线程调度器中大约500个任务后,以及在异步和核心调度器中大约10个任务后,呈线性增加。每个任务的成本则相反,呈线性成本下降,随后成本大致保持不变。

整个图的调度开销(左)与每个任务的开销(右)

添加边

线性缩放与边数

随着每个任务的边数增加,调度开销再次线性增加。

注意:无论是简单的核心调度器还是多进程调度器,都不擅长处理具有非平凡任务间通信的工作流;它们已从图中移除。

图表展示了Dask在任务图中的边数增加时的扩展性能。图表显示了y轴上的持续时间(秒)与x轴上的每任务边数。随着边数从0增加到100,使用线程调度器调度整个图的时间从2秒增加到8秒,而使用异步调度器则从0秒增加到3秒。每边的成本在约10个边之前逐渐降低,之后对于线程和异步调度器来说成本都趋于平稳,异步调度器始终更快。

整个图的调度开销(左)与每条边的开销(右)

下载调度脚本

已知限制

共享内存调度器有一些显著的限制:

  1. 它在单台机器上运行

  2. 线程调度器受限于 Python 代码的 GIL,因此如果你的操作是纯 Python 函数,你不应该期望多核加速。

  3. 多进程调度器必须在工作者之间序列化函数,这可能会失败

  4. 多进程调度器必须在工作者和中央进程之间序列化数据,这可能会很耗费资源。

  5. 多进程调度器无法直接在工作者进程之间传输数据;所有数据都通过主进程路由。