调度策略

本文档描述了Dask分布式调度器用于选择任务偏好和选择工人偏好的策略。有关这些策略如何高效实施的更多信息,请参见 调度状态

选择工人

当任务从等待状态转换为处理状态时,我们会为该任务选择一个合适的工人。如果任务有大量的数据依赖关系,或者工人处于高负载状态,那么工人的选择会强烈影响全局性能。同样,根任务的放置也会影响下游计算的性能,因为它可以决定未来工人之间需要传输多少数据。针对这些不同的场景,我们使用不同的启发式方法:

初始任务放置 - 排队

排队 启用时(默认),每个初始任务仅在当前最不繁忙的工作者上调度。如果一个初始任务想要运行,但所有工作者线程都被占用,那么该任务将进入(或保持在)队列中,不会被发送到任何工作者。

初始任务放置 - 无排队

目前,这种调度仅在 排队 被禁用时使用(distributed.scheduler.worker-saturation 配置值设置为 inf)。

我们希望相邻的根任务在同一个工作节点上运行,因为这些邻居很有可能在下游操作中被合并:

  i       j
 / \     / \
e   f   g   h
|   |   |   |
a   b   c   d
\   \  /   /
     X

在上面的例子中,我们希望 ab 在同一个工作节点上运行,cd 在同一个工作节点上运行,从而减少未来的数据传输。我们也可以忽略 X 的位置,因为假设我们将 a b c d 组分布在所有工作节点上以最大化并行性,那么 X 最终会被传输到所有地方。(需要注意的是,即使 X 不存在,希望 a bc d 共置的想法仍然适用。)

直接通过遍历图来计算这些相关任务的成本会很高。相反,我们使用任务的 TaskGroup,它是所有具有相同键前缀的任务的集合。((random-a1b2c3, 0), (random-a1b2c3, 1), (random-a1b2c3, 2) 都属于 TaskGroup random-a1b2c3。)

为了识别根(类似)任务,我们使用这个启发式方法:

  1. TaskGroup 的任务数量是集群中线程数量的两倍

  2. TaskGroup 在组内 所有 任务中的唯一依赖项少于 5 个。

    We don’t just say “The task has no dependencies”, because real-world cases like dask.array.from_zarr and dask.array.from_array produce graphs like the one above, where the data-creation tasks (a b c d) all share one dependency (X)—the Zarr dataset, for example. Though a b c d are not technically root tasks, we want to treat them as such, hence allowing a small number of trivial dependencies shard by all tasks.

然后,我们使用 与子节点和深度断开联系 中描述的相同优先级来确定哪些任务是相关的。这种深度优先与子权重相结合的度量标准通常可以用来将图的叶子适当分割成相对分离良好且子图间连接性较低的子图。

按照这种优先顺序迭代任务,我们为工作者分配一批后续任务,然后选择一个新的工作者(最不忙碌的那个)并重复此过程。

虽然这并不能提供完美的初始任务分配(一些兄弟任务可能会被分配给不同的工作者),但在大多数情况下它表现良好,同时只增加了最小的调度开销。

初始任务放置是一个前瞻性的决策。通过将相关的根任务并置,我们确保其下游任务为成功做好准备。

下游任务部署

当初始任务布置得当时,后续任务的布置则是回顾性的:考虑到数据传输和工作者的忙碌程度,任务能在哪里最快运行?

不符合上述根状标准的任务按如下方式选择:

首先,我们识别出可行的工人池:

  1. 如果任务没有依赖关系且没有限制,那么我们找到占用最少的工作者。

  2. 否则,如果一个任务有用户提供的限制(例如它必须在有GPU的机器上运行),那么我们将可用的工人池限制为仅该集合。否则,我们考虑所有工人。

  3. 我们将上述集合限制为仅持有任务至少一个依赖项的工人。

从这些工人中,我们然后使用 Scheduler.worker_objective() 确定我们认为任务将最快开始运行的工人。对于每个工人:

  1. 我们考虑已经在该工作节点上排队的其他任务的估计运行时间。然后,我们根据它们的大小(以字节为单位)和工作者之间的测量网络带宽,计算将任何依赖项传输到该工作节点所需的时间,前提是该工作节点尚未拥有这些依赖项。请注意,这*不*考虑(反)序列化时间,如果数据被溢出到磁盘,则不考虑从磁盘检索数据的时间,也不考虑内存中的大小与序列化大小之间的潜在差异。在实践中,队列等待时间(称为*占用*)通常占主导地位,因此如果这意味着任务可以更快开始,数据通常会被传输到不同的工作节点。

  2. 在使用“最早开始”指标时,可能会出现平局的情况,尽管在所有工人都忙碌时这种情况并不常见。我们通过选择存储的Dask数据字节数最少的工人来打破平局(包括溢出的数据)。请注意,这与 托管 加上 溢出 内存相同,而不是 进程 内存。

