工作状态机

任务状态

当调度器要求工作者计算一个任务时,该任务通过一个 distributed.worker_state_machine.TaskState 对象在工作者的端被追踪 - 不要与匹配的调度器端类 distributed.scheduler.TaskState 混淆。

该类有一个关键属性,TaskState.state,它可以取以下值:

发布

已知但未主动计算或不在内存中。当调度器要求忘记它时,任务可以保持这种状态,但它在该工作节点上有依赖任务。

等待

调度器已将任务添加到工作队列。其所有依赖项都存储在集群的某个内存中,但并非所有依赖项都存储在当前工作者的内存中,因此需要获取它们。

获取

此任务存储在一个或多个对等工作节点上,但不在当前工作节点上。其数据已排队等待通过网络传输,可能是因为它是处于 waiting 状态的任务的依赖项,或者是因为 活动内存管理器 请求在此处复制它。可以在 WorkerState.data_needed 堆中找到该任务。

缺失

类似于 fetch,但调度器列出的所有对等工作者要么无法访问,要么已经回应他们实际上没有任务数据。工作者将定期询问调度器是否知道其他副本;当调度器知道时,任务将再次转换为 fetch。任务可以在 WorkerState.missing_dep_flight 集合中找到。

航班

任务数据目前正在从另一个工作节点通过网络传输。该任务可以在 WorkerState.in_flight_tasksWorkerState.in_flight_workers 集合中找到。

准备

任务已准备好进行计算;其所有依赖项都已在当前工作线程的内存中,并等待可用线程。该任务可以在 WorkerState.ready 堆中找到。

constrained

类似于 ready,但用户为此任务指定了 资源约束。该任务可以在 WorkerState.constrained 队列中找到。

执行

任务当前正在一个线程上计算。它可以在 WorkerState.executing 集合和 distributed.worker.Worker.active_threads 字典中找到。

长时间运行

类似于 executing ,但用户代码调用了 distributed.secede() ,因此该任务不再计入最大并发任务数。它可以在 WorkerState.long_running 集合和 distributed.worker.Worker.active_threads 字典中找到。

重新安排

刚刚的任务引发了 Reschedule 异常。这是一个暂时状态,不会永久存储。

已取消

调度器要求忘记这个任务,但从技术上讲,目前这是不可能的。请参阅 任务取消。该任务可以在其 previous 状态所在的任何集合中找到。

resumed

任务已从 cancelled 状态恢复。参见 任务取消。任务可以在其 previous 状态所在的任何集合中找到。

内存

任务执行完成,或者任务已成功从另一个工作者转移,现在保存在 WorkerState.dataWorkerState.actors 中。

错误

任务执行失败。或者,任务执行成功完成,或者任务数据在网络上成功传输,但序列化或反序列化失败。完整的异常和回溯信息存储在任务本身中,以便它们可以在客户端重新引发。

遗忘

调度器要求此工作节点忘记该任务,并且该工作节点上既没有依赖项也没有依赖关系。一旦任务达到此状态,它将立即从 WorkerState 中解除引用,并很快被垃圾回收。这是唯一一种在同一解释器中可以(暂时)存在两个具有相同 keyTaskState 对象实例的情况。

获取依赖项

依赖项的工作状态

当需要计算的任务到达Worker时,任何尚未在该Worker内存中的依赖项都会被 TaskState 对象包装,并包含一个Worker列表(TaskState.who_has)以从中收集结果。

这些 TaskState 对象的状态被设置为 fetch,被放入 data_needed 堆中,并逐步通过网络传输。对于每个依赖项,我们随机选择一个拥有该数据的工人,并从该工人处收集依赖项。为了提高带宽,我们机会性地收集其他已知在该工人上的其他任务的依赖项,最多收集50MB的数据(transfer_message_bytes_limit,从配置键 distributed.worker.transfer.message-bytes-limit 获取)——数据太少,带宽会受到影响;数据太多,响应性会受到影响。我们使用固定的50个连接(transfer_incoming_count_limit,从配置键 distributed.worker.connections.outgoing 获取),以避免过度碎片化我们的网络带宽。

如果两个工作节点之间的网络通信饱和,依赖任务可能会在 fetchflight 之间循环,直到成功收集。也可能发生对等工作节点响应不再拥有请求数据副本的情况;最后,对等工作节点可能无法访问或无响应。当这种情况发生时,对等节点从 who_has 中移除,任务转换回 fetch,以便工作节点尝试从不同的对等节点收集相同的键。如果由于此过程导致 who_has 变为空,任务转换为 missing,工作节点开始定期询问调度器是否有其他可用对等节点。

