如何在 Ray Tune 中启用容错#
容错性是分布式机器学习实验的一个重要特性,可以帮助缓解由于内存不足和磁盘不足问题导致的节点故障的影响。
通过容错机制,用户可以:
通过保留训练进度来节省时间和资源,即使在节点失败的情况下。
在分布式环境中访问抢占式竞价实例节点的成本节约。
参见
在 分布式 Tune 实验中,启用容错功能的前提是配置某种形式的持久存储,以便所有试验结果和检查点可以被整合。参见 如何在 Ray Tune 中配置持久存储。
在本指南中,我们将介绍如何启用 Ray Tune 提供的不同类型的容错功能。
Tune中的实验级容错#
在实验级别,Tuner.restore
从上次中断的地方恢复之前中断的实验。
在以下情况下,您应该使用 Tuner.restore
:
调用
Tuner.fit()
的驱动脚本出错(例如,由于头节点内存耗尽或磁盘空间不足)。实验通过
Ctrl+C
手动中断。由于短暂的错误,例如网络中断或Ray对象存储内存耗尽,整个集群及其相关的实验都会崩溃。
备注
Tuner.restore
不是用于恢复已终止的实验并修改超参数搜索空间或停止条件。相反,实验恢复旨在恢复并完成之前通过 Tuner.fit
提交的 完全相同的任务。
例如,考虑一个配置为运行 10
次训练迭代的 Tune 实验,其中所有试验已经完成。Tuner.restore
不能用于恢复实验,将训练迭代次数更改为 20
,然后继续训练。
相反,这应该通过启动一个 新 实验并使用前一个实验的检查点来初始化模型权重来实现。有关示例,请参见 此常见问题解答帖子。
备注
用户定义的训练循环中的错误无法通过恢复来修复。相反,导致实验首次崩溃的问题应该是 短暂的 ,这意味着在恢复后的重试尝试下一次可以成功。
恢复一个 Tune 实验#
假设你的初始 Tune 实验配置如下。实际的训练循环仅用于演示目的:重要的细节是 在可训练对象中已经实现了保存和加载检查点。
import json
import os
import tempfile
from ray import train, tune
from ray.train import Checkpoint
def trainable(config):
# Checkpoint loading
checkpoint = train.get_checkpoint()
start = 1
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
with open(os.path.join(checkpoint_dir, "checkpoint.json"), "r") as f:
state = json.load(f)
start = state["epoch"] + 1
for epoch in range(start, config["num_epochs"]):
# Do some training...
# Checkpoint saving
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
with open(os.path.join(temp_checkpoint_dir, "checkpoint.json"), "w") as f:
json.dump({"epoch": epoch}, f)
train.report(
{"epoch": epoch},
checkpoint=Checkpoint.from_directory(temp_checkpoint_dir),
)
tuner = tune.Tuner(
trainable,
param_space={"num_epochs": 10},
run_config=train.RunConfig(
storage_path=os.path.expanduser("~/ray_results"),
name="tune_fault_tolerance_guide",
),
)
result_grid = tuner.fit()
实验的结果和检查点被保存到 ~/ray_results/tune_fault_tolerance_guide
,这是由 RunConfig
配置的。如果实验由于上述原因之一被中断,使用此路径来恢复:
tuner = tune.Tuner.restore(
os.path.expanduser("~/ray_results/tune_fault_tolerance_guide"),
trainable=trainable,
resume_errored=True,
)
tuner.fit()
小技巧
您还可以从云存储桶路径恢复实验:
tuner = tune.Tuner.restore(
path="s3://cloud-bucket/tune_fault_tolerance_guide", trainable=trainable
)
恢复配置#
Tune 允许根据实验中断时的状态配置哪些试验应该恢复:
处于
RUNNING
状态的未完成试验将默认恢复。状态为
ERRORED
的试验可以从中断处恢复,或者从头开始重试。TERMINATED
试验 不能 恢复。
tuner = tune.Tuner.restore(
os.path.expanduser("~/ray_results/tune_fault_tolerance_guide"),
trainable=trainable,
resume_errored=True,
restart_errored=False,
resume_unfinished=True,
)
自动恢复#
在生产环境中运行时,可能需要一个 单一脚本 来(1)在开始时启动初始训练运行,以及(2)如果(1)已经发生,则恢复实验。
使用 Tuner.can_restore
工具来完成此操作:
import os
from ray import train, tune
storage_path = os.path.expanduser("~/ray_results")
exp_name = "tune_fault_tolerance_guide"
path = os.path.join(storage_path, exp_name)
if tune.Tuner.can_restore(path):
tuner = tune.Tuner.restore(path, trainable=trainable, resume_errored=True)
else:
tuner = tune.Tuner(
trainable,
param_space={"num_epochs": 10},
run_config=train.RunConfig(storage_path=storage_path, name=exp_name),
)
tuner.fit()
第一次运行此脚本将启动初始训练运行。第二次运行此脚本将尝试从第一次运行的输出中恢复。
使用Ray对象引用的实验恢复调优(高级)#
实验恢复通常发生在与原始运行不同的 Ray 会话中,在这种情况下,Ray 对象引用会自动被垃圾回收。如果对象引用与实验状态一起保存(例如,在每个试验的配置中),那么在恢复后尝试检索这些对象将无法正常工作:这些引用指向的对象已不存在。
为了解决这个问题,你必须重新创建这些对象,将它们放入 Ray 对象存储中,然后将新的对象引用传递给 Tune。
示例#
假设我们有一个大型预训练模型,我们希望以某种方式在我们的训练循环中使用它。例如,这可能是一个用于计算生成模型质量的Inception Score的图像分类模型。我们可能有多个模型需要调整,其中每个试验都会选择一个模型进行使用。
import ray
from ray import train, tune
class LargeModel:
def __init__(self, model_id):
self.model_id = model_id
# Load weights based on the `model_id`...
def train_fn(config):
# Retrieve the model from the object store.
model = ray.get(config["model_ref"])
print(model.model_id)
# These models may be large, so `ray.put` them in the Ray Object Store
# to share the models between trials.
model_refs = [ray.put(LargeModel(1)), ray.put(LargeModel(2))]
tuner = tune.Tuner(
train_fn,
# Tune over the object references!
param_space={"model_ref": tune.grid_search(model_refs)},
run_config=train.RunConfig(
storage_path=os.path.expanduser("~/ray_results"), name="restore_object_refs"
),
)
tuner.fit()
要恢复,我们只需通过 Tuner.restore
重新指定 param_space
:
# Re-create the objects and put them in the object store.
param_space = {
"model_ref": tune.grid_search([ray.put(LargeModel(1)), ray.put(LargeModel(2))])
}
tuner = tune.Tuner.restore(
os.path.expanduser("~/ray_results/restore_object_refs"),
trainable=train_fn,
# Re-specify the `param_space` to update the object references.
param_space=param_space,
resume_errored=True,
)
tuner.fit()
备注
如果你正在调整 Ray Data ,你还需要在 param_space
中重新指定它们。Ray Data 可以包含对象引用,因此上述描述的相同问题也适用。
请参见以下示例:
ds_1 = ray.data.from_items([{"x": i, "y": 2 * i} for i in range(128)])
ds_2 = ray.data.from_items([{"x": i, "y": 3 * i} for i in range(128)])
param_space = {
"datasets": {"train": tune.grid_search([ds_1, ds_2])},
}
tuner = tune.Tuner.restore(..., param_space=param_space)
Tune 中的试验级容错#
试验级容错处理集群中单个试验的失败,这可能是由以下原因引起的:
使用抢占式现货实例运行。
短暂的网络连接问题。
节点内存不足或磁盘空间不足。
Ray Tune 提供了一种通过 FailureConfig
配置单个试验失败处理的方法。
假设我们正在使用前一个示例中的 trainable
,它实现了试验检查点的保存和加载,以下是如何配置 FailureConfig
:
from ray import train, tune
tuner = tune.Tuner(
trainable,
param_space={"num_epochs": 10},
run_config=train.RunConfig(
storage_path=os.path.expanduser("~/ray_results"),
name="trial_fault_tolerance",
failure_config=train.FailureConfig(max_failures=3),
),
)
tuner.fit()
当试验遇到运行时错误时,上述配置将重新调度该试验,最多可重试 max_failures=3
次。
同样地,如果节点 X
发生故障(例如,被抢占或失去连接),此配置将重新调度所有位于节点 X
上的试验,最多可达 3
次。
摘要#
在本用户指南中,我们介绍了如何在 Ray Tune 中启用实验级和试验级的容错功能。
更多信息请参阅以下资源:
调整存储选项
调优分布式参考
调整试验检查点