跨度

备注

这是一个实验性功能,可能会在没有弃用周期的情况下迅速变化。

Dask 提供了各种诊断和 细粒度性能指标 关于任务,按其前缀(通常是调用的函数名称)进行分组。这可能不是最优的:

  • 你的客户端代码可能非常复杂,以至于在客户端和集群上运行的任务之间关联行可能会变得困难;

  • 相同的函数可以应用于工作流程的不同部分,具有不同的性能特征;

  • 您可能处于多租户设置中,集群上的部分负载并非由您的客户端代码引起。

在这些情况下,为你的工作流程或其部分附加有意义的标签可能会有所帮助。为此,你应该在客户端代码中使用 span() 上下文管理器。

例如:

import dask.config
import dask.array as da
from distributed import Client, span

# Read important note below
dask.config.set({"optimization.fuse.active": False})
client = Client()

with span("Alice's workflow"):
    with span("data load"):
        a = da.read_zarr(...)
    with span("ML preprocessing"):
        a = preprocess(a)
    with span("Model training"):
        model = train(a)
    model = model.compute()

注意 span() 上下文管理器是如何嵌套的。该示例将在调度程序上创建以下范围:

  • ("Alice的工作流程", )

  • ("Alice's workflow", "data load")

  • ("Alice的工作流程", "机器学习预处理")

  • ("Alice的工作流程", "模型训练")

每个跨度都将映射到与其上下文管理器内定义的图段匹配的任务。父跨度将映射到其所有子任务。

标签是任意的,没有什么能阻止你参数化它们;例如

>>> with span(f"{user}'s workflow"):
...     ...

这可能会给你

  • ("Alice's workflow", "data load")

  • ("Bob的工作流程", "数据加载")

  • 等等。

这在以下情况下很有用,例如,如果你想观察Alice提交的所有工作负载,同时隐藏Bob的活动,或者反之,观察所有的数据加载活动,无论提交者是谁。

可能性或多或少是无穷的——除了顶部显示的用户名之外,或者除此之外,你还可以存储关于你正在处理的数据集的信息等。

默认范围

如果你不使用 span() 上下文管理器,你的任务将自动归属于 ("default", ) span。

查看跨度

您可以在 良好的性能指标 仪表板小部件中按 span 标签进行过滤,以过滤您的工作负载:

在 Fine Performance Metrics 仪表板中选择跨度

此外,可以使用调度器扩展或 run_on_scheduler() 查询跨度;参见 Dask 开发者 API

用户API

重要

数据框的最小粒度是单次调用 compute()persist(),并且不能将其进一步细分为操作组——如果上面的示例使用数据框,所有内容都将被统一标记为“Alice的工作流”,因为这是在 compute() 期间活跃的范围。

在其他集合中,例如数组和延迟对象,在优化阶段,未包裹在 compute()persist() 调用中的跨度可能会丢失。为防止此问题,您必须设置

>>> dask.config.set({"optimization.fuse.active": False})

或在 dask.yaml 中:

optimization:
  fuse:
    active: false

一个可能的解决方法,也适用于数据框,可以是通过 persist() 进行中间调用。但请注意,这可能会显著影响优化并降低整体性能。

with span("Alice's workflow"):
    with span("data load"):
        a = dd.read_parquet(...).persist()
    with span("ML preprocessing"):
        a = preprocess(a).persist()
        del a  # Release distributed memory for a as soon as possible
    with span("Model training"):
        model = train(b).persist()
        del b  # Release distributed memory for b as soon as possible
        model = model.compute()
distributed.span(*tags: str) collections.abc.Iterator[str][源代码]

将任务组标记为属于某个特定组,称为跨度。

这个上下文管理器可以嵌套,从而创建子范围。如果你关闭并重新打开一个带有相同标签的范围上下文管理器,你将得到两个独立范围。

每个集群在没有客户端定义的跨度时,都会定义一个全局的“默认”跨度;当与该跨度相关的所有任务完成后,默认跨度会自动关闭并重新打开;换句话说,集群在空闲时,除了那些明确标注了跨度的任务外。请注意,在某些边缘情况下,您可能会遇到重叠的默认跨度,例如,如果一个工作节点崩溃,并且其内存中所有唯一的任务都需要重新计算。