用于获取依赖项的相同系统也被 活动内存管理器 复制所使用。

备注

对于任何给定的对等工作者,最多只有一个 gather_dep() asyncio 任务在任何给定时间运行。如果所有持有处于 fetch 状态任务副本的工作者都已经在执行中,任务将保持在 fetch 状态,直到有工作者再次可用。

计算任务

需要计算的 TaskState 在 Worker 上通过以下管道进行处理。它定义了 run_spec,指示 Worker 如何执行它。

计算任务的工作者状态

在任务的所有依赖项都加载到内存后,任务从 waiting 状态转变为 readyconstrained 状态,并被添加到 ready 堆中。

一旦有线程可用,我们就从堆顶弹出一个任务,并将其放入本地线程池中的一个线程中执行。

可选地,在运行过程中,此任务可能会将自己标识为长时间运行的任务(参见 任务启动任务),此时它会从线程池中退出,并将状态更改为 long-runningexecutinglong-running 状态几乎相同,唯一的区别是后者不计入同一时间并行运行的最大任务数。

一个任务可以通过三种方式终止:

  • 成功完成;其返回值存储在 dataactors 中。

  • 引发异常;异常和回溯信息存储在 TaskState 对象上

  • 引发 Reschedule;它会被立即遗忘。

在所有情况下,结果都会发送回调度器。

分散的数据

分散数据 遵循一个更简单的路径,直接进入 memory

分散数据的工人状态

忘记任务

一旦任务进入 memoryerror 状态,Worker 将无限期地保留它,直到调度器明确要求 Worker 忘记它。这种情况发生在没有更多的客户端持有对该键的引用,并且没有更多的等待任务(即尚未计算的依赖项)时。此外,活动内存管理器 可能会要求删除任务的多余副本。

rescheduled 的情况下,任务将立即转换到 released 然后 forgotten,而无需等待调度器。

计算任务的工作者状态

不规则流

上述流程图有一些重要的例外情况:

  • 一个任务被 窃取 ,在这种情况下,它从 waitingreadyconstrained 直接转变为 released 。请注意,当前正在执行的任务的窃取请求将被拒绝。

  • 调度器干预,即调度器将之前分配给单独工作者的任务重新分配给新的工作者。这种情况最常见于 工作者在计算过程中死亡 时发生。

  • 客户端介入,其中客户端要么显式释放一个 Future 或将其降级;或者整个客户端可能关闭或变得无响应。当没有更多客户端持有某个键或其依赖项的引用时,调度器将释放它。

简而言之:

重要

一个任务可以从 任何 状态转变为 released ,而不仅仅是上述图表中的那些状态。

如果没有依赖项,任务将立即转换为 forgotten 并被取消范围。然而,有一个重要的例外,任务取消

任务取消

Worker 可能会收到一个请求,要求在当前处于 flightexecutinglong-running 状态时释放一个键。由于取消 Python 线程的技术限制,以及当前从对等 Worker 获取数据的方式,此类事件无法导致相关的 asyncio 任务(以及在 executing / long-running 情况下,运行用户代码的线程)被立即中止。相反,处于这三种状态的任务会转换到另一个状态,cancelled,这意味着 asyncio 任务将继续完成(结果无关紧要),然后* Dask 任务将被释放。

cancelled 状态有一个子状态,previous,它被设置为上述三种状态之一。这种状态的常见表示法是 <state>(<previous>),例如 cancelled(flight)

当一个任务被取消时,会发生以下三种情况之一:

  • 在异步任务完成之前,什么都不会发生;例如,调度器不会改变主意,仍然希望工作线程在最后时刻才忘记任务。当这种情况发生时,任务从 cancelled 转变为 released ,通常,随后是 forgotten

  • 调度器切换回其原始请求:

    • 调度器要求 Worker 获取一个当前为 cancelled(flight) 的任务;此时任务将立即恢复为 flight,忘记取消操作曾经发生,并继续等待已经运行的数据获取。

    • 调度器要求 Worker 计算一个当前状态为 cancelled(executing)cancelled(long-running) 的任务。Worker 将完全忽略新的 previous 状态,并等待正在执行的线程完成。

  • 调度器切换到相反的请求,从获取到计算,或者反过来。

