使用Ray Core进行时间序列的简单自动机器学习(AutoML)#

小技巧

我们强烈建议使用 Ray Tune 进行超参数调优/自动机器学习,这将使您能够更快、更轻松地构建,并获得内置的好处,如日志记录、容错等。如果您认为您的用例无法由 Ray Tune 支持,我们非常希望通过 Ray GitHub issue 收集您的反馈。

自动机器学习(AutoML)是一个广泛的主题,但从本质上讲,它归结为为当前任务和数据集选择最佳模型(以及可能的预处理)。虽然存在多个先进的自动机器学习框架,但我们可以仅使用 Ray Core 和无状态任务快速构建一个简单的解决方案。

如果您有兴趣应用更先进的优化算法或希望利用更高水平的抽象以及多个内置功能,我们强烈推荐使用 Ray Tune 的 Tuner

在本笔记本中,我们将构建一个自动机器学习(更准确地说,是一个自动时间序列,AutoTS)系统,它将为时间序列回归任务选择最佳的 statsforecast 模型和超参数组合——在这里,我们将使用 M5 数据集 的一个分区。

简单的自动机器学习(AutoML)包括在相同数据上独立运行不同的函数(超参数配置)。我们希望训练具有不同配置的模型,并对其进行评估,以获得各种指标,如均方误差。在所有配置评估完成后,我们将能够根据我们想要使用的指标选择最佳配置。

自动机器学习

为了使这个例子更具实际意义,我们将使用时间序列交叉验证(CV)作为我们的评估策略。交叉验证通过评估模型k次来工作,每次选择不同的数据子集(折)进行训练和评估。这允许对性能进行更稳健的估计,并有助于防止过拟合,特别是在小数据集的情况下。换句话说,我们将进行n * k次独立评估,其中n是配置的数量,k是折的数量。

演练#

让我们开始导入Ray并初始化一个本地Ray集群。

from typing import List, Union, Callable, Dict, Type, Tuple
import time
import ray
import itertools
import pandas as pd
import numpy as np
from collections import defaultdict
from statsforecast import StatsForecast
from statsforecast.models import ETS, AutoARIMA, _TS
from pyarrow import parquet as pq
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error
ray.init(ignore_reinit_error=True)

我们将把逻辑分解为几个函数和一个Ray 任务

Ray任务是train_and_evaluate_fold,它包含了在数据的CV折叠上拟合和评估模型所需的所有逻辑。我们构建我们的任务使其接受一个数据集和索引,将其分割为训练集和测试集 - 这样,我们可以在Ray对象存储中保持一个数据集的实例,并在每个任务中单独进行拆分。我们将其定义为Ray任务,因为我们希望所有折叠能够在Ray集群上并行评估 - Ray将处理所有的协调和执行。每个任务默认会保留1个CPU核心。

@ray.remote
def train_and_evaluate_fold(
    model: _TS,
    df: pd.DataFrame,
    train_indices: np.ndarray,
    test_indices: np.ndarray,
    label_column: str,
    metrics: Dict[str, Callable[[pd.Series, pd.Series], float]],
    freq: str = "D",
) -> Dict[str, float]:
    try:
        # 使用训练数据和模型创建 StatsForecast 对象。
        statsforecast = StatsForecast(
            df=df.iloc[train_indices], models=[model], freq=freq
        )
        # 对测试数据进行预测并计算指标。
        # 这将首先自动拟合模型。
        forecast = statsforecast.forecast(len(test_indices))
        return {
            metric_name: metric(
                df.iloc[test_indices][label_column], forecast[model.__class__.__name__]
            )
            for metric_name, metric in metrics.items()
        }
    except Exception:
        # 如果模型拟合或评估失败,则所有指标均返回None。
        return {metric_name: None for metric_name, metric in metrics.items()}

evaluate_models_with_cv 是一个驱动函数,用于运行我们的优化循环。我们传入一个模型列表(参数已设置)和数据框。

数据框被放入 Ray 对象存储中并被重用,这意味着我们只需序列化一次。通过这种方式,我们避免了 反模式:重复按值传递相同的大参数会损害性能

