获取数据进出Tune#
通常,你会发现自己需要将数据传递到 Tune Trainables(数据集、模型、其他大型参数)并从中获取数据(指标、检查点、其他工件)。在本指南中,我们将探讨不同的方法来实现这一点,并了解在什么情况下应该使用它们。
让我们从定义一个简单的可训练函数开始。我们将随着进展逐步扩展这个函数的功能。
import random
import time
import pandas as pd
def training_function(config):
# For now, we have nothing here.
data = None
model = {"hyperparameter_a": None, "hyperparameter_b": None}
epochs = 0
# Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
for epoch in range(epochs):
# Simulate doing something expensive.
time.sleep(1)
metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
-1
) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
trained_model = {"state": model, "epoch": epoch}
我们的 training_function
函数需要一个 pandas DataFrame、一个带有某些超参数的模型以及训练模型的轮数作为输入。模型的超参数会影响返回的指标,并且在每个轮次(训练的迭代)中,trained_model
状态会发生变化。
我们将使用 Tuner API 进行超参数优化。
from ray.tune import Tuner
from ray import tune
tuner = Tuner(training_function, tune_config=tune.TuneConfig(num_samples=4))
将数据导入Tune#
首先,我们需要为可训练对象提供输入。我们可以将它们大致分为两类——变量和常量。
变量是我们想要调整的参数。它们对于每个 Trial 都是不同的。例如,这些可能是神经网络的学习率和批量大小,随机森林的树的数量和最大深度,或者如果你使用 Tune 作为批量训练的执行引擎,则是数据分区。
常量是每个试验中都相同的参数。这些可以是训练轮数、我们想要设置但不调整的模型超参数、数据集等等。通常,常量的规模会相当大(例如数据集或模型)。
警告
来自 training_function
外部作用域的对象也会被自动序列化并发送到 Trial Actors,这可能会导致意外行为。例如,全局锁可能无法工作(因为每个 Actor 操作的是一个副本),或者与序列化相关的常规错误。最佳实践是不要在 training_function
中引用任何外部作用域的对象。
通过搜索空间将数据传递到 Tune 运行中#
备注
简而言之 - 使用 param_space
参数来指定小型的、可序列化的常量和变量。
传递输入到 Trainables 的第一种方式是 搜索空间(它也可能被称为 参数空间 或 配置)。在 Trainable 本身中,它映射到作为参数传递给函数的 config
字典。您使用 Tuner
的 param_space
参数定义搜索空间。搜索空间是一个字典,可能由 分布 组成,每个 Trial 将采样一个不同的值,或者是常量值。搜索空间可能由嵌套字典组成,而这些字典本身也可以有分布。
警告
搜索空间中的每个值将直接保存在 Trial 元数据中。这意味着搜索空间中的每个值 必须 是可序列化的,并且占用少量内存。
例如,将一个大的 pandas DataFrame 或一个不可序列化的模型对象作为搜索空间中的值传递,将导致不希望的行为。最好的情况下,它会导致大的减速和磁盘空间使用,因为保存到磁盘的 Trial 元数据也将包含这些数据。最坏的情况下,会引发异常,因为数据无法发送到 Trial 工作线程。更多详情,请参见 我如何避免瓶颈?。
相反,使用字符串或其他标识符作为您的值,并根据这些值直接在您的 Trainable 内部初始化/加载对象。
备注
数据集 可以直接在搜索空间中作为值使用。
在我们的示例中,我们希望调整两个模型超参数。我们还希望设置epoch的数量,以便稍后可以轻松调整它。对于超参数,我们将使用 tune.uniform
分布。我们还将修改 training_function
以从 config
字典中获取这些值。
def training_function(config):
# For now, we have nothing here.
data = None
model = {
"hyperparameter_a": config["hyperparameter_a"],
"hyperparameter_b": config["hyperparameter_b"],
}
epochs = config["epochs"]
# Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
for epoch in range(epochs):
# Simulate doing something expensive.
time.sleep(1)
metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
-1
) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
trained_model = {"state": model, "epoch": epoch}
tuner = Tuner(
training_function,
param_space={
"hyperparameter_a": tune.uniform(0, 20),
"hyperparameter_b": tune.uniform(-100, 100),
"epochs": 10,
},
)
使用 tune.with_parameters
访问 Tune 运行中的数据#
备注
TL;DR - 使用 tune.with_parameters
工具函数来指定大型常量参数。
如果我们有在各个试验中保持不变的大型对象,我们可以使用 tune.with_parameters
工具直接将它们传递给可训练对象。这些对象将存储在 Ray 对象存储 中,以便每个试验工作者可以访问它们以获取用于其进程的本地副本。
小技巧
放入 Ray 对象存储中的对象必须是可序列化的。
注意,大型对象的序列化(一次)和反序列化(每次试验)可能会导致性能开销。
在我们的示例中,我们将使用 tune.with_parameters
传递 data
DataFrame。为此,我们需要修改函数签名以将 data
作为参数包含在内。
def training_function(config, data):
model = {
"hyperparameter_a": config["hyperparameter_a"],
"hyperparameter_b": config["hyperparameter_b"],
}
epochs = config["epochs"]
# Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
for epoch in range(epochs):
# Simulate doing something expensive.
time.sleep(1)
metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
-1
) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
trained_model = {"state": model, "epoch": epoch}
tuner = Tuner(
training_function,
param_space={
"hyperparameter_a": tune.uniform(0, 20),
"hyperparameter_b": tune.uniform(-100, 100),
"epochs": 10,
},
)
下一步是将 training_function
使用 tune.with_parameters
包装,然后再传递给 Tuner
。tune.with_parameters
调用的每个关键字参数都将映射到 Trainable 签名中的关键字参数。
data = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
tuner = Tuner(
tune.with_parameters(training_function, data=data),
param_space={
"hyperparameter_a": tune.uniform(0, 20),
"hyperparameter_b": tune.uniform(-100, 100),
"epochs": 10,
},
tune_config=tune.TuneConfig(num_samples=4),
)
在 Tune Trainable 中加载数据#
您也可以直接从云存储、共享文件存储(如NFS)或Trainable工作节点的本地磁盘加载数据。
警告
从磁盘加载时,请确保集群中的所有节点都能访问您尝试加载的文件。
一个常见的用例是通过 pandas、arrow 或其他框架从 S3 或其他云存储加载数据集。
Trainable 工作者的当前工作目录将自动更改为相应的 Trial 目录。更多详情,请参见 如何在 Tune 训练函数中访问相对文件路径?。
我们的调优运行现在可以执行,尽管我们还不会得到任何有意义的输出。
results = tuner.fit()
从 Ray Tune 中获取数据#
我们现在可以使用 training_function
Trainable 运行我们的调优运行。下一步是向 Tune 报告 指标,这些指标可以用于指导优化。我们还希望 检查点 我们训练好的模型,以便在打断后恢复训练,并在以后用于预测。
ray.train.report
API 用于从可训练的工作者中获取数据。它可以在可训练函数中多次调用。每次调用对应于一次训练迭代(epoch、步骤、树)。
使用 Tune 报告指标#
指标 是通过 train.report
调用中的 metrics
参数传递的值。指标可以被 Tune 搜索算法 和 调度器 用来指导搜索。在调优运行完成后,你可以 分析结果,其中包括报告的指标。
备注
与搜索空间值类似,每个报告为指标的值将直接保存在 Trial 元数据中。这意味着每个报告为指标的值 必须 是可序列化的,并且占用少量内存。
备注
Tune 将自动包含一些指标,例如训练迭代、时间戳等。查看此处获取完整列表。
在我们的例子中,我们希望最大化 metric
。我们将在每个 epoch 向 Tune 报告它,并在 tune.TuneConfig
中设置 metric
和 mode
参数,以让 Tune 知道它应该将其用作优化目标。
from ray import train
def training_function(config, data):
model = {
"hyperparameter_a": config["hyperparameter_a"],
"hyperparameter_b": config["hyperparameter_b"],
}
epochs = config["epochs"]
# Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
for epoch in range(epochs):
# Simulate doing something expensive.
time.sleep(1)
metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
-1
) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
trained_model = {"state": model, "epoch": epoch}
train.report(metrics={"metric": metric})
tuner = Tuner(
tune.with_parameters(training_function, data=data),
param_space={
"hyperparameter_a": tune.uniform(0, 20),
"hyperparameter_b": tune.uniform(-100, 100),
"epochs": 10,
},
tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
)
使用 Tune 回调记录指标#
使用 train.report
记录的每个指标都可以在调优运行期间通过 Tune 回调 访问。Ray Tune 提供了与流行框架(如 MLFlow、Weights & Biases、CometML 等)的 多个内置集成。您还可以使用 回调 API 创建自己的回调。
回调函数通过 Tuner
的 RunConfig
中的 callback
参数传递。
在我们的示例中,我们将使用 MLFlow 回调来跟踪我们的调优运行的进度以及 metric
的变化值(需要安装 mlflow
)。
from ray import train
from ray.train import RunConfig
from ray.tune.logger.mlflow import MLflowLoggerCallback
def training_function(config, data):
model = {
"hyperparameter_a": config["hyperparameter_a"],
"hyperparameter_b": config["hyperparameter_b"],
}
epochs = config["epochs"]
# Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
for epoch in range(epochs):
# Simulate doing something expensive.
time.sleep(1)
metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
-1
) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
trained_model = {"state": model, "epoch": epoch}
train.report(metrics={"metric": metric})
tuner = Tuner(
tune.with_parameters(training_function, data=data),
param_space={
"hyperparameter_a": tune.uniform(0, 20),
"hyperparameter_b": tune.uniform(-100, 100),
"epochs": 10,
},
tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
run_config=RunConfig(
callbacks=[MLflowLoggerCallback(experiment_name="example")]
),
)
使用检查点和其他工件从 Tune 中获取数据#
除了指标之外,您可能还希望保存训练模型的状态以及任何其他工件,以便在训练失败时恢复,并进行进一步的检查和使用。这些不能作为指标保存,因为它们通常太大,可能不容易序列化。最后,它们应该持久化到磁盘或云存储中,以便在Tune运行中断或终止后仍然可以访问。
Ray Train 为此提供了一个 检查点
API。检查点
对象可以从多种来源(字典、目录、云存储)创建。
在 Ray Tune 中,检查点
是由用户在其可训练函数中创建的,并通过 train.report
的可选 checkpoint
参数报告。检查点
可以包含任意数据,并且可以在 Ray 集群中自由传递。调优运行结束后,可以从结果中获取检查点。
Ray Tune 可以配置为 自动将检查点同步到云存储,只保留一定数量的检查点以节省空间(使用 ray.train.CheckpointConfig
)等。
备注
实验状态本身是单独检查点的。更多详情请参见 附录:Tune 存储的数据类型。
在我们的示例中,我们希望能够从最新的检查点恢复训练,并在每次迭代时将 trained_model
保存到检查点中。为此,我们将使用 session
和 Checkpoint
API。
import os
import pickle
import tempfile
from ray import train
from ray.train import Checkpoint
def training_function(config, data):
model = {
"hyperparameter_a": config["hyperparameter_a"],
"hyperparameter_b": config["hyperparameter_b"],
}
epochs = config["epochs"]
# Load the checkpoint, if there is any.
checkpoint = session.get_checkpoint()
start_epoch = 0
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
with open(os.path.join(checkpoint_dir, "model.pkl"), "w") as f:
checkpoint_dict = pickle.load(f)
start_epoch = checkpoint_dict["epoch"] + 1
model = checkpoint_dict["state"]
# Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
for epoch in range(start_epoch, epochs):
# Simulate doing something expensive.
time.sleep(1)
metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
-1
) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
checkpoint_dict = {"state": model, "epoch": epoch}
# Create the checkpoint.
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
with open(os.path.join(temp_checkpoint_dir, "model.pkl"), "w") as f:
pickle.dump(checkpoint_dict, f)
train.report(
{"metric": metric},
checkpoint=Checkpoint.from_directory(temp_checkpoint_dir),
)
tuner = Tuner(
tune.with_parameters(training_function, data=data),
param_space={
"hyperparameter_a": tune.uniform(0, 20),
"hyperparameter_b": tune.uniform(-100, 100),
"epochs": 10,
},
tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
run_config=RunConfig(
callbacks=[MLflowLoggerCallback(experiment_name="example")]
),
)
在实施了所有这些更改后,我们现在可以运行我们的调优并获得有意义的指标和工件。
results = tuner.fit()
results.get_dataframe()
2022-11-30 17:40:28,839 INFO tune.py:762 -- Total run time: 15.79 seconds (15.65 seconds for the tuning loop).
metric | time_this_iter_s | should_checkpoint | done | timesteps_total | episodes_total | training_iteration | trial_id | experiment_id | date | ... | hostname | node_ip | time_since_restore | timesteps_since_restore | iterations_since_restore | warmup_time | config/epochs | config/hyperparameter_a | config/hyperparameter_b | logdir | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | -58.399962 | 1.015951 | True | False | NaN | NaN | 10 | 0b239_00000 | acf38c19d59c4cf2ad7955807657b6ea | 2022-11-30_17-40-26 | ... | ip-172-31-43-110 | 172.31.43.110 | 10.282120 | 0 | 10 | 0.003541 | 10 | 18.065981 | -98.298928 | /home/ubuntu/ray_results/training_function_202... |
1 | -24.461518 | 1.030420 | True | False | NaN | NaN | 10 | 0b239_00001 | 5ca9e03d7cca46a7852cd501bc3f7b38 | 2022-11-30_17-40-28 | ... | ip-172-31-43-110 | 172.31.43.110 | 10.362581 | 0 | 10 | 0.004031 | 10 | 1.544918 | -47.741455 | /home/ubuntu/ray_results/training_function_202... |
2 | 18.510299 | 1.034228 | True | False | NaN | NaN | 10 | 0b239_00002 | aa38dd786c714486a8d69fa5b372df48 | 2022-11-30_17-40-28 | ... | ip-172-31-43-110 | 172.31.43.110 | 10.333781 | 0 | 10 | 0.005286 | 10 | 8.129285 | 28.846415 | /home/ubuntu/ray_results/training_function_202... |
3 | -16.138780 | 1.020072 | True | False | NaN | NaN | 10 | 0b239_00003 | 5b401e15ab614332b631d552603a8d77 | 2022-11-30_17-40-28 | ... | ip-172-31-43-110 | 172.31.43.110 | 10.242707 | 0 | 10 | 0.003809 | 10 | 17.982020 | -27.867871 | /home/ubuntu/ray_results/training_function_202... |
4 rows × 23 columns
每个试验的检查点、指标和日志目录可以通过 Tune 实验的 ResultGrid
输出访问。有关如何与返回的 ResultGrid
交互的更多信息,请参见 分析 Tune 实验结果。
我完成之后如何访问 Tune 结果?#
在完成Python会话后,您仍然可以访问结果和检查点。默认情况下,Tune会将实验结果保存到本地目录 ~/ray_results
中。您还可以配置Tune以在云中持久保存结果。有关如何配置持久化实验结果的存储选项的更多信息,请参阅 如何在 Ray Tune 中配置持久存储。
你可以通过调用 Tuner.restore(path_or_cloud_uri, trainable)
来恢复 Tune 实验,其中 path_or_cloud_uri
指向实验保存到的文件系统或云端位置。恢复 Tuner
后,你可以通过调用 Tuner.get_results()
来获取 ResultGrid
对象,然后按照上一节中的说明继续操作。