处理失败和节点抢占#
自动从训练工作器故障中恢复#
Ray Train 内置了容错机制,可以从工作节点故障(即 RayActorError
)中恢复。当检测到故障时,工作节点将被关闭,并添加新的工作节点。
训练函数将被重新启动,但可以通过检查点恢复先前执行的进度。
每个从工作失败中恢复的实例都被视为一次重试。重试次数可以通过 RunConfig
传递给 Trainer
的 FailureConfig
参数中的 max_failures
属性进行配置。
from ray.train import RunConfig, FailureConfig
# Tries to recover a run up to this many times.
run_config = RunConfig(failure_config=FailureConfig(max_failures=2))
# No limit on the number of retries.
run_config = RunConfig(failure_config=FailureConfig(max_failures=-1))
将恢复哪个检查点?#
Ray Train 将自动从最新的可用 检查点恢复训练。
这将是传递给 train.report()
的最后一个检查点。
恢复一个 Ray Train 实验#
在实验层面,训练器恢复功能允许你从之前中断的地方继续一个实验。
列车实验可能会因以下原因之一而中断:
实验被手动中断(例如,Ctrl+C,或抢占式头节点实例)。
头节点崩溃了(例如,OOM 或其他运行时错误)。
整个集群宕机(例如,影响所有节点的网络错误)。
对于 Ray Train 的所有内置训练器,都可以进行训练器恢复,但我们在示例中使用 TorchTrainer
进行演示。我们还使用 <Framework>Trainer
来指代所有内置训练器共享的方法。
假设你的初始训练实验配置如下。实际的训练循环仅用于演示目的:重要的细节是 保存 和 加载检查点 已经实现。
import os
import tempfile
from typing import Dict, Optional
import torch
import ray
from ray import train
from ray.train import Checkpoint
from ray.train.torch import TorchTrainer
def get_datasets() -> Dict[str, ray.data.Dataset]:
return {"train": ray.data.from_items([{"x": i, "y": 2 * i} for i in range(10)])}
def train_loop_per_worker(config: dict):
from torchvision.models import resnet18
model = resnet18()
# Checkpoint loading
checkpoint: Optional[Checkpoint] = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))
model.load_state_dict(model_state_dict)
model = train.torch.prepare_model(model)
train_ds = train.get_dataset_shard("train")
for epoch in range(5):
# Do some training...
# Checkpoint saving
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.module.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report({"epoch": epoch}, checkpoint=Checkpoint.from_directory(tmpdir))
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
datasets=get_datasets(),
scaling_config=train.ScalingConfig(num_workers=2),
run_config=train.RunConfig(
name="dl_trainer_restore", storage_path=os.path.expanduser("~/ray_results")
),
)
result = trainer.fit()
实验的结果和检查点被保存到由 RunConfig
配置的路径中。如果实验因上述原因之一而中断,请使用此路径恢复:
from ray.train.torch import TorchTrainer
restored_trainer = TorchTrainer.restore(
path=os.path.expanduser("~/ray_results/dl_trainer_restore"),
datasets=get_datasets(),
)
小技巧
您还可以从远程路径恢复(例如,从存储在s3桶中的实验目录恢复)。
original_trainer = TorchTrainer(
# ...
run_config=train.RunConfig(
# Configure cloud storage
storage_path="s3://results-bucket",
name="dl_trainer_restore",
),
)
result = trainer.fit()
restored_trainer = TorchTrainer.restore(
"s3://results-bucket/dl_trainer_restore",
datasets=get_datasets(),
)
备注
不同的训练器可能在恢复时允许更多参数被选择性地重新指定。只有在原始提供的情况下,数据集 才需要在恢复时重新指定。
TorchTrainer.restore
, TensorflowTrainer.restore
, 和 HorovodTrainer.restore
可以接受与其父类 DataParallelTrainer.restore
相同的参数。
除非另有规定,其他训练器将接受与 BaseTrainer.restore
相同的参数。
自动恢复#
添加以下分支逻辑将允许你在中断后运行相同的脚本,从上次运行中断的地方继续训练。注意,我们使用 <Framework>Trainer.can_restore
实用方法来确定给定实验目录的存在性和有效性。
experiment_path = os.path.expanduser("~/ray_results/dl_restore_autoresume")
if TorchTrainer.can_restore(experiment_path):
trainer = TorchTrainer.restore(experiment_path, datasets=get_datasets())
result = trainer.fit()
else:
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
datasets=get_datasets(),
scaling_config=train.ScalingConfig(num_workers=2),
run_config=train.RunConfig(
storage_path=os.path.expanduser("~/ray_results"),
name="dl_restore_autoresume",
),
)
result = trainer.fit()
参见
请参阅 BaseTrainer.restore
文档字符串以获取完整示例。
备注
<Framework>Trainer.restore
与 <Framework>Trainer(..., resume_from_checkpoint=...)
不同。resume_from_checkpoint
用于启动一个新的训练实验,该实验将结果写入新目录,并从迭代 0 开始。
<Framework>Trainer.restore
用于继续一个现有的实验,其中新的结果将继续追加到现有的日志中。