调度器状态机

概述

Dask 计算的生命周期可以分为以下几个阶段:

  1. 用户使用某些库编写一个图,可能是 dask.delayed 或 dask.dataframe 或客户端上的 submit/map 函数。他们将这些任务提交给调度器。

  2. 调度器将这些任务整合到其所有任务的图中进行跟踪,当它们的依赖关系可用时,它会依次要求工作线程运行每个任务。

  3. 工作节点接收关于如何运行任务的信息,与其对等工作节点通信以收集数据依赖,然后在适当的数据上运行相关函数。它向调度器报告已完成,并将结果存储在计算该结果的工作节点中。

  4. 调度器向用户报告任务已完成。如果用户需要,它随后通过调度器从工作节点获取数据。

大部分相关逻辑在于跟踪任务从新提交、等待依赖、在某工作者上积极运行、内存中完成到被垃圾回收的演变过程。跟踪这一过程,以及跟踪该任务对可能依赖它的其他任务的所有影响,是动态任务调度器复杂性的主要部分。本节描述了用于执行此跟踪的系统。

有关调度器使用的策略的更抽象信息,请参阅 调度策略

调度器保持关于几种实体的内部状态:

  • 调度器已知的单个任务

  • 连接到调度器的工人

  • 连接到调度器的客户端

备注

本页列出的所有内容都是 Dask 操作的内部细节。它可能会在版本之间发生变化,您可能应该避免在用户代码中依赖它(包括此处解释的任何 API)。

任务状态

在内部,调度器在固定的一组状态之间移动任务,特别是 releasedwaitingno-workerqueuedprocessingmemoryerror

任务沿着以下状态流动,并允许以下转换:

Dask 调度器任务状态

请注意,任务也可能从任何状态(图中未显示)转变为 released

发布

已知但未主动计算或不在内存中

等待

正在计算中,等待依赖项进入内存

无工作者

准备好进行计算,但没有合适的worker存在(例如由于资源限制,或根本没有worker连接)。

排队

准备计算,但所有工作节点已满。

处理

所有依赖项都可用,并且任务已分配给一个工作者进行计算(调度器不知道它是在工作者队列中还是正在被主动计算)。

内存

在内存中,在一个或多个工作节点上

出错

任务计算,或其依赖项之一,遇到了错误

遗忘

任务不再被任何客户端或依赖任务需要,因此它也从调度器中消失。一旦任务达到这个状态,它将立即从调度器中解除引用。

备注

distributed.scheduler.worker_saturation 配置值设置为 1.1``(默认值)或其他任何有限值,将在调度器上以 ``queued 状态排队多余的根任务。这些任务只有在工作节点有容量时才会被分配给它们,从而减少工作节点上任务队列的长度。

distributed.scheduler.worker_saturation 配置值设置为 inf 时,在 waiting / no-workerprocessing 之间没有中间状态:一旦一个任务在集群的某个地方拥有了其所有依赖项,它就会立即被分配给一个工作节点。这可能导致工作节点上的任务队列非常长,然后通过 工作窃取 动态重新平衡。

除了字面状态外,还需要保留和更新有关每个任务的其他信息。单个任务状态存储在一个名为 TaskState 的对象中;通过链接查看完整的 API。

调度器使用几个容器来跟踪所有 TaskState 对象(那些不在“遗忘”状态的对象):

tasks: {str: TaskState}

一个将任务键映射到 TaskState 对象的字典。任务键是调度器与客户端或调度器与工作节点之间传递任务信息的方式;然后使用此字典来查找相应的 TaskState 对象。

unrunnable: {TaskState}

一组处于“无工作者”状态的 TaskState 对象。这些任务已经满足其所有 dependencies (它们的 waiting_on 集合为空),并等待合适的工作者加入网络后再进行计算。

一旦任务在工作者上排队,它也会通过 工作者状态 在工作端被跟踪。

工作状态

每个工人的当前状态存储在一个 WorkerState 对象中;通过链接查看完整的API。

这是一个调度器端的对象,它保存了调度器对集群中每个工作者的了解信息,不应与 distributed.worker-state-machine.WorkerState 混淆。

这些信息涉及决定 在哪个工作节点上运行任务

除了单个工作者的状态外,调度器还维护两个容器来帮助调度任务:

Scheduler.saturated: {WorkerState}

一组计算能力(以 WorkerState.nthreads 衡量)被完全利用来处理任务的工作者,其当前的 occupancy 远高于平均水平。

Scheduler.idle: {WorkerState}

一组计算能力未被充分利用的工作者。假设这些工作者能够立即开始计算新任务。

这两组是互斥的。此外,一些工人可能既不是“空闲”也不是“饱和”。在 决定合适的工人 来运行新任务时,将优先选择“空闲”的工人。相反,“饱和”的工人可能会通过 工作窃取 减轻工作负担。

客户端状态

关于调度器每个单独客户端的信息保存在一个 ClientState 对象中;通过链接查看完整的API。

理解任务的流程

如上所述,与任务和工作线程状态相关的信息有很多,其中一些信息可以在任务转换期间计算、更新或删除。

下表显示了任务在不同状态下所处的状态变量。带有勾号 () 的单元格表示任务键 必须 存在于给定的状态变量中;带有问号 (?) 的单元格表示任务键 可能 存在于给定的状态变量中。

状态变量

发布

等待

无工作者

处理

内存

出错

TaskState.dependencies

TaskState.dependents

TaskState.host_restrictions

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

TaskState.worker_restrictions

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

TaskState.resource_restrictions

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

TaskState.loose_restrictions

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

TaskState.waiting_on

TaskState.waiters

TaskState.processing_on

WorkerState.processing

TaskState.who_has

WorkerState.has_what

TaskState.nbytes (1)

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

TaskState.exception (2)

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

TaskState.traceback (2)

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

TaskState.exception_blame

TaskState.retries

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

TaskState.suspicious_tasks

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

注释:

  1. TaskState.nbytes: 只要任务已经被计算过,即使之后被释放,这个属性也可以被知道。

  2. TaskState.exceptionTaskState.traceback 应该在 TaskState.exception_blame 任务中查找。

下表显示了在每个任务状态转换时更新的工作状态变量。

过渡

受影响的工人状态

已发布 → 等待

占用, 空闲, 饱和

等待 → 处理

占用率, 空闲, 饱和, 已用资源

等待 → 内存

空闲, 饱和, 字节数

处理 → 内存

占用率, 空闲, 饱和, 使用资源, 字节数

处理 → 出错

占用率, 空闲, 饱和, 已用资源

处理 → 发布

占用率, 空闲, 饱和, 已用资源

内存 → 已释放

nbytes

记忆 → 遗忘

nbytes

备注

理解此表的另一种方式是观察到进入或退出特定任务状态会更新一组定义明确的工人状态变量。例如,进入和退出“内存”状态会更新 WorkerState.nbytes

实现

每个状态之间的转换在调度器中都是一个单独的方法。这些任务转换函数以 transition 为前缀,然后是开始和结束任务状态的名称,如下所示。

def transition_released_waiting(self, key, stimulus_id): ...

def transition_processing_memory(self, key, stimulus_id): ...

