处理失败和节点抢占#

自动从训练工作器故障中恢复#

Ray Train 内置了容错机制,可以从工作节点故障(即 RayActorError)中恢复。当检测到故障时,工作节点将被关闭,并添加新的工作节点。

训练函数将被重新启动,但可以通过检查点恢复先前执行的进度。

小技巧

为了在恢复时保留进度,您的训练函数 必须 实现 保存 加载检查点 的逻辑。

每个从工作失败中恢复的实例都被视为一次重试。重试次数可以通过 RunConfig 传递给 TrainerFailureConfig 参数中的 max_failures 属性进行配置。

from ray.train import RunConfig, FailureConfig


# Tries to recover a run up to this many times.
run_config = RunConfig(failure_config=FailureConfig(max_failures=2))

# No limit on the number of retries.
run_config = RunConfig(failure_config=FailureConfig(max_failures=-1))

将恢复哪个检查点?#

Ray Train 将自动从最新的可用 检查点恢复训练

这将是传递给 train.report() 的最后一个检查点。

恢复一个 Ray Train 实验#

在实验层面,训练器恢复功能允许你从之前中断的地方继续一个实验。

列车实验可能会因以下原因之一而中断:

  • 实验被手动中断(例如,Ctrl+C,或抢占式头节点实例)。

  • 头节点崩溃了(例如,OOM 或其他运行时错误)。

  • 整个集群宕机(例如,影响所有节点的网络错误)。

对于 Ray Train 的所有内置训练器,都可以进行训练器恢复,但我们在示例中使用 TorchTrainer 进行演示。我们还使用 <Framework>Trainer 来指代所有内置训练器共享的方法。

假设你的初始训练实验配置如下。实际的训练循环仅用于演示目的:重要的细节是 保存 加载检查点 已经实现。

import os
import tempfile
from typing import Dict, Optional

import torch

import ray
from ray import train
from ray.train import Checkpoint
from ray.train.torch import TorchTrainer


def get_datasets() -> Dict[str, ray.data.Dataset]:
    return {"train": ray.data.from_items([{"x": i, "y": 2 * i} for i in range(10)])}


def train_loop_per_worker(config: dict):
    from torchvision.models import resnet18

    model = resnet18()

    # Checkpoint loading
    checkpoint: Optional[Checkpoint] = train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))
            model.load_state_dict(model_state_dict)

    model = train.torch.prepare_model(model)

    train_ds = train.get_dataset_shard("train")

    for epoch in range(5):
        # Do some training...

        # Checkpoint saving
        with tempfile.TemporaryDirectory() as tmpdir:
            torch.save(model.module.state_dict(), os.path.join(tmpdir, "model.pt"))
            train.report({"epoch": epoch}, checkpoint=Checkpoint.from_directory(tmpdir))


trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    datasets=get_datasets(),
    scaling_config=train.ScalingConfig(num_workers=2),
    run_config=train.RunConfig(
        name="dl_trainer_restore", storage_path=os.path.expanduser("~/ray_results")
    ),
)
result = trainer.fit()

实验的结果和检查点被保存到由 RunConfig 配置的路径中。如果实验因上述原因之一而中断,请使用此路径恢复:

from ray.train.torch import TorchTrainer

restored_trainer = TorchTrainer.restore(
    path=os.path.expanduser("~/ray_results/dl_trainer_restore"),
    datasets=get_datasets(),
)

小技巧

您还可以从远程路径恢复(例如,从存储在s3桶中的实验目录恢复)。

original_trainer = TorchTrainer(
    # ...
    run_config=train.RunConfig(
        # Configure cloud storage
        storage_path="s3://results-bucket",
        name="dl_trainer_restore",
    ),
)
result = trainer.fit()
restored_trainer = TorchTrainer.restore(
    "s3://results-bucket/dl_trainer_restore",
    datasets=get_datasets(),
)

备注

不同的训练器可能在恢复时允许更多参数被选择性地重新指定。只有在原始提供的情况下,数据集 才需要在恢复时重新指定。

TorchTrainer.restore, TensorflowTrainer.restore, 和 HorovodTrainer.restore 可以接受与其父类 DataParallelTrainer.restore 相同的参数。

除非另有规定,其他训练器将接受与 BaseTrainer.restore 相同的参数。

自动恢复#

添加以下分支逻辑将允许你在中断后运行相同的脚本,从上次运行中断的地方继续训练。注意,我们使用 <Framework>Trainer.can_restore 实用方法来确定给定实验目录的存在性和有效性。

experiment_path = os.path.expanduser("~/ray_results/dl_restore_autoresume")
if TorchTrainer.can_restore(experiment_path):
    trainer = TorchTrainer.restore(experiment_path, datasets=get_datasets())
    result = trainer.fit()
else:
    trainer = TorchTrainer(
        train_loop_per_worker=train_loop_per_worker,
        datasets=get_datasets(),
        scaling_config=train.ScalingConfig(num_workers=2),
        run_config=train.RunConfig(
            storage_path=os.path.expanduser("~/ray_results"),
            name="dl_restore_autoresume",
        ),
    )
result = trainer.fit()

参见

请参阅 BaseTrainer.restore 文档字符串以获取完整示例。

备注

<Framework>Trainer.restore<Framework>Trainer(..., resume_from_checkpoint=...) 不同。resume_from_checkpoint 用于启动一个新的训练实验,该实验将结果写入新目录,并从迭代 0 开始。

<Framework>Trainer.restore 用于继续一个现有的实验,其中新的结果将继续追加到现有的日志中。