获取数据进出Tune#

通常,你会发现自己需要将数据传递到 Tune Trainables(数据集、模型、其他大型参数)并从中获取数据(指标、检查点、其他工件)。在本指南中,我们将探讨不同的方法来实现这一点,并了解在什么情况下应该使用它们。

让我们从定义一个简单的可训练函数开始。我们将随着进展逐步扩展这个函数的功能。

import random
import time
import pandas as pd


def training_function(config):
    # For now, we have nothing here.
    data = None
    model = {"hyperparameter_a": None, "hyperparameter_b": None}
    epochs = 0

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}

我们的 training_function 函数需要一个 pandas DataFrame、一个带有某些超参数的模型以及训练模型的轮数作为输入。模型的超参数会影响返回的指标,并且在每个轮次(训练的迭代)中,trained_model 状态会发生变化。

我们将使用 Tuner API 进行超参数优化。

from ray.tune import Tuner
from ray import tune

tuner = Tuner(training_function, tune_config=tune.TuneConfig(num_samples=4))

将数据导入Tune#

首先,我们需要为可训练对象提供输入。我们可以将它们大致分为两类——变量和常量。

变量是我们想要调整的参数。它们对于每个 Trial 都是不同的。例如,这些可能是神经网络的学习率和批量大小,随机森林的树的数量和最大深度,或者如果你使用 Tune 作为批量训练的执行引擎,则是数据分区。

常量是每个试验中都相同的参数。这些可以是训练轮数、我们想要设置但不调整的模型超参数、数据集等等。通常,常量的规模会相当大(例如数据集或模型)。

警告

来自 training_function 外部作用域的对象也会被自动序列化并发送到 Trial Actors,这可能会导致意外行为。例如,全局锁可能无法工作(因为每个 Actor 操作的是一个副本),或者与序列化相关的常规错误。最佳实践是不要在 training_function 中引用任何外部作用域的对象。

通过搜索空间将数据传递到 Tune 运行中#

备注

简而言之 - 使用 param_space 参数来指定小型的、可序列化的常量和变量。

传递输入到 Trainables 的第一种方式是 搜索空间(它也可能被称为 参数空间配置)。在 Trainable 本身中,它映射到作为参数传递给函数的 config 字典。您使用 Tunerparam_space 参数定义搜索空间。搜索空间是一个字典,可能由 分布 组成,每个 Trial 将采样一个不同的值,或者是常量值。搜索空间可能由嵌套字典组成,而这些字典本身也可以有分布。

警告

搜索空间中的每个值将直接保存在 Trial 元数据中。这意味着搜索空间中的每个值 必须 是可序列化的,并且占用少量内存。

例如,将一个大的 pandas DataFrame 或一个不可序列化的模型对象作为搜索空间中的值传递,将导致不希望的行为。最好的情况下,它会导致大的减速和磁盘空间使用,因为保存到磁盘的 Trial 元数据也将包含这些数据。最坏的情况下,会引发异常,因为数据无法发送到 Trial 工作线程。更多详情,请参见 我如何避免瓶颈?

相反,使用字符串或其他标识符作为您的值,并根据这些值直接在您的 Trainable 内部初始化/加载对象。

备注

数据集 可以直接在搜索空间中作为值使用。

在我们的示例中,我们希望调整两个模型超参数。我们还希望设置epoch的数量,以便稍后可以轻松调整它。对于超参数,我们将使用 tune.uniform 分布。我们还将修改 training_function 以从 config 字典中获取这些值。

def training_function(config):
    # For now, we have nothing here.
    data = None

    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}


tuner = Tuner(
    training_function,
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
)

使用 tune.with_parameters 访问 Tune 运行中的数据#

备注

TL;DR - 使用 tune.with_parameters 工具函数来指定大型常量参数。

如果我们有在各个试验中保持不变的大型对象,我们可以使用 tune.with_parameters 工具直接将它们传递给可训练对象。这些对象将存储在 Ray 对象存储 中,以便每个试验工作者可以访问它们以获取用于其进程的本地副本。

小技巧