def transition_processing_erred(self, key, stimulus_id): ...

这些函数各有三种效果。

  1. 他们对调度器状态(20个字典/列表/集合)执行必要的转换,以在状态之间移动一个键。

  2. 它们返回一个推荐 {键: 状态} 转换的字典,以便随后直接在其他键上执行。例如,在我们将一个键转换到内存之后,我们可能会发现许多等待的键现在已准备好从等待状态转换到就绪状态。

  3. 可选地,它们包括一组可以为测试开启的验证检查。

我们不直接调用这些函数,而是调用中心函数 transition

def transition(self, key, final_state, stimulus_id): ...

此转换函数找到从当前状态到最终状态的适当路径。它还作为日志记录和诊断的中心点。

通常我们希望一次性执行多个转换,或者希望不断响应初始转换推荐的新的转换,直到达到稳定状态。为此,我们使用 transitions 函数(注意复数 s)。

def transitions(self, recommendations, stimulus_id):
    recommendations = recommendations.copy()
    while recommendations:
        key, finish = recommendations.popitem()
        new = self.transition(key, finish)
        recommendations.update(new)

此函数运行 transition,接受推荐并同样运行它们,重复直到不再推荐进一步的任务转换。

刺激

转换由刺激引起,这些刺激是来自工作者或客户端的状态改变消息,发送给调度器。调度器响应以下刺激:

工人

任务已完成

一个任务已在工作节点上完成,并现在存储在内存中

任务-出错

一个任务在工作者上运行并出错

重新安排

一个任务在工作者上通过引发 Reschedule 完成

长时间运行

任务仍在工作线程上运行,但它调用了 secede()

添加键

复制完成。一个或多个任务,之前在其他工作节点内存中,现在在一个额外的工作节点内存中。也用于通知调度器 scatter() 操作成功。

请求-刷新-谁-有

所有持有工作者所知任务副本的对等方都不可用(暂时或永久),因此工作者无法获取该任务副本,并询问调度器是否知道任何其他副本。此调用会定期重复,直到出现新的副本。

release-worker-data

一个工作者通知调度器,它不再持有该任务在内存中

worker-status-change

一个工作者的全局状态刚刚发生了变化,例如在 runningpaused 之间。

日志事件

一个通用事件在工作节点上发生,该事件应集中记录。请注意,这是除了工作节点的日志之外的记录,客户端可以根据请求获取(最多到一定长度)。

保持活动

一个工作节点通知它仍然在线且响应。这使用了批量流通道,与使用专用RPC通信的 distributed.worker.Worker.heartbeat()Scheduler.heartbeat_worker() 相反,这是为了防止防火墙关闭批量流。

注册-工人

一个新的工作者被添加到网络中

注销

一个现有的工作者离开了网络

客户端

更新图

客户端向调度器发送更多任务

客户端发布密钥

客户端不再需要某些键的结果。

请注意,还有许多更多的客户端API端点(例如用于服务 scatter() 等),为了简洁起见,这里没有列出。

刺激函数以文本 stimulus 为前缀,并从消息中接受各种关键字参数,如下例所示:

def stimulus_task_finished(self, key=None, worker=None, nbytes=None,
                           type=None, compute_start=None, compute_stop=None,
                           transfer_start=None, transfer_stop=None):

def stimulus_task_erred(self, key=None, worker=None,
                        exception=None, traceback=None)

这些函数改变一些非必要的管理状态,然后调用转换函数。

请注意,我们从工作节点和客户端接收到的还有其他几种不改变状态的消息,例如请求有关调度器当前状态信息的请求。这些不被视为刺激。

API

class distributed.scheduler.Scheduler(loop=None, services=None, service_kwargs=None, allowed_failures=None, extensions=None, validate=None, scheduler_file=None, security=None, worker_ttl=None, idle_timeout=None, interface=None, host=None, port=0, protocol=None, dashboard_address=None, dashboard=None, http_prefix='/', preload=None, preload_argv=(), plugins=(), contact_address=None, transition_counter_max=False, jupyter=False, **kwargs)[源代码]

动态分布式任务调度器

调度器跟踪工作者、数据和计算的当前状态。调度器监听事件,并通过适当控制工作者来响应。它不断尝试利用工作者来执行一个不断增长的dask图。

所有事件都能快速处理,时间复杂度与输入成线性关系(输入通常是常量大小),并且通常在毫秒内完成。为了实现这一点,调度器跟踪大量状态。每个操作都维护此状态的一致性。

调度器通过 Comm 对象与外界通信。即使在同时监听多个客户端时,它也能保持对世界的一致且有效的视图。

调度器通常通过 dask scheduler 可执行文件启动:

$ dask scheduler
Scheduler started at 127.0.0.1:8786

或者在 LocalCluster 中,Client 在没有连接信息的情况下启动:

>>> c = Client()  
>>> c.cluster.scheduler  
Scheduler(...)

用户通常不直接与调度器交互,而是通过客户端对象 Client 进行交互。

contact_address 参数允许向工作节点通告一个特定的地址,用于与调度器的通信,该地址与调度器绑定的地址不同。当调度器监听一个私有地址时,这非常有用,因为工作节点无法使用该私有地址来联系调度器。

状态

调度器包含以下状态变量。每个变量都列出了它存储的内容和简要描述。

  • 任务: {任务键: 任务状态}

    调度器当前已知的任务

  • 不可运行: {TaskState}

    处于“无工作者”状态的任务

  • 工人: {工人键: 工人状态}

    当前连接到调度器的工人

  • 空闲: {WorkerState}:

    未充分利用的工作者集合

  • 饱和状态: {WorkerState}:

    未过度利用的工作者集合

  • host_info: {hostname: dict}:

    每个工作主机的信息

  • 客户端: {客户端键: 客户端状态}

    当前连接到调度器的客户端

  • 服务: {str: 端口}:

    此调度器上运行的其他服务,如 Bokeh

  • 循环: IOLoop:

    正在运行的 Tornado IOLoop

  • client_comms: {客户端密钥: 通信}

    对于每个客户端,使用一个 Comm 对象来接收任务请求并报告任务状态更新。

  • stream_comms: {worker key: Comm}

    对于每个工作者,我们从一个Comm对象中接受刺激并报告结果。

  • 任务持续时间: {key-prefix: time}

    我们预期某些函数所需的时间,例如 {'sum': 0.25}

adaptive_target(target_duration=None)[源代码]

根据当前工作负载所需的工人数

这会查看当前运行的任务和内存使用情况,并返回所需的工作者数量。这通常用于自适应调度。

参数
目标持续时间str

计算所需的时间长度。这会影响调度器请求扩展的速度。

async add_client(comm: distributed.comm.core.Comm, client: str, versions: dict[str, Any]) None[源代码]

将客户端添加到网络

我们接收来自此通信的所有未来消息。

add_keys(worker: str, keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]] = (), stimulus_id: str | None = None) Literal['OK', 'not found'][源代码]

了解一个工人拥有某些钥匙

这在实践中不应使用,主要是出于历史原因而存在。然而,工作人员偶尔会发送它。

add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) None[源代码]

将外部插件添加到调度器。

