%load_ext autoreload
%autoreload 2
LightGBMCV
时间序列交叉验证与 LightGBM。
import copy
import os
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union
import lightgbm as lgb
import numpy as np
import pandas as pd
from utilsforecast.processing import backtest_splits
from mlforecast.core import (
DateFeature,
Freq,
LagTransforms,
Lags,
TargetTransform,
TimeSeries, )
from nbdev import show_doc
def _mape(y_true, y_pred, ids, _dates):
= abs(y_true - y_pred) / y_true
abs_pct_err return abs_pct_err.groupby(ids, observed=True).mean().mean()
def _rmse(y_true, y_pred, ids, _dates):
= (y_true - y_pred) ** 2
sq_err return sq_err.groupby(ids, observed=True).mean().pow(0.5).mean()
= {'mape': _mape, 'rmse': _rmse}
_metric2fn
def _update(bst, n):
for _ in range(n):
bst.update()
def _predict(ts, bst, valid, h, before_predict_callback, after_predict_callback):
= ts.static_features_.columns.drop(ts.id_col).tolist()
static = valid.columns.drop(static + [ts.id_col, ts.time_col, ts.target_col])
dynamic if not dynamic.empty:
= valid.drop(columns=static + [ts.target_col])
X_df else:
= None
X_df = ts.predict(
preds 'Booster': bst},
{=h,
horizon=before_predict_callback,
before_predict_callback=after_predict_callback,
after_predict_callback=X_df
X_df
)return valid.merge(preds, on=[ts.id_col, ts.time_col], how='left')
def _update_and_predict(ts, bst, valid, n, h, before_predict_callback, after_predict_callback):
_update(bst, n)return _predict(ts, bst, valid, h, before_predict_callback, after_predict_callback)
= Tuple[int, float] CVResult
class LightGBMCV:
def __init__(
self,
freq: Freq,= None,
lags: Optional[Lags] = None,
lag_transforms: Optional[LagTransforms] = None,
date_features: Optional[Iterable[DateFeature]] int = 1,
num_threads: = None,
target_transforms: Optional[List[TargetTransform]]
):"""Create LightGBM CV object.
Parameters
----------
freq : str or int
Pandas offset alias, e.g. 'D', 'W-THU' or integer denoting the frequency of the series.
lags : list of int, optional (default=None)
Lags of the target to use as features.
lag_transforms : dict of int to list of functions, optional (default=None)
Mapping of target lags to their transformations.
date_features : list of str or callable, optional (default=None)
Features computed from the dates. Can be pandas date attributes or functions that will take the dates as input.
num_threads : int (default=1)
Number of threads to use when computing the features.
target_transforms : list of transformers, optional(default=None)
Transformations that will be applied to the target before computing the features and restored after the forecasting step.
"""
self.num_threads = num_threads
= os.cpu_count()
cpu_count if cpu_count is None:
= 1
num_cpus else:
= cpu_count
num_cpus self.bst_threads = max(num_cpus // num_threads, 1)
self.ts = TimeSeries(
=freq,
freq=lags,
lags=lag_transforms,
lag_transforms=date_features,
date_features=self.bst_threads,
num_threads=target_transforms,
target_transforms
)
def __repr__(self):
return (
f'{self.__class__.__name__}('
f'freq={self.ts.freq}, '
f'lag_features={list(self.ts.transforms.keys())}, '
f'date_features={self.ts.date_features}, '
f'num_threads={self.num_threads}, '
f'bst_threads={self.bst_threads})'
)
def setup(
self,
df: pd.DataFrame,int,
n_windows: int,
h: str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: int] = None,
step_size: Optional[str, Any]] = None,
params: Optional[Dict[str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[float]] = None,
weights: Optional[Sequence[str, Callable] = 'mape',
metric: Union[int] = None,
input_size: Optional[
):"""Initialize internal data structures to iteratively train the boosters. Use this before calling partial_fit.
Parameters
----------
df : pandas DataFrame
Series data in long format.
n_windows : int
Number of windows to evaluate.
h : int
Forecast horizon.
id_col : str (default='unique_id')
Column that identifies each serie.
time_col : str (default='ds')
Column that identifies each timestep, its values can be timestamps or integers.
target_col : str (default='y')
Column that contains the target.
step_size : int, optional (default=None)
Step size between each cross validation window. If None it will be equal to `h`.
params : dict, optional(default=None)
Parameters to be passed to the LightGBM Boosters.
static_features : list of str, optional (default=None)
Names of the features that are static and will be repeated when forecasting.
dropna : bool (default=True)
Drop rows with missing values produced by the transformations.
keep_last_n : int, optional (default=None)
Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
weights : sequence of float, optional (default=None)
Weights to multiply the metric of each window. If None, all windows have the same weight.
metric : str or callable, default='mape'
Metric used to assess the performance of the models and perform early stopping.
input_size : int, optional (default=None)
Maximum training samples per serie in each window. If None, will use an expanding window.
Returns
-------
self : LightGBMCV
CV object with internal data structures for partial_fit.
"""
if weights is None:
self.weights = np.full(n_windows, 1 / n_windows)
elif len(weights) != n_windows:
raise ValueError('Must specify as many weights as the number of windows')
else:
self.weights = np.asarray(weights)
if callable(metric):
self.metric_fn = metric
self.metric_name = metric.__name__
else:
if metric not in _metric2fn:
raise ValueError(f'{metric} is not one of the implemented metrics: ({", ".join(_metric2fn.keys())})')
self.metric_fn = _metric2fn[metric]
self.metric_name = metric
self.items = []
self.h = h
self.id_col = id_col
self.time_col = time_col
self.target_col = target_col
self.params = {} if params is None else params
= backtest_splits(
splits
df,=n_windows,
n_windows=h,
h=id_col,
id_col=time_col,
time_col=self.ts.freq,
freq=step_size,
step_size=input_size,
input_size
)for _, train, valid in splits:
= copy.deepcopy(self.ts)
ts = ts.fit_transform(train, id_col, time_col, target_col, static_features, dropna, keep_last_n)
prep assert isinstance(prep, pd.DataFrame)
= lgb.Dataset(prep.drop(columns=[id_col, time_col, target_col]), prep[target_col]).construct()
ds = lgb.Booster({**self.params, 'num_threads': self.bst_threads}, ds)
bst = partial(bst.predict, num_threads=self.bst_threads)
bst.predict self.items.append((ts, bst, valid))
return self
def _single_threaded_partial_fit(
self,
metric_values,
num_iterations,= None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable]
): for j, (ts, bst, valid) in enumerate(self.items):
= _update_and_predict(
preds =ts,
ts=bst,
bst=valid,
valid=num_iterations,
n=self.h,
h=before_predict_callback,
before_predict_callback=after_predict_callback,
after_predict_callback
)= self.metric_fn(
metric_values[j] self.target_col], preds['Booster'], preds[self.id_col], preds[self.time_col]
preds[
)
def _multithreaded_partial_fit(
self,
metric_values,
num_iterations,= None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable]
): with ThreadPoolExecutor(self.num_threads) as executor:
= []
futures for ts, bst, valid in self.items:
_update(bst, num_iterations)= executor.submit(
future
_predict,=ts,
ts=bst,
bst=valid,
valid=self.h,
h=before_predict_callback,
before_predict_callback=after_predict_callback,
after_predict_callback
)
futures.append(future)= [f.result() for f in futures]
cv_preds = [
metric_values[:] self.metric_fn(
self.target_col], preds['Booster'], preds[self.id_col], preds[self.time_col]
preds[
)for preds in cv_preds
]
def partial_fit(
self,
int,
num_iterations: = None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable] -> float:
) """Train the boosters for some iterations.
Parameters
----------
num_iterations : int
Number of boosting iterations to run
before_predict_callback : callable, optional (default=None)
Function to call on the features before computing the predictions.
This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
The series identifier is on the index.
after_predict_callback : callable, optional (default=None)
Function to call on the predictions before updating the targets.
This function will take a pandas Series with the predictions and should return another one with the same structure.
The series identifier is on the index.
Returns
-------
metric_value : float
Weighted metric after training for num_iterations.
"""
= np.empty(len(self.items))
metric_values if self.num_threads == 1:
self._single_threaded_partial_fit(
metric_values, num_iterations, before_predict_callback, after_predict_callback
)else:
self._multithreaded_partial_fit(
metric_values, num_iterations, before_predict_callback, after_predict_callback
)return metric_values @ self.weights
def should_stop(self, hist, early_stopping_evals, early_stopping_pct) -> bool:
if len(hist) < early_stopping_evals + 1:
return False
= 1 - hist[-1][1] / hist[-(early_stopping_evals + 1)][1]
improvement_pct return improvement_pct < early_stopping_pct
def find_best_iter(self, hist, early_stopping_evals) -> int:
= hist[-1]
best_iter, best_score for r, m in hist[-(early_stopping_evals + 1):-1]:
if m < best_score:
= m
best_score = r
best_iter return best_iter
def fit(
self,
df: pd.DataFrame,int,
n_windows: int,
h: str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: int] = None,
step_size: Optional[int = 100,
num_iterations: str, Any]] = None,
params: Optional[Dict[str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[int = 10,
eval_every: float]] = None,
weights: Optional[Sequence[str, Callable] = 'mape',
metric: Union[bool = True,
verbose_eval: int = 2,
early_stopping_evals: float = 0.01,
early_stopping_pct: bool = False,
compute_cv_preds: = None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable] int] = None,
input_size: Optional[-> List[CVResult]:
) """Train boosters simultaneously and assess their performance on the complete forecasting window.
Parameters
----------
df : pandas DataFrame
Series data in long format.
n_windows : int
Number of windows to evaluate.
h : int
Forecast horizon.
id_col : str (default='unique_id')
Column that identifies each serie.
time_col : str (default='ds')
Column that identifies each timestep, its values can be timestamps or integers.
target_col : str (default='y')
Column that contains the target.
step_size : int, optional (default=None)
Step size between each cross validation window. If None it will be equal to `h`.
num_iterations : int (default=100)
Maximum number of boosting iterations to run.
params : dict, optional(default=None)
Parameters to be passed to the LightGBM Boosters.
static_features : list of str, optional (default=None)
Names of the features that are static and will be repeated when forecasting.
dropna : bool (default=True)
Drop rows with missing values produced by the transformations.
keep_last_n : int, optional (default=None)
Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
eval_every : int (default=10)
Number of boosting iterations to train before evaluating on the whole forecast window.
weights : sequence of float, optional (default=None)
Weights to multiply the metric of each window. If None, all windows have the same weight.
metric : str or callable, default='mape'
Metric used to assess the performance of the models and perform early stopping.
verbose_eval : bool
Print the metrics of each evaluation.
early_stopping_evals : int (default=2)
Maximum number of evaluations to run without improvement.
early_stopping_pct : float (default=0.01)
Minimum percentage improvement in metric value in `early_stopping_evals` evaluations.
compute_cv_preds : bool (default=True)
Compute predictions for each window after finding the best iteration.
before_predict_callback : callable, optional (default=None)
Function to call on the features before computing the predictions.
This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
The series identifier is on the index.
after_predict_callback : callable, optional (default=None)
Function to call on the predictions before updating the targets.
This function will take a pandas Series with the predictions and should return another one with the same structure.
The series identifier is on the index.
input_size : int, optional (default=None)
Maximum training samples per serie in each window. If None, will use an expanding window.
Returns
-------
cv_result : list of tuple.
List of (boosting rounds, metric value) tuples.
"""
self.setup(
=df,
df=n_windows,
n_windows=h,
h=params,
params=id_col,
id_col=time_col,
time_col=target_col,
target_col=input_size,
input_size=step_size,
step_size=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=weights,
weights=metric,
metric
)= []
hist for i in range(0, num_iterations, eval_every):
= self.partial_fit(eval_every, before_predict_callback, after_predict_callback)
metric_value = eval_every + i
rounds
hist.append((rounds, metric_value))if verbose_eval:
print(f'[{rounds:,d}] {self.metric_name}: {metric_value:,f}')
if self.should_stop(hist, early_stopping_evals, early_stopping_pct):
print(f"Early stopping at round {rounds:,}")
break
self.best_iteration_ = self.find_best_iter(hist, early_stopping_evals)
print(f'Using best iteration: {self.best_iteration_:,}')
= hist[:self.best_iteration_ // eval_every]
hist for _, bst, _ in self.items:
= self.best_iteration_
bst.best_iteration
self.cv_models_ = {f'Booster{i}': item[1] for i, item in enumerate(self.items)}
if compute_cv_preds:
with ThreadPoolExecutor(self.num_threads) as executor:
= []
futures for ts, bst, valid in self.items:
= executor.submit(
future
_predict,=ts,
ts=bst,
bst=valid,
valid=self.h,
h=before_predict_callback,
before_predict_callback=after_predict_callback,
after_predict_callback
)
futures.append(future) self.cv_preds_ = pd.concat([f.result().assign(window=i) for i, f in enumerate(futures)])
self.ts._fit(df, id_col, time_col, target_col, static_features, keep_last_n)
self.ts.as_numpy = False
return hist
def predict(
self,
int,
h: = None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable] = None,
X_df: Optional[pd.DataFrame] -> pd.DataFrame:
) """Compute predictions with each of the trained boosters.
Parameters
----------
h : int
Forecast horizon.
before_predict_callback : callable, optional (default=None)
Function to call on the features before computing the predictions.
This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
The series identifier is on the index.
after_predict_callback : callable, optional (default=None)
Function to call on the predictions before updating the targets.
This function will take a pandas Series with the predictions and should return another one with the same structure.
The series identifier is on the index.
X_df : pandas DataFrame, optional (default=None)
Dataframe with the future exogenous features. Should have the id column and the time column.
Returns
-------
result : pandas DataFrame
Predictions for each serie and timestep, with one column per window.
"""
return self.ts.predict(
self.cv_models_,
=h,
horizon=before_predict_callback,
before_predict_callback=after_predict_callback,
after_predict_callback=X_df,
X_df )
show_doc(LightGBMCV)
LightGBMCV
LightGBMCV (freq:Union[int,str,pandas._libs.tslibs.offsets.BaseOffset,Non eType]=None, lags:Optional[Iterable[int]]=None, lag_transform s:Optional[Dict[int,List[Union[Callable,Tuple[Callable,Any]]] ]]=None, date_features:Optional[Iterable[Union[str,Callable]]]=None, num_threads:int=1, target_transforms:Optional[List[mlforecast .target_transforms.BaseTargetTransform]]=None)
Create LightGBM CV object.
Type | Default | Details | |
---|---|---|---|
freq | Union | None | Pandas offset alias, e.g. ‘D’, ‘W-THU’ or integer denoting the frequency of the series. |
lags | Optional | None | Lags of the target to use as features. |
lag_transforms | Optional | None | Mapping of target lags to their transformations. |
date_features | Optional | None | Features computed from the dates. Can be pandas date attributes or functions that will take the dates as input. |
num_threads | int | 1 | Number of threads to use when computing the features. |
target_transforms | Optional | None | Transformations that will be applied to the target before computing the features and restored after the forecasting step. |
示例
这展示了仅包含 M4 数据集的 4 个序列的示例。如果您想在所有序列上自己运行,可以参考 这个笔记本。
import random
from datasetsforecast.m4 import M4, M4Info
from fastcore.test import test_eq, test_fail
from mlforecast.target_transforms import Differences
from nbdev import show_doc
from window_ops.ewm import ewm_mean
from window_ops.rolling import rolling_mean, seasonal_rolling_mean
= 'Hourly'
group await M4.async_download('data', group=group)
*_ = M4.load(directory='data', group=group)
df, 'ds'] = df['ds'].astype('int')
df[= df['unique_id'].unique()
ids 0)
random.seed(= random.choices(ids, k=4)
sample_ids = df[df['unique_id'].isin(sample_ids)]
sample_df sample_df
unique_id | ds | y | |
---|---|---|---|
86796 | H196 | 1 | 11.8 |
86797 | H196 | 2 | 11.4 |
86798 | H196 | 3 | 11.1 |
86799 | H196 | 4 | 10.8 |
86800 | H196 | 5 | 10.6 |
... | ... | ... | ... |
325235 | H413 | 1004 | 99.0 |
325236 | H413 | 1005 | 88.0 |
325237 | H413 | 1006 | 47.0 |
325238 | H413 | 1007 | 41.0 |
325239 | H413 | 1008 | 34.0 |
4032 rows × 3 columns
= M4Info[group]
info = info.horizon
horizon = sample_df.groupby('unique_id').tail(horizon)
valid = sample_df.drop(valid.index)
train train.shape, valid.shape
((3840, 3), (192, 3))
LightGBMCV的作用是模拟LightGBM的cv函数,在数据的不同分区上同时训练多个Boosters,即一次性执行所有的提升迭代。这允许我们对每次迭代的误差进行估计,因此如果将其与提前停止相结合,我们可以找到最佳迭代,以便使用所有数据训练最终模型,或者甚至使用这些单独模型的预测来计算集成。
为了对我们模型的预测性能有一个良好的估计,我们计算整个测试期的预测并基于此计算一个指标。由于这一步骤可能会降低训练速度,因此有一个eval_every
参数可以用来控制这一点,即,如果eval_every=10
(默认值),那么在每10次提升迭代中,我们将为完整窗口计算预测并报告误差。
我们还有提前停止参数:
early_stopping_evals
:在停止训练之前,我们需要无改善的完整窗口评估次数为多少?early_stopping_pct
:在这些early_stopping_evals
中,我们希望达到的最小百分比改善是多少,以便继续训练?
这使得LightGBMCV类成为快速测试模型不同配置的一个好工具。考虑以下示例,我们要尝试找出哪些特征可以提高模型的性能。我们首先仅使用滞后项。
= dict(
static_fit_config =2,
n_windows=horizon,
h={'verbose': -1},
params=True,
compute_cv_preds
)= LightGBMCV(
cv =1,
freq=[24 * (i+1) for i in range(7)], # 一周的延迟
lags )
show_doc(LightGBMCV.fit)
LightGBMCV.fit
LightGBMCV.fit (df:pandas.core.frame.DataFrame, n_windows:int, h:int, id_col:str='unique_id', time_col:str='ds', target_col:str='y', step_size:Optional[int]=None, num_iterations:int=100, params:Optional[Dict[str,Any]]=None, static_features:Optional[List[str]]=None, dropna:bool=True, keep_last_n:Optional[int]=None, eval_every:int=10, weights:Optional[Sequence[float]]=None, metric:Union[str,Callable]='mape', verbose_eval:bool=True, early_stopping_evals:int=2, early_stopping_pct:float=0.01, compute_cv_preds:bool=False, before_predict_callback:Optional[Callable]=None, after_predict_callback:Optional[Callable]=None, input_size:Optional[int]=None)
Train boosters simultaneously and assess their performance on the complete forecasting window.
Type | Default | Details | |
---|---|---|---|
df | DataFrame | Series data in long format. | |
n_windows | int | Number of windows to evaluate. | |
h | int | Forecast horizon. | |
id_col | str | unique_id | Column that identifies each serie. |
time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |
target_col | str | y | Column that contains the target. |
step_size | Optional | None | Step size between each cross validation window. If None it will be equal to h . |
num_iterations | int | 100 | Maximum number of boosting iterations to run. |
params | Optional | None | Parameters to be passed to the LightGBM Boosters. |
static_features | Optional | None | Names of the features that are static and will be repeated when forecasting. |
dropna | bool | True | Drop rows with missing values produced by the transformations. |
keep_last_n | Optional | None | Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. |
eval_every | int | 10 | Number of boosting iterations to train before evaluating on the whole forecast window. |
weights | Optional | None | Weights to multiply the metric of each window. If None, all windows have the same weight. |
metric | Union | mape | Metric used to assess the performance of the models and perform early stopping. |
verbose_eval | bool | True | Print the metrics of each evaluation. |
early_stopping_evals | int | 2 | Maximum number of evaluations to run without improvement. |
early_stopping_pct | float | 0.01 | Minimum percentage improvement in metric value in early_stopping_evals evaluations. |
compute_cv_preds | bool | False | Compute predictions for each window after finding the best iteration. |
before_predict_callback | Optional | None | Function to call on the features before computing the predictions. This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure. The series identifier is on the index. |
after_predict_callback | Optional | None | Function to call on the predictions before updating the targets. This function will take a pandas Series with the predictions and should return another one with the same structure. The series identifier is on the index. |
input_size | Optional | None | Maximum training samples per serie in each window. If None, will use an expanding window. |
Returns | List | List of (boosting rounds, metric value) tuples. |
= cv.fit(train, **static_fit_config) hist
[LightGBM] [Info] Start training from score 51.745632
[10] mape: 0.590690
[20] mape: 0.251093
[30] mape: 0.143643
[40] mape: 0.109723
[50] mape: 0.102099
[60] mape: 0.099448
[70] mape: 0.098349
[80] mape: 0.098006
[90] mape: 0.098718
Early stopping at round 90
Using best iteration: 80
通过设置 compute_cv_preds
,我们可以获得每个模型在其对应的验证折上的预测。
cv.cv_preds_
unique_id | ds | y | Booster | window | |
---|---|---|---|---|---|
0 | H196 | 865 | 15.5 | 15.522924 | 0 |
1 | H196 | 866 | 15.1 | 14.985832 | 0 |
2 | H196 | 867 | 14.8 | 14.667901 | 0 |
3 | H196 | 868 | 14.4 | 14.514592 | 0 |
4 | H196 | 869 | 14.2 | 14.035793 | 0 |
... | ... | ... | ... | ... | ... |
187 | H413 | 956 | 59.0 | 77.227905 | 1 |
188 | H413 | 957 | 58.0 | 80.589641 | 1 |
189 | H413 | 958 | 53.0 | 53.986834 | 1 |
190 | H413 | 959 | 38.0 | 36.749786 | 1 |
191 | H413 | 960 | 46.0 | 36.281225 | 1 |
384 rows × 5 columns
我们训练的各个模型已被保存,因此调用 predict
会返回每个训练模型的预测结果。
show_doc(LightGBMCV.predict)
LightGBMCV.predict
LightGBMCV.predict (h:int, before_predict_callback:Optional[Callable]=None, after_predict_callback:Optional[Callable]=None, X_df:Optional[pandas.core.frame.DataFrame]=None)
Compute predictions with each of the trained boosters.
Type | Default | Details | |
---|---|---|---|
h | int | Forecast horizon. | |
before_predict_callback | Optional | None | Function to call on the features before computing the predictions. This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure. The series identifier is on the index. |
after_predict_callback | Optional | None | Function to call on the predictions before updating the targets. This function will take a pandas Series with the predictions and should return another one with the same structure. The series identifier is on the index. |
X_df | Optional | None | Dataframe with the future exogenous features. Should have the id column and the time column. |
Returns | DataFrame | Predictions for each serie and timestep, with one column per window. |
= cv.predict(horizon)
preds preds
unique_id | ds | Booster0 | Booster1 | |
---|---|---|---|---|
0 | H196 | 961 | 15.670252 | 15.848888 |
1 | H196 | 962 | 15.522924 | 15.697399 |
2 | H196 | 963 | 14.985832 | 15.166213 |
3 | H196 | 964 | 14.985832 | 14.723238 |
4 | H196 | 965 | 14.562152 | 14.451092 |
... | ... | ... | ... | ... |
187 | H413 | 1004 | 70.695242 | 65.917620 |
188 | H413 | 1005 | 66.216580 | 62.615788 |
189 | H413 | 1006 | 63.896573 | 67.848598 |
190 | H413 | 1007 | 46.922797 | 50.981950 |
191 | H413 | 1008 | 45.006541 | 42.752819 |
192 rows × 4 columns
我们可以对这些预测进行平均并评估它们。
def evaluate_on_valid(preds):
= preds.copy()
preds 'final_prediction'] = preds.drop(columns=['unique_id', 'ds']).mean(1)
preds[= preds.merge(valid, on=['unique_id', 'ds'])
merged 'abs_err'] = abs(merged['final_prediction'] - merged['y']) / merged['y']
merged[return merged.groupby('unique_id')['abs_err'].mean().mean()
= evaluate_on_valid(preds)
eval1 eval1
0.11036194712311806
现在,由于这些序列是按小时的数据,也许我们可以通过取第168次(24 * 7)差异来去除每日季节性,即从一周前同一小时的值中减去当前值,因此我们的目标将是 \(z_t = y_{t} - y_{t-168}\)。特征将从这个目标计算,当我们进行预测时,它们会自动重新应用。
= LightGBMCV(
cv2 =1,
freq=[Differences([24 * 7])],
target_transforms=[24 * (i+1) for i in range(7)],
lags
)= cv2.fit(train, **static_fit_config) hist2
[LightGBM] [Info] Start training from score 0.519010
[10] mape: 0.089024
[20] mape: 0.090683
[30] mape: 0.092316
Early stopping at round 30
Using best iteration: 10
assert hist2[-1][1] < hist[-1][1]
很好!我们在较少的迭代中取得了更好的分数。让我们看看这一改进是否也能体现在验证集上。
= cv2.predict(horizon)
preds2 = evaluate_on_valid(preds2)
eval2 eval2
0.08956665504570135
assert eval2 < eval1
太好了!也许我们可以尝试一些滞后变换。我们将尝试季节滚动平均,这会对“每个季节”的值进行平均,也就是说,如果我们设置 season_length=24
和 window_size=7
,那么我们将对一周中每天相同小时的值进行平均。
= LightGBMCV(
cv3 =1,
freq=[Differences([24 * 7])],
target_transforms=[24 * (i+1) for i in range(7)],
lags={
lag_transforms48: [(seasonal_rolling_mean, 24, 7)],
},
)= cv3.fit(train, **static_fit_config) hist3
[LightGBM] [Info] Start training from score 0.273641
[10] mape: 0.086724
[20] mape: 0.088466
[30] mape: 0.090536
Early stopping at round 30
Using best iteration: 10
似乎这也在起到帮助作用!
assert hist3[-1][1] < hist2[-1][1]
这是否反映在验证集上?
= cv3.predict(horizon)
preds3 = evaluate_on_valid(preds3)
eval3 eval3
0.08961279023129345
很好!mlforecast 也支持日期特征,但在这种情况下,我们的时间列是由整数构成的,因此这里的可能性不多。如您所见,这允许您更快地迭代,并获得您可以期望的模型预测性能的更好估计。
0, 1), (1, 0.5)], 1), 1)
test_eq(cv.find_best_iter([(0, 1), (1, 0.5), (2, 0.6)], 1), 1)
test_eq(cv.find_best_iter([(0, 1), (1, 0.5), (2, 0.6), (3, 0.4)], 2), 3) test_eq(cv.find_best_iter([(
如果你正在进行超参数调优,能够运行几轮试验、评估性能并判断特定配置是否没有前景并应该被放弃是很有用的。例如,optuna 具有 pruners,你可以用当前得分调用它,它会决定该试验是否应该被丢弃。我们现在将展示如何做到这一点。
由于交叉验证需要一些设置,比如 LightGBM 数据集和内部特征,我们有这个 setup
方法。
show_doc(LightGBMCV.setup)
LightGBMCV.setup
LightGBMCV.setup (df:pandas.core.frame.DataFrame, n_windows:int, h:int, id_col:str='unique_id', time_col:str='ds', target_col:str='y', step_size:Optional[int]=None, params:Optional[Dict[str,Any]]=None, static_features:Optional[List[str]]=None, dropna:bool=True, keep_last_n:Optional[int]=None, weights:Optional[Sequence[float]]=None, metric:Union[str,Callable]='mape', input_size:Optional[int]=None)
Initialize internal data structures to iteratively train the boosters. Use this before calling partial_fit.
Type | Default | Details | |
---|---|---|---|
df | DataFrame | Series data in long format. | |
n_windows | int | Number of windows to evaluate. | |
h | int | Forecast horizon. | |
id_col | str | unique_id | Column that identifies each serie. |
time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |
target_col | str | y | Column that contains the target. |
step_size | Optional | None | Step size between each cross validation window. If None it will be equal to h . |
params | Optional | None | Parameters to be passed to the LightGBM Boosters. |
static_features | Optional | None | Names of the features that are static and will be repeated when forecasting. |
dropna | bool | True | Drop rows with missing values produced by the transformations. |
keep_last_n | Optional | None | Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. |
weights | Optional | None | Weights to multiply the metric of each window. If None, all windows have the same weight. |
metric | Union | mape | Metric used to assess the performance of the models and perform early stopping. |
input_size | Optional | None | Maximum training samples per serie in each window. If None, will use an expanding window. |
Returns | LightGBMCV | CV object with internal data structures for partial_fit. |
= LightGBMCV(
cv4 =1,
freq=[24 * (i+1) for i in range(7)],
lags
)
cv4.setup(
train,=2,
n_windows=horizon,
h={'verbose': -1},
params )
LightGBMCV(freq=1, lag_features=['lag24', 'lag48', 'lag72', 'lag96', 'lag120', 'lag144', 'lag168'], date_features=[], num_threads=1, bst_threads=8)
一旦我们拥有这个,我们可以调用 partial_fit
仅进行一些迭代的训练并返回预测窗口的得分。
show_doc(LightGBMCV.partial_fit)
LightGBMCV.partial_fit
LightGBMCV.partial_fit (num_iterations:int, before_predict_callback:Optional[Callable]=None, after_predict_callback:Optional[Callable]=None)
Train the boosters for some iterations.
Type | Default | Details | |
---|---|---|---|
num_iterations | int | Number of boosting iterations to run | |
before_predict_callback | Optional | None | Function to call on the features before computing the predictions. This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure. The series identifier is on the index. |
after_predict_callback | Optional | None | Function to call on the predictions before updating the targets. This function will take a pandas Series with the predictions and should return another one with the same structure. The series identifier is on the index. |
Returns | float | Weighted metric after training for num_iterations. |
= cv4.partial_fit(10)
score score
[LightGBM] [Info] Start training from score 51.745632
0.5906900462828166
这与我们第一个示例中的第一次评估相等。
assert hist[0][1] == score
我们现在可以使用这个分数来判断这个配置是否有前景。如果我们愿意,可以再训练更多的迭代。
= cv4.partial_fit(20) score2
这现在等于第一个例子中的第三个指标,因为这次我们训练了20次迭代。
assert hist[2][1] == score2
%%capture
# test we don't need X_df
from mlforecast.utils import generate_daily_series, generate_prices_for_series
def before_predict_callback(df):
assert not df['price'].isnull().any()
return df
= generate_daily_series(100, equal_ends=True, n_static_features=2, static_as_categorical=False)
dynamic_series = dynamic_series.rename(columns={'static_1': 'product_id'})
dynamic_series = generate_prices_for_series(dynamic_series)
prices_catalog = dynamic_series.merge(prices_catalog, how='left')
series_with_prices = LightGBMCV(freq='D', lags=[24])
cv = cv.fit(
_
series_with_prices,=2,
n_windows=5,
h={'verbosity': -1},
params=['static_0', 'product_id'],
static_features=False,
verbose_eval=before_predict_callback,
before_predict_callback )
使用自定义指标
内置的指标是 MAPE 和 RMSE,它们是通过系列计算的,然后在所有系列中取平均。如果您想做一些不同的事情或使用完全不同的指标,您可以像下面这样定义自己的指标:
def weighted_mape(
y_true: pd.Series,
y_pred: pd.Series,
ids: pd.Series,
dates: pd.Series,
):"""根据序列值的大小对MAPE进行加权"""
= abs(y_true - y_pred) / abs(y_true)
abs_pct_err = abs_pct_err.groupby(ids).mean()
mape_by_serie = y_pred.groupby(ids).sum()
totals_per_serie = totals_per_serie / totals_per_serie.sum()
series_weights return (mape_by_serie * series_weights).sum()
= LightGBMCV(
_ =1,
freq=[24 * (i+1) for i in range(7)],
lags
).fit(
train,=2,
n_windows=horizon,
h={'verbose': -1},
params=weighted_mape,
metric )
[LightGBM] [Info] Start training from score 51.745632
[10] weighted_mape: 0.480353
[20] weighted_mape: 0.218670
[30] weighted_mape: 0.161706
[40] weighted_mape: 0.149992
[50] weighted_mape: 0.149024
[60] weighted_mape: 0.148496
Early stopping at round 60
Using best iteration: 60
Give us a ⭐ on Github