LightGBMCV

%load_ext autoreload
%autoreload 2

时间序列交叉验证与 LightGBM。

import copy
import os
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union

import lightgbm as lgb
import numpy as np
import pandas as pd
from utilsforecast.processing import backtest_splits

from mlforecast.core import (
    DateFeature,
    Freq,
    LagTransforms,
    Lags,
    TargetTransform,
    TimeSeries,
)
from nbdev import show_doc
def _mape(y_true, y_pred, ids, _dates):
    abs_pct_err = abs(y_true - y_pred) / y_true
    return abs_pct_err.groupby(ids, observed=True).mean().mean()

def _rmse(y_true, y_pred, ids, _dates):
    sq_err = (y_true - y_pred) ** 2
    return sq_err.groupby(ids, observed=True).mean().pow(0.5).mean()

_metric2fn = {'mape': _mape, 'rmse': _rmse}

def _update(bst, n):
    for _ in range(n):
        bst.update()

def _predict(ts, bst, valid, h, before_predict_callback, after_predict_callback):
    static = ts.static_features_.columns.drop(ts.id_col).tolist()
    dynamic = valid.columns.drop(static + [ts.id_col, ts.time_col, ts.target_col])
    if not dynamic.empty:
        X_df = valid.drop(columns=static + [ts.target_col])
    else:
        X_df = None
    preds = ts.predict(
        {'Booster': bst},
        horizon=h,
        before_predict_callback=before_predict_callback,
        after_predict_callback=after_predict_callback,
        X_df=X_df
    )
    return valid.merge(preds, on=[ts.id_col, ts.time_col], how='left')

def _update_and_predict(ts, bst, valid, n, h, before_predict_callback, after_predict_callback):
    _update(bst, n)
    return _predict(ts, bst, valid, h, before_predict_callback, after_predict_callback)