请参阅 https://distributed.readthedocs.io/en/latest/plugins.html

参数
插件SchedulerPlugin

SchedulerPlugin 实例以添加

幂等布尔

如果为真,则假定插件已经存在,不会采取任何行动。

名称str

插件的名称,如果为 None,则检查 Plugin 实例上的 name 属性,并在未发现时生成。

add_replica(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) None

注意,一个worker持有一个状态为’memory’的任务副本。

async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) None[源代码]

向集群添加新工作节点

property address: str

可以联系到此服务器的地址。如果服务器尚未启动,则会引发 ValueError。

property address_safe: str

此服务器可以联系的地址。如果服务器尚未启动,则返回 "not-running"

async benchmark_hardware() dict[str, dict[str, float]][源代码]

在工作人员上运行基准测试,以测试内存、磁盘和网络带宽

返回
结果: dict

一个映射字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是许多工作者在集群上运行计算的平均值。

async broadcast(*, msg: dict, workers: collections.abc.Collection[str] | None = None, hosts: collections.abc.Collection[str] | None = None, nanny: bool = False, serializers: Any = None, on_error: Literal['raise', 'return', 'return_pickle', 'ignore'] = 'raise') dict[str, Any][源代码]

向工作节点广播消息,返回所有结果

bulk_schedule_unrunnable_after_adding_worker(ws: distributed.scheduler.WorkerState) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']]

no-worker 任务发送到 processing ,该工作程序可以处理这些任务。

返回按优先级排序的推荐。

check_idle_saturated(ws: distributed.scheduler.WorkerState, occ: float = - 1.0) None

更新空闲和饱和状态的状态

调度器跟踪那些是 .. 的工作者。

  • 饱和:有足够的工作保持忙碌

  • Idle: 没有足够的工作来保持忙碌

如果它们都有足够的任务来占用所有线程,并且这些任务的预期运行时间足够长,则认为它们是饱和的。

如果 distributed.scheduler.worker-saturation 不是 inf``(启用了调度器端排队),那么如果它们正在处理的任务数量少于 ``worker-saturation 阈值所规定的数量,则它们被视为空闲。

否则,如果它们处理的任务数量少于线程数,或者它们的任务总预期运行时间少于相同数量平均任务预期运行时间的一半,则它们被视为空闲。

这对于负载均衡和适应性很有用。

client_heartbeat(client: str) None[源代码]

处理来自客户端的心跳

client_releases_keys(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, stimulus_id: str | None = None) None[源代码]

从客户端期望列表中移除键

client_send(client, msg)[源代码]

向客户端发送消息

async close(fast=None, close_workers=None, reason='unknown')[源代码]

向所有协程发送清理信号,然后等待直到完成

参见

Scheduler.cleanup
close_worker(worker: str) None[源代码]

要求一个工作者自行关闭。不要等待其生效。注意,无法保证工作者会实际接受该命令。

注意,如果 close=True,remove_worker() 会在内部发送相同的命令。

coerce_address(addr: str | tuple, resolve: bool = True) str[源代码]

将可能的输入地址强制转换为规范形式。resolve 可以在使用假主机名进行测试时禁用。

处理字符串、元组或别名。

coerce_hostname(host: collections.abc.Hashable) str

强制转换工作节点的主机名。

decide_worker_non_rootish(ts: distributed.scheduler.TaskState) distributed.scheduler.WorkerState | None

选择一个可运行的非根任务的工作者,考虑依赖关系和限制。

从持有 ts 依赖项的合格工作者中,选择一个工作者,在考虑工作者积压和工作数据传输成本的情况下,估计任务将最快开始运行。

返回
ws: WorkerState | None

要分配任务的工人。如果没有工人满足 ts 的限制,或者没有正在运行的工人,则返回 None,在这种情况下,任务应转换为 no-worker

decide_worker_rootish_queuing_disabled(ts: distributed.scheduler.TaskState) distributed.scheduler.WorkerState | None

选择一个可运行的根任务的工作者,无需排队。

这试图在同一个工作节点上调度兄弟任务,以减少未来的数据传输。它不考虑依赖项的位置,因为它们最终会出现在每个工作节点上。

它假设按优先顺序调用一批任务,并通过在 SchedulerState.last_root_workerSchedulerState.last_root_worker_tasks_left 中维护状态来实现这一点。

这将把每个可运行的任务发送给一个工作线程,通常会导致根任务的过度生产。

返回
ws: WorkerState | None

要分配任务的工作者。如果集群中没有工作者,则返回 None,在这种情况下,任务应转换为 no-worker

decide_worker_rootish_queuing_enabled() distributed.scheduler.WorkerState | None

如果并非所有工人都忙碌,选择一个工人来执行可运行的根任务。

idle 工作者中选择最不繁忙的工作者(空闲工作者运行的任务数少于线程数,由 distributed.scheduler.worker-saturation 设置)。它不考虑依赖项的位置,因为它们最终会出现在每个工作者上。

如果所有工作线程都已满,则返回 None,这意味着任务应转换到 queued 状态。调度器将等待将其发送到工作线程,直到有线程空闲。这确保了下游任务总是在新根任务启动之前运行。

这不会尝试在同一个工作节点上调度兄弟任务;实际上,它通常会做相反的事情。尽管这会增加后续的数据传输,但它通常通过消除根任务的过度生产来减少整体内存使用。

返回
ws: WorkerState | None

要分配任务的工人。如果没有空闲的工人,则返回 None,在这种情况下,任务应转换为 queued

async delete_worker_data(worker_address: str, keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], stimulus_id: str) None[源代码]

从工作者中删除数据并更新相应的工作者/任务状态

参数
worker_address: str

要从中删除键的工作者地址

keys: list[Key]

在指定工作节点上删除的键列表

async dump_cluster_state_to_url(url: str, exclude: collections.abc.Collection[str], format: Literal['msgpack', 'yaml'], **storage_options: dict[str, Any]) None[源代码]

将集群状态转储写入一个与 fsspec 兼容的 URL。

async feed(comm: distributed.comm.core.Comm, function: bytes | None = None, setup: bytes | None = None, teardown: bytes | None = None, interval: str | float = '1s', **kwargs: Any) None[源代码]

提供对外部请求者的数据通信

注意:这会在调度器上运行任意 Python 代码。这最终应该被淘汰。它主要用于诊断。

async finished()

等待服务器完成

async gather(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], serializers: list[str] | None = None) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][源代码]

从工作者收集数据到调度器

async gather_on_worker(worker_address: str, who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) set[源代码]

从多个工作者到单个工作者的密钥点对点复制

参数
worker_address: str

接收工作者的地址以复制密钥到

who_has: dict[Key, list[str]]

{键: [发送者地址, 发送者地址, …], 键: …}

返回
returns:

一组未能复制的键

async get_cluster_state(exclude: collections.abc.Collection[str]) dict[源代码]

生成用于集群状态转储的状态字典

get_comm_cost(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) float

获取在给定工作节点上计算任务的估计通信成本(以秒为单位)。

get_connection_counters() dict[str, int]

一个包含各种连接计数的字典

参见

