工作状态机
内容
工作状态机¶
任务状态¶
当调度器要求工作者计算一个任务时,该任务通过一个 distributed.worker_state_machine.TaskState
对象在工作者的端被追踪 - 不要与匹配的调度器端类 distributed.scheduler.TaskState
混淆。
该类有一个关键属性,TaskState.state
,它可以取以下值:
- 发布
已知但未主动计算或不在内存中。当调度器要求忘记它时,任务可以保持这种状态,但它在该工作节点上有依赖任务。
- 等待
调度器已将任务添加到工作队列。其所有依赖项都存储在集群的某个内存中,但并非所有依赖项都存储在当前工作者的内存中,因此需要获取它们。
- 获取
此任务存储在一个或多个对等工作节点上,但不在当前工作节点上。其数据已排队等待通过网络传输,可能是因为它是处于
waiting
状态的任务的依赖项,或者是因为 活动内存管理器 请求在此处复制它。可以在WorkerState.data_needed
堆中找到该任务。- 缺失
类似于
fetch
,但调度器列出的所有对等工作者要么无法访问,要么已经回应他们实际上没有任务数据。工作者将定期询问调度器是否知道其他副本;当调度器知道时,任务将再次转换为fetch
。任务可以在WorkerState.missing_dep_flight
集合中找到。- 航班
任务数据目前正在从另一个工作节点通过网络传输。该任务可以在
WorkerState.in_flight_tasks
和WorkerState.in_flight_workers
集合中找到。- 准备
任务已准备好进行计算;其所有依赖项都已在当前工作线程的内存中,并等待可用线程。该任务可以在
WorkerState.ready
堆中找到。- constrained
类似于
ready
,但用户为此任务指定了 资源约束。该任务可以在WorkerState.constrained
队列中找到。- 执行
任务当前正在一个线程上计算。它可以在
WorkerState.executing
集合和distributed.worker.Worker.active_threads
字典中找到。- 长时间运行
类似于
executing
,但用户代码调用了distributed.secede()
,因此该任务不再计入最大并发任务数。它可以在WorkerState.long_running
集合和distributed.worker.Worker.active_threads
字典中找到。- 重新安排
刚刚的任务引发了
Reschedule
异常。这是一个暂时状态,不会永久存储。- 已取消
调度器要求忘记这个任务,但从技术上讲,目前这是不可能的。请参阅 任务取消。该任务可以在其
previous
状态所在的任何集合中找到。- resumed
- 内存
任务执行完成,或者任务已成功从另一个工作者转移,现在保存在
WorkerState.data
或WorkerState.actors
中。- 错误
任务执行失败。或者,任务执行成功完成,或者任务数据在网络上成功传输,但序列化或反序列化失败。完整的异常和回溯信息存储在任务本身中,以便它们可以在客户端重新引发。
- 遗忘
调度器要求此工作节点忘记该任务,并且该工作节点上既没有依赖项也没有依赖关系。一旦任务达到此状态,它将立即从
WorkerState
中解除引用,并很快被垃圾回收。这是唯一一种在同一解释器中可以(暂时)存在两个具有相同key
的TaskState
对象实例的情况。
获取依赖项¶
当需要计算的任务到达Worker时,任何尚未在该Worker内存中的依赖项都会被 TaskState
对象包装,并包含一个Worker列表(TaskState.who_has
)以从中收集结果。
这些 TaskState
对象的状态被设置为 fetch
,被放入 data_needed
堆中,并逐步通过网络传输。对于每个依赖项,我们随机选择一个拥有该数据的工人,并从该工人处收集依赖项。为了提高带宽,我们机会性地收集其他已知在该工人上的其他任务的依赖项,最多收集50MB的数据(transfer_message_bytes_limit
,从配置键 distributed.worker.transfer.message-bytes-limit
获取)——数据太少,带宽会受到影响;数据太多,响应性会受到影响。我们使用固定的50个连接(transfer_incoming_count_limit
,从配置键 distributed.worker.connections.outgoing
获取),以避免过度碎片化我们的网络带宽。
如果两个工作节点之间的网络通信饱和,依赖任务可能会在 fetch
和 flight
之间循环,直到成功收集。也可能发生对等工作节点响应不再拥有请求数据副本的情况;最后,对等工作节点可能无法访问或无响应。当这种情况发生时,对等节点从 who_has
中移除,任务转换回 fetch
,以便工作节点尝试从不同的对等节点收集相同的键。如果由于此过程导致 who_has
变为空,任务转换为 missing
,工作节点开始定期询问调度器是否有其他可用对等节点。
用于获取依赖项的相同系统也被 活动内存管理器 复制所使用。
备注
对于任何给定的对等工作者,最多只有一个 gather_dep()
asyncio 任务在任何给定时间运行。如果所有持有处于 fetch
状态任务副本的工作者都已经在执行中,任务将保持在 fetch
状态,直到有工作者再次可用。
计算任务¶
需要计算的 TaskState
在 Worker 上通过以下管道进行处理。它定义了 run_spec
,指示 Worker 如何执行它。
在任务的所有依赖项都加载到内存后,任务从 waiting
状态转变为 ready
或 constrained
状态,并被添加到 ready
堆中。
一旦有线程可用,我们就从堆顶弹出一个任务,并将其放入本地线程池中的一个线程中执行。
可选地,在运行过程中,此任务可能会将自己标识为长时间运行的任务(参见 任务启动任务),此时它会从线程池中退出,并将状态更改为 long-running。executing
和 long-running
状态几乎相同,唯一的区别是后者不计入同一时间并行运行的最大任务数。
一个任务可以通过三种方式终止:
引发异常;异常和回溯信息存储在
TaskState
对象上引发
Reschedule
;它会被立即遗忘。
在所有情况下,结果都会发送回调度器。
忘记任务¶
一旦任务进入 memory
或 error
状态,Worker 将无限期地保留它,直到调度器明确要求 Worker 忘记它。这种情况发生在没有更多的客户端持有对该键的引用,并且没有更多的等待任务(即尚未计算的依赖项)时。此外,活动内存管理器 可能会要求删除任务的多余副本。
在 rescheduled
的情况下,任务将立即转换到 released
然后 forgotten
,而无需等待调度器。
不规则流¶
上述流程图有一些重要的例外情况:
一个任务被 窃取 ,在这种情况下,它从
waiting
、ready
或constrained
直接转变为released
。请注意,当前正在执行的任务的窃取请求将被拒绝。调度器干预,即调度器将之前分配给单独工作者的任务重新分配给新的工作者。这种情况最常见于 工作者在计算过程中死亡 时发生。
客户端介入,其中客户端要么显式释放一个 Future 或将其降级;或者整个客户端可能关闭或变得无响应。当没有更多客户端持有某个键或其依赖项的引用时,调度器将释放它。
简而言之:
重要
一个任务可以从 任何 状态转变为 released
,而不仅仅是上述图表中的那些状态。
如果没有依赖项,任务将立即转换为 forgotten
并被取消范围。然而,有一个重要的例外,任务取消。
任务取消¶
Worker 可能会收到一个请求,要求在当前处于 flight
、executing
或 long-running
状态时释放一个键。由于取消 Python 线程的技术限制,以及当前从对等 Worker 获取数据的方式,此类事件无法导致相关的 asyncio 任务(以及在 executing
/ long-running
情况下,运行用户代码的线程)被立即中止。相反,处于这三种状态的任务会转换到另一个状态,cancelled
,这意味着 asyncio 任务将继续完成(结果无关紧要),然后* Dask 任务将被释放。
cancelled
状态有一个子状态,previous
,它被设置为上述三种状态之一。这种状态的常见表示法是 <state>(<previous>)
,例如 cancelled(flight)
。
当一个任务被取消时,会发生以下三种情况之一:
在异步任务完成之前,什么都不会发生;例如,调度器不会改变主意,仍然希望工作线程在最后时刻才忘记任务。当这种情况发生时,任务从
cancelled
转变为released
,通常,随后是forgotten
。调度器切换回其原始请求:
调度器要求 Worker 获取一个当前为
cancelled(flight)
的任务;此时任务将立即恢复为flight
,忘记取消操作曾经发生,并继续等待已经运行的数据获取。调度器要求 Worker 计算一个当前状态为
cancelled(executing)
或cancelled(long-running)
的任务。Worker 将完全忽略新的previous
状态,并等待正在执行的线程完成。
调度器切换到相反的请求,从获取到计算,或者反过来。
为了满足这最后一种使用情况,存在另一种特殊状态,resumed
。任务可以从 cancelled
状态独占地进入 resumed
状态。resumed
保留了 cancelled
状态中的 previous
属性,并添加了另一个属性,next
,该属性始终为:
回顾一下,这些是处理已取消任务的所有可能的状态和子状态的排列:
状态 |
上一个 |
下一个 |
---|---|---|
已取消 |
航班 |
无 |
已取消 |
执行 |
无 |
已取消 |
长时间运行 |
无 |
resumed |
航班 |
等待 |
resumed |
执行 |
获取 |
resumed |
长时间运行 |
获取 |
如果一个 resumed
任务成功完成,它将转换到 memory
(与 cancelled
任务相反,后者的输出被忽略),调度器将收到一个伪造的终止消息,即如果任务是 resumed(executing->fetch)
或 resumed(long-running->fetch)
,则为 flight
的预期结束消息,如果任务是 resumed(flight->waiting)
,则为 execute
的预期结束消息。
如果任务失败或引发 Reschedule
,Worker 将改为静默忽略异常并切换到其预定流程,因此 resumed(executing->fetch)
或 resumed(long-running->fetch)
将转换为 fetch
,而 resumed(flight->waiting)
将转换为 waiting
。
最后,调度器可以在任务的生命周期内多次改变主意,因此一个 resumed(executing->fetch)
或 resumed(long-running->fetch)
任务可能会被要求再次转换到 waiting
状态,此时它将恢复到其 previous
状态并忘记整个事件;同样,一个 resumed(flight->waiting)
任务可能会被要求再次转换到 fetch
状态,因此它将直接转换到 flight
状态。
一个常见的实际应用案例
集群中至少有两个工作节点,A 和 B。
任务 x 在工作者 A 上成功计算。
当任务 x 在工作者 A 上转移到内存时,调度器要求工作者 B 计算任务 y,任务 y 依赖于任务 x。
B 开始从 A 获取密钥 x,这将任务置入
flight
模式。工作者A崩溃了,无论出于什么原因,调度器在工作者B之前注意到了这一点。
调度器将释放任务 y(因为它正在等待的依赖项在内存中已无处可寻),并在集群的其他地方重新调度任务 x。任务 x 将在工作节点 A 上转换为
cancelled(flight)
。如果调度器随机选择工作节点A来计算任务X,任务将转换为
resumed(flight->waiting)
。当且仅当从 A 到 B 的 TCP 套接字崩溃(例如,由于超时)时,任务将转换为
waiting
状态,并最终在 A 上重新计算。
重要
对于任何一个给定的键,你总是最多有一个 compute()
或 gather_dep()
的 asyncio 任务在运行;你永远不会同时有两个。
调度器和工作者之间的任务状态映射¶
任务在调度器和工作节点上的状态是不同的,它们之间的映射有些微妙:
调度器状态 |
典型的工作者状态 |
边缘案例工作者状态 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
除了上述状态外,工作者可能完全不知道某个特定任务。相反的情况是,工作者知道一个任务,但在调度器中找不到它,这种情况仅发生在 已取消的任务 的情况下。
还需要考虑 竞态条件 ,即某个工作者(或某些工作者)在调度器之前知道某些信息,或者反之亦然。例如,
任务总是会从
executing
状态在工作者上过渡到memory
状态,然后才能从调度器上的processing
状态过渡到memory
状态。任务总是首先在调度器上转换为
released
或forgotten
状态,只有当消息到达工作节点时,它才会被释放。
流程控制¶
在工作者状态机中涉及几个类:
TaskState
包含与单个任务相关的所有信息;它还包括对依赖任务和被依赖任务的引用。这只是一个数据持有者,没有可变方法。请注意,这与 distributed.scheduler.TaskState
是不同的类。
WorkerState
封装了工作者的整体状态。它在 tasks
字典和其他几个次要集合中保存了对 TaskState
的引用。关键的是,这个类对 asyncio、网络、磁盘 I/O、线程等没有任何了解或可见性。请注意,这是与 distributed.scheduler.WorkerState
不同的类。
WorkerState
提供了一个单一的方法来改变状态:handle_stimulus()
。状态不能以任何其他方式改变。该方法获取一个 StateMachineEvent
,即所谓的 stimulus,这是一个数据类,它决定了发生了某些可能引起工作状态变化的事情。stimulus 可以来自调度器(例如,请求计算任务)或来自工作器本身(例如,任务已完成计算)。
WorkerState.handle_stimulus()
改变内部状态(例如,它可以将任务从 executing
转换为 memory
)并返回一个 Instruction
对象列表,这些对象是工人需要采取但与状态本身无关的行动:
向调度器发送消息
计算任务
从同行工作者处收集任务
WorkerState.handle_stimulus()
被 BaseWorker.handle_stimulus()
包装,后者消费 Instruction
对象。BaseWorker
处理异步任务的创建、跟踪和清理,但不实际执行任务或收集;相反,它暴露了抽象的异步方法 execute()
和 gather_dep()
,这些方法随后被其子类 Worker
重写,后者实际运行任务并执行网络 I/O。当实现的方法完成时,它们必须返回一个 StateMachineEvent
,该事件被反馈到 BaseWorker.handle_stimulus()
。
备注
这可以在工作者内部创建一个(可能非常长的)事件链;例如,如果 ready
队列中的任务数多于线程数,那么一个任务的终止 StateMachineEvent
将触发执行下一个任务的 Instruction
。
总结如下:
WorkerState
对 asyncio、网络、线程和磁盘 I/O 是不可知的;它包含TaskState
对象的集合。BaseWorker
封装了WorkerState
并增加了对 asyncio 的感知Worker
是BaseWorker
的子类,并增加了对网络、线程和磁盘 I/O 的感知。
内部状态排列¶
在内部,WorkerState.handle_stimulus()
的工作方式与 调度器端的相同过程 非常相似:
WorkerState.handle_stimulus()
调用WorkerState._handle_<stimulus name>()
,返回一个元组
推荐 任务转换:{
任务状态
: <新状态>}列表
Instruction
对象
WorkerState.handle_stimulus()
然后将建议传递给WorkerState._transitions()
对于每个推荐,
WorkerState._transitions()
调用WorkerState._transition()
。这反过来调用
WorkerState._transition_<start state>_<end state>()
,这反过来返回一个额外的元组 (recommendations, instructions)
新的推荐被
WorkerState._transitions()
消耗,直到不再返回推荐。WorkerState.handle_stimulus()
最终返回指令列表,该列表已通过转换逐步扩展。
API 文档¶
- class distributed.worker_state_machine.TaskState(key: Key, run_id: int = -1, run_spec: T_runspec | None = None, dependencies: set[TaskState] = <factory>, dependents: set[TaskState] = <factory>, waiting_for_data: set[TaskState] = <factory>, waiters: set[TaskState] = <factory>, state: TaskStateState = 'released', previous: Literal['executing', 'long-running', 'flight', None] = None, next: Literal['fetch', 'waiting', None] = None, duration: float | None = None, priority: tuple[int, ...] | None = None, who_has: set[str] = <factory>, coming_from: str | None = None, resource_restrictions: dict[str, float] = <factory>, exception: Serialize | None = None, traceback: Serialize | None = None, exception_text: str = '', traceback_text: str = '', type: type | None = None, suspicious_count: int = 0, startstops: list[StartStop] = <factory>, start_time: float | None = None, stop_time: float | None = None, metadata: dict = <factory>, nbytes: int | None = None, annotations: dict | None = None, span_id: str | None = None, done: bool = False)[源代码]¶
保存与单个 Dask 任务相关的易失性状态。
不要与
distributed.scheduler.TaskState
混淆,后者在调度器端持有类似的信息。- done: bool¶
如果服务此任务的
execute()
或gather_dep()
协程已完成,则为 True;否则为 False。此标志改变了从executing
、flight
等状态过渡的行为。
- key: Key¶
任务键。必填。
- previous: Literal['executing', 'long-running', 'flight', None]¶
任务的先前状态。当
state
为 (cancelled, resumed) 时,它不为 None。
- run_spec: T_runspec | None¶
包含与此 TaskState 实例关联的
function
、args
、kwargs
和task
的元组。默认为None
,如果这是一个将从另一个工作节点接收的依赖项,则可以保持为空。
- span_id: str | None¶
唯一的跨度ID(参见
distributed.spans
)。匹配distributed.scheduler.TaskState.group.span_id
。
- state: TaskStateState¶
任务的当前状态
- class distributed.worker_state_machine.WorkerState(*, nthreads: int = 1, address: str | None = None, data: MutableMapping[Key, object] | None = None, threads: dict[Key, int] | None = None, plugins: dict[str, WorkerPlugin] | None = None, resources: Mapping[str, float] | None = None, transfer_incoming_count_limit: int = 9999, validate: bool = True, transition_counter_max: int | Literal[False] = False, transfer_incoming_bytes_limit: float = inf, transfer_message_bytes_limit: float = inf)[源代码]¶
状态机封装了工作线程上所有任务的生命周期。
不要与
distributed.scheduler.WorkerState
混淆。备注
此类数据属性的实现细节可能会在没有弃用周期的情况下更改。
警告
这个类的属性之间都有很强的相关性。不要*直接修改它们,*永远不要,因为这样非常容易进入一个损坏的状态,这反过来很可能会导致集群范围内的死锁。
状态应仅通过
handle_stimulus()
进行修改。- address: str¶
<端口>。这在状态机进行决策时使用,例如确定对等工作程序是否在同一主机上运行。此属性在初始化 WorkerState 时可能未知。在第一次调用
handle_stimulus()
之前,必须 设置它。- 类型
Worker <IP 地址>
- property all_running_tasks: set[distributed.worker_state_machine.TaskState]¶
所有当前占用线程的任务。它们可能计入也可能不计入最大线程数。
这些是:
ts.status 在 (执行中, 长时间运行)
ts.status 在 (cancelled, resumed) 中,且 ts.previous 在 (executing, long-running) 中
- constrained: HeapSet[TaskState]¶
准备运行的任务的优先级堆,但正在等待抽象资源,如GPU。与
ready
互斥。参见available_resources
和 工作资源。
- data_needed: defaultdict[str, HeapSet[TaskState]]¶
仍然需要数据才能执行并且在至少另一个工作者的内存中的任务,按照每个工作者的堆栈优先级排序。所有且仅包含
TaskState.state == 'fetch'
的任务在此集合中。一个具有多个who_has
条目的TaskState
将在此处出现多次。
- executed_count: int¶
该工作者在其生命周期内执行的任务数量;这包括失败和取消的任务。另见
executing_count()
。
- executing: set[TaskState]¶
当前正在运行的任务集。
这个集合包括仅限于
state
== ‘executing’ 的任务,以及state
在 (‘cancelled’, ‘resumed’) 中且previous
== ‘executing’ 的任务。另请参阅
executing_count()
和long_running
。
- generation: int¶
每次调度程序调用 compute-task 处理程序时递减的计数器。它附加到
TaskState.priority
上,并在调度程序上具有相同优先级的任务之间充当决胜因素,决定它们之间的后进先出顺序。
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) list[distributed.worker_state_machine.Instruction] [源代码]¶
处理一个或多个外部事件,将相关任务转换到新状态,并返回一个要执行的指令列表作为结果。
- in_flight_tasks: set[TaskState]¶
当前点对点连接中向我们传来的任务。
这组任务包括仅限于
state
== ‘flight’ 的任务,以及state
在 (‘cancelled’, ‘resumed’) 中且previous
== ‘flight’ 的任务。另请参阅
in_flight_tasks_count()
。
- in_flight_workers: dict[str, set[Key]]¶
{worker address: {ts.key, ...}}
我们当前正在从中收集数据的worker以及我们期望从这些连接中获得的依赖项。在此字典中的worker在当前查询返回之前不会被要求提供额外的依赖项。
- log: deque[tuple]¶
[(..., stimulus_id: str | None, timestamp: float), ...]
记录的刺激数量是有限制的。另请参见story()
和stimulus_log
。- 类型
过渡日志
- long_running: set[TaskState]¶
当前正在运行并调用了
secede()
的任务集合,因此它们不再计入并发任务的最大数量(nthreads)。这些任务不会出现在executing
集合中。此集合仅包括
state
== ‘long-running’ 的任务,以及state
为 (‘cancelled’, ‘resumed’) 且previous
== ‘long-running’ 的任务。
- nthreads: int¶
可以并行执行的任务数量。在任何给定时间,
executing_count()
<= nthreads。
- plugins: dict[str, WorkerPlugin]¶
{name: worker plugin}
。这个集合是通过引用在Worker
和这个类之间共享的。Worker 负责添加和移除插件,而 WorkerState 调用WorkerPlugin.transition
方法,是可用的。
- ready: HeapSet[TaskState]¶
准备运行且无资源约束的任务优先堆。与
constrained
互斥。
- rng: random.Random¶
静态种子随机状态,用于在需要伪随机选择时保证确定性。
- running: bool¶
如果状态机应在有可用槽时开始执行更多任务并获取依赖项,则为True。此属性必须与Worker保持一致:
WorkerState.running == (Worker.status is Status.running)
。
- stimulus_log: deque[StateMachineEvent]¶
由
handle_stimulus()
接收的所有刺激的日志。记录的事件数量是有限的。另请参见log
和stimulus_story()
。
- stimulus_story(*keys_or_tasks: Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[distributed.worker_state_machine.StateMachineEvent] [源代码]¶
返回涉及一个或多个任务的所有状态机事件
- story(*keys_or_tasks_or_stimuli: Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[tuple] [源代码]¶
返回涉及一个或多个任务或刺激ID的所有转换日志记录
- task_counter: TaskCounter¶
当前每个状态的任务数量和累计耗时,按
前缀
细分
- threads: dict[Key, int]¶
{ts.key: 线程 ID}
。这个集合在Worker
和这个类之间通过引用共享。虽然 WorkerState 是线程无关的,但在某些情况下仍然需要访问这些信息。这个集合由distributed.worker.Worker.execute()
填充。它不需要被填充也能使 WorkerState 工作。
- transfer_incoming_bytes_throttle_threshold: int¶
只要
transfer_incoming_bytes
小于此值,忽略transfer_incoming_count_limit
。
- transfer_incoming_count_limit: int¶
其他工作节点并发传入数据传输的最大数量。另请参见
distributed.worker.Worker.transfer_outgoing_count_limit
。
- transfer_message_bytes_limit: float¶
在一次调用
BaseWorker.gather_dep()
时,从同一工作者收集的字节数。多个可以从同一工作者收集的小任务将被批处理为单个指令,只要它们的总大小不超过此值。如果第一个要收集的任务超过此限制,它仍将被收集以确保进度。因此,此限制不是绝对的。
- transition_counter: int¶
到目前为止的状态转换总数。另请参见
log
和transition_counter_max
。
- transition_counter_max: int | Literal[False]¶
如果
transition_counter
达到此值,则引发错误。这仅用于调试,以捕获无限递归循环。在生产环境中,应始终设置为 False。
- class distributed.worker_state_machine.BaseWorker(state: distributed.worker_state_machine.WorkerState)[源代码]¶
围绕
WorkerState
的包装器,实现了指令处理。这是一个包含多个@abc.abstractmethod
方法的抽象类,由Worker
和单元测试模拟类继承。- abstract batched_send(msg: dict[str, Any]) None [源代码]¶
通过批量通信向调度器发送一个“发送即忘”的消息。
- 参数
- msg: dict
要发送给调度器的 msgpack 可序列化消息。必须有一个 ‘op’ 键,该键在 Scheduler.stream_handlers 中注册。
- abstract digest_metric(name: collections.abc.Hashable, value: float) None [源代码]¶
记录一个任意的数值指标
- abstract async execute(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [源代码]¶
执行任务
- abstract async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [源代码]¶
从拥有任务依赖项的工人那里收集依赖项
- 参数
- 工人str
从工人处收集依赖项的地址
- to_gather列表
从工作节点收集的依赖项的键 – 这不一定等同于
dep
的完整依赖项列表,因为某些依赖项可能已经存在于该工作节点上。- total_nbytes整数
to_gather
中所有依赖项的总字节数
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None [源代码]¶
将一个或多个外部刺激转发到
WorkerState.handle_stimulus()
并处理返回的指令,调用相关的 Worker 回调(以下@abc.abstractmethod
方法)。为所有异步指令生成 asyncio 任务并开始跟踪它们。
- abstract async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent [源代码]¶
等待一段时间,然后将一个对等工作者从忙碌状态中移除
- class distributed.worker_state_machine.StateMachineEvent(*args: Any, **kwargs: Any)[源代码]¶
所有可以修改工人状态的刺激的基础抽象类
- static from_dict(d: dict) distributed.worker_state_machine.StateMachineEvent [源代码]¶
将
recursive_to_dict
的输出转换回原始对象。输出对象对于重建状态机是有意义的,但不一定与原始对象完全相同。
- to_loggable(*, handled: float) distributed.worker_state_machine.StateMachineEvent [源代码]¶
生成一个变体版本的自身,该版本足够小,可以在中期内存中存储,并且包含用于调试的有意义信息。
- class distributed.worker_state_machine.Instruction(stimulus_id: str)[源代码]¶
从工作状态机到工作者的命令,以响应事件
- classmethod match(**kwargs: Any) distributed.worker_state_machine._InstructionMatch [源代码]¶
生成一个部分匹配以与指令实例进行比较。典型的用法是比较由
WorkerState.handle_stimulus()
返回的指令列表或WorkerState.stimulus_log
中的指令列表与预期的匹配列表。示例
instructions = ws.handle_stimulus(...) assert instructions == [ TaskFinishedMsg.match(key="x"), ... ]
备注
StateMachineEvent
和 Instruction
是抽象类,为了简洁起见,这里没有列出它们的许多子类。请参考实现模块 distributed.worker_state_machine
以获取完整列表。