CVResult = Tuple[int, float]
class LightGBMCV:
    def __init__(
        self,
        freq: Freq,
        lags: Optional[Lags] = None,
        lag_transforms: Optional[LagTransforms] = None,
        date_features: Optional[Iterable[DateFeature]] = None,
        num_threads: int = 1,
        target_transforms: Optional[List[TargetTransform]] = None,        
    ):
        """Create LightGBM CV object.

        Parameters
        ----------
        freq : str or int
            Pandas offset alias, e.g. 'D', 'W-THU' or integer denoting the frequency of the series.
        lags : list of int, optional (default=None)
            Lags of the target to use as features.
        lag_transforms : dict of int to list of functions, optional (default=None)
            Mapping of target lags to their transformations.
        date_features : list of str or callable, optional (default=None)
            Features computed from the dates. Can be pandas date attributes or functions that will take the dates as input.
        num_threads : int (default=1)
            Number of threads to use when computing the features.
        target_transforms : list of transformers, optional(default=None)
            Transformations that will be applied to the target before computing the features and restored after the forecasting step.            
        """            
        self.num_threads = num_threads
        cpu_count = os.cpu_count()
        if cpu_count is None:
            num_cpus = 1
        else:
            num_cpus = cpu_count
        self.bst_threads = max(num_cpus // num_threads, 1)
        self.ts = TimeSeries(
            freq=freq,
            lags=lags,
            lag_transforms=lag_transforms,
            date_features=date_features,
            num_threads=self.bst_threads,
            target_transforms=target_transforms,
        )
        
    def __repr__(self):
        return (
            f'{self.__class__.__name__}('
            f'freq={self.ts.freq}, '
            f'lag_features={list(self.ts.transforms.keys())}, '
            f'date_features={self.ts.date_features}, '
            f'num_threads={self.num_threads}, '
            f'bst_threads={self.bst_threads})'
        )

    def setup(
        self,
        df: pd.DataFrame,
        n_windows: int,
        h: int,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',
        step_size: Optional[int] = None,
        params: Optional[Dict[str, Any]] = None,
        static_features: Optional[List[str]] = None,
        dropna: bool = True,
        keep_last_n: Optional[int] = None,
        weights: Optional[Sequence[float]] = None,
        metric: Union[str, Callable] = 'mape',
        input_size: Optional[int] = None,
    ):
        """Initialize internal data structures to iteratively train the boosters. Use this before calling partial_fit.
        
        Parameters
        ----------
        df : pandas DataFrame
            Series data in long format.
        n_windows : int
            Number of windows to evaluate.
        h : int
            Forecast horizon.
        id_col : str (default='unique_id')
            Column that identifies each serie.
        time_col : str (default='ds')
            Column that identifies each timestep, its values can be timestamps or integers.
        target_col : str (default='y')
            Column that contains the target.
        step_size : int, optional (default=None)
            Step size between each cross validation window. If None it will be equal to `h`.
        params : dict, optional(default=None)
            Parameters to be passed to the LightGBM Boosters.       
        static_features : list of str, optional (default=None)
            Names of the features that are static and will be repeated when forecasting.
        dropna : bool (default=True)
            Drop rows with missing values produced by the transformations.
        keep_last_n : int, optional (default=None)
            Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
        weights : sequence of float, optional (default=None)
            Weights to multiply the metric of each window. If None, all windows have the same weight.
        metric : str or callable, default='mape'
            Metric used to assess the performance of the models and perform early stopping.
        input_size : int, optional (default=None)
            Maximum training samples per serie in each window. If None, will use an expanding window.        

        Returns
        -------
        self : LightGBMCV
            CV object with internal data structures for partial_fit.
        """
        if weights is None:
            self.weights = np.full(n_windows, 1 / n_windows)        
        elif len(weights) != n_windows:
            raise ValueError('Must specify as many weights as the number of windows')
        else:
            self.weights = np.asarray(weights)
        if callable(metric):
            self.metric_fn = metric
            self.metric_name = metric.__name__
        else:
            if metric not in _metric2fn:
                raise ValueError(f'{metric} is not one of the implemented metrics: ({", ".join(_metric2fn.keys())})')
            self.metric_fn = _metric2fn[metric]
            self.metric_name = metric
        self.items = []
        self.h = h
        self.id_col = id_col
        self.time_col = time_col
        self.target_col = target_col
        self.params = {} if params is None else params
        splits = backtest_splits(
            df,
            n_windows=n_windows,
            h=h,
            id_col=id_col,
            time_col=time_col,
            freq=self.ts.freq,
            step_size=step_size,
            input_size=input_size,
        )
        for _, train, valid in splits:
            ts = copy.deepcopy(self.ts)
            prep = ts.fit_transform(train, id_col, time_col, target_col, static_features, dropna, keep_last_n)
            assert isinstance(prep, pd.DataFrame)
            ds = lgb.Dataset(prep.drop(columns=[id_col, time_col, target_col]), prep[target_col]).construct()
            bst = lgb.Booster({**self.params, 'num_threads': self.bst_threads}, ds)
            bst.predict = partial(bst.predict, num_threads=self.bst_threads)
            self.items.append((ts, bst, valid))
        return self

    def _single_threaded_partial_fit(
        self,
        metric_values,
        num_iterations,
        before_predict_callback: Optional[Callable] = None,
        after_predict_callback: Optional[Callable] = None,
    ):  
        for j, (ts, bst, valid) in enumerate(self.items):
            preds = _update_and_predict(
                ts=ts,
                bst=bst,
                valid=valid,
                n=num_iterations,
                h=self.h,
                before_predict_callback=before_predict_callback,
                after_predict_callback=after_predict_callback,
            )
            metric_values[j] = self.metric_fn(
                preds[self.target_col], preds['Booster'], preds[self.id_col], preds[self.time_col]
            )

    def _multithreaded_partial_fit(
        self,
        metric_values,
        num_iterations,
        before_predict_callback: Optional[Callable] = None,
        after_predict_callback: Optional[Callable] = None,
    ):                           
        with ThreadPoolExecutor(self.num_threads) as executor:
            futures = []
            for ts, bst, valid in self.items:
                _update(bst, num_iterations)
                future = executor.submit(
                    _predict,
                    ts=ts,
                    bst=bst,
                    valid=valid,
                    h=self.h,
                    before_predict_callback=before_predict_callback,
                    after_predict_callback=after_predict_callback,
                )
                futures.append(future)
            cv_preds = [f.result() for f in futures]
        metric_values[:] = [
            self.metric_fn(
                preds[self.target_col], preds['Booster'], preds[self.id_col], preds[self.time_col]
            )
            for preds in cv_preds
        ]
        
    def partial_fit(
        self,
        num_iterations: int,
        before_predict_callback: Optional[Callable] = None,
        after_predict_callback: Optional[Callable] = None,
    ) -> float:
        """Train the boosters for some iterations.
        
        Parameters
        ----------
        num_iterations : int
            Number of boosting iterations to run
        before_predict_callback : callable, optional (default=None)
            Function to call on the features before computing the predictions.
                This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
                The series identifier is on the index.
        after_predict_callback : callable, optional (default=None)
            Function to call on the predictions before updating the targets.
                This function will take a pandas Series with the predictions and should return another one with the same structure.
                The series identifier is on the index.  
                    
        Returns
        -------
        metric_value : float
            Weighted metric after training for num_iterations.
        """
        metric_values = np.empty(len(self.items))
        if self.num_threads == 1:
            self._single_threaded_partial_fit(
                metric_values, num_iterations, before_predict_callback, after_predict_callback
            )
        else:
            self._multithreaded_partial_fit(
                metric_values, num_iterations, before_predict_callback, after_predict_callback
            )
        return metric_values @ self.weights
    
    def should_stop(self, hist, early_stopping_evals, early_stopping_pct) -> bool:
        if len(hist) < early_stopping_evals + 1:
            return False
        improvement_pct = 1 - hist[-1][1] / hist[-(early_stopping_evals + 1)][1]
        return improvement_pct < early_stopping_pct

    def find_best_iter(self, hist, early_stopping_evals) -> int:
        best_iter, best_score = hist[-1]
        for r, m in hist[-(early_stopping_evals + 1):-1]:
            if m < best_score:
                best_score = m
                best_iter = r
        return best_iter

    def fit(
        self,
        df: pd.DataFrame,
        n_windows: int,
        h: int,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',
        step_size: Optional[int] = None,
        num_iterations: int = 100,
        params: Optional[Dict[str, Any]] = None,
        static_features: Optional[List[str]] = None,
        dropna: bool = True,
        keep_last_n: Optional[int] = None,
        eval_every: int = 10,
        weights: Optional[Sequence[float]] = None,
        metric: Union[str, Callable] = 'mape',
        verbose_eval: bool = True,
        early_stopping_evals: int = 2,
        early_stopping_pct: float = 0.01,
        compute_cv_preds: bool = False,
        before_predict_callback: Optional[Callable] = None,
        after_predict_callback: Optional[Callable] = None,
        input_size: Optional[int] = None,
    ) -> List[CVResult]:
        """Train boosters simultaneously and assess their performance on the complete forecasting window.
        
        Parameters
        ----------
        df : pandas DataFrame
            Series data in long format.
        n_windows : int
            Number of windows to evaluate.
        h : int
            Forecast horizon.
        id_col : str (default='unique_id')
            Column that identifies each serie.
        time_col : str (default='ds')
            Column that identifies each timestep, its values can be timestamps or integers.
        target_col : str (default='y')
            Column that contains the target.
        step_size : int, optional (default=None)
            Step size between each cross validation window. If None it will be equal to `h`.
        num_iterations : int (default=100)
            Maximum number of boosting iterations to run.
        params : dict, optional(default=None)
            Parameters to be passed to the LightGBM Boosters.            
        static_features : list of str, optional (default=None)
            Names of the features that are static and will be repeated when forecasting.
        dropna : bool (default=True)
            Drop rows with missing values produced by the transformations.
        keep_last_n : int, optional (default=None)
            Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
        eval_every : int (default=10)
            Number of boosting iterations to train before evaluating on the whole forecast window.
        weights : sequence of float, optional (default=None)
            Weights to multiply the metric of each window. If None, all windows have the same weight.
        metric : str or callable, default='mape'
            Metric used to assess the performance of the models and perform early stopping.
        verbose_eval : bool
            Print the metrics of each evaluation.
        early_stopping_evals : int (default=2)
            Maximum number of evaluations to run without improvement.
        early_stopping_pct : float (default=0.01)
            Minimum percentage improvement in metric value in `early_stopping_evals` evaluations.
        compute_cv_preds : bool (default=True)
            Compute predictions for each window after finding the best iteration.        
        before_predict_callback : callable, optional (default=None)
            Function to call on the features before computing the predictions.
                This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
                The series identifier is on the index.
        after_predict_callback : callable, optional (default=None)
            Function to call on the predictions before updating the targets.
                This function will take a pandas Series with the predictions and should return another one with the same structure.
                The series identifier is on the index.
        input_size : int, optional (default=None)
            Maximum training samples per serie in each window. If None, will use an expanding window.

        Returns
        -------
        cv_result : list of tuple.
            List of (boosting rounds, metric value) tuples.
        """
        self.setup(
            df=df,
            n_windows=n_windows,
            h=h,
            params=params,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            input_size=input_size,
            step_size=step_size,
            static_features=static_features,
            dropna=dropna,
            keep_last_n=keep_last_n,
            weights=weights,
            metric=metric,
        )
        hist = []
        for i in range(0, num_iterations, eval_every):
            metric_value = self.partial_fit(eval_every, before_predict_callback, after_predict_callback)
            rounds = eval_every + i
            hist.append((rounds, metric_value))
            if verbose_eval:
                print(f'[{rounds:,d}] {self.metric_name}: {metric_value:,f}')                
            if self.should_stop(hist, early_stopping_evals, early_stopping_pct):
                print(f"Early stopping at round {rounds:,}")
                break
        self.best_iteration_ = self.find_best_iter(hist, early_stopping_evals)
        print(f'Using best iteration: {self.best_iteration_:,}')
        hist = hist[:self.best_iteration_ // eval_every]
        for _, bst, _ in self.items:
            bst.best_iteration = self.best_iteration_

        self.cv_models_ = {f'Booster{i}': item[1] for i, item in enumerate(self.items)}
        if compute_cv_preds:
            with ThreadPoolExecutor(self.num_threads) as executor:
                futures = []            
                for ts, bst, valid in self.items:
                    future = executor.submit(
                        _predict,
                        ts=ts,
                        bst=bst,
                        valid=valid,
                        h=self.h,
                        before_predict_callback=before_predict_callback,
                        after_predict_callback=after_predict_callback,
                    )
                    futures.append(future)            
                self.cv_preds_ = pd.concat([f.result().assign(window=i) for i, f in enumerate(futures)])
        self.ts._fit(df, id_col, time_col, target_col, static_features, keep_last_n)
        self.ts.as_numpy = False
        return hist

    def predict(
        self,
        h: int,
        before_predict_callback: Optional[Callable] = None,
        after_predict_callback: Optional[Callable] = None,
        X_df: Optional[pd.DataFrame] = None,
    ) -> pd.DataFrame:
        """Compute predictions with each of the trained boosters.
        
        Parameters
        ----------
        h : int
            Forecast horizon.
        before_predict_callback : callable, optional (default=None)
            Function to call on the features before computing the predictions.
                This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
                The series identifier is on the index.
        after_predict_callback : callable, optional (default=None)
            Function to call on the predictions before updating the targets.
                This function will take a pandas Series with the predictions and should return another one with the same structure.
                The series identifier is on the index.
        X_df : pandas DataFrame, optional (default=None)
            Dataframe with the future exogenous features. Should have the id column and the time column.                             
                    
        Returns
        -------
        result : pandas DataFrame
            Predictions for each serie and timestep, with one column per window.
        """        
        return self.ts.predict(
            self.cv_models_,
            horizon=h,
            before_predict_callback=before_predict_callback,
            after_predict_callback=after_predict_callback,
            X_df=X_df,
        )
show_doc(LightGBMCV)

LightGBMCV

 LightGBMCV (freq:Union[int,str,pandas._libs.tslibs.offsets.BaseOffset,Non
             eType]=None, lags:Optional[Iterable[int]]=None, lag_transform
             s:Optional[Dict[int,List[Union[Callable,Tuple[Callable,Any]]]
             ]]=None,
             date_features:Optional[Iterable[Union[str,Callable]]]=None,
             num_threads:int=1, target_transforms:Optional[List[mlforecast
             .target_transforms.BaseTargetTransform]]=None)

Create LightGBM CV object.

Type Default Details
freq Union None Pandas offset alias, e.g. ‘D’, ‘W-THU’ or integer denoting the frequency of the series.
lags Optional None Lags of the target to use as features.
lag_transforms Optional None Mapping of target lags to their transformations.
date_features Optional None Features computed from the dates. Can be pandas date attributes or functions that will take the dates as input.
num_threads int 1 Number of threads to use when computing the features.
target_transforms Optional None Transformations that will be applied to the target before computing the features and restored after the forecasting step.

示例

这展示了仅包含 M4 数据集的 4 个序列的示例。如果您想在所有序列上自己运行,可以参考 这个笔记本

import random

from datasetsforecast.m4 import M4, M4Info
from fastcore.test import test_eq, test_fail
from mlforecast.target_transforms import Differences
from nbdev import show_doc
from window_ops.ewm import ewm_mean
from window_ops.rolling import rolling_mean, seasonal_rolling_mean
group = 'Hourly'
await M4.async_download('data', group=group)
df, *_ = M4.load(directory='data', group=group)
df['ds'] = df['ds'].astype('int')
ids = df['unique_id'].unique()
random.seed(0)
sample_ids = random.choices(ids, k=4)
sample_df = df[df['unique_id'].isin(sample_ids)]
sample_df
unique_id ds y
86796 H196 1 11.8
86797 H196 2 11.4
86798 H196 3 11.1
86799 H196 4 10.8
86800 H196 5 10.6
... ... ... ...
325235 H413 1004 99.0
325236 H413 1005 88.0
325237 H413 1006 47.0
325238 H413 1007 41.0
325239 H413 1008 34.0

4032 rows × 3 columns

info = M4Info[group]
horizon = info.horizon
valid = sample_df.groupby('unique_id').tail(horizon)
train = sample_df.drop(valid.index)
train.shape, valid.shape
((3840, 3), (192, 3))

LightGBMCV的作用是模拟LightGBM的cv函数,在数据的不同分区上同时训练多个Boosters,即一次性执行所有的提升迭代。这允许我们对每次迭代的误差进行估计,因此如果将其与提前停止相结合,我们可以找到最佳迭代,以便使用所有数据训练最终模型,或者甚至使用这些单独模型的预测来计算集成。

为了对我们模型的预测性能有一个良好的估计,我们计算整个测试期的预测并基于此计算一个指标。由于这一步骤可能会降低训练速度,因此有一个eval_every参数可以用来控制这一点,即,如果eval_every=10(默认值),那么在每10次提升迭代中,我们将为完整窗口计算预测并报告误差。

我们还有提前停止参数:

  • early_stopping_evals:在停止训练之前,我们需要无改善的完整窗口评估次数为多少?
  • early_stopping_pct:在这些early_stopping_evals中,我们希望达到的最小百分比改善是多少,以便继续训练?

这使得LightGBMCV类成为快速测试模型不同配置的一个好工具。考虑以下示例,我们要尝试找出哪些特征可以提高模型的性能。我们首先仅使用滞后项。

static_fit_config = dict(
    n_windows=2,
    h=horizon,
    params={'verbose': -1},
    compute_cv_preds=True,
)
cv = LightGBMCV(
    freq=1,
    lags=[24 * (i+1) for i in range(7)],  # 一周的延迟
)
show_doc(LightGBMCV.fit)

LightGBMCV.fit

 LightGBMCV.fit (df:pandas.core.frame.DataFrame, n_windows:int, h:int,
                 id_col:str='unique_id', time_col:str='ds',
                 target_col:str='y', step_size:Optional[int]=None,
                 num_iterations:int=100,
                 params:Optional[Dict[str,Any]]=None,
                 static_features:Optional[List[str]]=None,
                 dropna:bool=True, keep_last_n:Optional[int]=None,
                 eval_every:int=10,
                 weights:Optional[Sequence[float]]=None,
                 metric:Union[str,Callable]='mape',
                 verbose_eval:bool=True, early_stopping_evals:int=2,
                 early_stopping_pct:float=0.01,
                 compute_cv_preds:bool=False,
                 before_predict_callback:Optional[Callable]=None,
                 after_predict_callback:Optional[Callable]=None,
                 input_size:Optional[int]=None)

Train boosters simultaneously and assess their performance on the complete forecasting window.

Type Default Details
df DataFrame Series data in long format.
n_windows int Number of windows to evaluate.
h int Forecast horizon.
id_col str unique_id Column that identifies each serie.
time_col str ds Column that identifies each timestep, its values can be timestamps or integers.
target_col str y Column that contains the target.
step_size Optional None Step size between each cross validation window. If None it will be equal to h.
num_iterations int 100 Maximum number of boosting iterations to run.
params Optional None Parameters to be passed to the LightGBM Boosters.
static_features Optional None Names of the features that are static and will be repeated when forecasting.
dropna bool True Drop rows with missing values produced by the transformations.
keep_last_n Optional None Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
eval_every int 10 Number of boosting iterations to train before evaluating on the whole forecast window.
weights Optional None Weights to multiply the metric of each window. If None, all windows have the same weight.
metric Union mape Metric used to assess the performance of the models and perform early stopping.
verbose_eval bool True Print the metrics of each evaluation.
early_stopping_evals int 2 Maximum number of evaluations to run without improvement.
early_stopping_pct float 0.01 Minimum percentage improvement in metric value in early_stopping_evals evaluations.
compute_cv_preds bool False Compute predictions for each window after finding the best iteration.
before_predict_callback Optional None Function to call on the features before computing the predictions.
This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
The series identifier is on the index.
after_predict_callback Optional None Function to call on the predictions before updating the targets.
This function will take a pandas Series with the predictions and should return another one with the same structure.
The series identifier is on the index.
input_size Optional None Maximum training samples per serie in each window. If None, will use an expanding window.
Returns List List of (boosting rounds, metric value) tuples.
hist = cv.fit(train, **static_fit_config)
[LightGBM] [Info] Start training from score 51.745632
[10] mape: 0.590690
[20] mape: 0.251093
[30] mape: 0.143643
[40] mape: 0.109723
[50] mape: 0.102099
[60] mape: 0.099448
[70] mape: 0.098349
[80] mape: 0.098006
[90] mape: 0.098718
Early stopping at round 90
Using best iteration: 80

通过设置 compute_cv_preds,我们可以获得每个模型在其对应的验证折上的预测。

cv.cv_preds_
unique_id ds y Booster window
0 H196 865 15.5 15.522924 0
1 H196 866 15.1 14.985832 0
2 H196 867 14.8 14.667901 0
3 H196 868 14.4 14.514592 0
4 H196 869 14.2 14.035793 0
... ... ... ... ... ...
187 H413 956 59.0 77.227905 1
188 H413 957 58.0 80.589641 1
189 H413 958 53.0 53.986834 1
190 H413 959 38.0 36.749786 1
191 H413 960 46.0 36.281225 1

384 rows × 5 columns

我们训练的各个模型已被保存,因此调用 predict 会返回每个训练模型的预测结果。

show_doc(LightGBMCV.predict)

LightGBMCV.predict

 LightGBMCV.predict (h:int,
                     before_predict_callback:Optional[Callable]=None,
                     after_predict_callback:Optional[Callable]=None,
                     X_df:Optional[pandas.core.frame.DataFrame]=None)

Compute predictions with each of the trained boosters.

Type Default Details
h int Forecast horizon.
before_predict_callback Optional None Function to call on the features before computing the predictions.
This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
The series identifier is on the index.
after_predict_callback Optional None Function to call on the predictions before updating the targets.
This function will take a pandas Series with the predictions and should return another one with the same structure.
The series identifier is on the index.
X_df Optional None Dataframe with the future exogenous features. Should have the id column and the time column.
Returns DataFrame Predictions for each serie and timestep, with one column per window.
preds = cv.predict(horizon)
preds
unique_id ds Booster0 Booster1
0 H196 961 15.670252 15.848888
1 H196 962 15.522924 15.697399
2 H196 963 14.985832 15.166213
3 H196 964 14.985832 14.723238
4 H196 965 14.562152 14.451092
... ... ... ... ...
187 H413 1004 70.695242 65.917620
188 H413 1005 66.216580 62.615788
189 H413 1006 63.896573 67.848598
190 H413 1007 46.922797 50.981950
191 H413 1008 45.006541 42.752819

192 rows × 4 columns

我们可以对这些预测进行平均并评估它们。

def evaluate_on_valid(preds):
    preds = preds.copy()
    preds['final_prediction'] = preds.drop(columns=['unique_id', 'ds']).mean(1)
    merged = preds.merge(valid, on=['unique_id', 'ds'])
    merged['abs_err'] = abs(merged['final_prediction'] - merged['y']) / merged['y']
    return merged.groupby('unique_id')['abs_err'].mean().mean()
eval1 = evaluate_on_valid(preds)
eval1
0.11036194712311806

现在,由于这些序列是按小时的数据,也许我们可以通过取第168次(24 * 7)差异来去除每日季节性,即从一周前同一小时的值中减去当前值,因此我们的目标将是 \(z_t = y_{t} - y_{t-168}\)。特征将从这个目标计算,当我们进行预测时,它们会自动重新应用。

cv2 = LightGBMCV(
    freq=1,
    target_transforms=[Differences([24 * 7])],
    lags=[24 * (i+1) for i in range(7)],
)
hist2 = cv2.fit(train, **static_fit_config)
[LightGBM] [Info] Start training from score 0.519010
[10] mape: 0.089024
[20] mape: 0.090683
[30] mape: 0.092316
Early stopping at round 30
Using best iteration: 10
assert hist2[-1][1] < hist[-1][1]

很好!我们在较少的迭代中取得了更好的分数。让我们看看这一改进是否也能体现在验证集上。

preds2 = cv2.predict(horizon)
eval2 = evaluate_on_valid(preds2)
eval2
0.08956665504570135
assert eval2 < eval1

太好了!也许我们可以尝试一些滞后变换。我们将尝试季节滚动平均,这会对“每个季节”的值进行平均,也就是说,如果我们设置 season_length=24window_size=7,那么我们将对一周中每天相同小时的值进行平均。

cv3 = LightGBMCV(
    freq=1,
    target_transforms=[Differences([24 * 7])],
    lags=[24 * (i+1) for i in range(7)],
    lag_transforms={
        48: [(seasonal_rolling_mean, 24, 7)],
    },
)
hist3 = cv3.fit(train, **static_fit_config)
[LightGBM] [Info] Start training from score 0.273641
[10] mape: 0.086724
[20] mape: 0.088466
[30] mape: 0.090536
Early stopping at round 30
Using best iteration: 10

似乎这也在起到帮助作用!

assert hist3[-1][1] < hist2[-1][1]

这是否反映在验证集上?

preds3 = cv3.predict(horizon)
eval3 = evaluate_on_valid(preds3)
eval3
0.08961279023129345

很好!mlforecast 也支持日期特征,但在这种情况下,我们的时间列是由整数构成的,因此这里的可能性不多。如您所见,这允许您更快地迭代,并获得您可以期望的模型预测性能的更好估计。

test_eq(cv.find_best_iter([(0, 1), (1, 0.5)], 1), 1)
test_eq(cv.find_best_iter([(0, 1), (1, 0.5), (2, 0.6)], 1), 1)
test_eq(cv.find_best_iter([(0, 1), (1, 0.5), (2, 0.6), (3, 0.4)], 2), 3)

如果你正在进行超参数调优,能够运行几轮试验、评估性能并判断特定配置是否没有前景并应该被放弃是很有用的。例如,optuna 具有 pruners,你可以用当前得分调用它,它会决定该试验是否应该被丢弃。我们现在将展示如何做到这一点。

由于交叉验证需要一些设置,比如 LightGBM 数据集和内部特征,我们有这个 setup 方法。

show_doc(LightGBMCV.setup)

LightGBMCV.setup

 LightGBMCV.setup (df:pandas.core.frame.DataFrame, n_windows:int, h:int,
                   id_col:str='unique_id', time_col:str='ds',
                   target_col:str='y', step_size:Optional[int]=None,
                   params:Optional[Dict[str,Any]]=None,
                   static_features:Optional[List[str]]=None,
                   dropna:bool=True, keep_last_n:Optional[int]=None,
                   weights:Optional[Sequence[float]]=None,
                   metric:Union[str,Callable]='mape',
                   input_size:Optional[int]=None)

Initialize internal data structures to iteratively train the boosters. Use this before calling partial_fit.

Type Default Details
df DataFrame Series data in long format.
n_windows int Number of windows to evaluate.
h int Forecast horizon.
id_col str unique_id Column that identifies each serie.
time_col str ds Column that identifies each timestep, its values can be timestamps or integers.
target_col str y Column that contains the target.
step_size Optional None Step size between each cross validation window. If None it will be equal to h.
params Optional None Parameters to be passed to the LightGBM Boosters.
static_features Optional None Names of the features that are static and will be repeated when forecasting.
dropna bool True Drop rows with missing values produced by the transformations.
keep_last_n Optional None Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
weights Optional None Weights to multiply the metric of each window. If None, all windows have the same weight.
metric Union mape Metric used to assess the performance of the models and perform early stopping.
input_size Optional None Maximum training samples per serie in each window. If None, will use an expanding window.
Returns LightGBMCV CV object with internal data structures for partial_fit.
cv4 = LightGBMCV(
    freq=1,
    lags=[24 * (i+1) for i in range(7)],
)
cv4.setup(
    train,
    n_windows=2,
    h=horizon,
    params={'verbose': -1},
)
LightGBMCV(freq=1, lag_features=['lag24', 'lag48', 'lag72', 'lag96', 'lag120', 'lag144', 'lag168'], date_features=[], num_threads=1, bst_threads=8)

一旦我们拥有这个,我们可以调用 partial_fit 仅进行一些迭代的训练并返回预测窗口的得分。

show_doc(LightGBMCV.partial_fit)

LightGBMCV.partial_fit

 LightGBMCV.partial_fit (num_iterations:int,
                         before_predict_callback:Optional[Callable]=None,
                         after_predict_callback:Optional[Callable]=None)

Train the boosters for some iterations.

Type Default Details
num_iterations int Number of boosting iterations to run
before_predict_callback Optional None Function to call on the features before computing the predictions.
This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
The series identifier is on the index.
after_predict_callback Optional None Function to call on the predictions before updating the targets.
This function will take a pandas Series with the predictions and should return another one with the same structure.
The series identifier is on the index.
Returns float Weighted metric after training for num_iterations.
score = cv4.partial_fit(10)
score
[LightGBM] [Info] Start training from score 51.745632
0.5906900462828166

这与我们第一个示例中的第一次评估相等。

assert hist[0][1] == score

我们现在可以使用这个分数来判断这个配置是否有前景。如果我们愿意,可以再训练更多的迭代。

score2 = cv4.partial_fit(20)

这现在等于第一个例子中的第三个指标,因为这次我们训练了20次迭代。

assert hist[2][1] == score2
%%capture
# test we don't need X_df
from mlforecast.utils import generate_daily_series, generate_prices_for_series

def before_predict_callback(df):
    assert not df['price'].isnull().any()
    return df

dynamic_series = generate_daily_series(100, equal_ends=True, n_static_features=2, static_as_categorical=False)
dynamic_series = dynamic_series.rename(columns={'static_1': 'product_id'})
prices_catalog = generate_prices_for_series(dynamic_series)
series_with_prices = dynamic_series.merge(prices_catalog, how='left')
cv = LightGBMCV(freq='D', lags=[24])
_ = cv.fit(
    series_with_prices,
    n_windows=2,
    h=5,
    params={'verbosity': -1},
    static_features=['static_0', 'product_id'],
    verbose_eval=False,
    before_predict_callback=before_predict_callback,
)

使用自定义指标

内置的指标是 MAPE 和 RMSE,它们是通过系列计算的,然后在所有系列中取平均。如果您想做一些不同的事情或使用完全不同的指标,您可以像下面这样定义自己的指标:

def weighted_mape(
    y_true: pd.Series,
    y_pred: pd.Series,
    ids: pd.Series,
    dates: pd.Series,
):
    """根据序列值的大小对MAPE进行加权"""
    abs_pct_err = abs(y_true - y_pred) / abs(y_true)
    mape_by_serie = abs_pct_err.groupby(ids).mean()
    totals_per_serie = y_pred.groupby(ids).sum()
    series_weights = totals_per_serie / totals_per_serie.sum()
    return (mape_by_serie * series_weights).sum()
_ = LightGBMCV(
    freq=1,
    lags=[24 * (i+1) for i in range(7)],
).fit(
    train,
    n_windows=2,
    h=horizon,
    params={'verbose': -1},
    metric=weighted_mape,
)
[LightGBM] [Info] Start training from score 51.745632
[10] weighted_mape: 0.480353
[20] weighted_mape: 0.218670
[30] weighted_mape: 0.161706
[40] weighted_mape: 0.149992
[50] weighted_mape: 0.149024
[60] weighted_mape: 0.148496
Early stopping at round 60
Using best iteration: 60

Give us a ⭐ on Github