工作资源

访问稀缺资源,如内存、GPU或特殊硬件,可能会限制某些任务在特定机器上运行的数量。

例如,我们可能有一个由十台计算机组成的集群,其中四台计算机各有双GPU。我们可能有一千个任务,其中一百个需要GPU,十个需要同时使用两个GPU。在这种情况下,我们希望在考虑这些资源限制的情况下,在集群中平衡任务,将受GPU限制的任务分配给启用了GPU的工作节点。此外,我们需要确保在任何给定的工作节点上同时运行的GPU任务数量受到限制,以确保我们遵守提供的限制。

这种情况不仅出现在GPU上,还出现在许多资源上,例如需要在运行时占用大量内存的任务、特殊磁盘访问或访问特殊硬件。Dask允许您指定抽象的任意资源来约束任务在工作者上的运行方式。Dask并不以任何特定方式对这些资源进行建模(Dask不知道什么是GPU),并且由用户来指定工作者上的资源可用性和任务上的资源需求。

示例

我们考虑一个计算任务,其中我们从多个文件加载数据,使用需要GPU的函数处理每个文件,然后使用占用70GB内存的任务聚合所有中间结果。

我们在一个三节点集群上运行,该集群有两台机器,每台机器有两块GPU,还有一台机器有100GB的RAM。

当我们设置集群时,我们为每个工作节点定义资源:

dask worker scheduler:8786 --resources "GPU=2"
dask worker scheduler:8786 --resources "GPU=2"
dask worker scheduler:8786 --resources "MEMORY=100e9"

当我们向集群提交任务时,我们为每个任务指定约束条件

from distributed import Client
client = Client('scheduler:8786')

data = [client.submit(load, fn) for fn in filenames]
processed = [client.submit(process, d, resources={'GPU': 1}) for d in data]
final = client.submit(aggregate, processed, resources={'MEMORY': 70e9})

同样地,我们可以使用 dask 注释机制来指定资源约束:

with dask.annotate(resources={'GPU': 1}):
    processed = [client.submit(process, d) for d in data]
with dask.annotate(resources={'MEMORY': 70e9}):
    final = client.submit(aggregate, processed)

指定资源

资源可以通过多种方式指定。最简单的选项将取决于您的集群是如何创建的。

从命令行

如上所示,可以在启动工作进程时提供资源:

dask worker scheduler:8786 --resources "GPU=2"

键被用作资源名称,值被解析为数值。

来自Dask的配置系统

或者,可以使用 Dask 的 配置系统 来指定资源。

from distributed import LocalCluster

with dask.config.set({"distributed.worker.resources.GPU": 2}):
    cluster = LocalCluster()

配置需要在生成实际工作进程的过程中进行设置。通过将资源指定为环境变量(如下一节所示)可能是最容易实现的。

从环境变量

与其他任何 Dask 配置值一样,资源可以在启动进程之前通过环境变量指定。使用 Bash 语法

$ DASK_DISTRIBUTED__WORKER__RESOURCES__GPU=2 dask worker
...

如果你无法向 distributed.Worker 类传递选项,这可能是最简单的解决方案。

资源是分别应用于每个工作进程的

如果你使用 dask worker --nworkers <nworkers> ,资源将分别应用于每个 nworkers 工作进程。假设你的机器上有2个GPU,如果你想使用两个工作进程,每个工作进程有1个GPU,因此你需要这样做:

dask worker scheduler:8786 --nworkers 2 --resources "GPU=1"

以下是一个示例,展示了如何使用资源来确保每个任务都在一个单独的进程中运行,这对于执行非线程安全的任务或内部使用多线程的任务非常有用:

dask worker scheduler:8786 --nworkers 3 --nthreads 2 --resources "process=1"

使用以下代码,最多可以有3个任务同时运行,并且每个任务将在一个单独的进程中运行:

from distributed import Client
client = Client('scheduler:8786')

futures = [client.submit(non_thread_safe_function, arg,
                         resources={'process': 1}) for arg in args]

资源是抽象的

以这种方式列出的资源只是抽象的数量。我们同样可以使用“mem”、“memory”、“bytes”等术语,因为从Dask的角度来看,这只是一个抽象术语。只要您在工人和客户端之间保持一致,您可以选择任何术语。

值得注意的是,Dask 分别跟踪核心数量和可用内存作为实际资源,并在正常调度操作中使用这些资源。

带有集合的资源

你也可以使用带有 Dask 集合的资源,如数组和延迟对象。你可以使用 Dask 注释机制,在集合上注释需要特定资源来执行计算的操作。

# Read note below!
dask.config.set({"optimization.fuse.active": False})
x = da.read_zarr(...)
with dask.annotate(resources={'GPU': 1}):
    y = x.map_blocks(func1)
z = y.map_blocks(func2)
z.compute()

备注

此功能目前仅在 with dask.annotate(...): 包裹 compute()persist() 调用时对数据帧支持;在这种情况下,注释适用于整个图,从任何先前持久化的集合开始并排除它们。

对于其他集合,如数组和延迟对象,注释在优化阶段可能会丢失。要防止此问题,您必须设置:

>>> dask.config.set({"optimization.fuse.active": False})

或在 dask.yaml 中:

optimization:
  fuse:
    active: false

一个可能的解决方法,也适用于数据框,可以是通过 persist() 进行中间调用。但请注意,这可能会显著影响优化并降低整体性能。

x = dd.read_parquet(...)
with dask.annotate(resources={'GPU': 1}):
    y = x.map_partitions(func1).persist()
z = y.map_partitions(func2)
del y  # Release distributed memory for y as soon as possible
z.compute()