Server.incoming_comms_open
Server.incoming_comms_active
Server.outgoing_comms_open
Server.outgoing_comms_active
get_logs(start=0, n=None, timestamps=False)

获取此节点的日志条目

参数
开始float, 可选

开始过滤日志条目的时间(以秒为单位)

nint, 可选

从过滤结果中返回的最大日志条目数

时间戳bool, 默认 False

我们是否希望日志条目包含它们生成的时间?

返回
包含日志级别、消息和(可选)时间戳的元组列表,每个过滤条目按最新顺序排列
async get_story(keys_or_stimuli: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) list[distributed.scheduler.Transition][源代码]

用于 SchedulerState.story() 的 RPC 钩子。

请注意,msgpack 序列化/反序列化往返会将 Transition 命名元组转换为普通元组。

get_task_duration(ts: distributed.scheduler.TaskState) float

获取给定任务的估计计算成本(不包括任何通信成本)。

如果没有观察到数据,则使用 distributed.scheduler.default-task-durations 的值。如果此任务未设置,则改为使用 distributed.scheduler.unknown-task-duration

get_worker_service_addr(worker: str, service_name: str, protocol: bool = False) tuple[str, int] | str | None[源代码]

获取 worker 上命名服务的 (主机, 端口) 地址。如果服务不存在,则返回 None。

参数
工人地址
服务名称str

常见服务包括 ‘bokeh’ 和 ‘nanny’

协议布尔

是否包含带有协议的完整地址(True)或仅包含(主机,端口)对

handle_comm(comm: distributed.comm.core.Comm) distributed.utils.NoOpAwaitable

启动一个后台任务,将新通信分派给协程处理程序

handle_long_running(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], worker: str, compute_duration: float | None, stimulus_id: str) None[源代码]

一个任务已从线程池中脱离

我们阻止任务在未来被窃取,并更改任务持续时间计算,就好像任务已经停止一样。

handle_request_refresh_who_has(keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], worker: str, stimulus_id: str) None[源代码]

来自工作者的请求,要求刷新某些键的 who_has 信息。不要与 scheduler.who_has 混淆,后者是客户端的专用通信 RPC 请求。

async handle_worker(comm: distributed.comm.core.Comm, worker: str) None[源代码]

监听来自单个工作者的响应

这是调度器与工作线程交互的主循环

参见

Scheduler.handle_client

客户端的等效协程

property host

此服务器正在运行的主机。

如果服务器监听在非基于IP的协议上,这将引发 ValueError。

identity() dict[str, Any][源代码]

关于我们自己和我们集群的基本信息

property incoming_comms_active: int

当前处理远程RPC的连接数

property incoming_comms_open: int

正在监听远程RPC的总连接数

property is_idle: bool

当且仅当没有未完成的任务时返回 True。

与测试 self.total_occupancy 不同,如果存在长时间运行的任务、无工作者或由于没有工作者而排队的任务,此属性将返回 False。

不要与 idle 混淆。

is_rootish(ts: distributed.scheduler.TaskState) bool

ts 是否是一个根任务或类似根任务。

根状任务是隶属于一个比集群大得多的组,并且很少或没有依赖关系。任务也可以被明确标记为根状以覆盖这一启发式方法。

property listen_address

服务器正在监听的地址。这可能是一个通配符地址,例如 tcp://0.0.0.0:1234

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[源代码]

在给定主题下记录事件

参数
主题str, list[str]

记录事件的主题名称。要将同一事件记录在多个主题下,请传递一个主题名称列表。

消息

事件消息以记录。注意这必须是 msgpack 可序列化的。

参见

Client.log_event
new_task(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], spec: tuple[collections.abc.Callable, tuple, dict[str, Any]] | None, state: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], computation: distributed.scheduler.Computation | None = None) distributed.scheduler.TaskState

创建一个新任务,并关联状态

property outgoing_comms_active: int

当前用于执行RPC的出站连接数

property outgoing_comms_open: int

当前打开并等待远程RPC的连接数

property port

此服务器正在监听的端口号。

如果服务器监听在非基于IP的协议上,这将引发 ValueError。

async proxy(msg: dict, worker: str, serializers: Any = None) Any[源代码]

通过调度器代理通信到其他工作节点

async rebalance(keys: collections.abc.Iterable[Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]] | None = None, workers: collections.abc.Iterable[str] | None = None, stimulus_id: str | None = None) dict[源代码]

重新平衡键,使得每个工作线程最终拥有大致相同的进程内存(托管+非托管)。

警告

此操作通常未经过调度器正常操作的充分测试。不建议在等待计算时使用它。

算法

  1. 查找集群的平均占用率,定义为 dask 管理的数据 + 至少存在 30 秒的未管理进程内存(distributed.worker.memory.recent-to-old-time)。这使我们能够忽略由任务堆使用引起的临时峰值。

    或者,您可以通过 distributed.worker.memory.rebalance.measure 更改内存的测量方式,包括单个工作者的内存测量以及计算平均值。具体来说,这可以用于忽略不准确的系统内存测量。

  2. 丢弃占用率在集群平均占用率5%以内的工作节点(distributed.worker.memory.rebalance.sender-recipient-gap / 2)。这有助于避免数据在集群中反复跳动。

  3. 高于平均水平的工人是发送者;低于平均水平的是接收者。

  4. 丢弃发送者,其绝对占用率低于30%(distributed.worker.memory.rebalance.sender-min)。换句话说,只要所有工作者的占用率都低于30%,无论不平衡程度如何,都不会移动数据。

  5. 丢弃绝对占用率超过60%的接收者(distributed.worker.memory.rebalance.recipient-max)。请注意,此阈值默认与``distributed.worker.memory.target``相同,以防止工作者在接受数据后立即将其溢出到磁盘。

  6. 迭代地选择与平均值最远的发送者和接收者,并在两者之间移动*最近最少插入*的键,直到所有发送者或所有接收者都在平均值的5%以内。

    如果接收者已经拥有数据的副本,它将被跳过。换句话说,此方法不会降低复制效果。如果没有足够的内存来接受密钥且尚未持有副本的接收者可用,则该密钥将被跳过。

最近最少插入 (LRI) 策略是一种具有 O(1) 时间复杂度优势的贪心选择,实现简单(依赖于 Python 字典的插入排序),并且在大多数情况下应该足够好。被淘汰的替代策略有:

  • 从最大开始。O(n*log(n)) 节省了非平凡的额外数据结构,并可能导致最大的数据块像弹球一样在集群中反复移动。

  • 最近最少使用 (LRU)。此信息目前仅在工作节点上可用,并且在调度器上复制并不简单;通过网络传输它将非常昂贵。此外,请注意,dask 会尽力最小化中间键在内存中保留的时间,因此在这种情况下的 LRI 是 LRU 的近似值。

参数
键: 可选

应考虑移动的 dask 键的允许列表。所有其他键将被忽略。请注意,这并不保证键实际上会被移动(例如,因为它是不必要的,或者因为没有可行的接收工作者)。

workers: 可选

允许的工作者地址列表,作为发送者或接收者。所有其他工作者将被忽略。集群占用率的平均值将仅使用允许的工作者计算。

async register_nanny_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage][源代码]