为了满足这最后一种使用情况,存在另一种特殊状态,resumed。任务可以从 cancelled 状态独占地进入 resumed 状态。resumed 保留了 cancelled 状态中的 previous 属性,并添加了另一个属性,next,该属性始终为:

  • fetch,如果 previousexecutinglong-running

  • waiting,如果 previousflight

回顾一下,这些是处理已取消任务的所有可能的状态和子状态的排列:

状态

上一个

下一个

已取消

航班

已取消

执行

已取消

长时间运行

resumed

航班

等待

resumed

执行

获取

resumed

长时间运行

获取

如果一个 resumed 任务成功完成,它将转换到 memory (与 cancelled 任务相反,后者的输出被忽略),调度器将收到一个伪造的终止消息,即如果任务是 resumed(executing->fetch)resumed(long-running->fetch),则为 flight 的预期结束消息,如果任务是 resumed(flight->waiting),则为 execute 的预期结束消息。

如果任务失败或引发 Reschedule,Worker 将改为静默忽略异常并切换到其预定流程,因此 resumed(executing->fetch)resumed(long-running->fetch) 将转换为 fetch,而 resumed(flight->waiting) 将转换为 waiting

最后,调度器可以在任务的生命周期内多次改变主意,因此一个 resumed(executing->fetch)resumed(long-running->fetch) 任务可能会被要求再次转换到 waiting 状态,此时它将恢复到其 previous 状态并忘记整个事件;同样,一个 resumed(flight->waiting) 任务可能会被要求再次转换到 fetch 状态,因此它将直接转换到 flight 状态。

取消/恢复的工作者状态取消/恢复的工作者状态

一个常见的实际应用案例

  1. 集群中至少有两个工作节点,A 和 B。

  2. 任务 x 在工作者 A 上成功计算。

  3. 当任务 x 在工作者 A 上转移到内存时,调度器要求工作者 B 计算任务 y,任务 y 依赖于任务 x。

  4. B 开始从 A 获取密钥 x,这将任务置入 flight 模式。

  5. 工作者A崩溃了,无论出于什么原因,调度器在工作者B之前注意到了这一点。

  6. 调度器将释放任务 y(因为它正在等待的依赖项在内存中已无处可寻),并在集群的其他地方重新调度任务 x。任务 x 将在工作节点 A 上转换为 cancelled(flight)

  7. 如果调度器随机选择工作节点A来计算任务X,任务将转换为 resumed(flight->waiting)

  8. 当且仅当从 A 到 B 的 TCP 套接字崩溃(例如,由于超时)时,任务将转换为 waiting 状态,并最终在 A 上重新计算。

重要

对于任何一个给定的键,你总是最多有一个 compute()gather_dep() 的 asyncio 任务在运行;你永远不会同时有两个。

调度器和工作者之间的任务状态映射

任务在调度器和工作节点上的状态是不同的,它们之间的映射有些微妙:

调度器状态

典型的工作者状态

边缘案例工作者状态

  • 发布

  • 等待

  • 无工作者

  • (未知)

  • 发布

  • 已取消

  • 处理

  • 等待

  • 准备

  • constrained

  • 执行

  • 长时间运行

  • resumed(等待中)

  • 内存

  • 内存

  • 获取

  • 航班

  • 错误

  • 缺失

  • resumed(fetch)

  • 出错

  • 错误

除了上述状态外,工作者可能完全不知道某个特定任务。相反的情况是,工作者知道一个任务,但在调度器中找不到它,这种情况仅发生在 已取消的任务 的情况下。

还需要考虑 竞态条件 ,即某个工作者(或某些工作者)在调度器之前知道某些信息,或者反之亦然。例如,

  • 任务总是会从 executing 状态在工作者上过渡到 memory 状态,然后才能从调度器上的 processing 状态过渡到 memory 状态。

  • 任务总是首先在调度器上转换为 releasedforgotten 状态,只有当消息到达工作节点时,它才会被释放。

流程控制

工作状态机控制流

在工作者状态机中涉及几个类:

TaskState 包含与单个任务相关的所有信息;它还包括对依赖任务和被依赖任务的引用。这只是一个数据持有者,没有可变方法。请注意,这与 distributed.scheduler.TaskState 是不同的类。