您可以在客户端捕获一个 span 的 ID 以与调度器中的 Span 对象匹配。

>>> client = Client()
>>> with span("my workflow") as span_id:
...     client.submit(lambda: "Hello world!").result()
>>> client.cluster.scheduler.extensions["spans"].spans[span_id]
Span<name=('my workflow',), id=5dc9b908-116b-49a5-b0d7-5a681f49a111>

注释

你可以通过 dask.get_annotations().get("span") 获取当前的 span。你可以在客户端代码中以及任务内部执行此操作。

Dask 开发者 API

目标受众

本节仅对维护 Dask 或编写调度器扩展的开发者感兴趣,例如创建替代仪表板或长期存储指标。

可以通过 Scheduler.extensions["spans"] 访问调度器上的跨度,其中包含 SpansSchedulerExtension 的单例实例。反过来,该扩展包含所有 Span 对象的映射,以及多种便捷方法来访问和聚合它们。

注意 Span 对象提供了多种方法,而仪表板当前并未使用这些方法 - 例如开始/停止时间、任务计数和输出大小。

class distributed.spans.Span(name: tuple[str, ...], id_: str, parent: distributed.spans.Span | None, total_nthreads_history: list[tuple[float, int]])[源代码]
property active_cpu_seconds: float

返回在Span运行期间集群上可用的CPU秒数;换句话说,(Span.stop - Span.enqueued) * Scheduler.total_nthreads

这解释了在活动期间加入和离开集群的工人。如果此Span是 merge() 的输出,请不要计算输入Span之间的间隙。

参见

enqueued
stop
nthreads_intervals
distributed.scheduler.SchedulerState.total_nthreads
add_metadata(metadata: distributed.spans.SpanMetadata) None[源代码]

向 span 添加元数据,例如代码片段

property all_durations: dict[str, float]

在此范围树中所有已完成操作的累计持续时间,按操作分类

参见

duration
distributed.scheduler.TaskGroup.all_durations
property annotation: dict[str, tuple[str, ...]] | None

重建包含完整ID历史的dask图注释

请注意,在 TaskGroup 冲突的情况下,这可能与原始注释不匹配。

children: list[Span]

此跨度的直接子元素,按创建时间排序

property code: list[tuple[SourceCode, ...]]

代码片段,由客户端在 compute()、persist() 和 submit() 中发送。

仅当 distributed.diagnostics.computations.nframes 非零时填充。

property cumulative_worker_metrics: dict[tuple[collections.abc.Hashable, ...], float]

Worker.digests_totalScheduler.cumulative_worker_metrics 的副本,但仅限于可以归因于当前跨度树的指标。键中已移除跨度ID。

在撰写本文时,所有键均为 ("execute", <任务前缀>, <活动>, <单位>)("p2p", <位置>, <活动>, <单位>),但未来可能会添加不同格式的更多键;请测试例如 k[0] == "execute"

property done: bool

如果此跨度树中的所有任务都已完成,则返回 True;否则返回 False。

参见

distributed.scheduler.TaskGroup.done

注释

此属性可能从 True 变为 False,例如当添加新的子范围时,或者当包含内存中唯一任务副本的工作者崩溃且需要重新计算任务时。

property duration: float

在此时间跨度树中所有任务所花费的总时间

参见

all_durations
distributed.scheduler.TaskGroup.duration
enqueued: float

跨度首次出现在调度器上的时间。父跨度上的相同属性总是小于或等于此值。

参见

start
stop
groups: set[TaskGroup]

注释

当最后一个任务被遗忘时,任务组会被调度器遗忘,但在这里会无限期地被引用。如果用户在同一个集合上调用两次 compute(),你将在这个集合中拥有多个具有相同 tg.name 的组!出于同样的原因,虽然相同的 TaskGroup 对象保证只附加到一个 Span,但你可能会有不同的 TaskGroup 附加到不同的 Span,并且具有相同的键。

id: str

唯一ID,由 span() 生成并取自 TaskState.annotations["span"]["id"][-1]。匹配 distributed.scheduler.TaskState.group.span_iddistributed.worker_state_machine.TaskState.span_id

static merge(*items: distributed.spans.Span) distributed.spans.Span[源代码]