在所有运行中和未来的保姆上注册一个保姆插件

async register_scheduler_plugin(plugin: bytes | distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None) None[源代码]

在调度器上注册一个插件。

async register_worker_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage][源代码]

在所有正在运行和未来的工作进程上注册一个工作插件

remove_all_replicas(ts: distributed.scheduler.TaskState) None

从所有工作节点中移除任务的所有副本

remove_client(client: str, stimulus_id: str | None = None) None[源代码]

从网络中移除客户端

remove_plugin(name: str | None = None) None[源代码]

从调度器中移除外接插件

参数
名称str

要移除的插件名称

remove_replica(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) None

注意,一个工作者不再持有任务的副本

remove_worker(address: str, *, stimulus_id: str, expected: bool = False, close: bool = True) Literal['OK', 'already-removed'][源代码]

从集群中移除工作节点。

当一个工作者报告它计划离开或当它看起来无响应时,我们会这样做。这可能会将其任务发送回释放状态。

async replicate(comm=None, keys=None, n=None, workers=None, branching_factor=2, delete=True, stimulus_id=None)[源代码]

在整个集群中复制数据

这会在网络中的每一份数据上单独执行数据的树形复制。

参数
键: 可迭代对象

要复制的键列表

n: int

我们期望在集群内看到的复制次数

branching_factor: int, 可选

每个生成中可以复制数据的工人数量。分支因子越大,我们在单个步骤中复制的数据越多,但给定的工人面临的数据请求风险也越大。

report(msg: dict, ts: distributed.scheduler.TaskState | None = None, client: str | None = None) None[源代码]

发布更新到所有监听的队列和通信

如果消息包含一个键,那么我们只将消息发送给那些关心该键的通信。

request_acquire_replicas(addr: str, keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None[源代码]

异步请求一个工作者从其他工作者获取所列键的副本。这是一个没有反馈的成功或失败的触发即忘操作,旨在用于家务管理而非计算。

request_remove_replicas(addr: str, keys: list[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None[源代码]

异步请求一个工作者丢弃其列出键的副本。这绝不能用于销毁键的最后一个副本。这是一个即发即弃的操作,适用于家务管理而非计算。

副本会立即从调度器端的 TaskState.who_has 中消失;如果工作节点拒绝删除,例如因为该任务是运行在其上的另一个任务的依赖项,它将(同样异步地)通知调度器重新将自己添加到 who_has 中。如果工作节点同意丢弃任务,则不会有反馈。

async restart(*, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, stimulus_id: str) None[源代码]

忘记所有任务并在所有工作器上调用 restart_workers。

参数
超时:

参见 restart_workers

wait_for_workers:

参见 restart_workers

参见

Client.restart
Client.restart_workers
Scheduler.restart_workers
async restart_workers(workers: list[str] | None = None, *, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, on_error: Literal['raise', 'return'] = 'raise', stimulus_id: str) dict[str, Literal['OK', 'removed', 'timed out']][源代码]

重启选定的工人。可以选择等待工人返回。

没有保姆的工人会被关闭,希望外部部署系统会重新启动它们。因此,如果不使用保姆,并且您的部署系统不会自动重启工人,restart 将只会关闭所有工人,然后超时!

restart 之后,所有连接的工作者都是新的,无论是否引发了 TimeoutError。任何未能及时关闭的工作者都会被移除,并且可能在将来自行关闭,也可能不会。

参数
工人:

要重启的工作者地址列表。如果省略,则重启所有工作者。

超时:

如果 wait_for_workers 为 True,等待工作线程关闭并返回的时间长度,否则仅等待工作线程关闭的时间长度。如果超过此时间,则引发 asyncio.TimeoutError

wait_for_workers:

是否等待所有工作进程重新连接,还是仅等待它们关闭(默认 True)。使用 restart(wait_for_workers=False) 结合 Client.wait_for_workers() 可以实现对等待多少工作进程的精细控制。

on_error:

如果为 ‘raise’(默认),则在重新启动工作进程时,如果有任何保姆超时,则引发异常。如果为 ‘return’,则返回错误消息。

返回
{worker address: “OK”, “no nanny”, 或 “timed out” 或错误信息}

参见

Client.restart
Client.restart_workers
Scheduler.restart
async retire_workers(workers: list[str], *, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, distributed.utils.Any][源代码]
async retire_workers(*, names: list, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, distributed.utils.Any]
async retire_workers(*, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None', memory_ratio: int | float | None = 'None', n: int | None = 'None', key: Callable[[WorkerState], Hashable] | bytes | None = 'None', minimum: int | None = 'None', target: int | None = 'None', attribute: str = "'address'") dict[str, distributed.utils.Any]

优雅地从集群中退役工作节点。任何仅在退役工作节点内存中的键将被复制到其他地方。

参数
workers: list[str] (可选)

待退休的工人地址列表。

names: list (可选)

要退休的工人名称列表。与 workers 互斥。如果既没有提供 workers 也没有提供 names,我们将调用 workers_to_close,它会找到一个合适的选择。

close_workers: bool (默认为 False)

是否从这里实际明确地关闭工作线程。否则,我们期望一些外部作业调度程序来完成工作线程。

remove: bool (默认为 True)

是否立即删除工作者的元数据,或者等待工作者与我们联系。

如果 close_workers=False 且 remove=False,此方法仅将内存中的任务从工作线程中刷新并返回。如果 close_workers=True 且 remove=False,此方法将在工作线程仍在集群中时返回,尽管它们不会接受新任务。如果 close_workers=False 或由于任何原因工作线程不接受关闭命令,它将永久无法接受新任务,并预期以其他方式关闭。

**kwargs: 字典

传递给 workers_to_close 的额外选项,用于确定我们应该丢弃哪些工作线程。仅在 workersnames 被省略时接受。

返回
字典映射工作器ID/地址到关于该工作器信息的字典
每个退休工人对应一个工人。
如果存在仅在即将退役的工作节点上内存中存在的键和它
无法在其他地方复制它们(例如,因为没有
如果有任何其他正在运行的工作进程),持有这些键的工作进程将不会被退役并且
不会出现在返回的字典中。
run_function(comm: distributed.comm.core.Comm, function: collections.abc.Callable, args: tuple = (), kwargs: dict | None = None, wait: bool = True) Any[源代码]

在此进程中运行一个函数

参见

Client.run_on_scheduler
async scatter(comm=None, data=None, workers=None, client=None, broadcast=False, timeout=2)[源代码]

将数据发送给工作者

send_all(client_msgs: dict[str, list[dict[str, Any]]], worker_msgs: dict[str, list[dict[str, Any]]]) None[源代码]

向客户端和工作者发送消息

send_task_to_worker(worker: str, ts: distributed.scheduler.TaskState, duration: float = - 1) None[源代码]

将单个计算任务发送给一个工作者

start_http_server(routes, dashboard_address, default_port=0, ssl_options=None)

这将在本节点上创建一个HTTP服务器

start_periodic_callbacks()

开始定期回调一致

如果 self.periodic_callbacks 中存储的 PeriodicCallbacks 尚未运行,此操作将启动它们。它通过检查是否使用了正确的事件循环来安全地执行此操作。

async start_unsafe() Self[源代码]

清除旧状态并重新启动所有正在运行的协程

stimulus_cancel(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, force: bool = False) None[源代码]

在键列表上停止执行

stimulus_queue_slots_maybe_opened(*, stimulus_id: str) None[源代码]

响应一个可能在工作线程池中打开空位的事件

根据工作线程上可用的任务槽总数(可能为0),从队列前端选择适当数量的任务,并将它们转换为 processing

注释

与此刺激相关的其他转换应在事先完全处理完毕,因此任何变得可运行的任务都已经在 processing 中。否则,如果队列中的任务在下游任务之前被调度,可能会发生过度生产。

必须在调用 check_idle_saturated 之后调用;即 idle_task_count 必须是最新的。

stimulus_task_erred(key=None, worker=None, exception=None, stimulus_id=None, traceback=None, run_id=None, **kwargs)[源代码]

标记某个任务在特定的工作节点上出错

stimulus_task_finished(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], worker: str, stimulus_id: str, run_id: int, **kwargs: Any) tuple[dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]][源代码]

标记一个任务在特定的工作节点上已完成执行

story(*keys_or_tasks_or_stimuli: Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.scheduler.TaskState]) list[distributed.scheduler.Transition]

获取所有触及输入键或刺激ID的转换

transition(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], finish: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], stimulus_id: str, **kwargs: Any) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']][源代码]