我们将每个折的拟合视为一个单独的任务。我们为每个模型生成 k 个任务,并通过调用 ray.get() 等待它们完成,这将在所有任务完成并收集结果之前保持阻塞。然后,我们聚合返回的指标,以计算每个模型每个折的平均指标。

def evaluate_models_with_cv(
    models: List[_TS],
    df: pd.DataFrame,
    label_column: str,
    metrics: Dict[str, Callable[[pd.Series, pd.Series], float]],
    freq: str = "D",
    cv: Union[int, TimeSeriesSplit] = 5,
) -> Dict[_TS, Dict[str, float]]:
    # 获取每个折叠的CV训练-测试索引。
    if isinstance(cv, int):
        cv = TimeSeriesSplit(cv)
    train_test_indices = list(cv.split(df))

    # 将 df 放入 Ray 对象存储中以获得更好的性能。
    df_ref = ray.put(df)

    # 为每个折叠添加要执行的任务。
    fold_refs = []
    for model in models:
        fold_refs.extend(
            [
                train_and_evaluate_fold.remote(
                    model,
                    df_ref,
                    train_indices,
                    test_indices,
                    label_column,
                    metrics,
                    freq=freq,
                )
                for train_indices, test_indices in train_test_indices
            ]
        )

    fold_results = ray.get(fold_refs)

    # 将折叠结果拆分为多个大小为CV分割的块列表。
    # Ray 确保顺序得以保留。
    fold_results_per_model = [
        fold_results[i : i + len(train_test_indices)]
        for i in range(0, len(fold_results), len(train_test_indices))
    ]

    # 汇总并平均每个模型在所有折叠中的结果。
    # 我们从字典列表转换为列表字典,然后
    # 获取这些列表的平均值。
    mean_results_per_model = []
    for model_results in fold_results_per_model:
        aggregated_results = defaultdict(list)
        for fold_result in model_results:
            for metric, value in fold_result.items():
                aggregated_results[metric].append(value)
        mean_results = {
            metric: np.mean(values) for metric, values in aggregated_results.items()
        }
        mean_results_per_model.append(mean_results)

    # 将模型及其指标结合起来。
    mean_results_per_model = {
        models[i]: mean_results_per_model[i] for i in range(len(mean_results_per_model))
    }
    return mean_results_per_model

最后,我们需要定义将字典搜索空间转换为可传递给 evaluate_models_with_cv 的实例化模型的逻辑。

备注

scikit-learn 和 statsforecast 模型可以轻松序列化并且非常小,这意味着实例化的模型可以轻松传递到 Ray 集群中。对于其他框架,例如 Torch,您可能希望在拟合模型的任务中实例化模型,以避免出现问题。

我们的 generate_configurations 生成器翻译一个二级字典,其中键是模型类,值是参数的字典及其可能值的列表。我们想要进行网格搜索,这意味着我们想评估给定模型的每一种可能的超参数组合。

我们稍后将使用的搜索空间如下:

{
    AutoARIMA: {},
    ETS: {
        "season_length": [6, 7],
        "model": ["ZNA", "ZZZ"]
    }
}

它将转换为以下模型:

AutoARIMA(),
ETS(season_length=6, model="ZNA")
ETS(season_length=7, model="ZNA")
ETS(season_length=6, model="ZZZ")
ETS(season_length=7, model="ZZZ")

evaluate_search_space_with_cv 是我们 AutoML 系统的入口点,它接受搜索空间、数据框、标签列、指标、用于选择最佳配置的指标、是否希望最小化或最大化它、数据的频率以及要使用的 scikit-learn 的 TimeSeriesSplit 交叉验证分割器。

def generate_configurations(search_space: Dict[Type[_TS], Dict[str, list]]) -> _TS:
    # 将字典搜索空间转换为配置——使用特定参数实例化的模型。
    for model, model_search_space in search_space.items():
        kwargs, values = model_search_space.keys(), model_search_space.values()
        # 获取产品 - 所有按型号划分的组合。
        for configuration in itertools.product(*values):
            yield model(**dict(zip(kwargs, configuration)))