放入 Ray 对象存储中的对象必须是可序列化的。

注意,大型对象的序列化(一次)和反序列化(每次试验)可能会导致性能开销。

在我们的示例中,我们将使用 tune.with_parameters 传递 data DataFrame。为此,我们需要修改函数签名以将 data 作为参数包含在内。

def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}


tuner = Tuner(
    training_function,
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
)

下一步是将 training_function 使用 tune.with_parameters 包装,然后再传递给 Tunertune.with_parameters 调用的每个关键字参数都将映射到 Trainable 签名中的关键字参数。

data = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})

tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4),
)

在 Tune Trainable 中加载数据#

您也可以直接从云存储、共享文件存储(如NFS)或Trainable工作节点的本地磁盘加载数据。

警告

从磁盘加载时,请确保集群中的所有节点都能访问您尝试加载的文件。

一个常见的用例是通过 pandas、arrow 或其他框架从 S3 或其他云存储加载数据集。

Trainable 工作者的当前工作目录将自动更改为相应的 Trial 目录。更多详情,请参见 如何在 Tune 训练函数中访问相对文件路径?

我们的调优运行现在可以执行,尽管我们还不会得到任何有意义的输出。

results = tuner.fit()

从 Ray Tune 中获取数据#

我们现在可以使用 training_function Trainable 运行我们的调优运行。下一步是向 Tune 报告 指标,这些指标可以用于指导优化。我们还希望 检查点 我们训练好的模型,以便在打断后恢复训练,并在以后用于预测。

ray.train.report API 用于从可训练的工作者中获取数据。它可以在可训练函数中多次调用。每次调用对应于一次训练迭代(epoch、步骤、树)。

使用 Tune 报告指标#

指标 是通过 train.report 调用中的 metrics 参数传递的值。指标可以被 Tune 搜索算法调度器 用来指导搜索。在调优运行完成后,你可以 分析结果,其中包括报告的指标。

备注

与搜索空间值类似,每个报告为指标的值将直接保存在 Trial 元数据中。这意味着每个报告为指标的值 必须 是可序列化的,并且占用少量内存。

备注

Tune 将自动包含一些指标,例如训练迭代、时间戳等。查看此处获取完整列表。

在我们的例子中,我们希望最大化 metric。我们将在每个 epoch 向 Tune 报告它,并在 tune.TuneConfig 中设置 metricmode 参数,以让 Tune 知道它应该将其用作优化目标。

from ray import train


def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}
        train.report(metrics={"metric": metric})


tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
)

使用 Tune 回调记录指标#

使用 train.report 记录的每个指标都可以在调优运行期间通过 Tune 回调 访问。Ray Tune 提供了与流行框架(如 MLFlow、Weights & Biases、CometML 等)的 多个内置集成。您还可以使用 回调 API 创建自己的回调。

回调函数通过 TunerRunConfig 中的 callback 参数传递。

在我们的示例中,我们将使用 MLFlow 回调来跟踪我们的调优运行的进度以及 metric 的变化值(需要安装 mlflow)。

from ray import train
from ray.train import RunConfig
from ray.tune.logger.mlflow import MLflowLoggerCallback


def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}
        train.report(metrics={"metric": metric})


tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
    run_config=RunConfig(
        callbacks=[MLflowLoggerCallback(experiment_name="example")]
    ),
)

使用检查点和其他工件从 Tune 中获取数据#

除了指标之外,您可能还希望保存训练模型的状态以及任何其他工件,以便在训练失败时恢复,并进行进一步的检查和使用。这些不能作为指标保存,因为它们通常太大,可能不容易序列化。最后,它们应该持久化到磁盘或云存储中,以便在Tune运行中断或终止后仍然可以访问。

Ray Train 为此提供了一个 检查点 API。检查点 对象可以从多种来源(字典、目录、云存储)创建。

在 Ray Tune 中,检查点 是由用户在其可训练函数中创建的,并通过 train.report 的可选 checkpoint 参数报告。检查点 可以包含任意数据,并且可以在 Ray 集群中自由传递。调优运行结束后,可以从结果中获取检查点