将多个跨度合并为一个综合跨度。输入的跨度之间必须没有关联。

name: tuple[str, ...]

(<tag>, <tag>, …) 匹配 TaskState.annotations["span"]["name"],无论是在调度器还是在工作节点上。

property nbytes_total: int

这个跨度树产生的总字节数

参见

distributed.scheduler.TaskGroup.nbytes_total
property nthreads_intervals: list[tuple[float, float, int]]
返回

  • 开始时间戳
  • 结束时间戳
  • 在此间隔期间,Scheduler.total_nthreads
当 Span 是 merge() 的输出时,区间可能不是
连续的。

返回类型

List of tuples

参见

enqueued
stop
active_cpu_seconds
distributed.scheduler.SchedulerState.total_nthreads
property start: float

属于此跨度树的任务开始计算的最早时间;如果没有任务*完成*计算,则为0。

参见

enqueued
stop
distributed.scheduler.TaskGroup.start

注释

这不会更新,直到至少一个任务已经 完成 计算。随着任务的完成,它可能会向后移动。

property states: dict[TaskStateState, int]

当前在此跨度树中每个状态的任务数量;例如 {"memory": 10, "processing": 3, "released": 4, ...}

参见

distributed.scheduler.TaskGroup.states
property stop: float

当这个跨度树完成计算时,或者如果尚未完成,则为当前时间戳。

参见

enqueued
start
done
distributed.scheduler.TaskGroup.stop

注释

这与 TaskGroup.stop 不同,当没有未完成的任务时;它也永远不会为零。

traverse_groups() Iterator[TaskGroup][源代码]

所有属于此分支的任务组

traverse_spans() collections.abc.Iterator[distributed.spans.Span][源代码]

自顶向下的递归遍历所有属于此分支的跨度树中的跨度,包括自身

class distributed.spans.SpansSchedulerExtension(scheduler: Scheduler)[源代码]

支持跨度的调度器扩展

find_by_tags(*tags: str) collections.abc.Iterator[distributed.spans.Span][源代码]

生成包含任何给定标签的所有跨度。当一个标签同时被一个跨度及其(孙)子跨度共享时,只返回父跨度。

heartbeat(ws: scheduler_module.WorkerState, data: dict[tuple[Hashable, ...], float]) None[源代码]

SpansWorkerExtension.heartbeat() 触发。

用来自worker的数据填充 Span.cumulative_worker_metrics()

merge_all() distributed.spans.Span[源代码]

返回一个综合的 Span,它是所有 Span 的总和

merge_by_tags(*tags: str) distributed.spans.Span[源代码]

返回一个综合的 Span,它是包含给定标签的所有 Span 的总和。

observe_tasks(tss: Iterable[scheduler_module.TaskState], code: tuple[SourceCode, ...], span_metadata: SpanMetadata) dict[Key, dict][源代码]

确认调度器上存在可运行的任务。这些可能是新任务、之前不可运行的任务,或者是已经通过此方法处理过的任务。

将新观察到的任务附加到所需的跨度或 (“default”, )。更新 TaskGroup.span_id 并清除 TaskState.annotations[“span”]。

返回
更新后的 ‘span’ 注释: {key: {“name”: (…, …), “ids”: (…, …)}}
root_spans: list[Span]

仅包含没有父级的跨度,按创建时间排序。这是一个为了加速搜索的便利辅助结构。

spans: dict[str, Span]

所有按ID划分的Span对象

spans_search_by_name: defaultdict[tuple[str, ...], list[Span]]

所有跨度,按其全名键入并按创建时间排序。这是一个方便的帮助结构,以加快搜索速度。

spans_search_by_tag: defaultdict[str, list[Span]]

所有跨度,按构成其名称的各个标签键控,并按创建时间排序。这是一个方便的帮助结构,以加快搜索速度。

class distributed.spans.SpansWorkerExtension(worker: Worker)[源代码]

支持跨度的Worker扩展

heartbeat() dict[tuple[collections.abc.Hashable, ...], float][源代码]

将具有跨度的指标分配给调度器上的跨度

返回
{(上下文, 跨度ID, 前缀, 活动, 单位): 值}}

参见

SpansSchedulerExtension.heartbeat
Span.cumulative_worker_metrics
distributed.worker.Worker.get_metrics