如何在 Ray Tune 中启用容错#

容错性是分布式机器学习实验的一个重要特性,可以帮助缓解由于内存不足和磁盘不足问题导致的节点故障的影响。

通过容错机制,用户可以:

  • 通过保留训练进度来节省时间和资源,即使在节点失败的情况下。

  • 在分布式环境中访问抢占式竞价实例节点的成本节约

参见

分布式 Tune 实验中,启用容错功能的前提是配置某种形式的持久存储,以便所有试验结果和检查点可以被整合。参见 如何在 Ray Tune 中配置持久存储

在本指南中,我们将介绍如何启用 Ray Tune 提供的不同类型的容错功能。

Tune中的实验级容错#

在实验级别,Tuner.restore 从上次中断的地方恢复之前中断的实验。

在以下情况下,您应该使用 Tuner.restore

  1. 调用 Tuner.fit() 的驱动脚本出错(例如,由于头节点内存耗尽或磁盘空间不足)。

  2. 实验通过 Ctrl+C 手动中断。

  3. 由于短暂的错误,例如网络中断或Ray对象存储内存耗尽,整个集群及其相关的实验都会崩溃。

备注

Tuner.restore 不是用于恢复已终止的实验并修改超参数搜索空间或停止条件。相反,实验恢复旨在恢复并完成之前通过 Tuner.fit 提交的 完全相同的任务

例如,考虑一个配置为运行 10 次训练迭代的 Tune 实验,其中所有试验已经完成。Tuner.restore 不能用于恢复实验,将训练迭代次数更改为 20,然后继续训练。

相反,这应该通过启动一个 实验并使用前一个实验的检查点来初始化模型权重来实现。有关示例,请参见 此常见问题解答帖子

备注

用户定义的训练循环中的错误无法通过恢复来修复。相反,导致实验首次崩溃的问题应该是 短暂的 ,这意味着在恢复后的重试尝试下一次可以成功。

恢复一个 Tune 实验#

假设你的初始 Tune 实验配置如下。实际的训练循环仅用于演示目的:重要的细节是 在可训练对象中已经实现了保存和加载检查点

import json
import os
import tempfile

from ray import train, tune
from ray.train import Checkpoint


def trainable(config):
    # Checkpoint loading
    checkpoint = train.get_checkpoint()
    start = 1
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            with open(os.path.join(checkpoint_dir, "checkpoint.json"), "r") as f:
                state = json.load(f)
        start = state["epoch"] + 1

    for epoch in range(start, config["num_epochs"]):
        # Do some training...

        # Checkpoint saving
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            with open(os.path.join(temp_checkpoint_dir, "checkpoint.json"), "w") as f:
                json.dump({"epoch": epoch}, f)
            train.report(
                {"epoch": epoch},
                checkpoint=Checkpoint.from_directory(temp_checkpoint_dir),
            )


tuner = tune.Tuner(
    trainable,
    param_space={"num_epochs": 10},
    run_config=train.RunConfig(
        storage_path=os.path.expanduser("~/ray_results"),
        name="tune_fault_tolerance_guide",
    ),
)
result_grid = tuner.fit()

实验的结果和检查点被保存到 ~/ray_results/tune_fault_tolerance_guide,这是由 RunConfig 配置的。如果实验由于上述原因之一被中断,使用此路径来恢复:

tuner = tune.Tuner.restore(
    os.path.expanduser("~/ray_results/tune_fault_tolerance_guide"),
    trainable=trainable,
    resume_errored=True,
)
tuner.fit()

小技巧

您还可以从云存储桶路径恢复实验:

tuner = tune.Tuner.restore(
    path="s3://cloud-bucket/tune_fault_tolerance_guide", trainable=trainable
)

参见 如何在 Ray Tune 中配置持久存储

恢复配置#

Tune 允许根据实验中断时的状态配置哪些试验应该恢复:

  • 处于 RUNNING 状态的未完成试验将默认恢复。

  • 状态为 ERRORED 的试验可以从中断处恢复,或者从头开始重试。

  • TERMINATED 试验 不能 恢复。

tuner = tune.Tuner.restore(
    os.path.expanduser("~/ray_results/tune_fault_tolerance_guide"),
    trainable=trainable,
    resume_errored=True,
    restart_errored=False,
    resume_unfinished=True,
)

自动恢复#

在生产环境中运行时,可能需要一个 单一脚本 来(1)在开始时启动初始训练运行,以及(2)如果(1)已经发生,则恢复实验。

使用 Tuner.can_restore 工具来完成此操作:

