调度

所有大规模的 Dask 集合,如 Dask 数组Dask DataFrameDask 包,以及细粒度的 API,如 延迟未来,都会生成任务图,其中图中的每个节点都是一个普通的 Python 函数,节点之间的边是普通 Python 对象,这些对象由一个任务作为输出创建,并在另一个任务中作为输入使用。在 Dask 生成这些任务图之后,它需要在并行硬件上执行它们。这是 任务调度器 的工作。存在不同的任务调度器,每个调度器都会消耗任务图并计算相同的结果,但具有不同的性能特征。

Dask 有两个任务调度器系列:

  1. 单机调度器: 该调度器在本地进程或线程池上提供基本功能。这是最早开发的调度器,也是默认的调度器。它使用简单且成本低廉,尽管它只能用于单台机器,并且不具备扩展性。

  2. 分布式调度器: 这个调度器更复杂,提供更多功能,但也需要更多努力来设置。它可以在本地运行或分布在集群中。


Dask 由三个部分组成。“集合”创建“任务图”,然后将其发送给“调度器”执行。下面将详细描述两种类型的调度器。

对于不同的计算任务,您可能会发现特定的调度器设置能带来更好的性能。本文档帮助您理解如何在不同的调度器之间进行选择和配置,并提供何时选择某种调度器更为合适的指导原则。

本地线程

import dask
dask.config.set(scheduler='threads')  # overwrite default with threaded scheduler

线程调度器使用本地的 concurrent.futures.ThreadPoolExecutor 执行计算。它轻量且无需设置。它引入的任务开销非常小(每个任务约50微秒),并且由于所有操作都在同一进程中进行,因此在任务之间传输数据的成本为零。然而,由于Python的全局解释器锁(GIL),这个调度器仅在计算主要由非Python代码主导时提供并行性,这种情况主要发生在操作NumPy数组、Pandas DataFrame中的数值数据,或使用生态系统中的其他C/C++/Cython项目时。

线程调度器是 Dask 数组Dask DataFrameDask Delayed 的默认选择。然而,如果你的计算主要处理纯 Python 对象,如字符串、字典或列表,那么你可能想尝试下面基于进程的调度器之一(我们目前推荐在本地机器上使用分布式调度器)。

本地进程

备注

下面描述的 分布式调度器 通常是更好的选择。我们鼓励读者在本节之后继续阅读。

小技巧

在使用多进程调度器的独立Python脚本中,务必包含 if __name__ == "__main__": 块。更多详情请参见 独立Python脚本

import dask
dask.config.set(scheduler='processes')  # overwrite default with multiprocessing scheduler

多进程调度器使用本地的 concurrent.futures.ProcessPoolExecutor 执行计算。它使用轻量且无需设置。每个任务及其所有依赖项都被发送到本地进程,执行后,结果再被发送回主进程。这意味着它能够绕过GIL的问题,并提供并行性,即使在纯Python代码占主导地位的计算中,例如处理字符串、字典和列表的计算。

然而,将数据移动到远程进程并返回可能会引入性能损失,特别是在进程之间传输的数据量较大时。当工作流程相对线性且不涉及大量的任务间数据传输,以及输入和输出都较小时(如文件名和计数),多进程调度器是一个极佳的选择。

这在基本数据摄取工作负载中很常见,例如在 Dask Bag 中常见的工作负载,其中多进程调度器是默认的:

>>> import dask.bag as db
>>> db.read_text('*.json').map(json.loads).pluck('name').frequencies().compute()
{'alice': 100, 'bob': 200, 'charlie': 300}

对于更复杂的负载,其中可能会有多个下游任务依赖于大的中间结果,我们通常建议在本地机器上使用分布式调度器。分布式调度器在处理大的中间结果时更加智能。

单线程

import dask
dask.config.set(scheduler='synchronous')  # overwrite default with single-threaded scheduler

单线程同步调度器在本地线程中执行所有计算,完全没有并行性。这对于调试和分析特别有价值,因为使用线程或进程时这些操作会更加困难。

例如,在使用 IPython 或 Jupyter 笔记本时,%debug%pdb%prun 魔法在并行 Dask 调度器下可能无法正常工作(它们并非设计用于并行计算环境)。然而,如果你遇到异常并希望进入调试器,你可能希望在单线程调度器下重新运行你的计算,这样这些工具就能正常工作。

Dask 分布式(本地)

小技巧

在使用独立Python脚本中的本地分布式调度器时,务必包含一个 if __name__ == "__main__": 块。更多详情请参见 独立Python脚本

from dask.distributed import Client
client = Client()
# or
client = Client(processes=False)

Dask 分布式调度器既可以 在集群上设置 ,也可以在个人计算机上本地运行。尽管名为“分布式”,但由于一些原因,它在本地机器上通常也很实用:

  1. 它提供了对异步API的访问,特别是 Futures

  2. 它提供了一个诊断仪表板,可以提供有关性能和进展的宝贵见解。

  3. 它以更复杂的方式处理数据局部性,因此在需要多个进程的工作负载上,其效率可能高于多进程调度器。

你可以在 这些文档 中阅读更多关于在单台机器上使用 Dask 分布式调度器的信息。

Dask 分布式 (集群)

你也可以在分布式集群上运行 Dask。根据你的集群,有多种设置方式。我们建议参考 如何部署 Dask 集群 以获取更多信息。

配置

你可以通过使用 dask.config.set(scheduler...) 命令来配置全局默认调度器。这可以在全局范围内完成:

dask.config.set(scheduler='threads')

x.compute()

或作为上下文管理器:

with dask.config.set(scheduler='threads'):
    x.compute()

或在单个计算调用中:

x.compute(scheduler='threads')

每个调度器可能支持特定于该调度器的额外关键字。例如,基于池的单机调度器允许您提供自定义池或指定所需的工作者数量:

from concurrent.futures import ThreadPoolExecutor
with dask.config.set(pool=ThreadPoolExecutor(4)):
    x.compute()

with dask.config.set(num_workers=4):
    x.compute()

请注意,Dask 也支持自定义的 concurrent.futures.Executor 子类,例如来自 lokyReusablePoolExecutor

from loky import get_reusable_executor
with dask.config.set(scheduler=get_reusable_executor()):
    x.compute()

其他库如 ipyparallelmpi4py 也提供了 concurrent.futures.Executor 子类,这些也可以使用。

独立的 Python 脚本

在独立Python脚本中运行Dask调度器时需要特别注意。具体来说,当使用单机多进程调度器或本地分布式调度器时,Dask会创建额外的Python进程。作为Python正常子进程初始化的一部分,Python会在创建的每个子进程中导入脚本内容(这对于任何创建子进程的Python代码都是如此——不仅限于Dask)。这种导入初始化可能导致子进程递归地创建其他子进程,最终引发错误。

常见的错误
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.

This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:

   if __name__ == '__main__':
         freeze_support()
         ...

The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.

为了避免这种类型的错误,你应该将任何创建子进程的 Dask 代码(例如,使用多进程调度器的所有 compute() 调用,或在创建本地分布式集群时)放在 if __name__ == "__main__": 块内。这确保了子进程仅在脚本作为主程序运行时被创建。

例如,使用下面的脚本运行 python myscript.py 将会引发错误:

# myscript.py

from dask.distributed import Client
client = Client()  # Will raise an error when creating local subprocesses

相反,应该将脚本的内容放在 if __name__ == "__main__": 块中:

# myscript.py

if __name__ == "__main__":  # This avoids infinite subprocess creation

   from dask.distributed import Client
   client = Client()

关于此主题的更多详情,请参见 Python 的多进程指南