WorkerState 封装了工作者的整体状态。它在 tasks 字典和其他几个次要集合中保存了对 TaskState 的引用。关键的是,这个类对 asyncio、网络、磁盘 I/O、线程等没有任何了解或可见性。请注意,这是与 distributed.scheduler.WorkerState 不同的类。

WorkerState 提供了一个单一的方法来改变状态:handle_stimulus()。状态不能以任何其他方式改变。该方法获取一个 StateMachineEvent,即所谓的 stimulus,这是一个数据类,它决定了发生了某些可能引起工作状态变化的事情。stimulus 可以来自调度器(例如,请求计算任务)或来自工作器本身(例如,任务已完成计算)。

WorkerState.handle_stimulus() 改变内部状态(例如,它可以将任务从 executing 转换为 memory)并返回一个 Instruction 对象列表,这些对象是工人需要采取但与状态本身无关的行动:

  • 向调度器发送消息

  • 计算任务

  • 从同行工作者处收集任务

WorkerState.handle_stimulus()BaseWorker.handle_stimulus() 包装,后者消费 Instruction 对象。BaseWorker 处理异步任务的创建、跟踪和清理,但不实际执行任务或收集;相反,它暴露了抽象的异步方法 execute()gather_dep(),这些方法随后被其子类 Worker 重写,后者实际运行任务并执行网络 I/O。当实现的方法完成时,它们必须返回一个 StateMachineEvent,该事件被反馈到 BaseWorker.handle_stimulus()

备注

这可以在工作者内部创建一个(可能非常长的)事件链;例如,如果 ready 队列中的任务数多于线程数,那么一个任务的终止 StateMachineEvent 将触发执行下一个任务的 Instruction

总结如下:

内部状态排列

在内部,WorkerState.handle_stimulus() 的工作方式与 调度器端的相同过程 非常相似:

  1. WorkerState.handle_stimulus() 调用 WorkerState._handle_<stimulus name>()

  2. 返回一个元组

    • 推荐 任务转换:{任务状态: <新状态>}

    • 列表 Instruction 对象

  3. WorkerState.handle_stimulus() 然后将建议传递给 WorkerState._transitions()

  4. 对于每个推荐,WorkerState._transitions() 调用 WorkerState._transition()

  5. 这反过来调用 WorkerState._transition_<start state>_<end state>()

  6. 这反过来返回一个额外的元组 (recommendations, instructions)

  7. 新的推荐被 WorkerState._transitions() 消耗,直到不再返回推荐。

  8. WorkerState.handle_stimulus() 最终返回指令列表,该列表已通过转换逐步扩展。

API 文档

class distributed.worker_state_machine.TaskState(key: Key, run_id: int = -1, run_spec: T_runspec | None = None, dependencies: set[TaskState] = <factory>, dependents: set[TaskState] = <factory>, waiting_for_data: set[TaskState] = <factory>, waiters: set[TaskState] = <factory>, state: TaskStateState = 'released', previous: Literal['executing', 'long-running', 'flight', None] = None, next: Literal['fetch', 'waiting', None] = None, duration: float | None = None, priority: tuple[int, ...] | None = None, who_has: set[str] = <factory>, coming_from: str | None = None, resource_restrictions: dict[str, float] = <factory>, exception: Serialize | None = None, traceback: Serialize | None = None, exception_text: str = '', traceback_text: str = '', type: type | None = None, suspicious_count: int = 0, startstops: list[StartStop] = <factory>, start_time: float | None = None, stop_time: float | None = None, metadata: dict = <factory>, nbytes: int | None = None, annotations: dict | None = None, span_id: str | None = None, done: bool = False)[源代码]

保存与单个 Dask 任务相关的易失性状态。

不要与 distributed.scheduler.TaskState 混淆,后者在调度器端持有类似的信息。

annotations: dict | None

任意任务注释

coming_from: str | None

如果任务正在进行中,当前任务数据来自的工作者

dependencies: set[TaskState]

此键运行所需的数据

dependents: set[TaskState]

使用此依赖的键

done: bool

如果服务此任务的 execute()gather_dep() 协程已完成,则为 True;否则为 False。此标志改变了从 executingflight 等状态过渡的行为。

duration: float | None

任务预期持续时间

exception: Serialize | None

如果任务出错,运行任务引发的异常(序列化)

exception_text: str

异常的字符串表示

key: Key

任务键。必填。

metadata: dict