import os
from ray import train, tune

storage_path = os.path.expanduser("~/ray_results")
exp_name = "tune_fault_tolerance_guide"
path = os.path.join(storage_path, exp_name)

if tune.Tuner.can_restore(path):
    tuner = tune.Tuner.restore(path, trainable=trainable, resume_errored=True)
else:
    tuner = tune.Tuner(
        trainable,
        param_space={"num_epochs": 10},
        run_config=train.RunConfig(storage_path=storage_path, name=exp_name),
    )
tuner.fit()

第一次运行此脚本将启动初始训练运行。第二次运行此脚本将尝试从第一次运行的输出中恢复。

使用Ray对象引用的实验恢复调优(高级)#

实验恢复通常发生在与原始运行不同的 Ray 会话中,在这种情况下,Ray 对象引用会自动被垃圾回收。如果对象引用与实验状态一起保存(例如,在每个试验的配置中),那么在恢复后尝试检索这些对象将无法正常工作:这些引用指向的对象已不存在。

为了解决这个问题,你必须重新创建这些对象,将它们放入 Ray 对象存储中,然后将新的对象引用传递给 Tune。

示例#

假设我们有一个大型预训练模型,我们希望以某种方式在我们的训练循环中使用它。例如,这可能是一个用于计算生成模型质量的Inception Score的图像分类模型。我们可能有多个模型需要调整,其中每个试验都会选择一个模型进行使用。

import ray
from ray import train, tune


class LargeModel:
    def __init__(self, model_id):
        self.model_id = model_id
        # Load weights based on the `model_id`...


def train_fn(config):
    # Retrieve the model from the object store.
    model = ray.get(config["model_ref"])
    print(model.model_id)


# These models may be large, so `ray.put` them in the Ray Object Store
# to share the models between trials.
model_refs = [ray.put(LargeModel(1)), ray.put(LargeModel(2))]

tuner = tune.Tuner(
    train_fn,
    # Tune over the object references!
    param_space={"model_ref": tune.grid_search(model_refs)},
    run_config=train.RunConfig(
        storage_path=os.path.expanduser("~/ray_results"), name="restore_object_refs"
    ),
)
tuner.fit()

要恢复,我们只需通过 Tuner.restore 重新指定 param_space

# Re-create the objects and put them in the object store.
param_space = {
    "model_ref": tune.grid_search([ray.put(LargeModel(1)), ray.put(LargeModel(2))])
}

tuner = tune.Tuner.restore(
    os.path.expanduser("~/ray_results/restore_object_refs"),
    trainable=train_fn,
    # Re-specify the `param_space` to update the object references.
    param_space=param_space,
    resume_errored=True,
)
tuner.fit()

备注

如果你正在调整 Ray Data ,你还需要在 param_space 中重新指定它们。Ray Data 可以包含对象引用,因此上述描述的相同问题也适用。

请参见以下示例:

ds_1 = ray.data.from_items([{"x": i, "y": 2 * i} for i in range(128)])
ds_2 = ray.data.from_items([{"x": i, "y": 3 * i} for i in range(128)])

param_space = {
    "datasets": {"train": tune.grid_search([ds_1, ds_2])},
}

tuner = tune.Tuner.restore(..., param_space=param_space)

Tune 中的试验级容错#

试验级容错处理集群中单个试验的失败,这可能是由以下原因引起的:

  • 使用抢占式现货实例运行。

  • 短暂的网络连接问题。

  • 节点内存不足或磁盘空间不足。

Ray Tune 提供了一种通过 FailureConfig 配置单个试验失败处理的方法。

假设我们正在使用前一个示例中的 trainable ,它实现了试验检查点的保存和加载,以下是如何配置 FailureConfig

from ray import train, tune

tuner = tune.Tuner(
    trainable,
    param_space={"num_epochs": 10},
    run_config=train.RunConfig(
        storage_path=os.path.expanduser("~/ray_results"),
        name="trial_fault_tolerance",
        failure_config=train.FailureConfig(max_failures=3),
    ),
)
tuner.fit()

当试验遇到运行时错误时,上述配置将重新调度该试验,最多可重试 max_failures=3 次。

同样地,如果节点 X 发生故障(例如,被抢占或失去连接),此配置将重新调度所有位于节点 X 上的试验,最多可达 3 次。

摘要#

在本用户指南中,我们介绍了如何在 Ray Tune 中启用实验级和试验级的容错功能。

更多信息请参阅以下资源:

  • 调整存储选项

  • 调优分布式参考

  • 调整试验检查点