将一个键从其当前状态转换到完成状态

返回
未来过渡的建议字典

参见

Scheduler.transitions

此函数的传递版本

示例

>>> self.transition('x', 'waiting')
{'x': 'processing'}
transitions(recommendations: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], stimulus_id: str) None[源代码]

处理所有剩余的转换过程

这包括了从前几次过渡中得到的反馈,并持续进行,直到我们达到一个稳定状态

async unregister_nanny_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][源代码]

注销一个工作插件

async unregister_scheduler_plugin(name: str) None[源代码]

在调度器上注销一个插件。

async unregister_worker_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][源代码]

注销一个工作插件

update_data(*, who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]], nbytes: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], int], client: str | None = None) None[源代码]

了解到有新数据从外部来源进入网络

valid_workers(ts: distributed.scheduler.TaskState) set[distributed.scheduler.WorkerState] | None

返回当前有效的键的工作者集合

如果所有工作者都有效,则返回 None,在这种情况下可以使用任何 运行中 的工作者。否则,将返回适用于此任务的运行中工作者的子集。此检查跟踪以下状态:

  • worker_restrictions

  • host_restrictions

  • 资源限制

worker_objective(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) tuple

确定哪个工人应该获得任务的目标函数

最小化预期启动时间。如果有平局,则通过数据存储来打破平局。

worker_send(worker: str, msg: dict[str, Any]) None[源代码]

向工作者发送消息

这还通过在下一周期添加回调来处理连接失败,以移除工作线程。

workers_list(workers: collections.abc.Iterable[str] | None) list[str][源代码]

合格工人名单

接受一个工作地址或主机名的列表。返回所有匹配的工作地址列表。

workers_to_close(memory_ratio: int | float | None = None, n: int | None = None, key: collections.abc.Callable[[distributed.scheduler.WorkerState], collections.abc.Hashable] | bytes | None = None, minimum: int | None = None, target: int | None = None, attribute: str = 'address') list[str][源代码]

找到我们可以以低成本关闭的工人

这将返回一组适合退休的工作者。这些工作者没有运行任何任务,并且相对于其他工作者存储的数据量较少。如果所有工作者都处于空闲状态,我们仍然会保留足够的工作者,以确保有足够的RAM来存储我们的数据,并留有舒适的缓冲区。

这是用于 distributed.deploy.adaptive 等系统。

参数
memory_ratio数字

我们希望为存储的数据额外保留的空间量。默认为2,即我们希望保留的内存量是当前数据量的两倍。

n整数

要关闭的工人数量

最小整数

保留的最少工作线程数

关键Callable(WorkerState)

一个可选的可调用对象,将 WorkerState 对象映射到一个组隶属关系。组将一起关闭。这在需要集体关闭工作进程时很有用,例如按主机名关闭。

目标整数

在我们关闭后,目标工作者的数量

属性str

要返回的 WorkerState 对象的属性,如“address”或“name”。默认为“address”。

返回
to_close: 可以关闭的工作地址列表

示例

>>> scheduler.workers_to_close()
['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']

在关闭之前按主机名对工作线程进行分组

>>> scheduler.workers_to_close(key=lambda ws: ws.host)
['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']

移除两名工人

>>> scheduler.workers_to_close(n=2)

保持足够的工人,使其内存是我们所需的两倍。

>>> scheduler.workers_to_close(memory_ratio=2)
class distributed.scheduler.TaskState(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], run_spec: tuple[collections.abc.Callable, tuple, dict[str, Any]] | None, state: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], group: distributed.scheduler.TaskGroup)[源代码]

一个简单的对象,用于保存任务的相关信息。

不要与 distributed.worker_state_machine.TaskState 混淆,后者在 Worker 端保存类似的信息。

actor: bool

这个任务是否是一个角色

add_dependency(other: distributed.scheduler.TaskState) None[源代码]

将另一个任务作为此任务的依赖项添加

annotations: dict[str, Any] | None

任务注解

dependencies: set[distributed.scheduler.TaskState]

此任务为确保正确执行所依赖的任务集合。只有仍在运行的任务才会列在此集合中。如果由于任何原因,此任务还依赖于一个已遗忘的任务,则 has_lost_dependencies 标志将被设置。

一个任务只有在所有依赖任务都已成功执行并且在至少一个工作节点上存储了它们的结果后才能执行。这是通过逐步清空 waiting_on 集合来跟踪的。

dependents: set[distributed.scheduler.TaskState]

依赖于此任务的任务集合。只有仍然存活(未完成)的任务才会被列在这个集合中。这是 dependencies 的反向映射。

erred_on: set[str] | None

出现错误的Worker地址,导致此任务处于错误状态。

exception: distributed.protocol.serialize.Serialized | None

如果此任务执行失败,异常对象将存储在此处。

exception_blame: distributed.scheduler.TaskState | None

如果此任务或其依赖项之一执行失败,则失败的任务存储在此处(可能本身)。

exception_text: str | None

异常的字符串表示

group: distributed.scheduler.TaskGroup

这个任务所属的任务组

has_lost_dependencies: bool

是否忘记了此任务的任何依赖项。由于内存消耗的原因,即使可能有依赖任务,被遗忘的任务也不会保存在内存中。因此,当一个任务被遗忘时,其每个依赖项的 has_lost_dependencies 属性都会被设置为 True

如果 has_lost_dependencies 为真,此任务将不能再进入“处理”状态。

host_restrictions: set[str] | None

一组可以运行此任务的主机名(如果为空则为 None)。通常这为空,除非该任务已被特别限制只能在某些主机上运行。一个主机名可能对应一个或多个连接的工作者。

key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]]

键是任务的唯一标识符,通常由函数名称、函数和参数的哈希值组成,例如 'inc-ab31c010444977004d656610d2d421ec'

loose_restrictions: bool