与任务相关的元数据。存储的元数据应为 msgpack 可序列化(例如 int、string、list、dict)。

nbytes: int | None

任务值的大小,如果是在内存中

next: Literal['fetch', 'waiting', None]

任务的下一个状态。当且仅当 state == resumed 时,它不是 None。

prefix: str

任务前缀(键的最左侧部分)

previous: Literal['executing', 'long-running', 'flight', None]

任务的先前状态。当 state 为 (cancelled, resumed) 时,它不为 None。

priority: tuple[int, ...] | None

调度器赋予此任务的优先级。决定运行顺序。

resource_restrictions: dict[str, float]

运行任务所需的抽象资源

run_id: int

任务运行 ID。

run_spec: T_runspec | None

包含与此 TaskState 实例关联的 functionargskwargstask 的元组。默认为 None,如果这是一个将从另一个工作节点接收的依赖项,则可以保持为空。

span_id: str | None

唯一的跨度ID(参见 distributed.spans)。匹配 distributed.scheduler.TaskState.group.span_id

start_time: float | None

任务开始运行的时间

startstops: list[StartStop]

任务的传输、加载和计算时间的日志

state: TaskStateState

任务的当前状态

stop_time: float | None

任务运行完成的时间

suspicious_count: int

依赖项未出现在我们预期位置的次数

traceback: Serialize | None

如果任务出错,运行任务导致的回溯(序列化)

traceback_text: str

回溯的字符串表示

type: type | None

特定数据片段的类型

waiters: set[TaskState]

不在内存中的依赖项子集

waiting_for_data: set[TaskState]

不在内存中的依赖子集

who_has: set[str]

我们认为拥有此数据的工人的地址

class distributed.worker_state_machine.WorkerState(*, nthreads: int = 1, address: str | None = None, data: MutableMapping[Key, object] | None = None, threads: dict[Key, int] | None = None, plugins: dict[str, WorkerPlugin] | None = None, resources: Mapping[str, float] | None = None, transfer_incoming_count_limit: int = 9999, validate: bool = True, transition_counter_max: int | Literal[False] = False, transfer_incoming_bytes_limit: float = inf, transfer_message_bytes_limit: float = inf)[源代码]

状态机封装了工作线程上所有任务的生命周期。

不要与 distributed.scheduler.WorkerState 混淆。

备注

此类数据属性的实现细节可能会在没有弃用周期的情况下更改。

警告

这个类的属性之间都有很强的相关性。不要*直接修改它们,*永远不要,因为这样非常容易进入一个损坏的状态,这反过来很可能会导致集群范围内的死锁。

状态应仅通过 handle_stimulus() 进行修改。

actors: dict[Key, object]

演员任务。参见 演员

address: str

<端口>。这在状态机进行决策时使用,例如确定对等工作程序是否在同一主机上运行。此属性在初始化 WorkerState 时可能未知。在第一次调用 handle_stimulus() 之前,必须 设置它。

类型

Worker <IP 地址>

property all_running_tasks: set[distributed.worker_state_machine.TaskState]

所有当前占用线程的任务。它们可能计入也可能不计入最大线程数。

这些是:

  • ts.status 在 (执行中, 长时间运行)

  • ts.status 在 (cancelled, resumed) 中,且 ts.previous 在 (executing, long-running) 中

available_resources: dict[str, float]

{资源名称: 数量}。当前未被任务执行消耗的资源。总是小于或等于 total_resources。参见 工作资源

busy_workers: set[str]

最近返回忙碌状态的对等工作节点。此集合中的工作节点在一段时间内不会被要求提供额外的依赖项。

constrained: HeapSet[TaskState]

准备运行的任务的优先级堆,但正在等待抽象资源,如GPU。与 ready 互斥。参见 available_resources工作资源

data: MutableMapping[Key, object]

内存中的任务数据。这个集合通过引用在 WorkerWorkerMemoryManager 和这个类之间共享。

data_needed: defaultdict[str, HeapSet[TaskState]]

仍然需要数据才能执行并且在至少另一个工作者的内存中的任务,按照每个工作者的堆栈优先级排序。所有且仅包含 TaskState.state == 'fetch' 的任务在此集合中。一个具有多个 who_has 条目的 TaskState 将在此处出现多次。

executed_count: int

该工作者在其生命周期内执行的任务数量;这包括失败和取消的任务。另见 executing_count()

executing: set[TaskState]