Ray Tune 可以配置为 自动将检查点同步到云存储,只保留一定数量的检查点以节省空间(使用 ray.train.CheckpointConfig)等。

备注

实验状态本身是单独检查点的。更多详情请参见 附录:Tune 存储的数据类型

在我们的示例中,我们希望能够从最新的检查点恢复训练,并在每次迭代时将 trained_model 保存到检查点中。为此,我们将使用 sessionCheckpoint API。

import os
import pickle
import tempfile

from ray import train
from ray.train import Checkpoint


def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Load the checkpoint, if there is any.
    checkpoint = session.get_checkpoint()
    start_epoch = 0
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            with open(os.path.join(checkpoint_dir, "model.pkl"), "w") as f:
                checkpoint_dict = pickle.load(f)
        start_epoch = checkpoint_dict["epoch"] + 1
        model = checkpoint_dict["state"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(start_epoch, epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()

        checkpoint_dict = {"state": model, "epoch": epoch}

        # Create the checkpoint.
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            with open(os.path.join(temp_checkpoint_dir, "model.pkl"), "w") as f:
                pickle.dump(checkpoint_dict, f)
            train.report(
                {"metric": metric},
                checkpoint=Checkpoint.from_directory(temp_checkpoint_dir),
            )


tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
    run_config=RunConfig(
        callbacks=[MLflowLoggerCallback(experiment_name="example")]
    ),
)

在实施了所有这些更改后,我们现在可以运行我们的调优并获得有意义的指标和工件。

results = tuner.fit()
results.get_dataframe()
2022-11-30 17:40:28,839 INFO tune.py:762 -- Total run time: 15.79 seconds (15.65 seconds for the tuning loop).
metric time_this_iter_s should_checkpoint done timesteps_total episodes_total training_iteration trial_id experiment_id date ... hostname node_ip time_since_restore timesteps_since_restore iterations_since_restore warmup_time config/epochs config/hyperparameter_a config/hyperparameter_b logdir
0 -58.399962 1.015951 True False NaN NaN 10 0b239_00000 acf38c19d59c4cf2ad7955807657b6ea 2022-11-30_17-40-26 ... ip-172-31-43-110 172.31.43.110 10.282120 0 10 0.003541 10 18.065981 -98.298928 /home/ubuntu/ray_results/training_function_202...
1 -24.461518 1.030420 True False NaN NaN 10 0b239_00001 5ca9e03d7cca46a7852cd501bc3f7b38 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.362581 0 10 0.004031 10 1.544918 -47.741455 /home/ubuntu/ray_results/training_function_202...
2 18.510299 1.034228 True False NaN NaN 10 0b239_00002 aa38dd786c714486a8d69fa5b372df48 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.333781 0 10 0.005286 10 8.129285 28.846415 /home/ubuntu/ray_results/training_function_202...
3 -16.138780 1.020072 True False NaN NaN 10 0b239_00003 5b401e15ab614332b631d552603a8d77 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.242707 0 10 0.003809 10 17.982020 -27.867871 /home/ubuntu/ray_results/training_function_202...

4 rows × 23 columns

每个试验的检查点、指标和日志目录可以通过 Tune 实验的 ResultGrid 输出访问。有关如何与返回的 ResultGrid 交互的更多信息,请参见 分析 Tune 实验结果

我完成之后如何访问 Tune 结果?#

在完成Python会话后,您仍然可以访问结果和检查点。默认情况下,Tune会将实验结果保存到本地目录 ~/ray_results 中。您还可以配置Tune以在云中持久保存结果。有关如何配置持久化实验结果的存储选项的更多信息,请参阅 如何在 Ray Tune 中配置持久存储

你可以通过调用 Tuner.restore(path_or_cloud_uri, trainable) 来恢复 Tune 实验,其中 path_or_cloud_uri 指向实验保存到的文件系统或云端位置。恢复 Tuner 后,你可以通过调用 Tuner.get_results() 来获取 ResultGrid 对象,然后按照上一节中的说明继续操作。