每个 host_restrictionsworker_restrictionsresource_restrictions 都是硬性约束:如果没有满足这些约束的工作者可用,任务将无法进入“处理”状态,而是会进入“无工作者”状态。

上述限制仅仅是偏好:如果没有满足这些限制的工人可用,任务仍然可以进入“处理”状态,并被发送给另一个连接的工人执行。

metadata: dict[str, Any] | None

与任务相关的元数据

nbytes: int

已完成任务的结果的字节数,由 sizeof 确定。此数字用于诊断和帮助优先处理工作。对于未完成的任务,设置为 -1。

property prefix: distributed.scheduler.TaskPrefix

这类任务的广泛类别,如“inc”或“read_csv”

priority: tuple[float, ...] | None

优先级为每个任务提供了一个相对排名,当考虑执行多个任务时,这个排名用于打破平局。

这个排名通常是一个包含两个元素的元组。第一个(也是主要的)元素对应于任务提交的时间。通常,较早的任务优先级更高。第二个元素由客户端决定,是在大型图中优先处理任务的一种方式,例如如果它们在关键路径上,或者为了释放许多依赖关系而需要按顺序运行。这在 调度策略 中有进一步的解释。

processing_on: distributed.scheduler.WorkerState | None

如果此任务处于“处理中”状态,当前正在处理它的工作人员是谁。此属性与 WorkerState.processing 保持同步。

resource_restrictions: dict[str, float] | None

此任务所需的资源,例如 {'gpu': 1}{'memory': 1e9}。这些是用户定义的名称,并与每个 WorkerState.resources 字典的内容进行匹配。

retries: int

此任务在失败情况下可以自动重试的次数。如果任务执行失败(工作进程返回错误),则会检查其 retries 属性。如果等于 0,则任务被标记为“出错”。如果大于 0,则 retries 属性递减并再次尝试执行。

run_id: int | None

任务特定执行的唯一标识符。此标识符用于签署任务,以便分配的工人预计会在任务完成消息中返回相同的标识符。这用于关联响应。只有最近分配的工人是受信任的。所有其他结果将被拒绝。

run_spec: tuple[collections.abc.Callable, tuple, dict[str, Any]] | None

如何运行任务的规范。这个值的类型和含义对调度器是不透明的,因为它仅由任务发送到的工人解释执行。

作为一种特殊情况,此属性也可能为 None,在这种情况下,任务是“纯数据”(例如,使用 Client.scatter() 方法在调度器中加载的数据片段)。“纯数据”任务如果丢失其值,则无法再次计算。

property state: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']

此任务的当前状态。有效状态包括 releasedwaitingno-workerprocessingmemoryerredforgotten。如果状态为 forgotten,则该任务不再存储在 tasks 字典中,并且可能很快从内存中消失。

suspicious: int

此任务导致工人死亡的次数。

某些任务可能导致工作进程死亡(例如调用 os._exit(0))。当一个工作进程死亡时,该工作进程上的所有任务都会被重新分配给其他工作进程。这种行为组合可能导致一个不良任务灾难性地摧毁集群中的所有工作进程,一个接一个。每当一个工作进程死亡时,我们会将当前在该工作进程上处理的所有任务(如 WorkerState.processing 记录的)标记为可疑。如果一个任务涉及三次死亡(或某个固定常数),则我们将该任务标记为 erred

traceback: distributed.protocol.serialize.Serialized | None

如果此任务执行失败,回溯对象将存储在此处。

traceback_text: str | None

回溯的字符串表示

type: str

对象的类型,以字符串表示。仅在任务已被计算时存在。

waiters: set[distributed.scheduler.TaskState] | None

需要此任务保持活动状态的任务集。这始终是 dependents 的子集。每次其中一个依赖项完成处理时,它都会从 waiters 集中移除。

一旦 waiterswho_wants 都变为空,这个任务可以被调度器释放(如果它有一个非空的 run_spec)或遗忘(否则),并且被 who_has 中的任何工作进程遗忘。

备注

反直觉地,waiting_onwaiters 并不是彼此的反向映射。

waiting_on: set[distributed.scheduler.TaskState] | None

此任务在执行前等待的任务集合。这始终是 dependencies 的一个子集。每次其中一个依赖项完成处理时,它都会从 waiting_on 集合中移除。

一旦 waiting_on 变为空,此任务可以从“等待”状态转移到“处理”状态(除非其中一个依赖项出错,在这种情况下,此任务将被标记为“出错”)。

who_has: set[distributed.scheduler.WorkerState] | None

拥有此任务结果的工人集合。当任务处于“内存”状态时,该集合非空。如果使用了 Client.scatter()Client.replicate() 方法,此集合中可能会有多个工人。

这是 WorkerState.has_what 的反向映射。

who_wants: set[distributed.scheduler.ClientState] | None

希望此任务结果保持活跃的客户端集合。这是 ClientState.wants_what 的反向映射。

当客户端向调度器提交一个图时,它还会指定它希望哪些输出任务的结果不被从内存中释放。

一旦任务完成执行(即进入“内存”或“错误”状态),who_wants 中的客户端将被通知。

一旦 waiterswho_wants 都变为空,这个任务可以被调度器释放(如果它有一个非空的 run_spec)或遗忘(否则),并且被 who_has 中的任何工作进程遗忘。

worker_restrictions: set[str] | None

一组完整的可以运行此任务的工作地址(如果为空则为 None)。通常这为空,除非该任务已被特别限制只能在某些工作节点上运行。请注意,这里跟踪的是工作地址,而不是工作状态,因为特定的工作节点此时可能未连接。

class distributed.scheduler.WorkerState(*, address: str, status: distributed.core.Status, pid: int, name: object, nthreads: int = 0, memory_limit: int, local_directory: str, nanny: str | None, server_id: str, services: dict[str, int] | None = None, versions: dict[str, Any] | None = None, extra: dict[str, Any] | None = None, scheduler: distributed.scheduler.SchedulerState | None = None)[源代码]

一个简单的对象,用于保存有关工人的信息。

不要与 distributed.worker_state_machine.WorkerState 混淆。

actors: set[distributed.scheduler.TaskState]

此工作节点上所有 TaskState 的集合,这些 TaskState 是 actor。这仅包括那些状态实际存在于此工作节点上的 actor,而不包括此工作节点有引用的 actor。

add_replica(ts: distributed.scheduler.TaskState) None[源代码]

工作者获取了任务的副本

add_to_processing(ts: distributed.scheduler.TaskState) None[源代码]

为该工作人员分配计算任务。

address: str

这个工作者的唯一键。这可以是其连接地址(例如 "tcp://127.0.0.1:8891")或别名(例如 "alice")。

clean() distributed.scheduler.WorkerState[源代码]

返回适合序列化的此对象版本

executing: dict[distributed.scheduler.TaskState, float]

当前正在此工作节点上运行的任务字典。每个任务状态都与任务已运行的持续时间(以秒为单位)相关联。

extra: dict[str, Any]

要添加到 identity() 的任意附加元数据

property has_what: collections.abc.Set[distributed.scheduler.TaskState]