当前正在运行的任务集。

这个集合包括仅限于 state == ‘executing’ 的任务,以及 state 在 (‘cancelled’, ‘resumed’) 中且 previous == ‘executing’ 的任务。

另请参阅 executing_count()long_running

property executing_count: int

当前在此工作线程上执行的任务数,并计入最大线程数。

它包括已取消的任务,但不包括长时间运行的(即脱离的)任务。

fetch_count: int

处于获取状态的任务总数。如果一个任务在多个 data_needed 堆中,它只被计算一次。

generation: int

每次调度程序调用 compute-task 处理程序时递减的计数器。它附加到 TaskState.priority 上,并在调度程序上具有相同优先级的任务之间充当决胜因素,决定它们之间的后进先出顺序。

handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) list[distributed.worker_state_machine.Instruction][源代码]

处理一个或多个外部事件,将相关任务转换到新状态,并返回一个要执行的指令列表作为结果。

has_what: defaultdict[str, set[Key]]

{worker address: {ts.key, ...}。我们关心的数据,我们认为一个worker拥有这些数据

in_flight_tasks: set[TaskState]

当前点对点连接中向我们传来的任务。

这组任务包括仅限于 state == ‘flight’ 的任务,以及 state 在 (‘cancelled’, ‘resumed’) 中且 previous == ‘flight’ 的任务。

另请参阅 in_flight_tasks_count()

property in_flight_tasks_count: int

当前从其他工作者复制到此工作者的任务数量。

in_flight_workers: dict[str, set[Key]]

{worker address: {ts.key, ...}} 我们当前正在从中收集数据的worker以及我们期望从这些连接中获得的依赖项。在此字典中的worker在当前查询返回之前不会被要求提供额外的依赖项。

log: deque[tuple]

[(..., stimulus_id: str | None, timestamp: float), ...] 记录的刺激数量是有限制的。另请参见 story()stimulus_log

类型

过渡日志

long_running: set[TaskState]

当前正在运行并调用了 secede() 的任务集合,因此它们不再计入并发任务的最大数量(nthreads)。这些任务不会出现在 executing 集合中。

此集合仅包括 state == ‘long-running’ 的任务,以及 state 为 (‘cancelled’, ‘resumed’) 且 previous == ‘long-running’ 的任务。

missing_dep_flight: set[TaskState]

所有且仅限于 TaskState.state == 'missing' 的任务。

nbytes: int

内存中所有任务的总大小

nthreads: int

可以并行执行的任务数量。在任何给定时间,executing_count() <= nthreads。

plugins: dict[str, WorkerPlugin]

{name: worker plugin}。这个集合是通过引用在 Worker 和这个类之间共享的。Worker 负责添加和移除插件,而 WorkerState 调用 WorkerPlugin.transition 方法,是可用的。

ready: HeapSet[TaskState]

准备运行且无资源约束的任务优先堆。与 constrained 互斥。

rng: random.Random

静态种子随机状态,用于在需要伪随机选择时保证确定性。

running: bool

如果状态机应在有可用槽时开始执行更多任务并获取依赖项,则为True。此属性必须与Worker保持一致:WorkerState.running == (Worker.status is Status.running)

stimulus_log: deque[StateMachineEvent]

handle_stimulus() 接收的所有刺激的日志。记录的事件数量是有限的。另请参见 logstimulus_story()

stimulus_story(*keys_or_tasks: Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[distributed.worker_state_machine.StateMachineEvent][源代码]

返回涉及一个或多个任务的所有状态机事件

story(*keys_or_tasks_or_stimuli: Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[tuple][源代码]

返回涉及一个或多个任务或刺激ID的所有转换日志记录

task_counter: TaskCounter

当前每个状态的任务数量和累计耗时,按 前缀 细分

tasks: dict[Key, TaskState]

{key: TaskState}。当前在此工作节点上执行的任务(以及这些任务的任何依赖项)

threads: dict[Key, int]

{ts.key: 线程 ID}。这个集合在 Worker 和这个类之间通过引用共享。虽然 WorkerState 是线程无关的,但在某些情况下仍然需要访问这些信息。这个集合由 distributed.worker.Worker.execute() 填充。它不需要被填充也能使 WorkerState 工作。

total_resources: dict[str, float]

{资源名称: 数量}。任务执行可用的总资源。参见 :doc: 资源

transfer_incoming_bytes: int

从其他工作节点当前打开的数据传输总大小

transfer_incoming_bytes_limit: float

传入数据传输的字节限制;用于限流。

transfer_incoming_bytes_throttle_threshold: int

只要 transfer_incoming_bytes 小于此值,忽略 transfer_incoming_count_limit

property transfer_incoming_count: int

当前从其他工作者打开的数据传输数量。

transfer_incoming_count_limit: int

其他工作节点并发传入数据传输的最大数量。另请参见 distributed.worker.Worker.transfer_outgoing_count_limit

transfer_incoming_count_total: int

自工作进程启动以来,从其他工作进程传输的数据总数。

transfer_message_bytes_limit: float

在一次调用 BaseWorker.gather_dep() 时,从同一工作者收集的字节数。多个可以从同一工作者收集的小任务将被批处理为单个指令,只要它们的总大小不超过此值。如果第一个要收集的任务超过此限制,它仍将被收集以确保进度。因此,此限制不是绝对的。

transition_counter: int

到目前为止的状态转换总数。另请参见 logtransition_counter_max

transition_counter_max: int | Literal[False]

如果 transition_counter 达到此值,则引发错误。这仅用于调试,以捕获无限递归循环。在生产环境中,应始终设置为 False。

validate: bool

如果为真,启用昂贵的内部一致性检查。通常在生产环境中禁用。

waiting: set[TaskState]

当前正在等待数据的任务

class distributed.worker_state_machine.BaseWorker(state: distributed.worker_state_machine.WorkerState)[源代码]

围绕 WorkerState 的包装器,实现了指令处理。这是一个包含多个 @abc.abstractmethod 方法的抽象类,由 Worker 和单元测试模拟类继承。

abstract batched_send(msg: dict[str, Any]) None[源代码]

通过批量通信向调度器发送一个“发送即忘”的消息。

参数
msg: dict

要发送给调度器的 msgpack 可序列化消息。必须有一个 ‘op’ 键,该键在 Scheduler.stream_handlers 中注册。

async close(timeout: float = 30) None[源代码]

取消所有异步指令

abstract digest_metric(name: collections.abc.Hashable, value: float) None[源代码]

记录一个任意的数值指标

abstract async execute(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[源代码]

执行任务

abstract async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[源代码]

从拥有任务依赖项的工人那里收集依赖项

参数
工人str

从工人处收集依赖项的地址

to_gather列表

从工作节点收集的依赖项的键 – 这不一定等同于 dep 的完整依赖项列表,因为某些依赖项可能已经存在于该工作节点上。

total_nbytes整数

to_gather 中所有依赖项的总字节数

handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None[源代码]

将一个或多个外部刺激转发到 WorkerState.handle_stimulus() 并处理返回的指令,调用相关的 Worker 回调(以下 @abc.abstractmethod 方法)。

为所有异步指令生成 asyncio 任务并开始跟踪它们。

abstract async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent[源代码]

等待一段时间,然后将一个对等工作者从忙碌状态中移除

class distributed.worker_state_machine.StateMachineEvent(*args: Any, **kwargs: Any)[源代码]

所有可以修改工人状态的刺激的基础抽象类

static from_dict(d: dict) distributed.worker_state_machine.StateMachineEvent[源代码]

recursive_to_dict 的输出转换回原始对象。输出对象对于重建状态机是有意义的,但不一定与原始对象完全相同。

stimulus_id: str

事件的唯一ID

to_loggable(*, handled: float) distributed.worker_state_machine.StateMachineEvent[源代码]

生成一个变体版本的自身,该版本足够小,可以在中期内存中存储,并且包含用于调试的有意义信息。

class distributed.worker_state_machine.Instruction(stimulus_id: str)[源代码]

从工作状态机到工作者的命令,以响应事件

classmethod match(**kwargs: Any) distributed.worker_state_machine._InstructionMatch[源代码]

生成一个部分匹配以与指令实例进行比较。典型的用法是比较由 WorkerState.handle_stimulus() 返回的指令列表或 WorkerState.stimulus_log 中的指令列表与预期的匹配列表。

示例

instructions = ws.handle_stimulus(...)
assert instructions == [
    TaskFinishedMsg.match(key="x"),
    ...
]

备注

StateMachineEventInstruction 是抽象类,为了简洁起见,这里没有列出它们的许多子类。请参考实现模块 distributed.worker_state_machine 以获取完整列表。