这个过程很容易改变(事实上,本文档可能已经过时)。我们鼓励读者检查 scheduler.py 中的 decide_workerworker_objective 函数。

decide_worker(ts, all_workers, ...)

决定哪个工作者应该执行任务 ts

Scheduler.decide_worker_non_rootish(ts)

选择一个可运行的非根任务的工作者,考虑依赖关系和限制。

Scheduler.decide_worker_rootish_queuing_disabled(ts)

选择一个可运行的根任务的工作者,无需排队。

Scheduler.decide_worker_rootish_queuing_enabled()

如果并非所有工人都忙碌,选择一个工人来执行可运行的根任务。

Scheduler.worker_objective(ts, ws)

确定哪个工人应该获得任务的目标函数

选择任务

我们经常需要在许多有效的任务之间做出选择。有几个相互竞争的利益可能会影响我们的选择:

  1. 按先到先得的原则运行任务,以确保多个客户端之间的公平性

  2. 运行属于关键路径的任务,以减少总运行时间并最小化滞后工作负载

  3. 运行任务,以便我们能够释放许多依赖项,从而努力保持较小的内存占用。

  4. 运行相关任务,以便在运行新任务块之前可以完全消除大量工作

  5. 在开始创建新工作的任务之前,先运行使用现有工作的任务

同时实现所有这些目标是不可能的。完美优化这些目标中的任何一个都可能导致高昂的开销。调度器的启发式算法在快速优化所有这些目标(它们在重要的工作负载中都会出现)方面做得不错,但并不完美。

后进先出

当一个工人完成一项任务时,该任务的直接依赖项会获得最高优先级。这鼓励了一种行为,即在开始新工作之前立即完成正在进行的工作(深度优先图遍历)。这通常与先到先得的目标相冲突,但通常会导致显著减少的内存占用,并且由于避免了数据溢出到磁盘,总体运行时间更好。

与子节点和深度断开联系

通常一个任务有多个依赖关系,我们需要通过其他目标来打破这些依赖关系。打破这些依赖关系对性能和内存占用的影响出乎意料地大。

当客户端提交一个图时,我们会对图进行几次线性扫描,以确定每个节点的后代数量(不完全准确,因为这是一个DAG而不是树,但这是一个接近的代理)。这个数量可以用来打破平局,并帮助我们优先处理具有较长关键路径和许多子节点的节点。实际使用的算法稍微复杂一些,详细描述在 dask/order.py 中。

先到先得,粗略地

工作者使用的后进先出行为,以最小化内存占用,可能会扭曲客户端提供的任务顺序。最近提交的任务可能比很久以前提交的任务更早运行,因为它们在当前内存数据下更方便。这种行为可能是 不公平 的,但可以提高全局运行时间和系统效率,有时相当显著。

然而,工人不可避免地会完成与他们刚刚处理的任务相关的所有任务,而先进先出策略最终会耗尽。在这些情况下,工人通常会从公共任务池中拉取任务。这个池中的任务*是*按照先到先服务的原则排序的,因此工人在*粗略*层面上确实表现出一种对多个提交公平的调度方式,即使不是细粒度的。

Dask 的调度策略在短期内高效,长期对多个客户端公平。

避免过度饱和工人

当有许多初始任务需要运行时,工作者不需要提前知道所有任务:

 o   o   o   o   o   o   o   o   o   o
/ \ / \ / \ / \ / \ / \ / \ / \ / \ / \
o o o o o o o o o o o o o o o o o o o o
| | | | | | | | | | | | | | | | | | | |
* * * * * * * * * * * * * * * * * * * *  <-- initial tasks

调度器仅将初始任务(上图中的 * 任务)提交给工作线程,直到所有工作线程都被填满 1。剩余的初始任务按优先级顺序放入调度器上的队列中。

任务从这个队列中弹出并在工作线程空闲 没有其他更高优先级的任务(本图中的 o 任务)可以运行时进行调度。

这确保我们在开始新工作之前完成现有工作流。这使得内存使用尽可能低,并且与一次性提交所有初始任务相比,通常能提供更稳定的执行。