一个插入排序的任务集合,这些任务当前驻留在此工作节点上。这里的所有任务都处于“内存”状态。这是 TaskState.who_has 的反向映射。

这是一个只读的公共访问器。数据实现为一个没有值的字典,因为 rebalance() 依赖于字典的插入排序。

last_seen: float

我们上次从该工作节点收到心跳的时间,以本地调度器时间为准。

long_running: set[distributed.scheduler.TaskState]

运行调用了 distributed.secede() 的任务

property memory: distributed.scheduler.MemoryState

工作者的优化内存指标。

关于托管内存的设计说明

有两种可用的托管内存度量:

  • self.nbytes

  • self.metrics["managed_bytes"]

在静止状态下,这两个数字必须相同。然而,self.nbytes 会通过批量通信立即更新,一旦每个任务到达工作内存;而 self.metrics["managed_bytes"] 则通过心跳更新,这可能会滞后几秒钟。

下面我们将可能较新的托管内存信息从 self.nbytes 与心跳中的进程和溢出内存混合在一起。这是故意的,以便更频繁地更新托管内存总量。

管理内存直接且立即有助于乐观内存,这反过来用于活动内存管理器的启发式算法(在撰写本文时;未来可能会增加更多用途)。因此,保持其最新状态非常重要;比进程内存更为重要。

一旦调度器了解到任务完成,拥有最新的托管内存信息也大大简化了单元测试。

这种设计的反面是,它可能会在 unmanaged_recent 测量中引起一些噪音。例如:

  1. 删除 100MB 的管理数据

  2. 更新的托管内存比更新的进程内存更快地到达调度器

  3. 调度器认为未管理的最近内存突然增加了100MB,这是一个小问题,因为进程内存没有变化,但管理的内存减少了100MB。

  4. 当心跳到达时,进程内存下降,unmanaged_recent 也随之下降。

这是可以的 - unmanaged_recent / unmanaged_old 分割的主要原因之一正是为了将所有噪音集中在 unmanaged_recent 中,并将其从用于启发式的乐观内存中排除。

不太理想但也不太常见的情况是,突然删除溢出的键会导致托管内存出现负向波动:

  1. 删除 100MB 的溢出数据

  2. 更新后的托管内存 总量 比更新后的溢出部分更快地到达调度器

  3. 这导致托管内存暂时骤降,并被 unmanaged_recent 替换,而溢出内存保持不变

  4. 当心跳到达时,托管的进程恢复,未托管的最近进程减少,溢出的内存减少100MB,这应该是初始状态。

GH#6002 将让我们解决这个问题。

memory_limit: int

工作进程可用的内存,以字节为单位

nanny: str | None

关联的 Nanny 的地址,如果存在

nbytes: int

该工作程序在内存中持有的任务所使用的总内存大小,单位为字节(即该工作程序的 has_what 中的任务)。

needs_what: dict[distributed.scheduler.TaskState, int]

可能需要获取到此工作者的键,以及需要这些键的任务数量。所有任务目前都在另一个工作者上的 memory 中。与 processing 类似,这并不完全反映工作者状态:这里的键可能正在排队获取、正在传输中,或者已经在此工作者的内存中。

nthreads: int

在此工作节点上可用的 CPU 线程数

processing: set[distributed.scheduler.TaskState]

这里所有的任务都处于“处理中”状态。此属性与 TaskState.processing_on 保持同步。

remove_from_processing(ts: distributed.scheduler.TaskState) None[源代码]

从工作者的处理中移除一个任务

remove_replica(ts: distributed.scheduler.TaskState) None[源代码]

该工作线程在内存中不再有任务

resources: dict[str, float]

此工作节点上的可用资源,例如 {"GPU": 2}。这些是抽象的数量,限制了某些任务不能同时在此工作节点上运行。

status: distributed.core.Status

只读的工作者状态,从远程工作者对象单向同步

used_resources: dict[str, float]

分配给此工作者的所有任务使用的每种资源的总量。此字典中的数字只能小于或等于此工作者的 resources

versions: dict[str, Any]

工作节点上 distributed.versions.get_versions() 的输出

class distributed.scheduler.ClientState(client: str, *, versions: dict[str, Any] | None = None)[源代码]

一个简单的对象,用于保存客户端的信息。

client_key: str

此客户端的唯一标识符。这通常是由客户端自身生成的一个不透明的字符串。

last_seen: float

我们上次从该客户端收到心跳的时间,以本地调度器时间为准。

versions: dict[str, Any]

客户端上 distributed.versions.get_versions() 的输出

wants_what: set[distributed.scheduler.TaskState]

客户端希望保留在内存中的一组任务,以便在需要时可以下载其结果。这是 TaskState.who_wants 的反向映射。当客户端空间中的相应对象(例如 Future 或 Dask 集合)被垃圾回收时,任务通常会从该集合中移除。

distributed.scheduler.decide_worker(ts: distributed.scheduler.TaskState, all_workers: set[distributed.scheduler.WorkerState], valid_workers: set[distributed.scheduler.WorkerState] | None, objective: collections.abc.Callable[[distributed.scheduler.WorkerState], Any]) distributed.scheduler.WorkerState | None[源代码]

决定哪个工作者应该执行任务 ts

我们选择拥有 ts 所依赖数据的 worker。

如果有多个工人有依赖关系,那么我们选择较不忙的工人。

可选地提供 valid_workers ,指定允许执行任务的工作者(如果所有工作者都可以接受任务,则传递 None)。

如果任务需要数据通信,因为没有符合条件的工人已经拥有所有依赖项,那么我们选择最小化工人之间发送的字节数。这是通过调用 objective 函数来确定的。

class distributed.scheduler.MemoryState(*, process: int, unmanaged_old: int, managed: int, spilled: int)[源代码]

在单个工作节点或整个集群上的内存读数。

参见 Worker 内存管理

属性 / 特性:

managed_total

工作进程在内存中持有的所有dask键的sizeof()输出总和,加上溢出到磁盘的字节数

管理

dask键在RAM中持有的sizeof()输出总和。请注意,这可能不准确,这可能导致未管理的内存不准确(见下文)。

溢出

dask键溢出到硬盘的字节数。请注意,这是磁盘上的大小;内存中的大小可能因压缩和sizeof()的不准确性而不同。换句话说,给定相同的键,’managed’ 将根据键是在内存中还是溢出而变化。

过程

操作系统在工作进程中测量的总RSS内存。这总是恰好等于托管 + 非托管。

未管理的

进程 - 受管理的。这是总和。

  • Python 解释器和模块

  • 全局变量

  • 由当前运行的dask任务临时分配的内存

  • 内存碎片化

  • 内存泄漏

  • 内存尚未进行垃圾回收

  • 尚未被Python内存管理器释放给操作系统的内存

unmanaged_old

过去 distributed.memory.recent-to-old-time 秒内 ‘unmanaged’ 措施的最小值

未管理的近期

unmanaged - unmanaged_old; 换句话说,这是最近分配但未被dask记录的进程内存;希望它大部分是一个暂时的峰值。

乐观的

managed + unmanaged_old;换句话说,在乐观假设下,进程长期持有的内存,假设所有 unmanaged_recent 内存是一个暂时的峰值