def evaluate_search_space_with_cv(
    search_space: Dict[Type[_TS], Dict[str, list]],
    df: pd.DataFrame,
    label_column: str,
    metrics: Dict[str, Callable[[pd.Series, pd.Series], float]],
    eval_metric: str,
    mode: str = "min",
    freq: str = "D",
    cv: Union[int, TimeSeriesSplit] = 5,
) -> List[Tuple[_TS, Dict[str, float]]]:
    assert eval_metric in metrics
    assert mode in ("min", "max")

    configurations = list(generate_configurations(search_space))
    print(
        f"Evaluating {len(configurations)} configurations with {cv.get_n_splits()} splits each, "
        f"totalling {len(configurations)*cv.get_n_splits()} tasks..."
    )
    ret = evaluate_models_with_cv(
        configurations, df, label_column, metrics, freq=freq, cv=cv
    )

    # 按 eval_metric 对结果进行排序
    ret = sorted(ret.items(), key=lambda x: x[1][eval_metric], reverse=(mode == "max"))
    print("Evaluation complete!")
    return ret

随着我们的系统完成,我们只需要一个快速的辅助函数来从 S3 存储桶中获取数据,并将其预处理为 statsforecast 所期望的格式。由于数据集相当庞大,我们使用 PyArrow 的推导谓词作为过滤器,以仅获取我们关心的行,而无需将它们全部加载到内存中。

def get_m5_partition(unique_id: str) -> pd.DataFrame:
    ds1 = pq.read_table(
        "s3://anonymous@m5-benchmarks/data/train/target.parquet",
        filters=[("item_id", "=", unique_id)],
    )
    Y_df = ds1.to_pandas()
    # StatsForecasts 需要特定的列名!
    Y_df = Y_df.rename(
        columns={"item_id": "unique_id", "timestamp": "ds", "demand": "y"}
    )
    Y_df["unique_id"] = Y_df["unique_id"].astype(str)
    Y_df["ds"] = pd.to_datetime(Y_df["ds"])
    Y_df = Y_df.dropna()
    constant = 10
    Y_df["y"] += constant
    return Y_df[Y_df.unique_id == unique_id]
df = get_m5_partition("FOODS_1_001_CA_1")
df
unique_id ds y
0 FOODS_1_001_CA_1 2011-01-29 13.0
1 FOODS_1_001_CA_1 2011-01-30 10.0
2 FOODS_1_001_CA_1 2011-01-31 10.0
3 FOODS_1_001_CA_1 2011-02-01 11.0
4 FOODS_1_001_CA_1 2011-02-02 14.0
... ... ... ...
1936 FOODS_1_001_CA_1 2016-05-18 10.0
1937 FOODS_1_001_CA_1 2016-05-19 11.0
1938 FOODS_1_001_CA_1 2016-05-20 10.0
1939 FOODS_1_001_CA_1 2016-05-21 10.0
1940 FOODS_1_001_CA_1 2016-05-22 10.0

1941 rows × 3 columns

我们现在可以运行我们的AutoML系统,使用我们的搜索空间获取最佳模型及其配置。我们将使用scikit-learn实现的均方误差(MSE)和平均绝对误差(MAE)作为指标,其中前者是我们希望优化的目标。

tuning_results = evaluate_search_space_with_cv(
    {AutoARIMA: {}, ETS: {"season_length": [6, 7], "model": ["ZNA", "ZZZ"]}},
    df,
    "y",
    {"mse": mean_squared_error, "mae": mean_absolute_error},
    "mse",
    cv=TimeSeriesSplit(test_size=1),
)
Evaluating 5 configurations with 5 splits each, totalling 25 tasks...
Evaluation complete!

我们可以看到,在我们的搜索空间中,最小化均方误差(MSE)效果最佳的模型是一个季节长度为6的ZNA ETS模型。

print(tuning_results[0])

# 打印模型的参数:
print(tuning_results[0][0].__dict__)
(ETS, {'mse': 0.64205205, 'mae': 0.7200615})
{'season_length': 6, 'model': 'ZNA'}