这种排队方式有两个缺点:

  1. 初始任务不会 共同分配 。这意味着工作者可能需要进行数据传输,而这些传输本可以避免。与禁用队列相比,这可能会导致某些工作负载中度减慢。然而,在许多情况下,禁用队列可能会导致工作者内存耗尽,因此通常情况下,减慢速度是一个更好的权衡。

  2. 对于像 client.map 这样的令人尴尬的并行工作负载,每个任务可能会产生轻微的额外开销,因为每次任务完成时,都需要一个调度器<->工作者的往返消息,然后才能开始下一个任务。在大多数情况下,这种开销甚至无法测量,也不是需要担心的事情。

    这只有在你的任务非常快,或者网络非常慢的情况下才有意义——也就是说,如果你的任务运行时间和网络延迟在同一数量级。例如,如果每个任务只花费1毫秒,而调度器<->工作者的往返消息需要10毫秒,那么所有这些往返消息将主导运行时间。

    这意味着你应该让你的任务更大(通过增加块大小,或将更多工作批量处理到单个Dask任务中)。通常,任务运行时间应明显大于网络延迟,Dask才能表现良好。

1

默认情况下,它实际上会为每个工作线程提交略多于线程数的任务(例如,对于线程数小于等于10的工作线程,会多提交1个任务)。这种轻微的缓冲在任务非常快时能保持更好的性能。详情请参见下一节。

调整或禁用排队

很少需要调整排队。默认值几乎适用于所有情况。*只有希望在特殊情况下调整性能的高级用户才可能考虑*调整此参数。

排队行为由 distributed.scheduler.worker-saturation 配置值控制。这是通过 Dask 配置系统 设置的。配置值必须在调度器启动之前在调度器上设置。

该值控制工作线程在内存中一次可以拥有的初始数据块数量。这基本上是图执行的“广度”。具体来说,一次最多向工作线程发送 ceil(worker-saturation * nthreads) 个初始任务。

默认情况下,worker-saturation 的值为 1.1。这个值的选择是为了保持工作线程的内存相对较低(<= 10 个线程的工作线程每个只会额外获得一个初始内存块),同时减轻在非常慢的网络上运行的用户所遇到的额外延迟的影响。

  • 如果工作进程内存不足,考虑将 worker-saturation 设置为 1.0 而不是 1.1

  • 如果你的网络非常慢,或者你的任务非常快,并且你想减少运行时间,可以考虑增加 worker-saturation。这*可能*会稍微加快处理速度,但代价是增加内存使用。通常情况下,超过 2.0 的值带来的好处不大。

  • 如果你的图表会从 共同分配 中受益,并且你的集群有充足的内存,可以考虑通过将 worker-saturation 设置为 inf 来禁用队列,以加快运行时间。

这些决策的制定地点

上述目标主要由客户端、调度器和工人在计算过程中的各个点做出的小决策所遵循。

  1. 当我们从客户端向调度器提交一个图时,我们会为该图的每个任务分配一个数值优先级。这个优先级侧重于在广泛计算之前深入计算,优先考虑关键路径,优先考虑具有许多依赖关系的节点等。这与单机调度器使用的逻辑相同,并存在于 dask/order.py 中。

  2. 当图到达调度器时,调度器将这些数值优先级转换为两个数字的元组,第一个是递增计数器,第二个是上述客户端生成的优先级。这个每个图的计数器鼓励先进先出的策略在计算之间。来自先前调用计算的所有任务的优先级高于来自后续调用计算(或提交、持久化、映射,或任何生成未来对象的操作)的所有任务。

  3. 每当一个任务准备好运行时(如果有依赖项,则依赖项已完成),调度器会将其分配给一个工作线程。当多个任务同时准备好时,它们会按优先顺序提交给工作线程。如果启用了调度器端队列,它们会被提交直到所有工作线程都满载,然后任何剩余的可运行任务都会被放入调度器队列。如果禁用了队列,那么所有可运行的任务都会一次性提交。

  4. 然而,当工作线程接收到这些任务时,它会根据任务的优先级来决定哪些任务优先获取数据或进行计算。工作线程维护一个按此优先级排序的所有就绪任务的堆。

  5. 如果调度器端队列处于活动状态:当工作节点上的任何任务完成时,如果没有其他更高优先级的任务要运行,调度器会弹出下一个队列中的任务并在该工作节点上运行它。