%load_ext autoreload
%autoreload 2
MLForecast
完整的管道封装
import copy
import re
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple, Union
import cloudpickle
import fsspec
import numpy as np
import utilsforecast.processing as ufp
from sklearn.base import BaseEstimator, clone
from utilsforecast.compat import DFType, DataFrame
from mlforecast.core import (
DateFeature,
Freq,
LagTransforms,
Lags,
Models,
TargetTransform,
TimeSeries,
_name_models,
_get_model_name,
)from mlforecast.grouped_array import GroupedArray
if TYPE_CHECKING:
from mlforecast.lgb_cv import LightGBMCV
from mlforecast.target_transforms import _BaseGroupedArrayTargetTransform
from mlforecast.utils import PredictionIntervals
import pandas as pd
from fastcore.test import test_warns, test_eq, test_ne, test_fail, test_close
from nbdev import show_doc
from sklearn import set_config
='text')
set_config(display'ignore', UserWarning) warnings.simplefilter(
def _add_conformal_distribution_intervals(
fcst_df: DFType,
cs_df: DFType,str],
model_names: List[int, float]],
level: List[Union[int,
cs_n_windows: int,
cs_h: int,
n_series: int,
horizon: -> DFType:
) """
Adds conformal intervals to a `fcst_df` based on conformal scores `cs_df`.
`level` should be already sorted. This strategy creates forecasts paths
based on errors and calculate quantiles using those paths.
"""
= ufp.copy_if_pandas(fcst_df, deep=False)
fcst_df = [100 - lv for lv in level]
alphas = [alpha / 200 for alpha in reversed(alphas)]
cuts 1 - alpha / 200 for alpha in alphas)
cuts.extend(for model in model_names:
= cs_df[model].to_numpy().reshape(cs_n_windows, n_series, cs_h)
scores # 将分数限制在水平范围内
= scores[:,:,:horizon]
scores = fcst_df[model].to_numpy().reshape(1, n_series, -1)
mean = np.vstack([mean - scores, mean + scores])
scores = np.quantile(
quantiles
scores,
cuts,=0,
axis
)= quantiles.reshape(len(cuts), -1).T
quantiles = [f"{model}-lo-{lv}" for lv in reversed(level)]
lo_cols = [f"{model}-hi-{lv}" for lv in level]
hi_cols = lo_cols + hi_cols
out_cols = ufp.assign_columns(fcst_df, out_cols, quantiles)
fcst_df return fcst_df
def _add_conformal_error_intervals(
fcst_df: DFType,
cs_df: DFType, str],
model_names: List[int, float]],
level: List[Union[int,
cs_n_windows: int,
cs_h: int,
n_series: int,
horizon: -> DFType:
) """
根据`cs_df`中的置信分数,为`fcst_df`添加符合条件的区间。`level`应已排序。此策略基于绝对误差创建预测区间。
"""
= ufp.copy_if_pandas(fcst_df, deep=False)
fcst_df = [lv / 100 for lv in level]
cuts for model in model_names:
= fcst_df[model].to_numpy().ravel()
mean = cs_df[model].to_numpy().reshape(cs_n_windows, n_series, cs_h)
scores # 将分数限制在水平范围内
= scores[:,:,:horizon]
scores = np.quantile(
quantiles
scores,
cuts,=0,
axis
)= quantiles.reshape(len(cuts), -1)
quantiles = [f"{model}-lo-{lv}" for lv in reversed(level)]
lo_cols = [f"{model}-hi-{lv}" for lv in level]
hi_cols = np.vstack([mean - quantiles[::-1], mean + quantiles]).T
quantiles = lo_cols + hi_cols
columns = ufp.assign_columns(fcst_df, columns, quantiles)
fcst_df return fcst_df
def _get_conformal_method(method: str):
= {
available_methods 'conformal_distribution': _add_conformal_distribution_intervals,
'conformal_error': _add_conformal_error_intervals,
}if method not in available_methods.keys():
raise ValueError(
f'prediction intervals method {method} not supported '
f'please choose one of {", ".join(available_methods.keys())}'
)return available_methods[method]
lambda: _get_conformal_method('my_method')) test_fail(
class MLForecast:
def __init__(
self,
models: Models,
freq: Freq,= None,
lags: Optional[Lags] = None,
lag_transforms: Optional[LagTransforms] = None,
date_features: Optional[Iterable[DateFeature]] int = 1,
num_threads: = None,
target_transforms: Optional[List[TargetTransform]] = None,
lag_transforms_namer: Optional[Callable]
):"""Forecasting pipeline
Parameters
----------
models : regressor or list of regressors
Models that will be trained and used to compute the forecasts.
freq : str or int or pd.offsets.BaseOffset
Pandas offset, pandas offset alias, e.g. 'D', 'W-THU' or integer denoting the frequency of the series.
lags : list of int, optional (default=None)
Lags of the target to use as features.
lag_transforms : dict of int to list of functions, optional (default=None)
Mapping of target lags to their transformations.
date_features : list of str or callable, optional (default=None)
Features computed from the dates. Can be pandas date attributes or functions that will take the dates as input.
num_threads : int (default=1)
Number of threads to use when computing the features.
target_transforms : list of transformers, optional(default=None)
Transformations that will be applied to the target before computing the features and restored after the forecasting step.
lag_transforms_namer : callable, optional(default=None)
Function that takes a transformation (either function or class), a lag and extra arguments and produces a name.
"""
if not isinstance(models, dict) and not isinstance(models, list):
= [models]
models if isinstance(models, list):
= _name_models([_get_model_name(m) for m in models])
model_names = dict(zip(model_names, models))
models_with_names else:
= models
models_with_names self.models = models_with_names
self.ts = TimeSeries(
=freq,
freq=lags,
lags=lag_transforms,
lag_transforms=date_features,
date_features=num_threads,
num_threads=target_transforms,
target_transforms=lag_transforms_namer,
lag_transforms_namer
)
def __repr__(self):
return (
f'{self.__class__.__name__}(models=[{", ".join(self.models.keys())}], '
f'freq={self.freq}, '
f'lag_features={list(self.ts.transforms.keys())}, '
f'date_features={self.ts.date_features}, '
f'num_threads={self.ts.num_threads})'
)
@property
def freq(self):
return self.ts.freq
@classmethod
def from_cv(cls, cv: 'LightGBMCV') -> 'MLForecast':
if not hasattr(cv, 'best_iteration_'):
raise ValueError('LightGBMCV object must be fitted first.')
import lightgbm as lgb
= cls(
fcst =lgb.LGBMRegressor(**{**cv.params, 'n_estimators': cv.best_iteration_}),
models=cv.ts.freq,
freq
)= copy.deepcopy(cv.ts)
fcst.ts return fcst
def preprocess(
self,
df: DFType,str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[int] = None,
max_horizon: Optional[bool = False,
return_X_y: bool = False,
as_numpy: -> Union[DFType, Tuple[DFType, np.ndarray]]:
) """Add the features to `data`.
Parameters
----------
df : pandas DataFrame
Series data in long format.
id_col : str (default='unique_id')
Column that identifies each serie.
time_col : str (default='ds')
Column that identifies each timestep, its values can be timestamps or integers.
target_col : str (default='y')
Column that contains the target.
static_features : list of str, optional (default=None)
Names of the features that are static and will be repeated when forecasting.
dropna : bool (default=True)
Drop rows with missing values produced by the transformations.
keep_last_n : int, optional (default=None)
Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
max_horizon : int, optional (default=None)
Train this many models, where each model will predict a specific horizon.
return_X_y : bool (default=False)
Return a tuple with the features and the target. If False will return a single dataframe.
as_numpy : bool (default = False)
Cast features to numpy array. Only works for `return_X_y=True`.
Returns
-------
result : DataFrame or tuple of pandas Dataframe and a numpy array.
`df` plus added features and target(s).
"""
return self.ts.fit_transform(
df,=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=max_horizon,
max_horizon=return_X_y,
return_X_y=as_numpy,
as_numpy
)
def fit_models(
self,
X: Union[DataFrame, np.ndarray],
y: np.ndarray,-> 'MLForecast':
) """Manually train models. Use this if you called `MLForecast.preprocess` beforehand.
Parameters
----------
X : pandas or polars DataFrame or numpy array
Features.
y : numpy array.
Target.
Returns
-------
self : MLForecast
Forecast object with trained models.
"""
self.models_: Dict[str, Union[BaseEstimator, List[BaseEstimator]]] = {}
for name, model in self.models.items():
if y.ndim == 2 and y.shape[1] > 1:
self.models_[name] = []
for col in range(y.shape[1]):
= ~np.isnan(y[:, col])
keep if isinstance(X, np.ndarray):
# 待办事项:迁移至工具库
= X[keep]
Xh else:
= ufp.filter_with_mask(X, keep)
Xh = y[keep, col]
yh self.models_[name].append(clone(model).fit(Xh, yh))
else:
self.models_[name] = clone(model).fit(X, y)
return self
def _conformity_scores(
self,
df: DFType,str,
id_col: str,
time_col: str,
target_col: str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[int] = None,
max_horizon: Optional[int = 2,
n_windows: int = 1,
h: bool = False,
as_numpy: -> DFType:
) """Compute conformity scores.
We need at least two cross validation errors to compute
quantiles for prediction intervals (`n_windows=2`).
The exception is raised by the PredictionIntervals data class.
In this simplest case, we assume the width of the interval
is the same for all the forecasting horizon (`h=1`).
"""
= ufp.counts_by_id(df, id_col)['counts'].min()
min_size = h * n_windows + 1
min_samples if min_size < min_samples:
raise ValueError(
"Minimum required samples in each serie for the prediction intervals "
f"settings are: {min_samples}, shortest serie has: {min_size}. "
"Please reduce the number of windows, horizon or remove those series."
)= self.cross_validation(
cv_results =df,
df=n_windows,
n_windows=h,
h=False,
refit=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=max_horizon,
max_horizon=None,
prediction_intervals=as_numpy,
as_numpy
)# 每个模型的符合度评分
for model in self.models.keys():
# 计算每个模型的绝对误差
= abs(cv_results[model] - cv_results[target_col])
abs_err = ufp.assign_columns(cv_results, model, abs_err)
cv_results return ufp.drop_columns(cv_results, target_col)
def _invert_transforms_fitted(self, df: DFType) -> DFType:
if self.ts.target_transforms is None:
return df
if any(isinstance(tfm, _BaseGroupedArrayTargetTransform) for tfm in self.ts.target_transforms):
= [c for c in df.columns if c not in (self.ts.id_col, self.ts.time_col)]
model_cols = ufp.counts_by_id(df, self.ts.id_col)
id_counts = id_counts['counts'].to_numpy()
sizes = np.append(0, sizes.cumsum())
indptr for tfm in self.ts.target_transforms[::-1]:
if isinstance(tfm, _BaseGroupedArrayTargetTransform):
if self.ts._dropped_series is not None:
= np.delete(np.arange(self.ts.ga.n_groups), self.ts._dropped_series)
idxs = tfm.take(idxs)
tfm for col in model_cols:
= GroupedArray(df[col].to_numpy(), indptr)
ga = tfm.inverse_transform_fitted(ga)
ga = ufp.assign_columns(df, col, ga.data)
df else:
= tfm.inverse_transform(df)
df return df
def _extract_X_y(
self,
prep: DFType,str,
target_col: -> Tuple[Union[DFType, np.ndarray], np.ndarray]:
) = prep[self.ts.features_order_]
X = [c for c in prep.columns if re.match(rf'^{target_col}\d*$', c)]
targets if len(targets) == 1:
= targets[0]
targets = prep[targets].to_numpy()
y return X, y
def _compute_fitted_values(
self,
base: DFType,
X: Union[DFType, np.ndarray],
y: np.ndarray,str,
id_col: str,
time_col: str,
target_col: int],
max_horizon: Optional[-> DFType:
) = ufp.copy_if_pandas(base, deep=False)
base = ufp.maybe_compute_sort_indices(base, id_col, time_col)
sort_idxs if sort_idxs is not None:
= ufp.take_rows(base, sort_idxs)
base = ufp.take_rows(X, sort_idxs)
X = y[sort_idxs]
y if max_horizon is None:
= ufp.assign_columns(base, target_col, y)
fitted_values for name, model in self.models_.items():
assert not isinstance(model, list) # mypy
= model.predict(X)
preds = ufp.assign_columns(fitted_values, name, preds)
fitted_values = self._invert_transforms_fitted(fitted_values)
fitted_values else:
= []
horizon_fitted_values for horizon in range(max_horizon):
= ufp.copy_if_pandas(base, deep=True)
horizon_base = ufp.assign_columns(horizon_base, target_col, y[:, horizon])
horizon_base
horizon_fitted_values.append(horizon_base)for name, horizon_models in self.models_.items():
for horizon, model in enumerate(horizon_models):
= model.predict(X)
preds = ufp.assign_columns(
horizon_fitted_values[horizon]
horizon_fitted_values[horizon], name, preds
)for horizon, horizon_df in enumerate(horizon_fitted_values):
= ~ufp.is_nan(horizon_df[target_col])
keep_mask = ufp.filter_with_mask(horizon_df, keep_mask)
horizon_df = ufp.copy_if_pandas(horizon_df, deep=True)
horizon_df = self._invert_transforms_fitted(horizon_df)
horizon_df = ufp.assign_columns(horizon_df, 'h', horizon + 1)
horizon_df = horizon_df
horizon_fitted_values[horizon] = ufp.vertical_concat(horizon_fitted_values, match_categories=False)
fitted_values if self.ts.target_transforms is not None:
for tfm in self.ts.target_transforms[::-1]:
if hasattr(tfm, 'store_fitted'):
= False
tfm.store_fitted if hasattr(tfm, 'fitted_'):
= []
tfm.fitted_ return fitted_values
def fit(
self,
df: DataFrame,str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[int] = None,
max_horizon: Optional[= None,
prediction_intervals: Optional[PredictionIntervals] bool = False,
fitted: bool = False,
as_numpy: -> 'MLForecast':
) """Apply the feature engineering and train the models.
Parameters
----------
df : pandas or polars DataFrame
Series data in long format.
id_col : str (default='unique_id')
Column that identifies each serie.
time_col : str (default='ds')
Column that identifies each timestep, its values can be timestamps or integers.
target_col : str (default='y')
Column that contains the target.
static_features : list of str, optional (default=None)
Names of the features that are static and will be repeated when forecasting.
If `None`, will consider all columns (except id_col and time_col) as static.
dropna : bool (default=True)
Drop rows with missing values produced by the transformations.
keep_last_n : int, optional (default=None)
Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
max_horizon : int, optional (default=None)
Train this many models, where each model will predict a specific horizon.
prediction_intervals : PredictionIntervals, optional (default=None)
Configuration to calibrate prediction intervals (Conformal Prediction).
fitted : bool (default=False)
Save in-sample predictions.
as_numpy : bool (default = False)
Cast features to numpy array.
Returns
-------
self : MLForecast
Forecast object with series values and trained models.
"""
if fitted and self.ts.target_transforms is not None:
for tfm in self.ts.target_transforms:
if hasattr(tfm, 'store_fitted'):
= True
tfm.store_fitted self._cs_df: Optional[DataFrame] = None
if prediction_intervals is not None:
self.prediction_intervals = prediction_intervals
self._cs_df = self._conformity_scores(
=df,
df=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=prediction_intervals.n_windows,
n_windows=prediction_intervals.h,
h=as_numpy,
as_numpy
)= self.preprocess(
prep =df,
df=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=max_horizon,
max_horizon=not fitted,
return_X_y=as_numpy,
as_numpy
)if isinstance(prep, tuple):
= prep
X, y else:
= prep[[id_col, time_col]]
base = self._extract_X_y(prep, target_col)
X, y if as_numpy:
= ufp.to_numpy(X)
X del prep
self.fit_models(X, y)
if fitted:
= self._compute_fitted_values(
fitted_values =base,
base=X,
X=y,
y=id_col,
id_col=time_col,
time_col=target_col,
target_col=max_horizon,
max_horizon
)= ufp.drop_index_if_pandas(fitted_values)
fitted_values self.fcst_fitted_values_ = fitted_values
return self
def forecast_fitted_values(self, level: Optional[List[Union[int, float]]] = None) -> DataFrame:
"""Access in-sample predictions.
Parameters
----------
level : list of ints or floats, optional (default=None)
Confidence levels between 0 and 100 for prediction intervals.
Returns
-------
pandas or polars DataFrame
Dataframe with predictions for the training set
"""
if not hasattr(self, 'fcst_fitted_values_'):
raise Exception('Please run the `fit` method using `fitted=True`')
= self.fcst_fitted_values_
res if level is not None:
= ufp.add_insample_levels(
res
res,=list(self.models_.keys()),
models=level,
level=self.ts.id_col,
id_col=self.ts.target_col,
target_col
)return res
def make_future_dataframe(self, h: int) -> DataFrame:
"""Create a dataframe with all ids and future times in the forecasting horizon.
Parameters
----------
h : int
Number of periods to predict.
Returns
-------
pandas or polars DataFrame
DataFrame with expected ids and future times
"""
if not hasattr(self.ts, 'id_col'):
raise ValueError('You must call fit first')
return ufp.make_future_dataframe(
=self.ts.uids,
uids=self.ts.last_dates,
last_times=self.freq,
freq=h,
h=self.ts.id_col,
id_col=self.ts.time_col,
time_col
)
def get_missing_future(self, h: int, X_df: DFType) -> DFType:
"""Get the missing id and time combinations in `X_df`.
Parameters
----------
h : int
Number of periods to predict.
X_df : pandas or polars DataFrame, optional (default=None)
Dataframe with the future exogenous features. Should have the id column and the time column.
Returns
-------
pandas or polars DataFrame
DataFrame with expected ids and future times missing in `X_df`
"""
= self.make_future_dataframe(h=h)
expected = [self.ts.id_col, self.ts.time_col]
ids return ufp.anti_join(expected, X_df[ids], on=ids)
def predict(
self,
int,
h: = None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable] = None,
new_df: Optional[DFType] int, float]]] = None,
level: Optional[List[Union[= None,
X_df: Optional[DFType] str]] = None,
ids: Optional[List[-> DFType:
) """Compute the predictions for the next `h` steps.
Parameters
----------
h : int
Number of periods to predict.
before_predict_callback : callable, optional (default=None)
Function to call on the features before computing the predictions.
This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
The series identifier is on the index.
after_predict_callback : callable, optional (default=None)
Function to call on the predictions before updating the targets.
This function will take a pandas Series with the predictions and should return another one with the same structure.
The series identifier is on the index.
new_df : pandas or polars DataFrame, optional (default=None)
Series data of new observations for which forecasts are to be generated.
This dataframe should have the same structure as the one used to fit the model, including any features and time series data.
If `new_df` is not None, the method will generate forecasts for the new observations.
level : list of ints or floats, optional (default=None)
Confidence levels between 0 and 100 for prediction intervals.
X_df : pandas or polars DataFrame, optional (default=None)
Dataframe with the future exogenous features. Should have the id column and the time column.
ids : list of str, optional (default=None)
List with subset of ids seen during training for which the forecasts should be computed.
Returns
-------
result : pandas or polars DataFrame
Predictions for each serie and timestep, with one column per model.
"""
if not hasattr(self, 'models_'):
raise ValueError(
"No fitted models found. You have to call fit or preprocess + fit_models. "
"If you used cross_validation before please fit again."
)= isinstance(next(iter(self.models_.values())), list)
first_model_is_list = self.ts.max_horizon
max_horizon if first_model_is_list and max_horizon is None:
raise ValueError(
'Found one model per horizon but `max_horizon` is None. '
'If you ran preprocess after fit please run fit again.'
)elif not first_model_is_list and max_horizon is not None:
raise ValueError(
'Found a single model for all horizons '
f'but `max_horizon` is {max_horizon}. '
'If you ran preprocess after fit please run fit again.'
)
if new_df is not None:
if level is not None:
raise ValueError(
"Prediction intervals are not supported in transfer learning."
)= TimeSeries(
new_ts =self.ts.freq,
freq=self.ts.lags,
lags=self.ts.lag_transforms,
lag_transforms=self.ts.date_features,
date_features=self.ts.num_threads,
num_threads=self.ts.target_transforms,
target_transforms=self.ts.lag_transforms_namer,
lag_transforms_namer
)
new_ts._fit(
new_df,=self.ts.id_col,
id_col=self.ts.time_col,
time_col=self.ts.target_col,
target_col=self.ts.static_features,
static_features=self.ts.keep_last_n,
keep_last_n
)= new_ts._get_core_lag_tfms()
core_tfms if core_tfms:
# 填充更新所需的统计数据
=False)
new_ts._compute_transforms(core_tfms, updates_only= self.ts.max_horizon
new_ts.max_horizon = self.ts.as_numpy
new_ts.as_numpy = new_ts
ts else:
= self.ts
ts
= ts.predict(
forecasts =self.models_,
models=h,
horizon=before_predict_callback,
before_predict_callback=after_predict_callback,
after_predict_callback=X_df,
X_df=ids,
ids
)if level is not None:
if self._cs_df is None:
= (
warn_msg 'Please rerun the `fit` method passing a proper value '
'to prediction intervals to compute them.'
)UserWarning)
warnings.warn(warn_msg, else:
if (self.prediction_intervals.h != 1) and (self.prediction_intervals.h < h):
raise ValueError(
'The `h` argument of PredictionIntervals '
'should be equal to one or greater or equal to `h`. '
'Please rerun the `fit` method passing a proper value '
'to prediction intervals.'
)if self.prediction_intervals.h == 1 and h > 1:
= (
warn_msg 'Prediction intervals are calculated using 1-step ahead cross-validation, '
'with a constant width for all horizons. To vary the error by horizon, '
'pass PredictionIntervals(h=h) to the `prediction_intervals` '
'argument when refitting the model.'
)UserWarning)
warnings.warn(warn_msg, = sorted(level)
level_ = self.models.keys()
model_names = _get_conformal_method(self.prediction_intervals.method)
conformal_method if ids is not None:
= ufp.is_in(self._cs_df[self.ts.id_col], ids)
ids_mask = ufp.filter_with_mask(self._cs_df, ids_mask)
cs_df = len(ids)
n_series else:
= self._cs_df
cs_df = self.ts.ga.n_groups
n_series = conformal_method(
forecasts
forecasts,
cs_df,=list(model_names),
model_names=level_,
level=self.prediction_intervals.h,
cs_h=self.prediction_intervals.n_windows,
cs_n_windows=n_series,
n_series=h,
horizon
)return forecasts
def cross_validation(
self,
df: DFType,int,
n_windows: int,
h: str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: int] = None,
step_size: Optional[str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[bool, int] = True,
refit: Union[int] = None,
max_horizon: Optional[= None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable] = None,
prediction_intervals: Optional[PredictionIntervals] int, float]]] = None,
level: Optional[List[Union[int] = None,
input_size: Optional[bool = False,
fitted: bool = False,
as_numpy: -> DFType:
) """Perform time series cross validation.
Creates `n_windows` splits where each window has `h` test periods,
trains the models, computes the predictions and merges the actuals.
Parameters
----------
df : pandas or polars DataFrame
Series data in long format.
n_windows : int
Number of windows to evaluate.
h : int
Forecast horizon.
id_col : str (default='unique_id')
Column that identifies each serie.
time_col : str (default='ds')
Column that identifies each timestep, its values can be timestamps or integers.
target_col : str (default='y')
Column that contains the target.
step_size : int, optional (default=None)
Step size between each cross validation window. If None it will be equal to `h`.
static_features : list of str, optional (default=None)
Names of the features that are static and will be repeated when forecasting.
dropna : bool (default=True)
Drop rows with missing values produced by the transformations.
keep_last_n : int, optional (default=None)
Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
max_horizon: int, optional (default=None)
Train this many models, where each model will predict a specific horizon.
refit : bool or int (default=True)
Retrain model for each cross validation window.
If False, the models are trained at the beginning and then used to predict each window.
If positive int, the models are retrained every `refit` windows.
before_predict_callback : callable, optional (default=None)
Function to call on the features before computing the predictions.
This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
The series identifier is on the index.
after_predict_callback : callable, optional (default=None)
Function to call on the predictions before updating the targets.
This function will take a pandas Series with the predictions and should return another one with the same structure.
The series identifier is on the index.
prediction_intervals : PredictionIntervals, optional (default=None)
Configuration to calibrate prediction intervals (Conformal Prediction).
level : list of ints or floats, optional (default=None)
Confidence levels between 0 and 100 for prediction intervals.
input_size : int, optional (default=None)
Maximum training samples per serie in each window. If None, will use an expanding window.
fitted : bool (default=False)
Store the in-sample predictions.
as_numpy : bool (default = False)
Cast features to numpy array.
Returns
-------
result : pandas or polars DataFrame
Predictions for each window with the series id, timestamp, last train date, target value and predictions from each model.
"""
= []
results = []
cv_models = []
cv_fitted_values = ufp.backtest_splits(
splits
df,=n_windows,
n_windows=h,
h=id_col,
id_col=time_col,
time_col=self.freq,
freq=step_size,
step_size=input_size,
input_size
)for i_window, (cutoffs, train, valid) in enumerate(splits):
= i_window == 0 or (refit > 0 and i_window % refit == 0)
should_fit if should_fit:
self.fit(
train,=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=max_horizon,
max_horizon=prediction_intervals,
prediction_intervals=fitted,
fitted=as_numpy,
as_numpy
)self.models_)
cv_models.append(if fitted:
self.fcst_fitted_values_, 'fold', i_window))
cv_fitted_values.append(ufp.assign_columns(if fitted and not should_fit:
if self.ts.target_transforms is not None:
for tfm in self.ts.target_transforms:
if hasattr(tfm, 'store_fitted'):
= True
tfm.store_fitted = self.preprocess(
prep
train,=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=max_horizon,
max_horizon=False,
return_X_y
)assert not isinstance(prep, tuple)
= prep[[id_col, time_col]]
base = self._extract_X_y(prep, target_col)
train_X, train_y if as_numpy:
= ufp.to_numpy(train_X)
train_X del prep
= self._compute_fitted_values(
fitted_values =base,
base=train_X,
X=train_y,
y=id_col,
id_col=time_col,
time_col=target_col,
target_col=max_horizon,
max_horizon
)= ufp.assign_columns(fitted_values, 'fold', i_window)
fitted_values
cv_fitted_values.append(fitted_values)= [c for c in self.ts.static_features_.columns if c != id_col]
static = [
dynamic for c in valid.columns if c not in static + [id_col, time_col, target_col]
c
]if dynamic:
= ufp.drop_columns(valid, static + [target_col])
X_df: Optional[DataFrame] else:
= None
X_df = self.predict(
y_pred =h,
h=before_predict_callback,
before_predict_callback=after_predict_callback,
after_predict_callback=train if not should_fit else None,
new_df=level,
level=X_df,
X_df
)= ufp.join(y_pred, cutoffs, on=id_col, how='left')
y_pred = ufp.join(
result
valid[[id_col, time_col, target_col]],
y_pred,=[id_col, time_col],
on
)= ufp.maybe_compute_sort_indices(result, id_col, time_col)
sort_idxs if sort_idxs is not None:
= ufp.take_rows(result, sort_idxs)
result if result.shape[0] < valid.shape[0]:
raise ValueError(
"Cross validation result produced less results than expected. "
"Please verify that the frequency set on the MLForecast constructor matches your series' "
"and that there aren't any missing periods."
)
results.append(result)del self.models_
self.cv_models_ = cv_models
self.cv_fitted_values_ = cv_fitted_values
= ufp.vertical_concat(results, match_categories=False)
out = ufp.drop_index_if_pandas(out)
out = [id_col, time_col, 'cutoff', target_col]
first_out_cols = [c for c in out.columns if c not in first_out_cols]
remaining_cols return out[first_out_cols + remaining_cols]
def cross_validation_fitted_values(self):
if not getattr(self, 'cv_fitted_values_', []):
raise ValueError('Please run cross_validation with fitted=True first.')
= ufp.vertical_concat(self.cv_fitted_values_, match_categories=False)
out = [self.ts.id_col, self.ts.time_col, 'fold', self.ts.target_col]
first_out_cols = [c for c in out.columns if c not in first_out_cols]
remaining_cols = ufp.drop_index_if_pandas(out)
out return out[first_out_cols + remaining_cols]
def save(self, path: Union[str, Path]) -> None:
"""保存预测对象
参数
----------
路径 : str 或 pathlib.Path
存储工件的目录。"""
self.ts.save(f'{path}/ts.pkl')
with fsspec.open(f'{path}/models.pkl', 'wb') as f:
self.models_, f)
cloudpickle.dump(if self._cs_df is not None:
with fsspec.open(f'{path}/intervals.pkl', 'wb') as f:
cloudpickle.dump('scores': self._cs_df, 'settings': self.prediction_intervals},
{
f
)
@staticmethod
def load(path: Union[str, Path]) -> 'MLForecast':
"""负荷预测对象
参数
----------
路径 : str 或 pathlib.Path
保存了工件的目录。"""
= TimeSeries.load(f'{path}/ts.pkl')
ts with fsspec.open(f'{path}/models.pkl', 'rb') as f:
= cloudpickle.load(f)
models try:
with fsspec.open(f'{path}/intervals.pkl', 'rb') as f:
= cloudpickle.load(f)
intervals except FileNotFoundError:
= None
intervals = MLForecast(models=models, freq=ts.freq)
fcst = ts
fcst.ts = models
fcst.models_ if intervals is not None:
= intervals['settings']
fcst.prediction_intervals = intervals['scores']
fcst._cs_df return fcst
def update(self, df: DataFrame) -> None:
"""更新存储序列的值。
参数
----------
df : pandas 或 polars 的 DataFrame
包含新观测值的数据框。"""
self.ts.update(df)
数据
这展示了仅包含 M4 数据集中的 4 个系列的示例。如果您想自己在所有系列上运行它,可以参考 这个笔记本。
import random
import tempfile
import lightgbm as lgb
import matplotlib.pyplot as plt
import numpy as np
import xgboost as xgb
from datasetsforecast.m4 import M4, M4Info
from sklearn.linear_model import LinearRegression
from utilsforecast.plotting import plot_series
from mlforecast.lag_transforms import ExpandingMean, ExponentiallyWeightedMean, RollingMean
from mlforecast.lgb_cv import LightGBMCV
from mlforecast.target_transforms import Differences, LocalStandardScaler
from mlforecast.utils import generate_daily_series
= 'Hourly'
group await M4.async_download('data', group=group)
*_ = M4.load(directory='data', group=group)
df, 'ds'] = df['ds'].astype('int')
df[= df['unique_id'].unique()
ids 0)
random.seed(= random.choices(ids, k=4)
sample_ids = df[df['unique_id'].isin(sample_ids)]
sample_df sample_df
unique_id | ds | y | |
---|---|---|---|
86796 | H196 | 1 | 11.8 |
86797 | H196 | 2 | 11.4 |
86798 | H196 | 3 | 11.1 |
86799 | H196 | 4 | 10.8 |
86800 | H196 | 5 | 10.6 |
... | ... | ... | ... |
325235 | H413 | 1004 | 99.0 |
325236 | H413 | 1005 | 88.0 |
325237 | H413 | 1006 | 47.0 |
325238 | H413 | 1007 | 41.0 |
325239 | H413 | 1008 | 34.0 |
4032 rows × 3 columns
我们现在将这些数据分割为训练集和验证集。
= M4Info[group]
info = info.horizon
horizon = sample_df.groupby('unique_id').tail(horizon)
valid = sample_df.drop(valid.index)
train train.shape, valid.shape
((3840, 3), (192, 3))
show_doc(MLForecast)
MLForecast
MLForecast (models:Union[sklearn.base.BaseEstimator,List[sklearn.base.Bas eEstimator],Dict[str,sklearn.base.BaseEstimator]], freq:Union[int,str], lags:Optional[Iterable[int]]=None, lag_t ransforms:Optional[Dict[int,List[Union[Callable,Tuple[Callabl e,Any]]]]]=None, date_features:Optional[Iterable[Union[str,Callable]]]=None, num_threads:int=1, target_transforms:Optional[List[Union[mlfo recast.target_transforms.BaseTargetTransform,mlforecast.targe t_transforms._BaseGroupedArrayTargetTransform]]]=None, lag_transforms_namer:Optional[Callable]=None)
Forecasting pipeline
Type | Default | Details | |
---|---|---|---|
models | Union | Models that will be trained and used to compute the forecasts. | |
freq | Union | Pandas offset, pandas offset alias, e.g. ‘D’, ‘W-THU’ or integer denoting the frequency of the series. | |
lags | Optional | None | Lags of the target to use as features. |
lag_transforms | Optional | None | Mapping of target lags to their transformations. |
date_features | Optional | None | Features computed from the dates. Can be pandas date attributes or functions that will take the dates as input. |
num_threads | int | 1 | Number of threads to use when computing the features. |
target_transforms | Optional | None | Transformations that will be applied to the target before computing the features and restored after the forecasting step. |
lag_transforms_namer | Optional | None | Function that takes a transformation (either function or class), a lag and extra arguments and produces a name. |
MLForecast对象封装了特征工程 + 模型训练 + 预测。
= MLForecast(
fcst =lgb.LGBMRegressor(random_state=0, verbosity=-1),
models=1,
freq=[24 * (i+1) for i in range(7)],
lags={
lag_transforms48: [ExponentiallyWeightedMean(alpha=0.3)],
},=1,
num_threads=[Differences([24])],
target_transforms
) fcst
MLForecast(models=[LGBMRegressor], freq=1, lag_features=['lag24', 'lag48', 'lag72', 'lag96', 'lag120', 'lag144', 'lag168', 'exponentially_weighted_mean_lag48_alpha0.3'], date_features=[], num_threads=1)
一旦我们完成了这个设置,我们就可以计算特征并拟合模型。
show_doc(MLForecast.fit)
MLForecast.fit
MLForecast.fit (df:Union[pandas.core.frame.DataFrame,polars.dataframe.fr ame.DataFrame], id_col:str='unique_id', time_col:str='ds', target_col:str='y', static_features:Optional[List[str]]=None, dropna:bool=True, keep_last_n:Optional[int]=None, max_horizon:Optional[int]=None, prediction_intervals:Opti onal[mlforecast.utils.PredictionIntervals]=None, fitted:bool=False, as_numpy:bool=False)
Apply the feature engineering and train the models.
Type | Default | Details | |
---|---|---|---|
df | Union | Series data in long format. | |
id_col | str | unique_id | Column that identifies each serie. |
time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |
target_col | str | y | Column that contains the target. |
static_features | Optional | None | Names of the features that are static and will be repeated when forecasting. If None , will consider all columns (except id_col and time_col) as static. |
dropna | bool | True | Drop rows with missing values produced by the transformations. |
keep_last_n | Optional | None | Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. |
max_horizon | Optional | None | Train this many models, where each model will predict a specific horizon. |
prediction_intervals | Optional | None | Configuration to calibrate prediction intervals (Conformal Prediction). |
fitted | bool | False | Save in-sample predictions. |
as_numpy | bool | False | Cast features to numpy array. |
Returns | MLForecast | Forecast object with series values and trained models. |
= MLForecast(
fcst =lgb.LGBMRegressor(random_state=0, verbosity=-1),
models=1,
freq=[24 * (i+1) for i in range(7)],
lags={
lag_transforms48: [ExponentiallyWeightedMean(alpha=0.3)],
},=1,
num_threads=[Differences([24])],
target_transforms )
=True); fcst.fit(train, fitted
show_doc(MLForecast.save)
MLForecast.save
MLForecast.save (path:Union[str,pathlib.Path])
Save forecast object
Type | Details | |
---|---|---|
path | Union | Directory where artifacts will be stored. |
Returns | None |
show_doc(MLForecast.load)
MLForecast.load
MLForecast.load (path:Union[str,pathlib.Path])
Load forecast object
Type | Details | |
---|---|---|
path | Union | Directory with saved artifacts. |
Returns | MLForecast |
show_doc(MLForecast.update)
MLForecast.update
MLForecast.update (df:Union[pandas.core.frame.DataFrame,polars.dataframe .frame.DataFrame])
Update the values of the stored series.
Type | Details | |
---|---|---|
df | Union | Dataframe with new observations. |
Returns | None |
show_doc(MLForecast.make_future_dataframe)
MLForecast.make_future_dataframe
MLForecast.make_future_dataframe (h:int)
Create a dataframe with all ids and future times in the forecasting horizon.
Type | Details | |
---|---|---|
h | int | Number of periods to predict. |
Returns | Union | DataFrame with expected ids and future times |
= fcst.make_future_dataframe(h=1)
expected_future expected_future
unique_id | ds | |
---|---|---|
0 | H196 | 961 |
1 | H256 | 961 |
2 | H381 | 961 |
3 | H413 | 961 |
show_doc(MLForecast.get_missing_future)
MLForecast.get_missing_future
MLForecast.get_missing_future (h:int, X_df:Union[pandas.core.frame.DataFrame,pol ars.dataframe.frame.DataFrame])
Get the missing id and time combinations in X_df
.
Type | Details | |
---|---|---|
h | int | Number of periods to predict. |
X_df | Union | Dataframe with the future exogenous features. Should have the id column and the time column. |
Returns | Union | DataFrame with expected ids and future times missing in X_df |
= fcst.get_missing_future(h=1, X_df=expected_future.head(2))
missing_future
pd.testing.assert_frame_equal(
missing_future,2).reset_index(drop=True)
expected_future.tail( )
show_doc(MLForecast.forecast_fitted_values)
MLForecast.forecast_fitted_values
MLForecast.forecast_fitted_values (level:Optional[List[Union[int,float]] ]=None)
Access in-sample predictions.
Type | Default | Details | |
---|---|---|---|
level | Optional | None | Confidence levels between 0 and 100 for prediction intervals. |
Returns | Union | Dataframe with predictions for the training set |
fcst.forecast_fitted_values()
unique_id | ds | y | LGBMRegressor | |
---|---|---|---|---|
0 | H196 | 193 | 12.7 | 12.671271 |
1 | H196 | 194 | 12.3 | 12.271271 |
2 | H196 | 195 | 11.9 | 11.871271 |
3 | H196 | 196 | 11.7 | 11.671271 |
4 | H196 | 197 | 11.4 | 11.471271 |
... | ... | ... | ... | ... |
3067 | H413 | 956 | 59.0 | 68.280574 |
3068 | H413 | 957 | 58.0 | 70.427570 |
3069 | H413 | 958 | 53.0 | 44.767965 |
3070 | H413 | 959 | 38.0 | 48.691257 |
3071 | H413 | 960 | 46.0 | 46.652238 |
3072 rows × 4 columns
=[90]) fcst.forecast_fitted_values(level
unique_id | ds | y | LGBMRegressor | LGBMRegressor-lo-90 | LGBMRegressor-hi-90 | |
---|---|---|---|---|---|---|
0 | H196 | 193 | 12.7 | 12.671271 | 12.540634 | 12.801909 |
1 | H196 | 194 | 12.3 | 12.271271 | 12.140634 | 12.401909 |
2 | H196 | 195 | 11.9 | 11.871271 | 11.740634 | 12.001909 |
3 | H196 | 196 | 11.7 | 11.671271 | 11.540634 | 11.801909 |
4 | H196 | 197 | 11.4 | 11.471271 | 11.340634 | 11.601909 |
... | ... | ... | ... | ... | ... | ... |
3067 | H413 | 956 | 59.0 | 68.280574 | 58.846640 | 77.714509 |
3068 | H413 | 957 | 58.0 | 70.427570 | 60.993636 | 79.861504 |
3069 | H413 | 958 | 53.0 | 44.767965 | 35.334031 | 54.201899 |
3070 | H413 | 959 | 38.0 | 48.691257 | 39.257323 | 58.125191 |
3071 | H413 | 960 | 46.0 | 46.652238 | 37.218304 | 56.086172 |
3072 rows × 6 columns
# 检查经过变换后,从fitted_values中得到的拟合目标是否与原始目标一致。
= MLForecast(
fcst2 =lgb.LGBMRegressor(random_state=0, verbosity=-1),
models=1,
freq=[24 * (i+1) for i in range(7)],
lags={
lag_transforms48: [ExponentiallyWeightedMean(alpha=0.3)],
},=1,
num_threads=[Differences([24]), LocalStandardScaler()],
target_transforms
)=True)
fcst2.fit(train, fitted= fcst2.forecast_fitted_values()
fitted_vals = train.merge(
train_restored ='LGBMRegressor'),
fitted_vals.drop(columns=['unique_id', 'ds'],
on=('_expected', '_actual')
suffixes
)
np.testing.assert_allclose('y_expected'].values,
train_restored['y_actual'].values,
train_restored[
)
# 检查已安装的 + 最大视野
= 7
max_horizon =True, max_horizon=max_horizon)
fcst2.fit(train, fitted= fcst2.forecast_fitted_values()
max_horizon_fitted_values # h 从 1 到 max_horizon
np.testing.assert_equal('h'].unique()),
np.sort(max_horizon_fitted_values[1, max_horizon + 1),
np.arange(
)# 第一期预测等于递归
pd.testing.assert_frame_equal(=True),
fitted_vals.reset_index(drop'h'] == 1].drop(columns='h'),
max_horizon_fitted_values[max_horizon_fitted_values[
)# 恢复的值匹配
= max_horizon_fitted_values[lambda x: x['unique_id'].eq('H413')].pivot_table(
xx =['unique_id', 'ds'], columns='h', values='y'
index'H413']
).loc[= xx.index.min()
first_ds = xx.index.max()
last_ds for h in range(1, max_horizon):
np.testing.assert_allclose(+ h :, 1].values,
xx.loc[first_ds - h, h + 1].values,
xx.loc[: last_ds )
一旦我们运行完这个,我们就准备好计算我们的预测了。
show_doc(MLForecast.predict)
MLForecast.predict
MLForecast.predict (h:int, before_predict_callback:Optional[Callable]=None, after_predict_callback:Optional[Callable]=None, new_d f:Union[pandas.core.frame.DataFrame,polars.dataframe. frame.DataFrame,NoneType]=None, level:Optional[List[Union[int,float]]]=None, X_df:Uni on[pandas.core.frame.DataFrame,polars.dataframe.frame .DataFrame,NoneType]=None, ids:Optional[List[str]]=None)
Compute the predictions for the next h
steps.
Type | Default | Details | |
---|---|---|---|
h | int | Number of periods to predict. | |
before_predict_callback | Optional | None | Function to call on the features before computing the predictions. This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure. The series identifier is on the index. |
after_predict_callback | Optional | None | Function to call on the predictions before updating the targets. This function will take a pandas Series with the predictions and should return another one with the same structure. The series identifier is on the index. |
new_df | Union | None | Series data of new observations for which forecasts are to be generated. This dataframe should have the same structure as the one used to fit the model, including any features and time series data. If new_df is not None, the method will generate forecasts for the new observations. |
level | Optional | None | Confidence levels between 0 and 100 for prediction intervals. |
X_df | Union | None | Dataframe with the future exogenous features. Should have the id column and the time column. |
ids | Optional | None | List with subset of ids seen during training for which the forecasts should be computed. |
Returns | Union | Predictions for each serie and timestep, with one column per model. |
= fcst.predict(horizon) predictions
我们可以看到几个结果。
= valid.merge(predictions, on=['unique_id', 'ds'])
results = plot_series(forecasts_df=results) fig
'figs/forecast__predict.png', bbox_inches='tight') fig.savefig(
# 测试 new_df 参数
pd.testing.assert_frame_equal(=train),
fcst.predict(horizon, new_df
predictions )
预测区间
使用 MLForecast
,您可以通过符合预测生成预测区间。要配置符合预测,您需要将 PredictionIntervals
类的实例传递给 fit
方法的 prediction_intervals
参数。该类接受三个参数:n_windows
、h
和 method
。
n_windows
表示用于校准区间的交叉验证窗口数量h
是预测时间范围method
可以是conformal_distribution
或conformal_error
;默认的conformal_distribution
基于交叉验证错误创建预测路径,并使用这些路径计算分位数,而conformal_error
则计算误差分位数以生成预测区间。该策略将根据每个时间范围的步骤调整区间,导致每个步骤的宽度不同。请注意,必须使用至少 2 个交叉验证窗口。
fcst.fit(
train,=PredictionIntervals(n_windows=3, h=48)
prediction_intervals; )
之后,您只需通过使用 level
参数将所需的置信水平包含到 predict
方法中。水平必须介于 0 和 100 之间。
= fcst.predict(48, level=[50, 80, 95])
predictions_w_intervals predictions_w_intervals.head()
unique_id | ds | LGBMRegressor | LGBMRegressor-lo-95 | LGBMRegressor-lo-80 | LGBMRegressor-lo-50 | LGBMRegressor-hi-50 | LGBMRegressor-hi-80 | LGBMRegressor-hi-95 | |
---|---|---|---|---|---|---|---|---|---|
0 | H196 | 961 | 16.071271 | 15.958042 | 15.971271 | 16.005091 | 16.137452 | 16.171271 | 16.184501 |
1 | H196 | 962 | 15.671271 | 15.553632 | 15.553632 | 15.578632 | 15.763911 | 15.788911 | 15.788911 |
2 | H196 | 963 | 15.271271 | 15.153632 | 15.153632 | 15.162452 | 15.380091 | 15.388911 | 15.388911 |
3 | H196 | 964 | 14.971271 | 14.858042 | 14.871271 | 14.905091 | 15.037452 | 15.071271 | 15.084501 |
4 | H196 | 965 | 14.671271 | 14.553632 | 14.553632 | 14.562452 | 14.780091 | 14.788911 | 14.788911 |
# 测试我们可以预测低于h的地平线
# 带有预测区间
for method in ['conformal_distribution', 'conformal_errors']:
fcst.fit(
train, =PredictionIntervals(n_windows=3, h=48)
prediction_intervals
)
= fcst.predict(1, level=[50, 80, 95])
preds_h_lower_h = fcst.predict(30, level=[50, 80, 95])
preds_h_lower_h
# 测试区间的单调性
test_eq(filter(regex='lo|hi').apply(
preds_h_lower_h.lambda x: x.is_monotonic_increasing,
=1
axissum(),
).len(preds_h_lower_h)
)
lambda: fcst.predict(49, level=[68])) test_fail(
# 测试我们可以恢复点预测
test_eq(
predictions,
predictions_w_intervals[predictions.columns] )
# 测试我们可以恢复级别0的平均预测
np.testing.assert_allclose('LGBMRegressor'].values,
predictions[48, level=[0])['LGBMRegressor-lo-0'].values,
fcst.predict( )
# 测试区间的单调性
test_eq(filter(regex='lo|hi').apply(
predictions_w_intervals.lambda x: x.is_monotonic_increasing,
=1
axissum(),
).len(predictions_w_intervals)
)
让我们来探索生成的区间。
= valid.merge(predictions_w_intervals, on=['unique_id', 'ds'])
results = plot_series(forecasts_df=results, level=[50, 80, 95]) fig
'figs/forecast__predict_intervals.png', bbox_inches='tight') fig.savefig(
如果您想减少计算时间并为整个预测范围生成相同宽度的区间,只需将 h=1
传递给 PredictionIntervals
类即可。这个策略的一个缺点是在某些情况下,绝对残差的方差可能很小(甚至为零),因此区间可能会过于狭窄。
fcst.fit(
train, =PredictionIntervals(n_windows=3, h=1)
prediction_intervals; )
= fcst.predict(48, level=[80, 90, 95]) predictions_w_intervals_ws_1
让我们来探索生成的区间。
= valid.merge(predictions_w_intervals_ws_1, on=['unique_id', 'ds'])
results = plot_series(forecasts_df=results, level=[90]) fig
'figs/forecast__predict_intervals_window_size_1.png', bbox_inches='tight') fig.savefig(
# 测试索引数据,日期时间 ds
= MLForecast(
fcst_test =lgb.LGBMRegressor(random_state=0, verbosity=-1),
models='D',
freq=[1],
lags=1,
num_threads
)= generate_daily_series(1)
df_test
fcst_test.fit(
df_test,# 预测区间=预测区间()
)= fcst_test.predict(12)
pred_test = fcst_test.predict(12, level=[80, 90])
pred_int_test # 测试相同结构
test_eq(
pred_test,
pred_int_test[pred_test.columns]
)# 测试区间的单调性
test_eq(filter(regex='lo|hi').apply(
pred_int_test.lambda x: x.is_monotonic_increasing,
=1
axissum(),
).len(pred_int_test)
)
使用预训练模型进行预测
MLForecast允许您使用预训练模型为新数据集生成预测。只需在调用predict
方法时提供一个包含新观察值的pandas dataframe,作为new_df
参数的值。该dataframe应与用于拟合模型的结构相同,包括任何特征和时间序列数据。然后,函数将使用预训练模型为新观察值生成预测。这使您能够轻松地将预训练模型应用于新数据集,并生成预测,而无需重新训练模型。
= pd.read_csv('https://datasets-nixtla.s3.amazonaws.com/ERCOT-clean.csv')
ercot_df # 我们需要将 ds 列转换为整数。
# 由于MLForecast是基于该结构进行训练的
'ds'] = np.arange(1, len(ercot_df) + 1)
ercot_df[# 使用 `new_df` 参数传递 ercot 数据集
= fcst.predict(horizon, new_df=ercot_df)
ercot_fcsts = plot_series(ercot_df, ercot_fcsts, max_insample_length=48 * 2) fig
0].set_title('ERCOT forecasts trained on M4-Hourly dataset')
fig.get_axes()['figs/forecast__ercot.png', bbox_inches='tight') fig.savefig(
如果您想查看将用于训练模型的数据,可以调用 Forecast.preprocess
。
show_doc(MLForecast.preprocess)
MLForecast.preprocess
MLForecast.preprocess (df:Union[pandas.core.frame.DataFrame,polars.dataf rame.frame.DataFrame], id_col:str='unique_id', time_col:str='ds', target_col:str='y', static_features:Optional[List[str]]=None, dropna:bool=True, keep_last_n:Optional[int]=None, max_horizon:Optional[int]=None, return_X_y:bool=False, as_numpy:bool=False)
Add the features to data
.
Type | Default | Details | |
---|---|---|---|
df | Union | Series data in long format. | |
id_col | str | unique_id | Column that identifies each serie. |
time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |
target_col | str | y | Column that contains the target. |
static_features | Optional | None | Names of the features that are static and will be repeated when forecasting. |
dropna | bool | True | Drop rows with missing values produced by the transformations. |
keep_last_n | Optional | None | Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. |
max_horizon | Optional | None | Train this many models, where each model will predict a specific horizon. |
return_X_y | bool | False | Return a tuple with the features and the target. If False will return a single dataframe. |
as_numpy | bool | False | Cast features to numpy array. Only works for return_X_y=True . |
Returns | Union | df plus added features and target(s). |
= fcst.preprocess(train)
prep_df prep_df
unique_id | ds | y | lag24 | lag48 | lag72 | lag96 | lag120 | lag144 | lag168 | exponentially_weighted_mean_lag48_alpha0.3 | |
---|---|---|---|---|---|---|---|---|---|---|---|
86988 | H196 | 193 | 0.1 | 0.0 | 0.0 | 0.0 | 0.3 | 0.1 | 0.1 | 0.3 | 0.002810 |
86989 | H196 | 194 | 0.1 | -0.1 | 0.1 | 0.0 | 0.3 | 0.1 | 0.1 | 0.3 | 0.031967 |
86990 | H196 | 195 | 0.1 | -0.1 | 0.1 | 0.0 | 0.3 | 0.1 | 0.2 | 0.1 | 0.052377 |
86991 | H196 | 196 | 0.1 | 0.0 | 0.0 | 0.0 | 0.3 | 0.2 | 0.1 | 0.2 | 0.036664 |
86992 | H196 | 197 | 0.0 | 0.0 | 0.0 | 0.1 | 0.2 | 0.2 | 0.1 | 0.2 | 0.025665 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
325187 | H413 | 956 | 0.0 | 10.0 | 1.0 | 6.0 | -53.0 | 44.0 | -21.0 | 21.0 | 7.963225 |
325188 | H413 | 957 | 9.0 | 10.0 | 10.0 | -7.0 | -46.0 | 27.0 | -19.0 | 24.0 | 8.574257 |
325189 | H413 | 958 | 16.0 | 8.0 | 5.0 | -9.0 | -36.0 | 32.0 | -13.0 | 8.0 | 7.501980 |
325190 | H413 | 959 | -3.0 | 17.0 | -7.0 | 2.0 | -31.0 | 22.0 | 5.0 | -2.0 | 3.151386 |
325191 | H413 | 960 | 15.0 | 11.0 | -6.0 | -5.0 | -17.0 | 22.0 | -18.0 | 10.0 | 0.405970 |
3072 rows × 11 columns
如果我们这样做,那么我们必须调用 Forecast.fit_models
,因为这仅仅存储了序列信息。
# 转换命名器
def namer(tfm, lag, *args):
return f'hello_from_{tfm.__class__.__name__.lower()}'
= MLForecast(
fcst2 =LinearRegression(),
models=1,
freq={1: [ExpandingMean()]},
lag_transforms=namer,
lag_transforms_namer
)= fcst2.preprocess(train)
prep assert 'hello_from_expandingmean' in prep
show_doc(MLForecast.fit_models)
MLForecast.fit_models
MLForecast.fit_models (X:Union[pandas.core.frame.DataFrame,polars.datafra me.frame.DataFrame,numpy.ndarray], y:numpy.ndarray)
Manually train models. Use this if you called MLForecast.preprocess
beforehand.
Type | Details | |
---|---|---|
X | Union | Features. |
y | ndarray | Target. |
Returns | MLForecast | Forecast object with trained models. |
= prep_df.drop(columns=['unique_id', 'ds', 'y']), prep_df['y']
X, y fcst.fit_models(X, y)
MLForecast(models=[LGBMRegressor], freq=1, lag_features=['lag24', 'lag48', 'lag72', 'lag96', 'lag120', 'lag144', 'lag168', 'exponentially_weighted_mean_lag48_alpha0.3'], date_features=[], num_threads=1)
= fcst.predict(horizon)
predictions2 pd.testing.assert_frame_equal(predictions, predictions2)
# 测试间隔多输出
= 24
max_horizon = fcst.fit(
individual_fcst_intervals
train,=max_horizon,
max_horizon=PredictionIntervals(h=max_horizon)
prediction_intervals
)= fcst.predict(max_horizon)
individual_preds = fcst.predict(max_horizon, level=[90, 80])
individual_preds_intervals # 测试区间的单调性
test_eq(filter(regex='lo|hi').apply(
individual_preds_intervals.lambda x: x.is_monotonic_increasing,
=1
axissum(),
).len(individual_preds_intervals)
)# 测试我们可以恢复带区间的点预测
test_eq(
individual_preds,
individual_preds_intervals[individual_preds.columns] )
# 在预测之前检查最大预测范围和模型状态是否存在问题
= MLForecast(
fcst =[LinearRegression()],
models=1,
freq=[12],
lags
)=2)
fcst.fit(df, max_horizon=None)
fcst.preprocess(df, max_horizonlambda: fcst.predict(1), contains='Found one model per horizon')
test_fail(
=None)
fcst.fit(df, max_horizon=2)
fcst.preprocess(df, max_horizonlambda: fcst.predict(1), contains='Found a single model for all horizons') test_fail(
show_doc(MLForecast.cross_validation)
MLForecast.cross_validation
MLForecast.cross_validation (df:Union[pandas.core.frame.DataFrame,polars .dataframe.frame.DataFrame], n_windows:int, h:int, id_col:str='unique_id', time_col:str='ds', target_col:str='y', step_size:Optional[int]=None, static_features:Optional[List[str]]=None, dropna:bool=True, keep_last_n:Optional[int]=None, refit:Union[bool,int]=True, max_horizon:Optional[int]=None, before_predi ct_callback:Optional[Callable]=None, after_p redict_callback:Optional[Callable]=None, pre diction_intervals:Optional[mlforecast.utils. PredictionIntervals]=None, level:Optional[List[Union[int,float]]]=None, input_size:Optional[int]=None, fitted:bool=False, as_numpy:bool=False)
Perform time series cross validation. Creates n_windows
splits where each window has h
test periods, trains the models, computes the predictions and merges the actuals.
Type | Default | Details | |
---|---|---|---|
df | Union | Series data in long format. | |
n_windows | int | Number of windows to evaluate. | |
h | int | Forecast horizon. | |
id_col | str | unique_id | Column that identifies each serie. |
time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |
target_col | str | y | Column that contains the target. |
step_size | Optional | None | Step size between each cross validation window. If None it will be equal to h . |
static_features | Optional | None | Names of the features that are static and will be repeated when forecasting. |
dropna | bool | True | Drop rows with missing values produced by the transformations. |
keep_last_n | Optional | None | Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. |
refit | Union | True | Retrain model for each cross validation window. If False, the models are trained at the beginning and then used to predict each window. If positive int, the models are retrained every refit windows. |
max_horizon | Optional | None | |
before_predict_callback | Optional | None | Function to call on the features before computing the predictions. This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure. The series identifier is on the index. |
after_predict_callback | Optional | None | Function to call on the predictions before updating the targets. This function will take a pandas Series with the predictions and should return another one with the same structure. The series identifier is on the index. |
prediction_intervals | Optional | None | Configuration to calibrate prediction intervals (Conformal Prediction). |
level | Optional | None | Confidence levels between 0 and 100 for prediction intervals. |
input_size | Optional | None | Maximum training samples per serie in each window. If None, will use an expanding window. |
fitted | bool | False | Store the in-sample predictions. |
as_numpy | bool | False | Cast features to numpy array. |
Returns | Union | Predictions for each window with the series id, timestamp, last train date, target value and predictions from each model. |
如果我们想知道对于特定模型和特征集,我们的预测效果会有多好,那么我们可以进行交叉验证。交叉验证的过程是将我们的数据分成两部分,第一部分用于训练,而第二部分用于验证。由于数据是时间相关的,我们通常会将数据的最后 x 个观察值作为验证集。
这个过程在 MLForecast.cross_validation
中实现,它会使用我们的数据并按照上述过程进行 n_windows
次,其中每个窗口包含 h
个验证样本。例如,如果我们有100个样本,并且我们想每次进行2次大小为14的回测,则拆分如下:
- 训练:1到72。验证:73到86。
- 训练:1到86。验证:87到100。
您可以通过 step_size
参数控制每个交叉验证窗口之间的大小。例如,如果我们有100个样本,并且我们想每次进行2次大小为14的回测,并在每个折中向前移动一步(step_size=1
),拆分将如下:
- 训练:1到85。验证:86到99。
- 训练:1到86。验证:87到100。
您还可以通过设置 refit=False
来进行交叉验证,而无需为每个窗口重新拟合模型。这使您能够使用多个窗口大小评估模型的性能,而无需每次都重新训练它们。
= MLForecast(
fcst =lgb.LGBMRegressor(random_state=0, verbosity=-1),
models=1,
freq=[24 * (i+1) for i in range(7)],
lags={
lag_transforms1: [RollingMean(window_size=24)],
24: [RollingMean(window_size=24)],
48: [ExponentiallyWeightedMean(alpha=0.3)],
},=1,
num_threads=[Differences([24])],
target_transforms
)= fcst.cross_validation(
cv_results
train,=2,
n_windows=horizon,
h=horizon,
step_size=True,
fitted
) cv_results
unique_id | ds | cutoff | y | LGBMRegressor | |
---|---|---|---|---|---|
0 | H196 | 865 | 864 | 15.5 | 15.373393 |
1 | H196 | 866 | 864 | 15.1 | 14.973393 |
2 | H196 | 867 | 864 | 14.8 | 14.673393 |
3 | H196 | 868 | 864 | 14.4 | 14.373393 |
4 | H196 | 869 | 864 | 14.2 | 14.073393 |
... | ... | ... | ... | ... | ... |
379 | H413 | 956 | 912 | 59.0 | 64.284167 |
380 | H413 | 957 | 912 | 58.0 | 64.830429 |
381 | H413 | 958 | 912 | 53.0 | 40.726851 |
382 | H413 | 959 | 912 | 38.0 | 42.739657 |
383 | H413 | 960 | 912 | 46.0 | 52.802769 |
384 rows × 5 columns
由于我们设置了 fitted=True
,我们也可以通过 cross_validation_fitted_values
方法访问训练集的预测值。
fcst.cross_validation_fitted_values()
unique_id | ds | fold | y | LGBMRegressor | |
---|---|---|---|---|---|
0 | H196 | 193 | 0 | 12.7 | 12.673393 |
1 | H196 | 194 | 0 | 12.3 | 12.273393 |
2 | H196 | 195 | 0 | 11.9 | 11.873393 |
3 | H196 | 196 | 0 | 11.7 | 11.673393 |
4 | H196 | 197 | 0 | 11.4 | 11.473393 |
... | ... | ... | ... | ... | ... |
5563 | H413 | 908 | 1 | 49.0 | 50.620196 |
5564 | H413 | 909 | 1 | 39.0 | 35.972331 |
5565 | H413 | 910 | 1 | 29.0 | 29.359678 |
5566 | H413 | 911 | 1 | 24.0 | 25.784563 |
5567 | H413 | 912 | 1 | 20.0 | 23.168413 |
5568 rows × 5 columns
# 测试适配
= MLForecast(
fcst2 =lgb.LGBMRegressor(n_estimators=5, random_state=0, verbosity=-1),
models=1,
freq=[24],
lags=1,
num_threads
)= fcst2.cross_validation(
_
train,=2,
n_windows=2,
h=True,
fitted=False,
refit
)= fcst2.cross_validation_fitted_values()
fitted_cv_results = train.merge(fitted_cv_results, on=['unique_id', 'ds'], suffixes=('_expected', ''))
train_with_cv_fitted_values
np.testing.assert_allclose('y_expected'].values,
train_with_cv_fitted_values['y'].values,
train_with_cv_fitted_values[
)
# 测试最大视野
= fcst2.cross_validation(
_
train,=2,
n_windows=2,
h=True,
fitted=False,
refit=2,
max_horizon
)= fcst2.cross_validation_fitted_values()
max_horizon_fitted_cv_results
pd.testing.assert_frame_equal(lambda df: df['fold'].eq(0)],
fitted_cv_results[
(
max_horizon_fitted_cv_resultslambda df: df['fold'].eq(0) & df['h'].eq(1)]
[='h')
.drop(columns=True)
.reset_index(drop
), )
我们还可以通过将配置传递给 prediction_intervals
以及通过 levels
提供宽度的值来计算预测区间。
= fcst.cross_validation(
cv_results_intervals
train,=2,
n_windows=horizon,
h=horizon,
step_size=PredictionIntervals(h=horizon),
prediction_intervals=[80, 90]
level
) cv_results_intervals
unique_id | ds | cutoff | y | LGBMRegressor | LGBMRegressor-lo-90 | LGBMRegressor-lo-80 | LGBMRegressor-hi-80 | LGBMRegressor-hi-90 | |
---|---|---|---|---|---|---|---|---|---|
0 | H196 | 865 | 864 | 15.5 | 15.373393 | 15.311379 | 15.316528 | 15.430258 | 15.435407 |
1 | H196 | 866 | 864 | 15.1 | 14.973393 | 14.940556 | 14.940556 | 15.006230 | 15.006230 |
2 | H196 | 867 | 864 | 14.8 | 14.673393 | 14.606230 | 14.606230 | 14.740556 | 14.740556 |
3 | H196 | 868 | 864 | 14.4 | 14.373393 | 14.306230 | 14.306230 | 14.440556 | 14.440556 |
4 | H196 | 869 | 864 | 14.2 | 14.073393 | 14.006230 | 14.006230 | 14.140556 | 14.140556 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
379 | H413 | 956 | 912 | 59.0 | 64.284167 | 29.890099 | 34.371545 | 94.196788 | 98.678234 |
380 | H413 | 957 | 912 | 58.0 | 64.830429 | 56.874572 | 57.827689 | 71.833169 | 72.786285 |
381 | H413 | 958 | 912 | 53.0 | 40.726851 | 35.296195 | 35.846206 | 45.607495 | 46.157506 |
382 | H413 | 959 | 912 | 38.0 | 42.739657 | 35.292153 | 35.807640 | 49.671674 | 50.187161 |
383 | H413 | 960 | 912 | 46.0 | 52.802769 | 42.465597 | 43.895670 | 61.709869 | 63.139941 |
384 rows × 9 columns
refit
参数允许我们控制是否希望在每个窗口中重新训练模型。它可以是:
- 一个布尔值:True 将在每个窗口上重新训练,而 False 仅在第一个窗口上重新训练。
- 一个正整数:模型将在第一个窗口上训练,然后每隔
refit
个窗口训练一次。
= MLForecast(
fcst =LinearRegression(),
models=1,
freq=[1, 24],
lags
)for refit, expected_models in zip([True, False, 2], [4, 1, 2]):
fcst.cross_validation(
train,=4,
n_windows=horizon,
h=refit,
refit
)len(fcst.cv_models_), expected_models) test_eq(
= MLForecast(
fcst =lgb.LGBMRegressor(random_state=0, verbosity=-1),
models=1,
freq=[24 * (i+1) for i in range(7)],
lags={
lag_transforms1: [RollingMean(window_size=24)],
24: [RollingMean(window_size=24)],
48: [ExponentiallyWeightedMean(alpha=0.3)],
},=1,
num_threads=[Differences([24])],
target_transforms
)= fcst.cross_validation(
cv_results_no_refit
train,=2,
n_windows=horizon,
h=horizon,
step_size=False
refit
)# 测试我们恢复相同 "metadata"
test_eq(='LGBMRegressor'),
cv_results_no_refit.drop(columns='LGBMRegressor')
cv_results.drop(columns
)# 测试第一个窗口的预测结果是否相同
= cv_results['cutoff'].iloc[0]
first_cutoff
test_eq('cutoff == @first_cutoff'),
cv_results_no_refit.query('cutoff == @first_cutoff')
cv_results.query(
)# 测试下一个窗口有不同的预测
test_ne('cutoff != @first_cutoff'),
cv_results_no_refit.query('cutoff != @first_cutoff')
cv_results.query( )
# 输入尺寸为
= 300
input_size = fcst.cross_validation(
cv_results_input_size
train,=2,
n_windows=horizon,
h=horizon,
step_size=input_size,
input_size
)= np.diff(fcst.ts.ga.indptr)
series_lengths = np.unique(series_lengths)
unique_lengths assert unique_lengths.size == 1
assert unique_lengths[0] == input_size
# 每个地平线一个模型
= fcst.cross_validation(
cv_results2
train,=2,
n_windows=horizon,
h=horizon,
step_size=horizon,
max_horizon
)# 每个id和窗口的第一个条目是相同的
pd.testing.assert_frame_equal('unique_id', 'cutoff']).head(1),
cv_results.groupby(['unique_id', 'cutoff']).head(1)
cv_results2.groupby([
)# 其余的则不同
lambda: pd.testing.assert_frame_equal(cv_results, cv_results2)) test_fail(
# 每个时间范围一个模型,并带有预测区间
= fcst.cross_validation(
cv_results2_intervals
train,=2,
n_windows=horizon,
h=horizon,
step_size=horizon,
max_horizon=PredictionIntervals(n_windows=2, h=horizon),
prediction_intervals=[80, 90]
level
)# 每个id和窗口的第一个条目是相同的
pd.testing.assert_frame_equal('unique_id', 'cutoff']).head(1),
cv_results_intervals.groupby(['unique_id', 'cutoff']).head(1)
cv_results2_intervals.groupby([
)# 其余的则不同
lambda: pd.testing.assert_frame_equal(cv_results_intervals, cv_results2_intervals)) test_fail(
# 错误频率引发错误
= pd.DataFrame({'ds': pd.to_datetime(['2020-01-02', '2020-02-02', '2020-03-02', '2020-04-02'])})
df_wrong_freq 'unique_id'] = 'id1'
df_wrong_freq['y'] = 1
df_wrong_freq[= MLForecast(
fcst_wrong_freq =[LinearRegression()],
models='MS',
freq=[1],
lags
)
test_fail(lambda: fcst_wrong_freq.cross_validation(df_wrong_freq, n_windows=1, h=1),
='Cross validation result produced less results than expected',
contains )
= plot_series(forecasts_df=cv_results.drop(columns='cutoff')) fig
'figs/forecast__cross_validation.png', bbox_inches='tight') fig.savefig(
= plot_series(forecasts_df=cv_results_intervals.drop(columns='cutoff'), level=[90]) fig
'figs/forecast__cross_validation_intervals.png', bbox_inches='tight') fig.savefig(
show_doc(MLForecast.from_cv)
一旦您找到了适合您问题的一组特征和参数,您可以使用 MLForecast.from_cv
从中构建一个预测对象,该函数接受训练好的 LightGBMCV
对象,并构建一个将使用相同特征和参数的 MLForecast
对象。然后,您可以像往常一样调用 fit 和 predict。
= LightGBMCV(
cv =1,
freq=[24 * (i+1) for i in range(7)],
lags={
lag_transforms48: [ExponentiallyWeightedMean(alpha=0.3)],
},=1,
num_threads=[Differences([24])]
target_transforms
)= cv.fit(
hist
train,=2,
n_windows=horizon,
h={'verbosity': -1},
params )
[LightGBM] [Info] Start training from score 0.084340
[10] mape: 0.118569
[20] mape: 0.111506
[30] mape: 0.107314
[40] mape: 0.106089
[50] mape: 0.106630
Early stopping at round 50
Using best iteration: 40
= MLForecast.from_cv(cv)
fcst assert cv.best_iteration_ == fcst.models['LGBMRegressor'].n_estimators
= generate_daily_series(100, equal_ends=True, n_static_features=2, static_as_categorical=False)
series = series.copy()
non_std_series 'ds'] = non_std_series.groupby('unique_id', observed=True).cumcount()
non_std_series[= non_std_series.rename(columns={'unique_id': 'some_id', 'ds': 'time', 'y': 'value'})
non_std_series = [
models =1, random_state=0, verbosity=-1),
lgb.LGBMRegressor(n_jobs=1, random_state=0),
xgb.XGBRegressor(n_jobs
]= dict(
flow_params =models,
models=[7],
lags={
lag_transforms1: [ExpandingMean()],
7: [RollingMean(window_size=14)]
},=2,
num_threads
)= MLForecast(freq=1, **flow_params)
fcst = fcst.fit(non_std_series, id_col='some_id', time_col='time', target_col='value').predict(7)
non_std_preds = non_std_preds.rename(columns={'some_id': 'unique_id'})
non_std_preds = MLForecast(freq='D', **flow_params)
fcst = fcst.fit(series).predict(7)
preds ='ds'), non_std_preds.drop(columns='time')) pd.testing.assert_frame_equal(preds.drop(columns
def test_cross_validation(data=non_std_series, add_exogenous=False):
= 2
n_windows = 14
h = MLForecast(lgb.LGBMRegressor(verbosity=-1), freq=1, lags=[7, 14])
fcst if add_exogenous:
= data.assign(ex1 = lambda x: np.arange(0, len(x)))
data with warnings.catch_warnings():
'ignore', category=DeprecationWarning)
warnings.filterwarnings(= fcst.cross_validation(
backtest_results =data,
df=n_windows,
n_windows=h,
h='some_id',
id_col='time',
time_col='value',
target_col=['some_id', 'static_0', 'static_1'],
static_features
)= {'some_id': 'unique_id', 'time': 'ds', 'value': 'y'}
renamer = backtest_results.rename(columns=renamer)
backtest_results = data.rename(columns=renamer)
renamed = []
manual_results for cutoff, train, valid in ufp.backtest_splits(renamed, n_windows, h, 'unique_id', 'ds', 1):
=['unique_id', 'static_0', 'static_1'])
fcst.fit(train, static_featuresif add_exogenous:
= valid.drop(columns=['y', 'static_0', 'static_1']).reset_index()
X_df else:
= None
X_df = fcst.predict(h, X_df=X_df)
pred = valid[['unique_id', 'ds', 'y']].copy()
res = res.merge(cutoff, on='unique_id')
res = res[['unique_id', 'ds', 'cutoff', 'y']].copy()
res =['unique_id', 'ds'], how='left'))
manual_results.append(res.merge(pred, on= pd.concat(manual_results)
manual_results =True))
pd.testing.assert_frame_equal(backtest_results, manual_results.reset_index(drop
test_cross_validation()=True) test_cross_validation(add_exogenous
# 在简历中测试简短系列
= generate_daily_series(
series =100, min_length=20, max_length=51, equal_ends=True,
n_series
)= 10
horizon = 4
n_windows = MLForecast(models=[LinearRegression()], freq='D', lags=[1])
fcst = fcst.cross_validation(series, h=horizon, n_windows=n_windows)
cv_res = cv_res.groupby('cutoff')['unique_id'].nunique()
series_per_cutoff = series['unique_id'].value_counts().sort_index()
series_sizes for i in range(4):
- i) * horizon).sum()) test_eq(series_per_cutoff.iloc[i], series_sizes.gt((n_windows
#| 极地
from itertools import product
import polars as pl
from utilsforecast.processing import match_if_categorical
from mlforecast.utils import generate_prices_for_series
#| 极地
= 2
horizon = generate_daily_series(
series_pl 10, n_static_features=2, static_as_categorical=False, equal_ends=True, engine='polars'
)= generate_daily_series(
series_pd 10, n_static_features=2, static_as_categorical=False, equal_ends=True, engine='pandas'
)= series_pd.rename(columns={'static_0': 'product_id'})
series_pd = generate_prices_for_series(series_pd, horizon)
prices_pd 'unique_id'] = prices_pd['unique_id'].astype(series_pd['unique_id'].dtype)
prices_pd[= series_pd.merge(prices_pd, on=['unique_id', 'ds'])
series_pd
= pl.from_pandas(prices_pd)
prices_pl = match_if_categorical(series_pl['unique_id'], prices_pl['unique_id'])
uids_series, uids_prices = series_pl.with_columns(uids_series)
series_pl = prices_pl.with_columns(uids_prices)
prices_pl = series_pl.rename({'static_0': 'product_id'}).join(prices_pl, on=['unique_id', 'ds'])
series_pl = np.random.choice(series_pl.shape[0], series_pl.shape[0], replace=False)
permutation = series_pl[permutation]
series_pl = series_pd.iloc[permutation]
series_pd
= dict(
cfg =[LinearRegression(), lgb.LGBMRegressor(verbosity=-1)],
models='1d',
freq=[1, 2],
lags={
lag_transforms1: [ExpandingMean()],
2: [ExpandingMean()],
},=['day', 'month', 'week', 'year'],
date_features=[Differences([1, 2]), LocalStandardScaler()],
target_transforms
)= dict(
fit_kwargs =True,
fitted=PredictionIntervals(h=horizon),
prediction_intervals=['product_id', 'static_1'],
static_features
)= dict(
predict_kwargs =2,
h=[80, 95],
level
)= [None, horizon]
horizons = [True, False]
as_np for max_horizon, as_numpy in product(horizons, as_np):
= MLForecast(**cfg)
fcst_pl =max_horizon, as_numpy=as_numpy, **fit_kwargs)
fcst_pl.fit(series_pl, max_horizon= fcst_pl.forecast_fitted_values()
fitted_pl = fcst_pl.predict(X_df=prices_pl, **predict_kwargs)
preds_pl = fcst_pl.predict(X_df=prices_pl, ids=fcst_pl.ts.uids[[0, 6]], **predict_kwargs)
preds_pl_subset = fcst_pl.cross_validation(
cv_pl =2, h=horizon, fitted=True, static_features=['product_id', 'static_1'], as_numpy=as_numpy
series_pl, n_windows
)= fcst_pl.cross_validation_fitted_values()
cv_fitted_pl
= MLForecast(**cfg)
fcst_pd =max_horizon, as_numpy=as_numpy, **fit_kwargs)
fcst_pd.fit(series_pd, max_horizon= fcst_pd.forecast_fitted_values()
fitted_pd = fcst_pd.predict(X_df=prices_pd, **predict_kwargs)
preds_pd = fcst_pd.predict(X_df=prices_pd, ids=fcst_pd.ts.uids[[0, 6]], **predict_kwargs)
preds_pd_subset assert preds_pd_subset['unique_id'].unique().tolist() == ['id_0', 'id_6']
= fcst_pd.cross_validation(
cv_pd =2, h=horizon, fitted=True, static_features=['product_id', 'static_1'], as_numpy=as_numpy
series_pd, n_windows
)= fcst_pd.cross_validation_fitted_values()
cv_fitted_pd
if max_horizon is not None:
= fitted_pl.with_columns(pl.col('h').cast(pl.Int64))
fitted_pl for h in range(max_horizon):
= fitted_pl.filter(pl.col('h').eq(h + 1))
fitted_h = (
series_offset
series_pl'unique_id', 'ds')
.sort('y').shift(-h).over('unique_id'))
.with_columns(pl.col(
)= (
series_filtered
fitted_h'unique_id', 'ds']]
[[=['unique_id', 'ds'])
.join(series_offset, on'unique_id', 'ds'])
.sort([
)
np.testing.assert_allclose('y'],
series_filtered['y']
fitted_h[
)else:
= (
series_filtered
fitted_pl'unique_id', 'ds']]
[[=['unique_id', 'ds'])
.join(series_pl, on'unique_id', 'ds'])
.sort([
)
np.testing.assert_allclose('y'],
series_filtered['y']
fitted_pl[
)
pd.testing.assert_frame_equal(fitted_pl.to_pandas(), fitted_pd)
pd.testing.assert_frame_equal(preds_pl.to_pandas(), preds_pd)
pd.testing.assert_frame_equal(preds_pl_subset.to_pandas(), preds_pd_subset)
pd.testing.assert_frame_equal(cv_pl.to_pandas(), cv_pd)
pd.testing.assert_frame_equal('fold').cast(pl.Int64)).to_pandas(),
cv_fitted_pl.with_columns(pl.col(
cv_fitted_pd, )
# 当序列被删除时,测试转换被正确地反转
= generate_daily_series(10, min_length=5, max_length=20)
series = MLForecast(
fcst =LinearRegression(),
models='D',
freq=[10],
lags=[Differences([1]), LocalStandardScaler()],
target_transforms
)=True)
fcst.fit(series, fittedassert fcst.ts._dropped_series.size > 0
= fcst.forecast_fitted_values()
fitted_vals = fitted_vals.merge(series, on=['unique_id', 'ds'], suffixes=('_fitted', '_orig'))
full
np.testing.assert_allclose('y_fitted'].values,
full['y_orig'].values,
full[ )
# 保存与加载
= generate_daily_series(10)
series = MLForecast(
fcst =LinearRegression(),
models='D',
freq=[10],
lags=[Differences([1]), LocalStandardScaler()],
target_transforms
)
fcst.fit(series)= fcst.predict(10)
preds with tempfile.TemporaryDirectory() as tmpdir:
= Path(tmpdir) / 'fcst'
savedir
savedir.mkdir()
fcst.save(savedir)= MLForecast.load(savedir)
fcst2 = fcst2.predict(10)
preds2 pd.testing.assert_frame_equal(preds, preds2)
Give us a ⭐ on Github