%load_ext autoreload
%autoreload 2
模型
StatsForecast 当前支持的模型
StatsForecast 提供了多种模型,分为以下几类:
自动预测: 自动预测工具搜索最佳参数并为一系列时间序列选择最佳模型。这些工具对于大量单变量时间序列非常有用。包括 Arima、ETS、Theta、CES 的自动版本。
指数平滑: 使用所有过去观察值的加权平均,其中权重随时间呈指数递减。适合具有明确趋势和/或季节性的数据显示。对于没有明确趋势或季节性的数据显示,使用
SimpleExponential
家族。示例:SES、霍尔特-温特斯、SSO。基准模型: 用于建立基准的经典模型。示例:均值、天真模型、随机游走。
间歇性或稀疏模型: 适用于非零观察值极少的时间序列。示例:CROSTON、ADIDA、IMAPA。
多重季节性: 适用于具有多种明显季节性的信号。对低频数据(如电力和日志)很有用。示例:MSTL 和 TBATS。
Theta 模型: 将两条 theta 线拟合到去季节化时间序列,使用不同技术获得并组合这两条 theta 线以产生最终预测。示例:Theta、DynamicTheta。
GARCH 模型: 适用于建模随时间变化而波动性不恒定的时间序列。通常用于金融领域建模股票价格、汇率、利率及其他金融工具。ARCH 模型是 GARCH 的一种特例。
import warnings
from math import trunc
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
from numba import njit
from scipy.optimize import minimize
from scipy.special import inv_boxcox
from statsforecast.arima import (
Arima,
auto_arima_f,
fitted_arima,
forecast_arima,
forward_arima,
is_constant,
)from statsforecast.ces import (
auto_ces, forecast_ces,
forward_ces
)from statsforecast.ets import (
_PHI_LOWER,
_PHI_UPPER,
ets_f, forecast_ets,
forward_ets,
)from statsforecast.mfles import MFLES as _MFLES
from statsforecast.mstl import mstl
from statsforecast.theta import (
auto_theta, forecast_theta,
forward_theta
)from statsforecast.garch import (
garch_model, garch_forecast
)from statsforecast.tbats import tbats_selection, tbats_forecast, _compute_sigmah
from statsforecast.utils import (
_calculate_sigma,
_calculate_intervals,
_ensure_float,
_naive,
_old_kw_to_pos,
_quantiles,
_repeat_val,
_repeat_val_seas,
_seasonal_naive,
CACHE,
ConformalIntervals,
NOGIL, )
from datetime import date, timedelta
import matplotlib.pyplot as plt
import pandas as pd
from fastcore.test import test_eq, test_close, test_fail
from nbdev.showdoc import add_docs, show_doc
from statsforecast.garch import generate_garch_data
from statsforecast.utils import AirPassengers as ap
def _plot_insample_pi(fcst):
= plt.subplots(1, 1, figsize = (20,7))
fig, ax
= date(1949,1,1) # 开始日期
sdate = date(1961,1,1) # 结束日期
edate = pd.date_range(sdate,edate-timedelta(days=1),freq='m')
dates
= pd.DataFrame({'dates': dates,
df 'actual': ap,
'fitted': fcst['fitted'],
'fitted_lo_80': fcst['fitted-lo-80'],
'fitted_lo_95': fcst['fitted-lo-95'],
'fitted_hi_80': fcst['fitted-hi-80'],
'fitted_hi_95': fcst['fitted-hi-95']})
='firebrick', label='Actual value', linewidth=3)
plt.plot(df.dates, df.actual, color='navy', label='Fitted values', linewidth=3)
plt.plot(df.dates, df.fitted, color='darkorange', label='fitted-lo-80', linewidth=3)
plt.plot(df.dates, df.fitted_lo_80, color='deepskyblue', label='fitted-lo-95', linewidth=3)
plt.plot(df.dates, df.fitted_lo_95, color='darkorange', label='fitted-hi-80', linewidth=3)
plt.plot(df.dates, df.fitted_hi_80, color='deepskyblue', label='fitted-hi-95', linewidth=3)
plt.plot(df.dates, df.fitted_hi_95, color= 'deepskyblue', alpha = 0.2)
plt.fill_between(df.dates, df.fitted_lo_95, df.fitted_hi_95, color = 'darkorange', alpha = 0.3)
plt.fill_between(df.dates, df.fitted_lo_80, df.fitted_hi_80, color
plt.legend()
def _plot_fcst(fcst):
= plt.subplots(1, 1, figsize = (20,7))
fig, ax 0, len(ap)), ap)
plt.plot(np.arange(len(ap), len(ap) + 13), fcst['mean'], label='mean')
plt.plot(np.arange(len(ap), len(ap) + 13), fcst['lo-95'], color = 'r', label='lo-95')
plt.plot(np.arange(len(ap), len(ap) + 13), fcst['hi-95'], color = 'r', label='hi-95')
plt.plot(np.arange(len(ap), len(ap) + 13), fcst['lo-80'], color = 'g', label='lo-80')
plt.plot(np.arange(len(ap), len(ap) + 13), fcst['hi-80'], color = 'g', label='hi-80')
plt.plot(np.arange( plt.legend()
def _add_fitted_pi(res, se, level):
= sorted(level)
level = np.asarray(level)
level = _quantiles(level=level)
quantiles = res['fitted'].reshape(-1, 1) - quantiles * se.reshape(-1, 1)
lo = res['fitted'].reshape(-1, 1) + quantiles * se.reshape(-1, 1)
hi = lo[:, ::-1]
lo = {f'fitted-lo-{l}': lo[:, i] for i, l in enumerate(reversed(level))}
lo = {f'fitted-hi-{l}': hi[:, i] for i, l in enumerate(level)}
hi = {**res, **lo, **hi}
res return res
def _add_conformal_distribution_intervals(
fcst: Dict,
cs: np.ndarray,int, float]],
level: List[Union[-> Dict:
) r"""
Adds conformal intervals to the `fcst` dict based on conformal scores `cs`.
`level` should be already sorted. This strategy creates forecasts paths
based on errors and calculate quantiles using those paths.
"""
= [100 - lv for lv in level]
alphas = [alpha / 200 for alpha in reversed(alphas)]
cuts 1 - alpha / 200 for alpha in alphas)
cuts.extend(= fcst['mean'].reshape(1, -1)
mean = np.vstack([mean - cs, mean + cs])
scores = np.quantile(
quantiles
scores,
cuts,=0,
axis
)= quantiles.reshape(len(cuts), -1)
quantiles = [f"lo-{lv}" for lv in reversed(level)]
lo_cols = [f"hi-{lv}" for lv in level]
hi_cols = lo_cols + hi_cols
out_cols for i, col in enumerate(out_cols):
= quantiles[i]
fcst[col] return fcst
def _get_conformal_method(method: str):
= {
available_methods "conformal_distribution": _add_conformal_distribution_intervals,
#"conformal_error": _add_conformal_error_intervals,
}if method not in available_methods.keys():
raise ValueError(
f"prediction intervals method {method} not supported "
f'please choose one of {", ".join(available_methods.keys())}'
)return available_methods[method]
class _TS:
= False
uses_exog
def new(self):
= type(self).__new__(type(self))
b self.__dict__)
b.__dict__.update(return b
def __repr__(self):
return self.alias
def _conformity_scores(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray] -> np.ndarray:
) = _ensure_float(y)
y = self.prediction_intervals.n_windows # 类型:忽略[属性定义]
n_windows = self.prediction_intervals.h # 类型:忽略[属性定义]
h = y.size
n_samples # 尽可能多地使用窗口进行短期系列
# 训练集减去1
= min(n_windows, (n_samples - 1) // h)
n_windows if n_windows < 2:
raise ValueError(
f"Prediction intervals settings require at least {2 * h + 1:,} samples, serie has {n_samples:,}."
)= n_windows * h
test_size = np.empty((n_windows, h), dtype=y.dtype)
cs for i_window in range(n_windows):
= n_samples - test_size + i_window * h
train_end = y[:train_end]
y_train = y[train_end : train_end + h]
y_test if X is not None:
= X[:train_end]
X_train = X[train_end : train_end + h]
X_test else:
= None
X_train = None
X_test = self.forecast(h=h, y=y_train, X=X_train, X_future=X_test) # 类型:忽略[属性定义]
fcst_window = np.abs(fcst_window["mean"] - y_test)
cs[i_window] return cs
@property
def _conformal_method(self):
return _get_conformal_method(self.prediction_intervals.method)
def _store_cs(self, y, X):
if self.prediction_intervals is not None:
self._cs = self._conformity_scores(y, X)
def _add_conformal_intervals(self, fcst, y, X, level):
if self.prediction_intervals is not None and level is not None:
= self._conformity_scores(y, X) if y is not None else self._cs
cs = self._conformal_method(fcst=fcst, cs=cs, level=level)
res return res
return fcst
def _add_predict_conformal_intervals(self, fcst, level):
return self._add_conformal_intervals(fcst=fcst, y=None, X=None, level=level)
# 测试一致性得分
class ZeroModel(_TS):
def __init__(self, prediction_intervals: ConformalIntervals = None):
self.prediction_intervals = prediction_intervals
self.alias = 'SumAhead'
def forecast(self, y, h, X=None, X_future=None, fitted=False, level=None):
= {'mean': np.zeros(h)}
res if self.prediction_intervals is not None and level is not None:
= self._conformity_scores(y, X)
cs = self._conformal_method(fcst=res, cs=cs, level=level)
res return res
def fit(self, y, X):
return self
def predict(self, h, X=None, level=None):
= {'mean': np.zeros(h)}
res return res
= ConformalIntervals(h=12, n_windows=10)
conf_intervals = np.full((conf_intervals.n_windows, conf_intervals.h), np.nan)
expected_cs = ap[-conf_intervals.h*conf_intervals.n_windows:]
cs_info for i in range(conf_intervals.n_windows):
= cs_info[i * conf_intervals.h:(i+1) * conf_intervals.h]
expected_cs[i] = ZeroModel(conf_intervals)._conformity_scores(ap)
current_cs
test_eq(expected_cs, current_cs)= ZeroModel(conf_intervals)
zero_model = zero_model.forecast(ap, h=12, level=[80, 90])
fcst_conformal list(fcst_conformal.keys()), ['mean', 'lo-90', 'lo-80', 'hi-80', 'hi-90']) test_eq(
自动化预测
自动ARIMA
class AutoARIMA(_TS):
r"""AutoARIMA model.
Automatically selects the best ARIMA (AutoRegressive Integrated Moving Average)
model using an information criterion. Default is Akaike Information Criterion (AICc).
Notes
-----
This implementation is a mirror of Hyndman's [forecast::auto.arima](https://github.com/robjhyndman/forecast).
References
----------
[Rob J. Hyndman, Yeasmin Khandakar (2008). "Automatic Time Series Forecasting: The forecast package for R"](https://www.jstatsoft.org/article/view/v027i03).
Parameters
----------
d : Optional[int]
Order of first-differencing.
D : Optional[int]
Order of seasonal-differencing.
max_p : int
Max autorregresives p.
max_q : int
Max moving averages q.
max_P : int
Max seasonal autorregresives P.
max_Q : int
Max seasonal moving averages Q.
max_order : int
Max p+q+P+Q value if not stepwise selection.
max_d : int
Max non-seasonal differences.
max_D : int
Max seasonal differences.
start_p : int
Starting value of p in stepwise procedure.
start_q : int
Starting value of q in stepwise procedure.
start_P : int
Starting value of P in stepwise procedure.
start_Q : int
Starting value of Q in stepwise procedure.
stationary : bool
If True, restricts search to stationary models.
seasonal : bool
If False, restricts search to non-seasonal models.
ic : str
Information criterion to be used in model selection.
stepwise : bool
If True, will do stepwise selection (faster).
nmodels : int
Number of models considered in stepwise search.
trace : bool
If True, the searched ARIMA models is reported.
approximation : Optional[bool]
If True, conditional sums-of-squares estimation, final MLE.
method : Optional[str]
Fitting method between maximum likelihood or sums-of-squares.
truncate : Optional[int]
Observations truncated series used in model selection.
test : str
Unit root test to use. See `ndiffs` for details.
test_kwargs : Optional[str]
Unit root test additional arguments.
seasonal_test : str
Selection method for seasonal differences.
seasonal_test_kwargs : Optional[dict]
Seasonal unit root test arguments.
allowdrift : bool (default True)
If True, drift models terms considered.
allowmean : bool (default True)
If True, non-zero mean models considered.
blambda : Optional[float]
Box-Cox transformation parameter.
biasadj : bool
Use adjusted back-transformed mean Box-Cox.
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
= True
uses_exog
def __init__(
self,
int] = None,
d: Optional[int] = None,
D: Optional[int = 5,
max_p: int = 5,
max_q: int = 2,
max_P: int = 2,
max_Q: int = 5,
max_order: int = 2,
max_d: int = 1,
max_D: int = 2,
start_p: int = 2,
start_q: int = 1,
start_P: int = 1,
start_Q: bool = False,
stationary: bool = True,
seasonal: str = 'aicc',
ic: bool = True,
stepwise: int = 94,
nmodels: bool = False,
trace: bool] = False,
approximation: Optional[str] = None,
method: Optional[bool] = None,
truncate: Optional[str = 'kpss',
test: str] = None,
test_kwargs: Optional[str = 'seas',
seasonal_test: = None,
seasonal_test_kwargs: Optional[Dict] bool = True,
allowdrift: bool = True,
allowmean: float] = None,
blambda: Optional[bool = False,
biasadj: int = 1,
season_length: str = 'AutoARIMA',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):self.d=d
self.D=D
self.max_p=max_p
self.max_q=max_q
self.max_P=max_P
self.max_Q=max_Q
self.max_order=max_order
self.max_d=max_d
self.max_D=max_D
self.start_p=start_p
self.start_q=start_q
self.start_P=start_P
self.start_Q=start_Q
self.stationary=stationary
self.seasonal=seasonal
self.ic=ic
self.stepwise=stepwise
self.nmodels=nmodels
self.trace=trace
self.approximation=approximation
self.method=method
self.truncate=truncate
self.test=test
self.test_kwargs=test_kwargs
self.seasonal_test=seasonal_test
self.seasonal_test_kwargs=seasonal_test_kwargs
self.allowdrift=allowdrift
self.allowmean=allowmean
self.blambda=blambda
self.biasadj=biasadj
self.season_length=season_length
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the AutoARIMA model.
Fit an AutoARIMA to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
AutoARIMA fitted model.
"""
= _ensure_float(y)
y with np.errstate(invalid='ignore'):
self.model_ = auto_arima_f(
=y,
x=self.d,
d=self.D,
D=self.max_p,
max_p=self.max_q,
max_q=self.max_P,
max_P=self.max_Q,
max_Q=self.max_order,
max_order=self.max_d,
max_d=self.max_D,
max_D=self.start_p,
start_p=self.start_q,
start_q=self.start_P,
start_P=self.start_Q,
start_Q=self.stationary,
stationary=self.seasonal,
seasonal=self.ic,
ic=self.stepwise,
stepwise=self.nmodels,
nmodels=self.trace,
trace=self.approximation,
approximation=self.method,
method=self.truncate,
truncate=X,
xreg=self.test,
test=self.test_kwargs,
test_kwargs=self.seasonal_test,
seasonal_test=self.seasonal_test_kwargs,
seasonal_test_kwargs=self.allowdrift,
allowdrift=self.allowmean,
allowmean=self.blambda,
blambda=self.biasadj,
biasadj=self.season_length
period
)
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted AutoArima.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= forecast_arima(self.model_, h=h, xreg=X, level=level)
fcst = fcst['mean']
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
= {
res 'mean': mean,
**{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
**{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
}return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted AutoArima insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= fitted_arima(self.model_)
mean = {'fitted': mean}
res if level is not None:
= np.sqrt(self.model_['sigma2'])
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient AutoARIMA predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenpus of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x) optional exogenous.
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y with np.errstate(invalid='ignore'):
= auto_arima_f(
mod =y,
x=self.d,
d=self.D,
D=self.max_p,
max_p=self.max_q,
max_q=self.max_P,
max_P=self.max_Q,
max_Q=self.max_order,
max_order=self.max_d,
max_d=self.max_D,
max_D=self.start_p,
start_p=self.start_q,
start_q=self.start_P,
start_P=self.start_Q,
start_Q=self.stationary,
stationary=self.seasonal,
seasonal=self.ic,
ic=self.stepwise,
stepwise=self.nmodels,
nmodels=self.trace,
trace=self.approximation,
approximation=self.method,
method=self.truncate,
truncate=X,
xreg=self.test,
test=self.test_kwargs,
test_kwargs=self.seasonal_test,
seasonal_test=self.seasonal_test_kwargs,
seasonal_test_kwargs=self.allowdrift,
allowdrift=self.allowmean,
allowmean=self.blambda,
blambda=self.biasadj,
biasadj=self.season_length
period
)= forecast_arima(mod, h, xreg=X_future, level=level)
fcst = {'mean': fcst['mean']}
res if fitted:
'fitted'] = fitted_arima(mod)
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= {
res **res,
**{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
**{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
}if fitted:
# add prediction intervals for fitted values
= np.sqrt(mod['sigma2'])
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forward(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Apply fitted ARIMA model to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, 'model_'):
raise Exception('You have to use the `fit` method first')
= _ensure_float(y)
y with np.errstate(invalid='ignore'):
= forward_arima(self.model_, y=y, xreg=X, method=self.method)
mod = forecast_arima(mod, h, xreg=X_future, level=level)
fcst = {'mean': fcst['mean']}
res if fitted:
'fitted'] = fitted_arima(mod)
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= {
res **res,
**{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
**{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
}if fitted:
# add prediction intervals for fitted values
= np.sqrt(mod['sigma2'])
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def test_class(
=False, level=None, test_forward=False, X=None, X_future=None
cls_, x, h, skip_insample
):= cls_.fit(x, X=X)
cls_ = cls_.predict(h=h, X=X_future)
fcst_cls len(fcst_cls['mean']), h)
test_eq(# 测试拟合 + 预测 = 预报
test_eq(=x, h=h, X=X, X_future=X_future)['mean'],
cls_.forecast(y'mean']
fcst_cls[
)if not skip_insample:
len(cls_.predict_in_sample()['fitted']), len(x))
test_eq(assert isinstance(cls_.predict_in_sample()['fitted'], np.ndarray)
np.testing.assert_array_equal(=x, h=h, X=X, X_future=X_future, fitted=True)['fitted'],
cls_.forecast(y'fitted'],
cls_.predict_in_sample()[
)if test_forward:
np.testing.assert_array_equal(=x, h=h, X=X, X_future=X_future, fitted=True)['fitted'],
cls_.forward(y'fitted'],
cls_.predict_in_sample()[
)
if test_forward:
try:
pd.testing.assert_frame_equal(=h, X=X_future)),
pd.DataFrame(cls_.predict(h=x, X=X, X_future=X_future, h=h)),
pd.DataFrame(cls_.forward(y
)except AssertionError:
raise Exception('predict and forward methods are not equal')
if level is not None:
= pd.DataFrame(cls_.predict(h=h, X=X_future, level=level))
fcst_cls = pd.DataFrame(cls_.forecast(y=x, h=h, X=X, X_future=X_future, level=level))
fcst_forecast try:
pd.testing.assert_frame_equal(fcst_cls, fcst_forecast)except AssertionError:
raise Exception('predict and forecast methods are not equal with levels')
if test_forward:
try:
pd.testing.assert_frame_equal(=h, X=X_future, level=level)),
pd.DataFrame(cls_.predict(h=x, h=h, X=X, X_future=X_future, level=level))
pd.DataFrame(cls_.forward(y
)except AssertionError:
raise Exception('predict and forward methods are not equal with levels')
if not skip_insample:
= pd.DataFrame(cls_.predict_in_sample(level=level))
fcst_cls = cls_.forecast(y=x, h=h, X=X, X_future=X_future, level=level, fitted=True)
fcst_forecast = pd.DataFrame({key: val for key, val in fcst_forecast.items() if 'fitted' in key})
fcst_forecast try:
pd.testing.assert_frame_equal(fcst_cls, fcst_forecast)except AssertionError:
raise Exception(
'predict and forecast methods are not equal with '
'levels for fitted values '
)if test_forward:
= cls_.forecast(y=x, h=h, X=X, X_future=X_future, level=level, fitted=True)
fcst_forward = pd.DataFrame({key: val for key, val in fcst_forward.items() if 'fitted' in key})
fcst_forward try:
pd.testing.assert_frame_equal(fcst_cls, fcst_forward)except AssertionError:
raise Exception(
'predict and forward methods are not equal with '
'levels for fitted values '
)
def _test_fitted_sparse(model_factory):
= np.array([2, 5, 0, 1, 3, 0, 1, 1, 0], dtype=np.float64)
y1 = np.array([0, 0, 1, 0, 0, 7, 1, 0, 1], dtype=np.float64)
y2 = np.array([0, 0, 1, 0, 0, 7, 1, 0, 0], dtype=np.float64)
y3 = np.zeros(9, dtype=np.float64)
y4 for y in [y1, y2, y3, y4]:
= np.hstack(
expected_fitted
[=y[:i + 1], h=1)['mean']
model_factory().forecast(yfor i in range(y.size - 1)]
)
np.testing.assert_allclose(=y, h=1, fitted=True)['fitted'],
model_factory().forecast(y
np.append(np.nan, expected_fitted),=1e-6,
atol )
= AutoARIMA(season_length=12)
arima =ap, h=12, level=[90, 80], test_forward=True)
test_class(arima, x= arima.forecast(ap, 13, None, None, (80,95), True)
fcst_arima _plot_insample_pi(fcst_arima)
# 测试一致性预测
= AutoARIMA(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
arima_c =ap, h=13, level=[90, 80], test_forward=True)
test_class(arima_c, x= arima_c.forecast(ap, 13, None, None, (80,95), True)
fcst_arima_c
test_eq("mean"],
fcst_arima_c["mean"],
fcst_arima[ )
_plot_fcst(fcst_arima_c)
#测试别名参数
test_eq(repr(AutoARIMA()),
'AutoARIMA'
)
test_eq(repr(AutoARIMA(alias='AutoARIMA_seasonality')),
'AutoARIMA_seasonality'
)
_plot_fcst(fcst_arima)
=3) show_doc(AutoARIMA, title_level
=3) show_doc(AutoARIMA.fit, title_level
=3) show_doc(AutoARIMA.predict, title_level
=3) show_doc(AutoARIMA.predict_in_sample, title_level
=3) show_doc(AutoARIMA.forecast, title_level
=3) show_doc(AutoARIMA.forward, title_level
from statsforecast.models import AutoARIMA
from statsforecast.utils import AirPassengers as ap
# AutoARIMA的使用示例
= AutoARIMA(season_length=4)
arima = arima.fit(y=ap)
arima = arima.predict(h=4, level=[80])
y_hat_dict y_hat_dict
自动ETS
class AutoETS(_TS):
r"""Automatic Exponential Smoothing model.
Automatically selects the best ETS (Error, Trend, Seasonality)
model using an information criterion. Default is Akaike Information Criterion (AICc), while particular models are estimated using maximum likelihood.
The state-space equations can be determined based on their $M$ multiplicative, $A$ additive,
$Z$ optimized or $N$ ommited components. The `model` string parameter defines the ETS equations:
E in [$M, A, Z$], T in [$N, A, M, Z$], and S in [$N, A, M, Z$].
For example when model='ANN' (additive error, no trend, and no seasonality), ETS will
explore only a simple exponential smoothing.
If the component is selected as 'Z', it operates as a placeholder to ask the AutoETS model
to figure out the best parameter.
Notes
-----
This implementation is a mirror of Hyndman's [forecast::ets](https://github.com/robjhyndman/forecast).
References
----------
[Rob J. Hyndman, Yeasmin Khandakar (2008). "Automatic Time Series Forecasting: The forecast package for R"](https://www.jstatsoft.org/article/view/v027i03).
[Hyndman, Rob, et al (2008). "Forecasting with exponential smoothing: the state space approach"](https://robjhyndman.com/expsmooth/).
Parameters
----------
model : str
Controlling state-space-equations.
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
damped : bool
A parameter that 'dampens' the trend.
phi : float, optional (default=None)
Smoothing parameter for trend damping. Only used when `damped=True`.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals],
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1,
season_length: str = 'ZZZ',
model: bool] = None,
damped: Optional[float] = None,
phi: Optional[str = 'AutoETS',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):self.season_length = season_length
self.model = model
self.damped = damped
if phi is not None:
if not isinstance(phi, float):
raise ValueError('phi must be `None` or float.')
if not _PHI_LOWER <= phi <= _PHI_UPPER:
raise ValueError(f'Valid range for phi is [{_PHI_LOWER}, {_PHI_UPPER}]')
self.phi = phi
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the Exponential Smoothing model.
Fit an Exponential Smoothing model to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
Exponential Smoothing fitted model.
"""
= _ensure_float(y)
y self.model_ = ets_f(y, m=self.season_length, model=self.model, damped=self.damped, phi=self.phi)
self.model_['actual_residuals'] = y - self.model_['fitted']
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None
level: Optional[List[
):r"""Predict with fitted Exponential Smoothing.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenpus of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= forecast_ets(self.model_, h=h, level=level)
fcst = {'mean': fcst['mean']}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
= {
res **res,
**{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
**{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
}return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted Exponential Smoothing insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= self.model_['actual_residuals']
residuals = _calculate_sigma(residuals, len(residuals) - self.model_['n_params'])
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient Exponential Smoothing predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenpus of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = ets_f(y, m=self.season_length, model=self.model, damped=self.damped, phi=self.phi)
mod = forecast_ets(mod, h=h, level=level)
fcst = ['mean']
keys if fitted:
'fitted')
keys.append(= {key: fcst[key] for key in keys}
res if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= {
res **res,
**{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
**{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
}if fitted:
# add prediction intervals for fitted values
= _calculate_sigma(y - mod['fitted'], len(y) - mod['n_params'])
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forward(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Apply fitted Exponential Smoothing model to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenpus of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, 'model_'):
raise Exception('You have to use the `fit` method first')
= _ensure_float(y)
y = forward_ets(self.model_, y=y)
mod = forecast_ets(mod, h=h, level=level)
fcst = ['mean']
keys if fitted:
'fitted')
keys.append(= {key: fcst[key] for key in keys}
res if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= {
res **res,
**{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
**{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
}if fitted:
# add prediction intervals for fitted values
= _calculate_sigma(y - mod['fitted'], len(y) - mod['n_params'])
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
= AutoETS(season_length=12)
autoets =ap, h=12, level=[90, 80], test_forward=True)
test_class(autoets, x= autoets.forecast(ap, 13, None, None, (80,95), True)
fcst_ets _plot_insample_pi(fcst_ets)
_plot_fcst(fcst_ets)
#测试别名参数
test_eq(repr(AutoETS()),
'AutoETS'
)
test_eq(repr(AutoETS(alias='AutoETS_custom')),
'AutoETS_custom'
)
= AutoETS(season_length=12, model='AAA')
autoets =ap, h=12, level=[90, 80])
test_class(autoets, x= autoets.forecast(ap, 13, None, None, (80,95), True)
fcst_ets _plot_insample_pi(fcst_ets)
# 测试保形预测
= AutoETS(season_length=12, model='AAA', prediction_intervals=ConformalIntervals(h=13, n_windows=2))
autoets_c =ap, h=13, level=[90, 80], test_forward=True)
test_class(autoets_c, x= autoets_c.forecast(ap, 13, None, None, (80,95), True)
fcst_ets_c 'mean'],
test_eq(fcst_ets_c['mean'])
fcst_ets[ _plot_fcst(fcst_ets_c)
# 测试预测和拟合预测是否生成相同的结果
= ['ANNN', 'AANN', 'ANAN', 'AAAN', 'AAND', 'AAAD', # 班级 1
models 'MNNN', 'MANN', 'MAND', 'MNAN', 'MAAN', 'MAAD', # 二班
'MNMN', 'MAMN', 'MAMD'] # 三班
for k in range(0,len(models)):
= models[k][0:3]
mod = models[k][-1]
damped_val if damped_val == 'N':
= False
damped else:
= True
damped
= AutoETS(season_length=12, model=mod, damped=damped)
ets =ap, h=13, level=[90, 80], test_forward=True) test_class(ets, x
=3) show_doc(AutoETS, title_level
=3) show_doc(AutoETS.fit, title_level
=3) show_doc(AutoETS.predict, title_level
=3) show_doc(AutoETS.predict_in_sample, title_level
=3) show_doc(AutoETS.forecast, title_level
=3) show_doc(AutoETS.forward, title_level
from statsforecast.models import AutoETS
from statsforecast.utils import AirPassengers as ap
# AutoETS' usage example
# Multiplicative trend, optimal error and seasonality
= AutoETS(model='ZMZ', season_length=4)
autoets = autoets.fit(y=ap)
autoets = autoets.predict(h=4)
y_hat_dict y_hat_dict
class ETS(AutoETS):
@classmethod
def _warn(cls):
warnings.warn('`ETS` will be deprecated in future versions of `StatsForecast`. Please use `AutoETS` instead.',
=FutureWarning,
category=2
stacklevel
)
def __init__(self, season_length: int = 1, model: str = 'ZZZ',
bool] = None,
damped: Optional[float] = None,
phi: Optional[str = 'ETS',
alias: = None):
prediction_intervals: Optional[ConformalIntervals]
ETS._warn()super().__init__(
=season_length,
season_length=model,
model=damped,
damped=phi,
phi=alias,
alias=prediction_intervals,
prediction_intervals )
= ETS(model='ZMZ', season_length=4) ets
= ETS(model='ZMZ', season_length=4)
ets = AutoETS(model='ZMZ',
autoets =4)
season_length=ap, h=12)['mean'],
test_eq(ets.forecast(y=ap, h=12)['mean']) autoets.forecast(y
#测试别名参数
test_eq(repr(ETS()),
'ETS'
)
test_eq(repr(ETS(alias='ETS_custom')),
'ETS_custom'
)
自动化环境控制系统 (AutoCES)
class AutoCES(_TS):
r"""Complex Exponential Smoothing model.
Automatically selects the best Complex Exponential Smoothing
model using an information criterion. Default is Akaike Information Criterion (AICc), while particular
models are estimated using maximum likelihood.
The state-space equations can be determined based on their $S$ simple, $P$ parial,
$Z$ optimized or $N$ ommited components. The `model` string parameter defines the
kind of CES model: $N$ for simple CES (withous seasonality), $S$ for simple seasonality (lagged CES),
$P$ for partial seasonality (without complex part), $F$ for full seasonality (lagged CES
with real and complex seasonal parts).
If the component is selected as 'Z', it operates as a placeholder to ask the AutoCES model
to figure out the best parameter.
References
----------
[Svetunkov, Ivan & Kourentzes, Nikolaos. (2015). "Complex Exponential Smoothing". 10.13140/RG.2.1.3757.2562. ](https://onlinelibrary.wiley.com/doi/full/10.1002/nav.22074).
Parameters
----------
model : str
Controlling state-space-equations.
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1,
season_length: str = 'Z',
model: str = 'CES',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):self.season_length = season_length
self.model = model
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the Complex Exponential Smoothing model.
Fit the Complex Exponential Smoothing model to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
Complex Exponential Smoothing fitted model.
"""
= _ensure_float(y)
y if is_constant(y):
= Naive(alias=self.alias, prediction_intervals=self.prediction_intervals)
model =y, X=X)
model.fit(yreturn model
self.model_ = auto_ces(y, m=self.season_length, model=self.model)
self.model_['actual_residuals'] = y - self.model_['fitted']
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None
level: Optional[List[
):r"""Predict with fitted Exponential Smoothing.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level: List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= forecast_ces(self.model_, h=h, level=level)
fcst = {"mean": fcst["mean"]}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
= {
res **res,
**{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
**{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
}return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted Exponential Smoothing insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= self.model_['actual_residuals']
residuals = _calculate_sigma(residuals, self.model_['n'])
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient Complex Exponential Smoothing predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenpus of shape (h, n_x).
level: List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y if is_constant(y):
= Naive(alias=self.alias, prediction_intervals=self.prediction_intervals)
model return model.forecast(y=y, h=h, X=X, X_future=X_future, level=level, fitted=fitted)
= auto_ces(y, m=self.season_length, model=self.model)
mod = forecast_ces(mod, h, level=level)
fcst = ['mean']
keys if fitted:
'fitted')
keys.append(= {key: fcst[key] for key in keys}
res if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= {
res **res,
**{f'lo-{l}': fcst[f'lo-{l}'] for l in reversed(level)},
**{f'hi-{l}': fcst[f'hi-{l}'] for l in level},
}if fitted:
# 为拟合值添加预测区间
= _calculate_sigma(y - mod['fitted'], len(y))
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forward(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Apply fitted Complex Exponential Smoothing to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenpus of shape (h, n_x).
level: List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, 'model_'):
raise Exception('You have to use the `fit` method first')
= _ensure_float(y)
y = forward_ces(self.model_, y=y)
mod = forecast_ces(mod, h, level=level)
fcst = ['mean']
keys if fitted:
'fitted')
keys.append(= {key: fcst[key] for key in keys}
res if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= {
res **res,
**{f'lo-{l}': fcst[f'lo-{l}'] for l in reversed(level)},
**{f'hi-{l}': fcst[f'hi-{l}'] for l in level},
}if fitted:
# 为拟合值添加预测区间
= _calculate_sigma(y - mod['fitted'], len(y))
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
= AutoCES(season_length=12)
autoces = autoces.forecast(ap, 13, None, None, (80,95), True)
fcst_ces _plot_insample_pi(fcst_ces)
# 测试保形预测
= AutoCES(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
autoces_c =ap, h=13, level=[90, 80], test_forward=True)
test_class(autoces_c, x= autoces_c.forecast(ap, 13, None, None, (80,95), True)
fcst_ces_c
test_eq("mean"],
fcst_ces["mean"]
fcst_ces_c[ )
_plot_fcst(fcst_ces)
_plot_fcst(fcst_ces_c)
= autoces.fit(ap)
fit = fit.predict(13, None, (80,95))
fcst 13, None, (80,95)))
_plot_fcst(fit.predict(
= ['mean', 'lo-80', 'lo-95', 'hi-80', 'hi-95']
values for k in range(0, len(values)):
np.testing.assert_equal(
fcst_ces[values[k]],
fcst[values[k]] )
= fit.predict_in_sample((80,95))
pi_insample
_plot_insample_pi(pi_insample)
= ['fitted', 'fitted-lo-80', 'fitted-lo-95', 'fitted-hi-80', 'fitted-hi-95']
values for k in range(0, len(values)):
np.testing.assert_equal(
fcst_ces[values[k]],
pi_insample[values[k]] )
= AutoCES(season_length=12)
ces =ap, h=12, test_forward=True, level=[90, 80]) test_class(ces, x
#测试别名参数
test_eq(repr(AutoCES()),
'CES'
)
test_eq(repr(AutoCES(alias='AutoCES_custom')),
'AutoCES_custom'
)
=3) show_doc(AutoCES, title_level
=3) show_doc(AutoCES.fit, title_level
=3) show_doc(AutoCES.predict, title_level
=3) show_doc(AutoCES.predict_in_sample, title_level
=3) show_doc(AutoCES.forecast, title_level
=3) show_doc(AutoCES.forward, title_level
from statsforecast.models import AutoCES
from statsforecast.utils import AirPassengers as ap
# CES' usage example
# Multiplicative trend, optimal error and seasonality
= AutoCES(model='Z',
ces =4)
season_length= ces.fit(y=ap)
ces = ces.predict(h=4)
y_hat_dict y_hat_dict
自动Theta
class AutoTheta(_TS):
r"""AutoTheta model.
Automatically selects the best Theta (Standard Theta Model ('STM'),
Optimized Theta Model ('OTM'), Dynamic Standard Theta Model ('DSTM'),
Dynamic Optimized Theta Model ('DOTM')) model using mse.
References
----------
[Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
decomposition_type : str
Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
model : str
Controlling Theta Model. By default searchs the best model.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1,
season_length: str = 'multiplicative',
decomposition_type: str] = None,
model: Optional[str = 'AutoTheta',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):self.season_length = season_length
self.decomposition_type = decomposition_type
self.model = model
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the AutoTheta model.
Fit an AutoTheta model to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
AutoTheta fitted model.
"""
= _ensure_float(y)
y self.model_ = auto_theta(
=y,
y=self.season_length,
m=self.model,
model=self.decomposition_type,
decomposition_type
)self.model_['fitted'] = y - self.model_['residuals']
self._store_cs(y, X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted AutoTheta.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= forecast_theta(self.model_, h=h, level=level)
fcst if self.prediction_intervals is not None and level is not None:
= self._add_predict_conformal_intervals(fcst, level)
fcst return fcst
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted AutoTheta insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= np.std(self.model_['residuals'][3:], ddof=1)
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient AutoTheta predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = auto_theta(
mod =y,
y=self.season_length,
m=self.model,
model=self.decomposition_type
decomposition_type
)= forecast_theta(mod, h, level=level)
res if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res if fitted:
'fitted'] = y - mod['residuals']
res[if level is not None and fitted:
# 为拟合值添加预测区间
= np.std(mod['residuals'][3:], ddof=1)
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forward(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Apply fitted AutoTheta to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, 'model_'):
raise Exception('You have to use the `fit` method first')
= _ensure_float(y)
y = forward_theta(self.model_, y=y)
mod = forecast_theta(mod, h, level=level)
res if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res if fitted:
'fitted'] = y - mod['residuals']
res[if level is not None and fitted:
# 为拟合值添加预测区间
= np.std(mod['residuals'][3:], ddof=1)
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
= AutoTheta(season_length=12)
theta =ap, h=12, level=[80, 90], test_forward=True)
test_class(theta, x= theta.forecast(ap, 13, None, None, (80,95), True)
fcst_theta _plot_insample_pi(fcst_theta)
# 测试一致性预测
= AutoTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
theta_c =ap, h=13, level=[90, 80], test_forward=True)
test_class(theta_c, x= theta_c.forecast(ap, 13, None, None, (80,95), True)
fcst_theta_c
test_eq('mean'],
fcst_theta_c['mean'],
fcst_theta[ )
_plot_fcst(fcst_theta_c)
= theta.forward(np.zeros(10), h=12, level=[80, 90], fitted=True) zero_theta
#测试别名参数
test_eq(repr(AutoTheta()),
'AutoTheta'
)
test_eq(repr(AutoTheta(alias='AutoTheta_custom')),
'AutoTheta_custom'
)
=3) show_doc(AutoTheta, title_level
=3) show_doc(AutoTheta.fit, title_level
=3) show_doc(AutoTheta.predict, title_level
=3) show_doc(AutoTheta.predict_in_sample, title_level
=3) show_doc(AutoTheta.forecast, title_level
=3) show_doc(AutoTheta.forward, title_level
from statsforecast.models import AutoTheta
from statsforecast.utils import AirPassengers as ap
# AutoTheta的使用示例
= AutoTheta(season_length=4)
theta = theta.fit(y=ap)
theta = theta.predict(h=4)
y_hat_dict y_hat_dict
ARIMA家族
自回归积分滑动平均模型 (ARIMA)
class ARIMA(_TS):
r"""ARIMA model.
AutoRegressive Integrated Moving Average model.
References
----------
[Rob J. Hyndman, Yeasmin Khandakar (2008). "Automatic Time Series Forecasting: The forecast package for R"](https://www.jstatsoft.org/article/view/v027i03).
Parameters
----------
order : tuple (default=(0, 0, 0))
A specification of the non-seasonal part of the ARIMA model: the three components (p, d, q) are the AR order, the degree of differencing, and the MA order.
season_length : int (default=1)
Number of observations per unit of time. Ex: 24 Hourly data.
seasonal_order : tuple (default=(0, 0, 0))
A specification of the seasonal part of the ARIMA model.
(P, D, Q) for the AR order, the degree of differencing, the MA order.
include_mean : bool (default=True)
Should the ARIMA model include a mean term?
The default is True for undifferenced series, False for differenced ones (where a mean would not affect the fit nor predictions).
include_drift : bool (default=False)
Should the ARIMA model include a linear drift term?
(i.e., a linear regression with ARIMA errors is fitted.)
include_constant : bool, optional (default=None)
If True, then includ_mean is set to be True for undifferenced series and include_drift is set to be True for differenced series.
Note that if there is more than one difference taken, no constant is included regardless of the value of this argument.
This is deliberate as otherwise quadratic and higher order polynomial trends would be induced.
blambda : float, optional (default=None)
Box-Cox transformation parameter.
biasadj : bool (default=False)
Use adjusted back-transformed mean Box-Cox.
method : str (default='CSS-ML')
Fitting method: maximum likelihood or minimize conditional sum-of-squares.
The default (unless there are missing values) is to use conditional-sum-of-squares to find starting values, then maximum likelihood.
fixed : dict, optional (default=None)
Dictionary containing fixed coefficients for the arima model. Example: `{'ar1': 0.5, 'ma2': 0.75}`.
For autoregressive terms use the `ar{i}` keys. For its seasonal version use `sar{i}`.
For moving average terms use the `ma{i}` keys. For its seasonal version use `sma{i}`.
For intercept and drift use the `intercept` and `drift` keys.
For exogenous variables use the `ex_{i}` keys.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
= True
uses_exog
def __init__(
self,
int, int, int] = (0, 0, 0),
order: Tuple[int = 1,
season_length: int, int, int] = (0, 0, 0),
seasonal_order: Tuple[bool = True,
include_mean: bool = False,
include_drift: bool] = None,
include_constant: Optional[float] = None,
blambda: Optional[bool = False,
biasadj: str = 'CSS-ML',
method: dict] = None,
fixed: Optional[str = 'ARIMA',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):self.order=order
self.season_length=season_length
self.seasonal_order=seasonal_order
self.include_mean=include_mean
self.include_drift=include_drift
self.include_constant=include_constant
self.blambda=blambda
self.biasadj=biasadj
self.method=method
self.fixed=fixed
self.alias=alias
self.prediction_intervals=prediction_intervals
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""
Fit the model to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
Fitted model.
"""
= _ensure_float(y)
y with np.errstate(invalid='ignore'):
self.model_ = Arima(
=y,
x=self.order,
order={'order': self.seasonal_order,
seasonal'period': self.season_length},
=X,
xreg=self.include_mean,
include_mean=self.include_constant,
include_constant=self.include_drift,
include_drift=self.blambda,
blambda=self.biasadj,
biasadj=self.method,
method=self.fixed
fixed
)self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted model.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= forecast_arima(self.model_, h=h, xreg=X, level=level)
fcst = fcst['mean']
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
= {
res "mean": mean,
**{f"lo-{l}": fcst["lower"][f"{l}%"] for l in reversed(level)},
**{f"hi-{l}": fcst["upper"][f"{l}%"] for l in level},
}return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= fitted_arima(self.model_)
mean = {'fitted': mean}
res if level is not None:
= np.sqrt(self.model_['sigma2'])
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory efficient predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x) optional exogenous.
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y with np.errstate(invalid='ignore'):
= Arima(
mod =y,
x=self.order,
order={'order': self.seasonal_order,
seasonal'period': self.season_length},
=X,
xreg=self.include_mean,
include_mean=self.include_constant,
include_constant=self.include_drift,
include_drift=self.blambda,
blambda=self.biasadj,
biasadj=self.method,
method=self.fixed
fixed
)= forecast_arima(mod, h, xreg=X_future, level=level)
fcst = {'mean': fcst['mean']}
res if fitted:
'fitted'] = fitted_arima(mod)
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= {
res **res,
**{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
**{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
}if fitted:
# 为拟合值添加预测区间
= np.sqrt(mod['sigma2'])
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forward(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Apply fitted model to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, 'model_'):
raise Exception('You have to use the `fit` method first')
= _ensure_float(y)
y with np.errstate(invalid='ignore'):
= forward_arima(self.model_, y=y, xreg=X, method=self.method)
mod = forecast_arima(mod, h, xreg=X_future, level=level)
fcst = {'mean': fcst['mean']}
res if fitted:
'fitted'] = fitted_arima(mod)
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= {
res **res,
**{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
**{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
}if fitted:
# 为拟合值添加预测区间
= np.sqrt(mod['sigma2'])
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
= ARIMA(order=(1, 0, 0), season_length=12)
simple_arima =ap, h=12, level=[90, 80], test_forward=True)
test_class(simple_arima, x= simple_arima.forecast(ap, 13, None, None, (80,95), True)
fcst_simple_arima _plot_insample_pi(fcst_simple_arima)
# 测试一致性预测
= ARIMA(order=(1, 0, 0), season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
simple_arima_c =ap, h=13, level=[90, 80], test_forward=True)
test_class(simple_arima_c, x= simple_arima_c.forecast(ap, 13, None, None, (80,95), True)
fcst_simple_arima_c
test_eq('mean'],
fcst_simple_arima_c['mean'],
fcst_simple_arima[ )
_plot_fcst(fcst_simple_arima_c)
= ARIMA(order=(2, 0, 0), season_length=12, fixed={'ar1': 0.5, 'ar2': 0.5})
simple_arima =ap, h=12, level=[90, 80], test_forward=True)
test_class(simple_arima, x= simple_arima.forecast(ap, 4, None, None, (80,95), True)
fcst_simple_arima
_plot_insample_pi(fcst_simple_arima)
test_eq('mean'],
fcst_simple_arima[411., 421.5, 416.25, 418.875])
np.array([ )
#测试别名参数
test_eq(repr(ARIMA()),
'ARIMA'
)
test_eq(repr(ARIMA(alias='ARIMA_seasonality')),
'ARIMA_seasonality'
)
=3) show_doc(ARIMA, title_level
=3) show_doc(ARIMA.fit, title_level
=3) show_doc(ARIMA.predict, title_level
=3) show_doc(ARIMA.predict_in_sample, title_level
=3) show_doc(ARIMA.forecast, title_level
=3) show_doc(ARIMA.forward, title_level
from statsforecast.models import ARIMA
from statsforecast.utils import AirPassengers as ap
# ARIMA的应用示例
= ARIMA(order=(1, 0, 0), season_length=12)
arima = arima.fit(y=ap)
arima = arima.predict(h=4, level=[80])
y_hat_dict y_hat_dict
自回归
class AutoRegressive(ARIMA):
r"""Simple Autoregressive model.
Parameters
----------
lags : int or list
Number of lags to include in the model.
If an int is passed then all lags up to `lags` are considered.
If a list, only the elements of the list are considered as lags.
include_mean : bool (default=True)
Should the AutoRegressive model include a mean term?
The default is True for undifferenced series, False for differenced ones (where a mean would not affect the fit nor predictions).
include_drift : bool (default=False)
Should the AutoRegressive model include a linear drift term?
(i.e., a linear regression with AutoRegressive errors is fitted.)
blambda : float, optional (default=None)
Box-Cox transformation parameter.
biasadj : bool (default=False)
Use adjusted back-transformed mean Box-Cox.
method : str (default='CSS-ML')
Fitting method: maximum likelihood or minimize conditional sum-of-squares.
The default (unless there are missing values) is to use conditional-sum-of-squares to find starting values, then maximum likelihood.
fixed : dict, optional (default=None)
Dictionary containing fixed coefficients for the AutoRegressive model. Example: `{'ar1': 0.5, 'ar5': 0.75}`.
For autoregressive terms use the `ar{i}` keys.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
= True
uses_exog
def __init__(
self,
int, List],
lags: Tuple[bool = True,
include_mean: bool = False,
include_drift: float] = None,
blambda: Optional[bool = False,
biasadj: str = 'CSS-ML',
method: dict] = None,
fixed: Optional[str = 'AutoRegressive',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):if isinstance(lags, int):
= (lags, 0, 0)
order elif isinstance(lags, list):
= (max(lags), 0, 0)
order = {f'ar{i+1}': np.nan if (i+1) in lags else 0 for i in range(max(lags))}
fixed_lags if fixed is not None:
fixed_lags.update(fixed)= fixed_lags
fixed else:
raise ValueError('Please provide an int or a list specifying the lags to use.')
super().__init__(
=order,
order=include_mean,
include_mean=include_drift,
include_drift=blambda,
blambda=biasadj,
biasadj=method,
method=alias,
alias=fixed,
fixed=prediction_intervals,
prediction_intervals )
= AutoRegressive(lags=[12], fixed={'ar12': 0.9999999})
ar =ap, h=12, level=[90, 80], test_forward=True)
test_class(ar, x= ar.forecast(ap, 13, None, None, (80,95), True)
fcst_ar # 我们应该恢复季节性朴素
test_close('mean'][:-1],
fcst_ar[-12:],
ap[=1e-4
eps
) _plot_insample_pi(fcst_simple_arima)
# 测试一致性预测
= AutoRegressive(lags=[12], fixed={'ar12': 0.9999999}, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
ar_c =ap, h=13, level=[90, 80], test_forward=True)
test_class(ar_c, x= ar_c.forecast(ap, 13, None, None, (80,95), True)
fcst_ar_c
test_eq('mean'],
fcst_ar_c['mean'],
fcst_ar[ )
_plot_fcst(fcst_ar_c)
#测试别名参数
test_eq(repr(AutoRegressive(lags=[12])),
'AutoRegressive'
)
test_eq(repr(AutoRegressive(lags=[12], alias='AutoRegressive_lag12')),
'AutoRegressive_lag12'
)
=3) show_doc(AutoRegressive, title_level
=3, name='AutoRegressive.fit') show_doc(AutoRegressive.fit, title_level
=3, name='AutoRegressive.predict') show_doc(AutoRegressive.predict, title_level
=3, name='AutoRegressive.predict_in_sample') show_doc(AutoRegressive.predict_in_sample, title_level
=3, name='AutoRegressive.forecast') show_doc(AutoRegressive.forecast, title_level
=3, name='AutoRegressive.forward') show_doc(AutoRegressive.forward, title_level
from statsforecast.models import AutoRegressive
from statsforecast.utils import AirPassengers as ap
# 自回归模型的应用示例
= AutoRegressive(lags=[12])
ar = ar.fit(y=ap)
ar = ar.predict(h=4, level=[80])
y_hat_dict y_hat_dict
指数平滑法
简单平滑
@njit(nogil=NOGIL, cache=CACHE)
def _ses_fcst_mse(x: np.ndarray, alpha: float) -> Tuple[float, float, np.ndarray]:
r"""对一个序列执行简单指数平滑。
该函数返回一步预测值以及拟合的均方误差。
"""
= x[0]
smoothed = x.size
n = 0.
mse = np.full(n, np.nan, dtype=x.dtype)
fitted
for i in range(1, n):
= (alpha * x[i - 1] + (1 - alpha) * smoothed).item()
smoothed = x[i] - smoothed
error += error * error
mse = smoothed
fitted[i]
/= n
mse = alpha * x[-1] + (1 - alpha) * smoothed
forecast return forecast, mse, fitted
def _ses_mse(alpha: float, x: np.ndarray) -> float:
r"""计算简单指数平滑拟合的均方误差。"""
= _ses_fcst_mse(x, alpha)
_, mse, _ return mse
def _ses_forecast(x: np.ndarray, alpha: float) -> Tuple[float, np.ndarray]:
r"""一步超前预测,采用简单指数平滑法。"""
= _ses_fcst_mse(x, alpha)
forecast, _, fitted return forecast, fitted
def _demand(x: np.ndarray) -> np.ndarray:
r"""提取向量中的正元素。"""
return x[x > 0]
def _intervals(x: np.ndarray) -> np.ndarray:
r"""计算向量中非零元素之间的间隔。"""
= np.where(x != 0)[0]
nonzero_idxs return np.diff(nonzero_idxs + 1, prepend=0).astype(x.dtype)
def _probability(x: np.ndarray) -> np.ndarray:
r"""计算元素为非零的概率。"""
return (x != 0).astype(x.dtype)
def _optimized_ses_forecast(
x: np.ndarray,float, float]] = [(0.1, 0.3)]
bounds: Sequence[Tuple[-> Tuple[float, np.ndarray]:
) r"""搜索最优的alpha值并计算一步超前简单指数平滑预测。"""
= minimize(
alpha =_ses_mse,
fun=(0,),
x0=(x,),
args=bounds,
bounds='L-BFGS-B'
method0]
).x[= _ses_forecast(x, alpha)
forecast, fitted return forecast, fitted
def _chunk_sums(array: np.ndarray, chunk_size: int) -> np.ndarray:
r"""将数组分割成多个块,并返回每个块的总和。
不完整的块将被丢弃。"""
= array.size // chunk_size
n_chunks = n_chunks * chunk_size
n_elems return array[:n_elems].reshape(n_chunks, chunk_size).sum(axis=1)
def _ses(
# 时间序列
y: np.ndarray, int, # 预测时间范围
h: bool, # 拟合值
fitted: float, # 平滑参数
alpha: -> Dict[str, np.ndarray]:
) = _ses_fcst_mse(y, alpha)
fcst, _, fitted_vals = {'mean': _repeat_val(val=fcst, h=h)}
fcst if fitted:
'fitted'] = fitted_vals
fcst[return fcst
class SimpleExponentialSmoothing(_TS):
r"""SimpleExponentialSmoothing model.
Uses a weighted average of all past observations where the weights decrease exponentially into the past.
Suitable for data with no clear trend or seasonality.
Assuming there are $t$ observations, the one-step forecast is given by: $\hat{y}_{t+1} = \alpha y_t + (1-\alpha) \hat{y}_{t-1}$
The rate $0 \leq \alpha \leq 1$ at which the weights decrease is called the smoothing parameter. When $\alpha = 1$, SES is equal to the naive method.
References
----------
[Charles C Holt (1957). “Forecasting seasonals and trends by exponentially weighted moving averages”](https://doi.org/10.1016/j.ijforecast).
Parameters
----------
alpha : float
Smoothing parameter.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
float,
alpha: str = 'SES',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):self.alpha = alpha
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the SimpleExponentialSmoothing model.
Fit an SimpleExponentialSmoothing to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
SimpleExponentialSmoothing fitted model.
"""
= _ensure_float(y)
y = _ses(y=y, alpha=self.alpha, h=1, fitted=True)
mod self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted SimpleExponentialSmoothing.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val(val=self.model_['mean'][0], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to " "compute them.")
return res
def predict_in_sample(self):
r"""Access fitted SimpleExponentialSmoothing insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
= {'fitted': self.model_['fitted']}
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient SimpleExponentialSmoothing predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _ses(y=y, h=h, fitted=fitted, alpha=self.alpha)
res = dict(res)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to " "compute them.")
return res
= SimpleExponentialSmoothing(alpha=0.1)
ses =ap, h=12)
test_class(ses, x#更多测试
= ses.fit(ap)
ses = ses.predict(12)
fcst_ses 'mean'], np.repeat(460.3028, 12), eps=1e-4)
test_close(fcst_ses[
从R中恢复这些残差#你必须通过initial="simple"
在 `ses` 函数中
np.testing.assert_allclose('fitted'][[0, 1, -1]],
ses.predict_in_sample()[118 - 6., 432 + 31.447525])
np.array([np.nan, )
# 测试一致性预测
= SimpleExponentialSmoothing(alpha=0.1, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
ses_c =ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
test_class(ses_c, x= ses_c.forecast(ap, 13, None, None, (80,95), True)
fcst_ses_c
test_eq('mean'][:12],
fcst_ses_c['mean']
fcst_ses[
) _plot_fcst(fcst_ses_c)
#测试别名参数
test_eq(repr(SimpleExponentialSmoothing(alpha=0.1)),
'SES'
)
test_eq(repr(SimpleExponentialSmoothing(alpha=0.1, alias='SES_custom')),
'SES_custom'
)
=3) show_doc(SimpleExponentialSmoothing, title_level
=3) show_doc(SimpleExponentialSmoothing.forecast, title_level
=3) show_doc(SimpleExponentialSmoothing.fit, title_level
=3) show_doc(SimpleExponentialSmoothing.predict, title_level
=3) show_doc(SimpleExponentialSmoothing.predict_in_sample, title_level
from statsforecast.models import SimpleExponentialSmoothing
from statsforecast.utils import AirPassengers as ap
# 简单指数平滑的使用示例
= SimpleExponentialSmoothing(alpha=0.5)
ses = ses.fit(y=ap)
ses = ses.predict(h=4)
y_hat_dict y_hat_dict
简单平滑优化
def _ses_optimized(
# 时间序列
y: np.ndarray, int, # 预测时间范围
h: bool, # 拟合值
fitted:
):= _optimized_ses_forecast(y, [(0.01, 0.99)])
fcst_, fitted_vals = _repeat_val(val=fcst_, h=h)
mean = {'mean': mean}
fcst if fitted:
'fitted'] = fitted_vals
fcst[return fcst
class SimpleExponentialSmoothingOptimized(_TS):
r"""SimpleExponentialSmoothing model.
Uses a weighted average of all past observations where the weights decrease exponentially into the past.
Suitable for data with no clear trend or seasonality.
Assuming there are $t$ observations, the one-step forecast is given by: $\hat{y}_{t+1} = \alpha y_t + (1-\alpha) \hat{y}_{t-1}$
The smoothing parameter $\alpha^*$ is optimized by square error minimization.
References
----------
[Charles C Holt (1957). “Forecasting seasonals and trends by exponentially weighted moving averages”](https://doi.org/10.1016/j.ijforecast).
Parameters
----------
alias: str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
"""
def __init__(
self,
str = "SESOpt",
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the SimpleExponentialSmoothingOptimized model.
Fit an SimpleExponentialSmoothingOptimized to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
SimpleExponentialSmoothingOptimized fitted model.
"""
= _ensure_float(y)
y = _ses_optimized(y=y, h=1, fitted=True)
mod self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted SimpleExponentialSmoothingOptimized.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val(val=self.model_['mean'][0], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self):
r"""Access fitted SimpleExponentialSmoothingOptimized insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
= {'fitted': self.model_['fitted']}
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient SimpleExponentialSmoothingOptimized predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _ses_optimized(y=y, h=h, fitted=fitted)
res = dict(res)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
= SimpleExponentialSmoothingOptimized()
ses_op =ap, h=12)
test_class(ses_op, x= ses_op.fit(ap)
ses_op = ses_op.predict(12) fcst_ses_op
# 测试一致性预测
= SimpleExponentialSmoothingOptimized(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
ses_op_c =ap, h=13, level=[90, 80], skip_insample=True)
test_class(ses_op_c, x= ses_op_c.forecast(ap, 13, None, None, (80,95), True)
fcst_ses_op_c
test_eq('mean'][:12],
fcst_ses_op_c['mean']
fcst_ses_op[
) _plot_fcst(fcst_ses_op_c)
#测试别名参数
test_eq(repr(SimpleExponentialSmoothingOptimized()),
'SESOpt'
)
test_eq(repr(SimpleExponentialSmoothingOptimized(alias='SESOpt_custom')),
'SESOpt_custom'
)
=3) show_doc(SimpleExponentialSmoothingOptimized, title_level
=3) show_doc(SimpleExponentialSmoothingOptimized.fit, title_level
=3) show_doc(SimpleExponentialSmoothingOptimized.predict, title_level
=3) show_doc(SimpleExponentialSmoothingOptimized.predict_in_sample, title_level
=3) show_doc(SimpleExponentialSmoothingOptimized.forecast, title_level
from statsforecast.models import SimpleExponentialSmoothingOptimized
from statsforecast.utils import AirPassengers as ap
# 简单指数平滑优化模型的应用示例
= SimpleExponentialSmoothingOptimized()
seso = seso.fit(y=ap)
seso = seso.predict(h=4)
y_hat_dict y_hat_dict
季节平滑
def _seasonal_exponential_smoothing(
# 时间序列
y: np.ndarray, int, # 预测时间范围
h: bool, # 拟合值
fitted: int, # 赛季时长
season_length: float, # 平滑参数
alpha: -> Dict[str, np.ndarray]:
) = y.size
n if n < season_length:
return {'mean': np.full(h, np.nan, dtype=y.dtype)}
= np.empty(season_length, dtype=y.dtype)
season_vals = np.full_like(y, np.nan)
fitted_vals for i in range(season_length):
= (i + n % season_length)
init_idx = _ses_forecast(y[init_idx::season_length], alpha)
season_vals[i], fitted_vals[init_idx::season_length] = _repeat_val_seas(season_vals=season_vals, h=h)
out = {'mean': out}
fcst if fitted:
'fitted'] = fitted_vals
fcst[return fcst
class SeasonalExponentialSmoothing(_TS):
r"""SeasonalExponentialSmoothing model.
Uses a weighted average of all past observations where the weights decrease exponentially into the past.
Suitable for data with no clear trend or seasonality.
Assuming there are $t$ observations and season $s$, the one-step forecast is given by:
$\hat{y}_{t+1,s} = \alpha y_t + (1-\alpha) \hat{y}_{t-1,s}$
Notes
-----
This method is an extremely simplified of Holt-Winter's method where the trend and level are set to zero.
And a single seasonal smoothing parameter $\alpha$ is shared across seasons.
References
----------
[Charles. C. Holt (1957). "Forecasting seasonals and trends by exponentially weighted moving averages", ONR Research Memorandum, Carnegie Institute of Technology 52.](https://www.sciencedirect.com/science/article/abs/pii/S0169207003001134).
[Peter R. Winters (1960). "Forecasting sales by exponentially weighted moving averages". Management Science](https://pubsonline.informs.org/doi/abs/10.1287/mnsc.6.3.324).
Parameters
----------
alpha : float
Smoothing parameter.
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
"""
def __init__(
self,
int,
season_length: float,
alpha: str = 'SeasonalES',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):self.season_length = season_length
self.alpha = alpha
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the SeasonalExponentialSmoothing model.
Fit an SeasonalExponentialSmoothing to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
SeasonalExponentialSmoothing fitted model.
"""
= _ensure_float(y)
y = _seasonal_exponential_smoothing(
mod =y,
y=self.season_length,
season_length=self.alpha,
alpha=True,
fitted=self.season_length,
h
)self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted SeasonalExponentialSmoothing.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val_seas(self.model_['mean'], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self):
r"""Access fitted SeasonalExponentialSmoothing insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
= {'fitted': self.model_['fitted']}
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient SeasonalExponentialSmoothing predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _seasonal_exponential_smoothing(
res =y, h=h, fitted=fitted, alpha=self.alpha, season_length=self.season_length
y
)= dict(res)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
= SeasonalExponentialSmoothing(season_length=12, alpha=1.)
seas_es =ap, h=12)
test_class(seas_es, x'fitted'][-3:], np.array([461 - 54., 390 - 28., 432 - 27.]))
test_eq(seas_es.predict_in_sample()[= seas_es.fit(ap)
seas_es = seas_es.predict(12) fcst_seas_es
# 测试一致性预测
= SeasonalExponentialSmoothing(season_length=12, alpha=1., prediction_intervals=ConformalIntervals(h=13, n_windows=2))
seas_es_c =ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
test_class(seas_es_c, x= seas_es_c.forecast(ap, 13, None, None, (80,95), True)
fcst_seas_es_c
test_eq('mean'][:12],
fcst_seas_es_c['mean']
fcst_seas_es[
) _plot_fcst(fcst_seas_es_c)
# 测试我们可以恢复预期的季节性
test_eq(4:], h=12)['mean'],
seas_es.forecast(ap[=12)['mean']
seas_es.forecast(ap, h )
# 测试接近季节性朴素预测
for i in range(1, 13):
test_close(-12:],
ap[i:][=12)['mean'],
seas_es.forecast(ap[i:], h )
6:], seas_es.forecast(ap[6:], h=12)['mean']])) plt.plot(np.concatenate([ap[
#测试别名参数
test_eq(repr(SeasonalExponentialSmoothing(season_length=12, alpha=1.)),
'SeasonalES'
)
test_eq(repr(SeasonalExponentialSmoothing(season_length=12, alpha=1., alias='SeasonalES_custom')),
'SeasonalES_custom'
)
=3) show_doc(SeasonalExponentialSmoothing, title_level
=3) show_doc(SeasonalExponentialSmoothing.fit, title_level
=3) show_doc(SeasonalExponentialSmoothing.predict, title_level
=3) show_doc(SeasonalExponentialSmoothing.predict_in_sample, title_level
=3) show_doc(SeasonalExponentialSmoothing.forecast, title_level
from statsforecast.models import SeasonalExponentialSmoothing
from statsforecast.utils import AirPassengers as ap
# 季节性指数平滑法的使用示例
= SeasonalExponentialSmoothing(alpha=0.5, season_length=12)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
季节性平滑优化
def _seasonal_ses_optimized(
# 时间序列
y: np.ndarray, int, # 预测范围
h: bool , # 拟合值
fitted: int, # 季节长度
season_length:
): = y.size
n if n < season_length:
return {'mean': np.full(h, np.nan, dtype=y.dtype)}
= np.empty(season_length, dtype=y.dtype)
season_vals = np.full_like(y, np.nan)
fitted_vals for i in range(season_length):
= (i + n % season_length)
init_idx = _optimized_ses_forecast(y[init_idx::season_length], [(0.01, 0.99)])
season_vals[i], fitted_vals[init_idx::season_length] = _repeat_val_seas(season_vals=season_vals, h=h)
out = {'mean': out}
fcst if fitted:
'fitted'] = fitted_vals
fcst[return fcst
class SeasonalExponentialSmoothingOptimized(_TS):
def __init__(
self,
int,
season_length: str = 'SeasESOpt',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):r"""SeasonalExponentialSmoothingOptimized model.
Uses a weighted average of all past observations where the weights decrease exponentially into the past.
Suitable for data with no clear trend or seasonality.
Assuming there are $t$ observations and season $s$, the one-step forecast is given by:
$\hat{y}_{t+1,s} = \alpha y_t + (1-\alpha) \hat{y}_{t-1,s}$
The smoothing parameter $\alpha^*$ is optimized by square error minimization.
Notes
-----
This method is an extremely simplified of Holt-Winter's method where the trend and level are set to zero.
And a single seasonal smoothing parameter $\alpha$ is shared across seasons.
References
----------
[Charles. C. Holt (1957). "Forecasting seasonals and trends by exponentially weighted moving averages", ONR Research Memorandum, Carnegie Institute of Technology 52.](https://www.sciencedirect.com/science/article/abs/pii/S0169207003001134).
[Peter R. Winters (1960). "Forecasting sales by exponentially weighted moving averages". Management Science](https://pubsonline.informs.org/doi/abs/10.1287/mnsc.6.3.324).
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
"""
self.season_length = season_length
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the SeasonalExponentialSmoothingOptimized model.
Fit an SeasonalExponentialSmoothingOptimized to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
SeasonalExponentialSmoothingOptimized fitted model.
"""
= _ensure_float(y)
y = _seasonal_ses_optimized(
mod =y,
y=self.season_length,
season_length=True,
fitted=self.season_length,
h
)self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted SeasonalExponentialSmoothingOptimized.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val_seas(self.model_['mean'], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self):
r"""Access fitted SeasonalExponentialSmoothingOptimized insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
= {'fitted': self.model_['fitted']}
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient SeasonalExponentialSmoothingOptimized predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _seasonal_ses_optimized(
res =y, h=h, fitted=fitted,
y=self.season_length
season_length
)= dict(res)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
= SeasonalExponentialSmoothingOptimized(season_length=12)
seas_es_opt =ap, h=12)
test_class(seas_es_opt, x= seas_es_opt.forecast(ap, h=12) fcst_seas_es_opt
# 测试保形预测
= SeasonalExponentialSmoothingOptimized(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
seas_es_opt_c =ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
test_class(seas_es_opt_c, x= seas_es_opt_c.forecast(ap, 13, None, None, (80,95), True)
fcst_seas_es_opt_c
test_eq('mean'][:12],
fcst_seas_es_opt_c['mean']
fcst_seas_es_opt[
) _plot_fcst(fcst_seas_es_opt_c)
for i in range(1, 13):
test_close(-12:],
ap[i:][=12)['mean'],
seas_es_opt.forecast(ap[i:], h=0.8
eps )
#测试别名参数
test_eq(repr(SeasonalExponentialSmoothingOptimized(season_length=12)),
'SeasESOpt'
)
test_eq(repr(SeasonalExponentialSmoothingOptimized(season_length=12, alias='SeasESOpt_custom')),
'SeasESOpt_custom'
)
=3) show_doc(SeasonalExponentialSmoothingOptimized, title_level
=3) show_doc(SeasonalExponentialSmoothingOptimized.forecast, title_level
=3) show_doc(SeasonalExponentialSmoothingOptimized.fit, title_level
=3) show_doc(SeasonalExponentialSmoothingOptimized.predict, title_level
=3) show_doc(SeasonalExponentialSmoothingOptimized.predict_in_sample, title_level
from statsforecast.models import SeasonalExponentialSmoothingOptimized
from statsforecast.utils import AirPassengers as ap
# 季节性指数平滑优化用法示例
= SeasonalExponentialSmoothingOptimized(season_length=12)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
霍尔特方法
class Holt(AutoETS):
r""" Holt's method.
Also known as double exponential smoothing, Holt's method is an extension of exponential smoothing for series with a trend.
This implementation returns the corresponding `ETS` model with additive (A) or multiplicative (M) errors (so either 'AAN' or 'MAN').
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Methods with trend"](https://otexts.com/fpp3/holt.html).
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 12 Monthly data.
error_type : str
The type of error of the ETS model. Can be additive (A) or multiplicative (M).
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1,
season_length: str = 'A',
error_type: str = 'Holt',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
): self.season_length = season_length
self.error_type = error_type
self.alias = alias
self.prediction_intervals = prediction_intervals
= error_type + 'AN'
model super().__init__(
=alias, prediction_intervals=prediction_intervals
season_length, model, alias )
= Holt(season_length=12, error_type='A')
holt = holt.forecast(ap,12)
fcast_holt
= AutoETS(season_length=12, model='AAN')
ets = ets.forecast(ap,12)
fcast_ets
np.testing.assert_equal(
fcast_holt,
fcast_ets )
= Holt(season_length=12, error_type='A')
holt
holt.fit(ap)= holt.predict(12)
fcast_holt
= AutoETS(season_length=12, model='AAN')
ets = ets.forecast(ap,12)
fcast_ets
np.testing.assert_equal(
fcast_holt,
fcast_ets )
= Holt(season_length=12, error_type='A', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
holt_c = holt_c.forecast(ap, 12, level=[80, 90])
fcast_holt_c
= AutoETS(season_length=12, model='AAN', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
ets_c = ets_c.forecast(ap, 12, level=[80, 90])
fcast_ets_c
np.testing.assert_equal(
fcast_holt_c,
fcast_ets_c, )
#测试别名参数
test_eq(repr(Holt()),
'Holt'
)
test_eq(repr(Holt(alias='Holt_custom')),
'Holt_custom'
)
=3) show_doc(Holt, title_level
='Holt.forecast', title_level=3) show_doc(Holt.forecast, name
='Holt.fit', title_level=3) show_doc(Holt.fit, name
='Holt.predict', title_level=3) show_doc(Holt.predict, name
='Holt.predict_in_sample', title_level=3) show_doc(Holt.predict_in_sample, name
='Holt.forward', title_level=3) show_doc(Holt.forward, name
from statsforecast.models import Holt
from statsforecast.utils import AirPassengers as ap
# Holt's usage example
= Holt(season_length=12, error_type='A')
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
霍尔特-温特斯方法
class HoltWinters(AutoETS):
r""" Holt-Winters' method.
Also known as triple exponential smoothing, Holt-Winters' method is an extension of exponential smoothing for series that contain both trend and seasonality.
This implementation returns the corresponding `ETS` model with additive (A) or multiplicative (M) errors (so either 'AAA' or 'MAM').
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Methods with seasonality"](https://otexts.com/fpp3/holt-winters.html).
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 12 Monthly data.
error_type : str
The type of error of the ETS model. Can be additive (A) or multiplicative (M).
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1, # 季节长度
season_length: str = 'A', # 错误类型
error_type: str = 'HoltWinters',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
): self.season_length = season_length
self.error_type = error_type
self.alias = alias
= error_type + 'A' + error_type
model super().__init__(
=alias, prediction_intervals=prediction_intervals
season_length, model, alias )
= HoltWinters(season_length=12, error_type='A')
hw = hw.forecast(ap,12)
fcast_hw
= AutoETS(season_length=12, model='AAA')
ets = ets.forecast(ap,12)
fcast_ets
np.testing.assert_equal(
fcast_hw,
fcast_ets )
= HoltWinters(season_length=12, error_type='A', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
hw_c = hw_c.forecast(ap, 12, level=[80, 90])
fcast_hw_c
= AutoETS(season_length=12, model='AAA', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
ets_c = ets_c.forecast(ap, 12, level=[80, 90])
fcast_ets_c
np.testing.assert_equal(
fcast_hw_c,
fcast_ets_c, )
= HoltWinters(season_length=12, error_type='A')
hw
hw.fit(ap)= hw.predict(12)
fcast_hw
= AutoETS(season_length=12, model='AAA')
ets = ets.forecast(ap,12)
fcast_ets
np.testing.assert_equal(
fcast_hw,
fcast_ets )
#测试别名参数
test_eq(repr(HoltWinters()),
'HoltWinters'
)
test_eq(repr(HoltWinters(alias='HoltWinters_custom')),
'HoltWinters_custom'
)
=3) show_doc(HoltWinters, title_level
='HoltWinters.forecast', title_level=3) show_doc(HoltWinters.forecast, name
='HoltWinters.fit', title_level=3) show_doc(HoltWinters.fit, name
='HoltWinters.predict', title_level=3) show_doc(HoltWinters.predict, name
= 'HoltWinters.predict_in_sample', title_level=3) show_doc(HoltWinters.predict_in_sample, name
='HoltWinters.forward', title_level=3) show_doc(HoltWinters.forward, name
from statsforecast.models import HoltWinters
from statsforecast.utils import AirPassengers as ap
# Holt-Winters' usage example
= HoltWinters(season_length=12, error_type='A')
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
基线模型
历史平均值
def _historic_average(
# 时间序列
y: np.ndarray, int, # 预测时间范围
h: bool, # 拟合值
fitted: -> Dict[str, np.ndarray]:
) = {'mean': _repeat_val(val=y.mean(), h=h)}
fcst if fitted:
= _repeat_val(val=y.mean(), h=len(y))
fitted_vals 'fitted'] = fitted_vals
fcst[return fcst
class HistoricAverage(_TS):
def __init__(self, alias: str = 'HistoricAverage', prediction_intervals: Optional[ConformalIntervals] = None,):
r"""HistoricAverage model.
Also known as mean method. Uses a simple average of all past observations.
Assuming there are $t$ observations, the one-step forecast is given by:
$$\hat{y}_{t+1} = \frac{1}{t} \sum_{j=1}^t y_j$$
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).
Parameters
----------
alias: str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the HistoricAverage model.
Fit an HistoricAverage to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self
HistoricAverage fitted model.
r"""
= _ensure_float(y)
y = _historic_average(y, h=1, fitted=True)
mod = dict(mod)
mod = y - mod['fitted']
residuals 'sigma'] = _calculate_sigma(residuals, len(residuals) - 1)
mod['n'] = len(y)
mod[self.model_ = mod
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted HistoricAverage.
Parameters
----------
h : int
Forecast horizon.
X : numpy.array
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val(val=self.model_['mean'][0], h=h)
mean = {'mean': mean}
res
if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
= self.model_['sigma']
sigma = sigma * np.sqrt(1 + (1 / self.model_['n']))
sigmah = _calculate_intervals(res, level, h, sigmah)
pred_int = {**res, **pred_int}
res
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted HistoricAverage insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= self.model_['sigma']
sigma = sigma * np.sqrt(1 + (1 / self.model_['n']))
sigmah = _add_fitted_pi(res, se=sigmah, level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
): r"""Memory Efficient HistoricAverage predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : list[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _historic_average(y=y, h=h, fitted=fitted or (level is not None))
out = {'mean': out['mean']}
res
if fitted:
'fitted'] = out['fitted']
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= y - out["fitted"]
residuals = _calculate_sigma(residuals, len(residuals) - 1)
sigma = sigma * np.sqrt(1 + (1 / len(y)))
sigmah = _calculate_intervals(out, level, h, sigmah)
pred_int = {**res, **pred_int}
res if fitted:
= y - out["fitted"]
residuals = _calculate_sigma(residuals, len(residuals) - 1)
sigma = sigma * np.sqrt(1 + (1 / len(y)))
sigmah = _add_fitted_pi(res=res, se=sigmah, level=level)
res
return res
= HistoricAverage()
ha =ap, h=12, level=[80, 90])
test_class(ha, x#更多测试
ha.fit(ap)= ha.predict(12)
fcst_ha 'mean'], np.repeat(ap.mean(), 12), eps=1e-5)
test_close(fcst_ha[
np.testing.assert_almost_equal('fitted'][:4],
ha.predict_in_sample()[#np.array([np.nan, 112., 115., 120.6666667]),
280.2986,280.2986,280.2986,280.2986]),
np.array([=4
decimal )
= HistoricAverage()
ha = ha.forecast(ap,12,None,None,(80,95), True)
fcst_ha
np.testing.assert_almost_equal('lo-80'],
fcst_ha[126.0227,12),
np.repeat(=4
decimal
) _plot_insample_pi(fcst_ha)
# 测试一致性预测
= HistoricAverage(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
ha_c =ap, h=13, level=[90, 80], test_forward=False)
test_class(ha_c, x= ha_c.forecast(ap, 13, None, None, (80,95), True)
fcst_ha_c
test_eq('mean'][:12],
fcst_ha_c['mean'],
fcst_ha[
) _plot_fcst(fcst_ha_c)
#测试别名参数
test_eq(repr(HistoricAverage()),
'HistoricAverage'
)
test_eq(repr(HistoricAverage(alias='HistoricAverage_custom')),
'HistoricAverage_custom'
)
=3) show_doc(HistoricAverage, title_level
=3) show_doc(HistoricAverage.forecast, title_level
=3) show_doc(HistoricAverage.fit, title_level
=3) show_doc(HistoricAverage.predict, title_level
=3) show_doc(HistoricAverage.predict_in_sample, title_level
from statsforecast.models import HistoricAverage
from statsforecast.utils import AirPassengers as ap
# HistoricAverage 的使用示例
= HistoricAverage()
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
单纯的
class Naive(_TS):
def __init__(self, alias: str = 'Naive', prediction_intervals: Optional[ConformalIntervals] = None):
r"""Naive model.
All forecasts have the value of the last observation:
$\hat{y}_{t+1} = y_t$ for all $t$
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).
Parameters
----------
alias: str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the Naive model.
Fit an Naive to a time series (numpy.array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self:
Naive fitted model.
"""
= _ensure_float(y)
y = _naive(y, h=1, fitted=True)
mod = dict(mod)
mod = y - mod['fitted']
residuals = _calculate_sigma(residuals, len(residuals) - 1)
sigma 'sigma'] = sigma
mod[self.model_ = mod
self._store_cs(y=y, X=X)
return self
def predict(
self,
int, # 预测时间范围
h: = None, # 外生回归变量
X: Optional[np.ndarray] int]] = None # 置信水平
level: Optional[List[
):r"""Predict with fitted Naive.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val(self.model_['mean'][0], h=h)
mean = {'mean': mean}
res
if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
= np.arange(1,h+1)
steps = self.model_['sigma']
sigma = sigma * np.sqrt(steps)
sigmah = _calculate_intervals(res, level, h, sigmah)
pred_int = {**res, **pred_int}
res return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted Naive insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient Naive predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h: int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _naive(y=y, h=h, fitted=fitted or (level is not None))
out = {'mean': out['mean']}
res if fitted:
'fitted'] = out['fitted']
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= np.arange(1, h + 1)
steps = y - out["fitted"]
residuals = _calculate_sigma(residuals, len(residuals) - 1)
sigma = sigma * np.sqrt(steps)
sigmah = _calculate_intervals(out, level, h, sigmah)
pred_int = {**res, **pred_int}
res if fitted:
= y - out["fitted"]
residuals = _calculate_sigma(residuals, len(residuals) - 1)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
def forward(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r""" Apply fitted model to an new/updated series.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h: int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = self.forecast(y=y, h=h, X=X, X_future=X_future, level=level, fitted=fitted)
res return res
# 测试预测区间 - 预测
= Naive()
naive 12)
naive.forecast(ap, 12, None, None, (80,95), True) naive.forecast(ap,
# 测试预测区间 - 拟合与预测
naive.fit(ap)12)
naive.predict(12, None, (80,95)) naive.predict(
= Naive()
naive =ap, h=12, level=[90, 80])
test_class(naive, x
naive.fit(ap)= naive.predict(12)
fcst_naive 'mean'], np.repeat(ap[-1], 12), eps=1e-5) test_close(fcst_naive[
= Naive()
naive = naive.forecast(ap,12,None,None,(80,95), True)
fcst_naive
np.testing.assert_almost_equal('lo-80'],
fcst_naive[388.7984, 370.9037, 357.1726, 345.5967, 335.3982, 326.1781, 317.6992, 309.8073, 302.3951, 295.3845, 288.7164, 282.3452]),
np.array([=4
decimal# 这几乎相等,因为Hyndman的预测值被四舍五入到小数点后四位。
) _plot_insample_pi(fcst_naive)
# 单元测试前向:=预测
= Naive()
naive = naive.forward(ap,12,None,None,(80,95), True)
fcst_naive
np.testing.assert_almost_equal('lo-80'],
fcst_naive[388.7984, 370.9037, 357.1726, 345.5967, 335.3982, 326.1781, 317.6992, 309.8073, 302.3951, 295.3845, 288.7164, 282.3452]),
np.array([=4
decimal# 这几乎相等,因为Hyndman的预测值被四舍五入到小数点后四位。 )
# 测试一致性预测
= Naive(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
naive_c =ap, h=13, level=[90, 80], test_forward=False)
test_class(naive_c, x= naive_c.forecast(ap, 13, None, None, (80,95), True)
fcst_naive_c
test_eq('mean'][:12],
fcst_naive_c['mean']
fcst_naive[
) _plot_fcst(fcst_naive_c)
#测试别名参数
test_eq(repr(Naive()),
'Naive'
)
test_eq(repr(Naive(alias='Naive_custom')),
'Naive_custom'
)
=3) show_doc(Naive, title_level
=3) show_doc(Naive.forecast, title_level
=3) show_doc(Naive.fit, title_level
=3) show_doc(Naive.predict, title_level
=3) show_doc(Naive.predict_in_sample, title_level
from statsforecast.models import Naive
from statsforecast.utils import AirPassengers as ap
# Naive 的使用示例
= Naive()
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
带漂移的随机游走
def _random_walk_with_drift(
# 时间序列
y: np.ndarray, int, # 预测时间范围
h: bool, # 拟合值
fitted: -> Dict[str, np.ndarray]:
) = (y[-1] - y[0]) / (y.size - 1)
slope = slope * (1 + np.arange(h, dtype=y.dtype)) + y[-1]
mean = {'mean': mean,
fcst 'slope': np.array([slope], dtype=y.dtype),
'last_y': np.array([y[-1]], dtype=y.dtype)}
if fitted:
= np.full_like(y, np.nan)
fitted_vals 1:] = slope + y[:-1]
fitted_vals['fitted'] = fitted_vals
fcst[return fcst
class RandomWalkWithDrift(_TS):
def __init__(self, alias: str = 'RWD', prediction_intervals: Optional[ConformalIntervals] = None):
r"""RandomWalkWithDrift model.
A variation of the naive method allows the forecasts to change over time.
The amout of change, called drift, is the average change seen in the historical data.
$$\hat{y}_{t+1} = y_t+\frac{1}{t-1}\sum_{j=1}^t (y_j-y_{j-1}) = y_t+ \frac{y_t-y_1}{t-1}$$
From the previous equation, we can see that this is equivalent to extrapolating a line between
the first and the last observation.
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the RandomWalkWithDrift model.
Fit an RandomWalkWithDrift to a time series (numpy array) `y`.
Parameters
----------
y: numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
RandomWalkWithDrift fitted model.
r"""
= _ensure_float(y)
y = _random_walk_with_drift(y, h=1, fitted=True)
mod = dict(mod)
mod = y - mod['fitted']
residuals = _calculate_sigma(residuals, len(residuals) - 1)
sigma 'sigma'] = sigma
mod['n'] = len(y)
mod[self.model_ = mod
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None
level: Optional[List[
):r"""Predict with fitted RandomWalkWithDrift.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= np.arange(h, dtype=self.model_['last_y'].dtype)
hrange = self.model_['slope'] * (1 + hrange) + self.model_['last_y']
mean = {'mean': mean}
res
if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
= np.arange(1, h + 1)
steps = self.model_['sigma']
sigma = sigma * np.sqrt(steps * (1 + steps / (self.model_['n'] - 1)))
sigmah = _calculate_intervals(res, level, h, sigmah)
pred_int = {**res, **pred_int}
res return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted RandomWalkWithDrift insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient RandomWalkWithDrift predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts: dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _random_walk_with_drift(y=y, h=h, fitted=fitted or (level is not None))
out = {'mean': out['mean']}
res
if fitted:
'fitted'] = out['fitted']
res[
if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= np.arange(1, h + 1)
steps = y - out["fitted"]
residuals = _calculate_sigma(residuals, len(residuals) - 1)
sigma = sigma * np.sqrt(steps * (1 + steps / (len(y) - 1)))
sigmah = _calculate_intervals(out, level, h, sigmah)
pred_int = {**res, **pred_int}
res if fitted:
= y - out["fitted"]
residuals = _calculate_sigma(residuals, len(residuals) - 1)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
# 测试预测区间 - 预测
= RandomWalkWithDrift()
rwd 12)
rwd.forecast(ap, 12, None, None, (80,95), True) rwd.forecast(ap,
# 测试预测区间 - 拟合与预测
= RandomWalkWithDrift()
rwd
rwd.fit(ap)12)
rwd.predict(12, None, (80,95)) rwd.predict(
= RandomWalkWithDrift()
rwd =ap, h=12, level=[90, 80])
test_class(rwd, x= rwd.fit(ap)
rwd = rwd.predict(12)
fcst_rwd 'mean'][:2], np.array([434.2378, 436.4755]), eps=1e-4)
test_close(fcst_rwd[
np.testing.assert_almost_equal('fitted'][:3],
rwd.predict_in_sample()[118 - 3.7622378, 132 - 11.7622378]),
np.array([np.nan, =6
decimal )
# 测试一致性预测
= RandomWalkWithDrift(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
rwd_c =ap, h=13, level=[90, 80], test_forward=False)
test_class(rwd_c, x= rwd_c.forecast(ap, 13, None, None, (80,95), True)
fcst_rwd_c
test_eq('mean'][:12],
fcst_rwd_c['mean']
fcst_rwd[
) _plot_fcst(fcst_rwd_c)
= RandomWalkWithDrift()
rwd = rwd.forecast(ap,12,None,None,(80,95), True)
fcst_rwd
np.testing.assert_almost_equal('lo-80'],
fcst_rwd[390.9799, 375.0862, 363.2664, 353.5325, 345.1178, 337.6304, 330.8384, 324.5916, 318.7857, 313.3453, 308.2136, 303.3469]),
np.array([=1
decimal
) _plot_insample_pi(fcst_rwd)
#测试别名参数
test_eq(repr(RandomWalkWithDrift()),
'RWD'
)
test_eq(repr(RandomWalkWithDrift(alias='RWD_custom')),
'RWD_custom'
)
=3) show_doc(RandomWalkWithDrift, title_level
=3) show_doc(RandomWalkWithDrift.forecast, title_level
=3) show_doc(RandomWalkWithDrift.fit, title_level
=3) show_doc(RandomWalkWithDrift.predict, title_level
=3) show_doc(RandomWalkWithDrift.predict_in_sample, title_level
from statsforecast.models import RandomWalkWithDrift
from statsforecast.utils import AirPassengers as ap
# RandomWalkWithDrift 的使用示例
= RandomWalkWithDrift()
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
季节性朴素法
class SeasonalNaive(_TS):
def __init__(self, season_length: int, alias: str = 'SeasonalNaive', prediction_intervals: Optional[ConformalIntervals] = None):
r"""Seasonal naive model.
A method similar to the naive, but uses the last known observation of the same period (e.g. the same month of the previous year) in order to capture seasonal variations.
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html#seasonal-na%C3%AFve-method).
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.season_length = season_length
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the SeasonalNaive model.
Fit an SeasonalNaive to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X: array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
SeasonalNaive fitted model.
r"""
= _ensure_float(y)
y = _seasonal_naive(
mod =y,
y=self.season_length,
season_length=self.season_length,
h=True,
fitted
)= dict(mod)
mod = y - mod['fitted']
residuals 'sigma'] = _calculate_sigma(residuals,
mod[len(y) - self.season_length)
self.model_ = mod
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted Naive.
Parameters
----------
h : int
Forecast horizon.
X: array-like
Optional exogenous of shape (h, n_x).
level: List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val_seas(season_vals=self.model_['mean'], h=h)
mean = {'mean': mean}
res
if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
= np.floor((h - 1) / self.season_length)
k = self.model_['sigma']
sigma = sigma * np.sqrt(k + 1)
sigmah = _calculate_intervals(res, level, h, sigmah)
pred_int = {**res, **pred_int}
res return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted SeasonalNaive insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
r"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= sorted(level)
level = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient SeasonalNaive predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _seasonal_naive(
out =y, h=h, fitted=fitted or (level is not None),
y=self.season_length
season_length
)= {'mean': out['mean']}
res if fitted:
'fitted'] = out['fitted']
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
= np.floor((h - 1) / self.season_length)
k = y - out["fitted"]
residuals = _calculate_sigma(residuals, len(y) - self.season_length)
sigma = sigma * np.sqrt(k + 1)
sigmah = _calculate_intervals(out, level, h, sigmah)
pred_int = {**res, **pred_int}
res if fitted:
= np.floor((h - 1) / self.season_length)
k = y - out["fitted"]
residuals = _calculate_sigma(residuals, len(y) - self.season_length)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
# 测试预测区间 - 预测
= SeasonalNaive(season_length=12)
seas_naive 12)
seas_naive.forecast(ap, 12, None, None, (80,95), True) seas_naive.forecast(ap,
# 测试预测区间 - 拟合与预测
= SeasonalNaive(season_length=12)
seas_naive
seas_naive.fit(ap)12)
seas_naive.predict(12, None, (80,95)) seas_naive.predict(
= SeasonalNaive(season_length=12)
seas_naive =ap, h=12, level=[90, 80])
test_class(seas_naive, x= seas_naive.fit(ap)
seas_naive = seas_naive.predict(12)
fcst_seas_naive 'fitted'][-3:], np.array([461 - 54., 390 - 28., 432 - 27.])) test_eq(seas_naive.predict_in_sample()[
= SeasonalNaive(season_length=12)
seas_naive = seas_naive.forecast(ap, 12, None, None, (80,95), True)
fcst_seas_naive
np.testing.assert_almost_equal('lo-80'],
fcst_seas_naive[370.4595, 344.4595, 372.4595, 414.4595, 425.4595, 488.4595,
np.array([575.4595, 559.4595, 461.4595, 414.4595, 343.4595, 385.4595]),
=4
decimal
) _plot_insample_pi(fcst_seas_naive)
# 测试保形预测
= SeasonalNaive(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
seas_naive_c =ap, h=13, level=[90, 80], test_forward=False)
test_class(seas_naive_c, x= seas_naive_c.forecast(ap, 13, None, None, (80,95), True)
fcst_seas_naive_c
test_eq('mean'][:12],
fcst_seas_naive_c['mean']
fcst_seas_naive[
) _plot_fcst(fcst_seas_naive_c)
#测试别名参数
test_eq(repr(SeasonalNaive(12)),
'SeasonalNaive'
)
test_eq(repr(SeasonalNaive(12, alias='SeasonalNaive_custom')),
'SeasonalNaive_custom'
)
=3) show_doc(SeasonalNaive, title_level
=3) show_doc(SeasonalNaive.forecast, title_level
=3) show_doc(SeasonalNaive.fit, title_level
=3) show_doc(SeasonalNaive.predict, title_level
=3) show_doc(SeasonalNaive.predict_in_sample, title_level
from statsforecast.models import SeasonalNaive
from statsforecast.utils import AirPassengers as ap
# 季节性朴素法的应用示例
= SeasonalNaive(season_length=12)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
窗口平均值
def _window_average(
# 时间序列
y: np.ndarray, int, # 预测时间范围
h: bool, # 拟合值
fitted: int, # 窗口大小
window_size: -> Dict[str, np.ndarray]:
) if fitted:
raise NotImplementedError('return fitted')
if y.size < window_size:
return {'mean': np.full(h, np.nan, dtype=y.dtype)}
= y[-window_size:].mean()
wavg = _repeat_val(val=wavg, h=h)
mean return {'mean': mean}
class WindowAverage(_TS):
def __init__(
self,
int,
window_size: str = 'WindowAverage',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):r"""WindowAverage model.
Uses the average of the last $k$ observations, with $k$ the length of the window.
Wider windows will capture global trends, while narrow windows will reveal local trends.
The length of the window selected should take into account the importance of past
observations and how fast the series changes.
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).
Parameters
----------
window_size : int
Size of truncated series on which average is estimated.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
r"""
self.window_size = window_size
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the WindowAverage model.
Fit an WindowAverage to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
WindowAverage fitted model.
"""
= _ensure_float(y)
y = _window_average(y=y, h=1, window_size=self.window_size, fitted=False)
mod self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted WindowAverage.
Parameters
----------
h : int
Forecast horizon.
X : numpy.array
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val(self.model_['mean'][0], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self):
r"""Access fitted WindowAverage insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
raise NotImplementedError
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient WindowAverage predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _window_average(y=y, h=h, fitted=fitted, window_size=self.window_size)
res = dict(res)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
= WindowAverage(window_size=24)
w_avg =ap, h=12, skip_insample=True)
test_class(w_avg, x= w_avg.fit(ap)
w_avg = w_avg.predict(12)
fcst_w_avg 'mean'], np.repeat(ap[-24:].mean(), 12)) test_close(fcst_w_avg[
# 测试一致性预测
= WindowAverage(window_size=24, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
w_avg_c =ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
test_class(w_avg_c, x= w_avg_c.forecast(ap, 13, None, None, (80,95), False)
fcst_w_avg_c
test_eq('mean'][:12],
fcst_w_avg_c['mean']
fcst_w_avg[
) _plot_fcst(fcst_w_avg_c)
#测试别名参数
test_eq(repr(WindowAverage(1)),
'WindowAverage'
)
test_eq(repr(WindowAverage(1, alias='WindowAverage_custom')),
'WindowAverage_custom'
)
=3) show_doc(WindowAverage, title_level
=3) show_doc(WindowAverage.forecast, title_level
=3) show_doc(WindowAverage.fit, title_level
=3) show_doc(WindowAverage.predict, title_level
from statsforecast.models import WindowAverage
from statsforecast.utils import AirPassengers as ap
# WindowAverage 的使用示例
= WindowAverage(window_size=12*4)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
季节窗口平均值
def _seasonal_window_average(
y: np.ndarray,int,
h: bool,
fitted: int,
season_length: int,
window_size: -> Dict[str, np.ndarray]:
) if fitted:
raise NotImplementedError('return fitted')
= season_length * window_size
min_samples if y.size < min_samples:
return {'mean': np.full(h, np.nan, dtype=y.dtype)}
= y[-min_samples:].reshape(window_size, season_length).mean(axis=0)
season_avgs = _repeat_val_seas(season_vals=season_avgs, h=h)
out return {'mean': out}
class SeasonalWindowAverage(_TS):
def __init__(
self,
int,
season_length: int,
window_size: str = 'SeasWA',
alias: = None
prediction_intervals: Optional[ConformalIntervals]
):r"""SeasonalWindowAverage model.
An average of the last $k$ observations of the same period, with $k$ the length of the window.
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).
Parameters
----------
window_size : int
Size of truncated series on which average is estimated.
seasonal_length : int
Number of observations per cycle.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
r"""
self.season_length = season_length
self.window_size = window_size
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the SeasonalWindowAverage model.
Fit an SeasonalWindowAverage to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenpus of shape (t, n_x).
Returns
-------
self :
SeasonalWindowAverage fitted model.
"""
= _ensure_float(y)
y = _seasonal_window_average(
mod =y,
y=self.season_length,
h=False,
fitted=self.season_length,
season_length=self.window_size,
window_size
)self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted SeasonalWindowAverage.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val_seas(season_vals=self.model_['mean'], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self):
r"""Access fitted SeasonalWindowAverage insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
raise NotImplementedError
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient SeasonalWindowAverage predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _seasonal_window_average(
res =y, h=h, fitted=fitted,
y=self.season_length,
season_length=self.window_size
window_size
)= dict(res)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
= SeasonalWindowAverage(season_length=12, window_size=1)
seas_w_avg =ap, h=12, skip_insample=True)
test_class(seas_w_avg, x= seas_w_avg.fit(ap)
seas_w_avg = seas_w_avg.predict(12) fcst_seas_w_avg
# 测试一致性预测
= SeasonalWindowAverage(season_length=12, window_size=1, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
seas_w_avg_c =ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
test_class(seas_w_avg_c, x= seas_w_avg_c.forecast(ap, 13, None, None, (80,95), False)
fcst_seas_w_avg_c
test_eq('mean'][:12],
fcst_seas_w_avg_c['mean']
fcst_seas_w_avg[
)'mean'][:12]
fcst_seas_w_avg_c[ _plot_fcst(fcst_seas_w_avg_c)
#测试别名参数
test_eq(repr(SeasonalWindowAverage(12, 1)),
'SeasWA'
)
test_eq(repr(SeasonalWindowAverage(12, 1, alias='SeasWA_custom')),
'SeasWA_custom'
)
=3) show_doc(SeasonalWindowAverage, title_level
=3) show_doc(SeasonalWindowAverage.forecast, title_level
=3) show_doc(SeasonalWindowAverage.fit, title_level
=3) show_doc(SeasonalWindowAverage.predict, title_level
from statsforecast.models import SeasonalWindowAverage
from statsforecast.utils import AirPassengers as ap
# SeasonalWindowAverage 的使用示例
= SeasonalWindowAverage(season_length=12, window_size=4)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
稀疏或间歇性
ADIDA
def _chunk_forecast(y, aggregation_level):
= len(y) % aggregation_level
lost_remainder_data = y[lost_remainder_data:]
y_cut = _chunk_sums(y_cut, aggregation_level)
aggregation_sums = _optimized_ses_forecast(aggregation_sums)
sums_forecast, _ return sums_forecast
@njit(nogil=NOGIL, cache=CACHE)
def _expand_fitted_demand(fitted: np.ndarray, y: np.ndarray) -> np.ndarray:
= np.empty_like(y)
out 0] = np.nan
out[= 0
fitted_idx for i in range(1, y.size):
if y[i - 1] > 0:
+= 1
fitted_idx = fitted[fitted_idx]
out[i] elif fitted_idx > 0:
# if this entry is zero, the model didn't change
= out[i - 1]
out[i] else:
# if we haven't seen any demand, use naive
= y[i - 1]
out[i] return out
@njit(nogil=NOGIL, cache=CACHE)
def _expand_fitted_intervals(fitted: np.ndarray, y: np.ndarray) -> np.ndarray:
= np.empty_like(y)
out 0] = np.nan
out[= 0
fitted_idx for i in range(1, y.size):
if y[i - 1] != 0:
+= 1
fitted_idx if fitted[fitted_idx] == 0:
# 为了避免除以零
= 1
out[i] else:
= fitted[fitted_idx]
out[i] elif fitted_idx > 0:
# if this entry is zero, the model didn't change
= out[i - 1]
out[i] else:
# if we haven't seen any intervals, use 1 to avoid division by zero
= 1
out[i] return out
def _adida(
# 时间序列
y: np.ndarray, int, # 预测时间范围
h: bool, # 拟合值
fitted:
):if (y == 0).all():
= {'mean': np.zeros(h, dtype=y.dtype)}
res if fitted:
'fitted'] = np.zeros_like(y)
res['fitted'][0] = np.nan
res[return res
= _ensure_float(y)
y = _intervals(y)
y_intervals = y_intervals.mean()
mean_interval = round(mean_interval)
aggregation_level = _chunk_forecast(y, aggregation_level)
sums_forecast = sums_forecast / aggregation_level
forecast = {'mean': _repeat_val(val=forecast, h=h)}
res if fitted:
"Computing fitted values for ADIDA is very expensive")
warnings.warn(= np.round(
fitted_aggregation_levels / np.arange(1, y_intervals.size + 1)
y_intervals.cumsum()
)= _expand_fitted_intervals(
fitted_aggregation_levels
np.append(np.nan, fitted_aggregation_levels), y1:].astype(np.int32)
)[
= np.empty(y.size - 1, dtype=y.dtype)
sums_fitted for i, agg_lvl in enumerate(fitted_aggregation_levels):
= _chunk_forecast(y[:i+1], agg_lvl)
sums_fitted[i]
'fitted'] = np.append(np.nan, sums_fitted / fitted_aggregation_levels)
res[return res
class ADIDA(_TS):
def __init__(self, alias: str = 'ADIDA', prediction_intervals: Optional[ConformalIntervals] = None):
r"""ADIDA model.
Aggregate-Dissagregate Intermittent Demand Approach: Uses temporal aggregation to reduce the
number of zero observations. Once the data has been agregated, it uses the optimized SES to
generate the forecasts at the new level. It then breaks down the forecast to the original
level using equal weights.
ADIDA specializes on sparse or intermittent series are series with very few non-zero observations.
They are notoriously hard to forecast, and so, different methods have been developed
especifically for them.
References
----------
[Nikolopoulos, K., Syntetos, A. A., Boylan, J. E., Petropoulos, F., & Assimakopoulos, V. (2011). An aggregate–disaggregate intermittent demand approach (ADIDA) to forecasting: an empirical proposition and analysis. Journal of the Operational Research Society, 62(3), 544-554.](https://researchportal.bath.ac.uk/en/publications/an-aggregate-disaggregate-intermittent-demand-approach-adida-to-f).
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
r"""
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the ADIDA model.
Fit an ADIDA to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
ADIDA fitted model.
"""
= _ensure_float(y)
y self.model_ = _adida(y=y, h=1, fitted=False)
self._y = y
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted ADIDA.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val(val=self.model_['mean'][0], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception(
"You have to instantiate the class with `prediction_intervals`"
"to calculate them"
)return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted ADIDA insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= _adida(y=self._y, h=1, fitted=True)['fitted']
fitted = {'fitted': fitted}
res if level is not None:
= _calculate_sigma(self._y - fitted, self._y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient ADIDA predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _adida(y=y, h=h, fitted=fitted)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception(
"You have to instantiate the class with `prediction_intervals`"
"to calculate them"
)if fitted:
= _calculate_sigma(y - res['fitted'], y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
= np.zeros(10) deg_ts
= ADIDA()
adida =ap, h=12, skip_insample=False)
test_class(adida, x=deg_ts, h=12, skip_insample=False)
test_class(adida, x= adida.forecast(ap, 12)
fcst_adida
_test_fitted_sparse(ADIDA)
# 测试一致性预测
= ADIDA(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
adida_c =ap, h=13, level=[90, 80], skip_insample=False)
test_class(adida_c, x= adida_c.forecast(ap, 13, None, None, (80,95), True)
fcst_adida_c
test_eq('mean'][:12],
fcst_adida_c['mean'],
fcst_adida[
)
_plot_insample_pi(fcst_adida_c) _plot_fcst(fcst_adida_c)
#测试别名参数
test_eq(repr(ADIDA()),
'ADIDA'
)
test_eq(repr(ADIDA(alias='ADIDA_custom')),
'ADIDA_custom'
)
=3) show_doc(ADIDA, title_level
=3) show_doc(ADIDA.forecast, title_level
=3) show_doc(ADIDA.fit, title_level
=3) show_doc(ADIDA.predict, title_level
from statsforecast.models import ADIDA
from statsforecast.utils import AirPassengers as ap
# ADIDA的使用示例
= ADIDA()
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
Croston经典
def _croston_classic(
# 时间序列
y: np.ndarray, int, # 预测范围
h: bool, # 拟合值
fitted:
): = _ensure_float(y)
y # 需求
= _demand(y)
yd if not yd.size: #无需求
return _naive(y=y, h=h, fitted=fitted)
= _ses_forecast(yd, 0.1)
ydp, ydf
# 间隔
= _intervals(y)
yi = _ses_forecast(yi, 0.1)
yip, yif
if yip != 0.0:
= ydp / yip
mean else:
= ydp
mean = {'mean': _repeat_val(val=mean, h=h)}
out if fitted:
= _expand_fitted_demand(np.append(ydf, ydp), y)
ydf = _expand_fitted_intervals(np.append(yif, yip), y)
yif 'fitted'] = ydf / yif
out[return out
class CrostonClassic(_TS):
def __init__(self, alias: str = 'CrostonClassic', prediction_intervals: Optional[ConformalIntervals] = None):
r"""CrostonClassic model.
A method to forecast time series that exhibit intermittent demand.
It decomposes the original time series into a non-zero demand size $z_t$ and
inter-demand intervals $p_t$. Then the forecast is given by:
$$\hat{y}_t = \frac{\hat{z}_t}{\hat{p}_t}$$
where $\hat{z}_t$ and $\hat{p}_t$ are forecasted using SES. The smoothing parameter
of both components is set equal to 0.1
References
----------
[Croston, J. D. (1972). Forecasting and stock control for intermittent demands. Journal of the Operational Research Society, 23(3), 289-303.](https://link.springer.com/article/10.1057/jors.1972.50)
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the CrostonClassic model.
Fit an CrostonClassic to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
CrostonClassic fitted model.
"""
= _ensure_float(y)
y self.model_ = _croston_classic(y=y, h=1, fitted=True)
self.model_['sigma'] = _calculate_sigma(y - self.model_['fitted'], y.size)
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted CrostonClassic.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val(val=self.model_['mean'][0], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception(
"You have to instantiate the class with `prediction_intervals` to calculate them"
)return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted CrostonClassic insample predictions.
Parameters
----------
level: List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient CrostonClassic predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _croston_classic(y=y, h=h, fitted=fitted)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=dict(res), y=y, X=X, level=level)
res else:
raise Exception(
"You have to instantiate the class with `prediction_intervals` to calculate them"
)if fitted:
= _calculate_sigma(y - res['fitted'], y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
= CrostonClassic(prediction_intervals=ConformalIntervals(2, 1))
croston =ap, h=12, skip_insample=False, level=[80])
test_class(croston, x=deg_ts, h=12, skip_insample=False, level=[80])
test_class(croston, x= croston.forecast(ap, 12)
fcst_croston
_test_fitted_sparse(CrostonClassic)
# 测试一致性预测
= CrostonClassic(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
croston_c =ap, h=13, level=[90.0, 80.0], skip_insample=False)
test_class(croston_c, x= croston_c.forecast(ap, 13, None, None, (80,95), True)
fcst_croston_c
test_eq('mean'][:12],
fcst_croston_c['mean'],
fcst_croston[
)
_plot_insample_pi(fcst_croston_c) _plot_fcst(fcst_croston_c)
#测试别名参数
test_eq(repr(CrostonClassic()),
'CrostonClassic'
)
test_eq(repr(CrostonClassic(alias='CrostonClassic_custom')),
'CrostonClassic_custom'
)
=3) show_doc(CrostonClassic, title_level
=3) show_doc(CrostonClassic.forecast, title_level
=3) show_doc(CrostonClassic.fit, title_level
=3) show_doc(CrostonClassic.predict, title_level
from statsforecast.models import CrostonClassic
from statsforecast.utils import AirPassengers as ap
# CrostonClassic 的使用示例
= CrostonClassic()
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
Croston优化
def _croston_optimized(
# 时间序列
y: np.ndarray, int, # 预测时间范围
h: bool, # 拟合值
fitted:
):= _ensure_float(y)
y # 需求
= _demand(y)
yd if not yd.size:
return _naive(y=y, h=h, fitted=fitted)
= _optimized_ses_forecast(yd)
ydp, _
# 间隔
= _intervals(y)
yi = _optimized_ses_forecast(yi)
yip, _
if yip != 0.0:
= ydp / yip
mean else:
= ydp
mean = {'mean': _repeat_val(val=mean, h=h)}
out if fitted:
"Computing fitted values for CrostonOptimized is very expensive")
warnings.warn(= np.empty(yd.size + 1, dtype=y.dtype)
ydf 0] = np.nan
ydf[for i in range(yd.size):
+ 1] = _optimized_ses_forecast(yd[:i + 1])[0]
ydf[i
= np.empty(yi.size + 1, dtype=y.dtype)
yif 0] = np.nan
yif[for i in range(yi.size):
= _optimized_ses_forecast(yi[:i + 1])[0]
yiff if yiff == 0:
= 1.0
yiff + 1] = yiff
yif[i
= _expand_fitted_demand(ydf, y)
ydf = _expand_fitted_intervals(yif, y)
yif 'fitted'] = ydf / yif
out[return out
class CrostonOptimized(_TS):
def __init__(self, alias: str = 'CrostonOptimized', prediction_intervals: Optional[ConformalIntervals] = None,):
r"""CrostonOptimized model.
A method to forecast time series that exhibit intermittent demand.
It decomposes the original time series into a non-zero demand size $z_t$ and
inter-demand intervals $p_t$. Then the forecast is given by:
$$\hat{y}_t = \frac{\hat{z}_t}{\hat{p}_t}$$
A variation of the classic Croston's method where the smooting paramater is optimally
selected from the range $[0.1,0.3]$. Both the non-zero demand $z_t$ and the inter-demand
intervals $p_t$ are smoothed separately, so their smoothing parameters can be different.
References
----------
[Croston, J. D. (1972). Forecasting and stock control for intermittent demands. Journal of the Operational Research Society, 23(3), 289-303.](https://link.springer.com/article/10.1057/jors.1972.50).
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
r"""
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the CrostonOptimized model.
Fit an CrostonOptimized to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
CrostonOptimized fitted model.
"""
= _ensure_float(y)
y self.model_ = _croston_optimized(y=y, h=1, fitted=False)
self._y = y
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted CrostonOptimized.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val(val=self.model_['mean'][0], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted CrostonOptimized insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= _croston_optimized(y=self._y, h=1, fitted=True)['fitted']
fitted = {'fitted': fitted}
res if level is not None:
= _calculate_sigma(self._y - fitted, self._y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient CrostonOptimized predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _croston_optimized(y=y, h=h, fitted=fitted)
res = dict(res)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
= _calculate_sigma(y - res['fitted'], y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
= CrostonOptimized(prediction_intervals=ConformalIntervals(2, 1))
croston_op =ap, h=12, skip_insample=False, level=[80])
test_class(croston_op, x=deg_ts, h=12, skip_insample=False, level=[80])
test_class(croston_op, x= croston_op.forecast(ap, 12)
fcst_croston_op
_test_fitted_sparse(CrostonOptimized)
# 测试一致性预测
= CrostonOptimized(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
croston_op_c =ap, h=13, level=[90, 80], skip_insample=False)
test_class(croston_op_c, x= croston_op_c.forecast(ap, 13, None, None, (80,95), True)
fcst_croston_op_c
test_eq('mean'][:12],
fcst_croston_op_c['mean'],
fcst_croston_op[
)
_plot_insample_pi(fcst_croston_op_c) _plot_fcst(fcst_croston_op_c)
#测试别名参数
test_eq(repr(CrostonOptimized()),
'CrostonOptimized'
)
test_eq(repr(CrostonOptimized(alias='CrostonOptimized_custom')),
'CrostonOptimized_custom'
)
=3) show_doc(CrostonOptimized, title_level
=3) show_doc(CrostonOptimized.forecast, title_level
=3) show_doc(CrostonOptimized.fit, title_level
=3) show_doc(CrostonOptimized.predict, title_level
from statsforecast.models import CrostonOptimized
from statsforecast.utils import AirPassengers as ap
# CrostonOptimized 的使用示例
= CrostonOptimized()
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
CrostonSBA
def _croston_sba(
# 时间序列
y: np.ndarray, int, # 预测范围
h: bool, # 拟合值
fitted: -> Dict[str, np.ndarray]:
) = _croston_classic(y=y, h=h, fitted=fitted)
out 'mean'] *= 0.95
out[if fitted:
'fitted'] *= 0.95
out[return out
class CrostonSBA(_TS):
def __init__(self, alias: str = 'CrostonSBA', prediction_intervals: Optional[ConformalIntervals] = None,):
r"""CrostonSBA model.
A method to forecast time series that exhibit intermittent demand.
It decomposes the original time series into a non-zero demand size $z_t$ and
inter-demand intervals $p_t$. Then the forecast is given by:
$$\hat{y}_t = \frac{\hat{z}_t}{\hat{p}_t}$$
A variation of the classic Croston's method that uses a debiasing factor, so that the
forecast is given by:
$$\hat{y}_t = 0.95 \frac{\hat{z}_t}{\hat{p}_t}$$
References
----------
[Croston, J. D. (1972). Forecasting and stock control for intermittent demands. Journal of the Operational Research Society, 23(3), 289-303.](https://link.springer.com/article/10.1057/jors.1972.50).
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
r"""
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the CrostonSBA model.
Fit an CrostonSBA to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
CrostonSBA fitted model.
"""
= _ensure_float(y)
y self.model_ = _croston_sba(y=y, h=1, fitted=True)
self.model_['sigma'] = _calculate_sigma(y - self.model_['fitted'], y.size)
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted CrostonSBA.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val(val=self.model_['mean'][0], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception(
"You have to instantiate the class with `prediction_intervals` to calculate them"
)return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted CrostonSBA insample predictions.
Parameters
----------
level: List[float]
Confidence levels (0-100) prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient CrostonSBA predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _croston_sba(y=y, h=h, fitted=fitted)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=dict(res), y=y, X=X, level=level)
res else:
raise Exception(
"You have to instantiate the class with `prediction_intervals`"
"to calculate them"
)if fitted:
= _calculate_sigma(y - res['fitted'], y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
= CrostonSBA(prediction_intervals=ConformalIntervals(2, 1))
croston_sba =ap, h=12, skip_insample=False, level=[80])
test_class(croston_sba, x=deg_ts, h=12, skip_insample=False, level=[80])
test_class(croston_sba, x= croston_sba.forecast(ap, 12)
fcst_croston_sba
_test_fitted_sparse(CrostonSBA)
# 测试保形预测
= CrostonSBA(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
croston_sba_c =ap, h=13, level=[90, 80], skip_insample=False)
test_class(croston_sba_c, x= croston_sba_c.forecast(ap, 13, None, None, (80,95), True)
fcst_croston_sba_c
test_eq('mean'][:12],
fcst_croston_sba_c['mean'],
fcst_croston_sba[
)
_plot_insample_pi(fcst_croston_sba_c) _plot_fcst(fcst_croston_sba_c)
#测试别名参数
test_eq(repr(CrostonSBA()),
'CrostonSBA'
)
test_eq(repr(CrostonSBA(alias='CrostonSBA_custom')),
'CrostonSBA_custom'
)
=3) show_doc(CrostonSBA, title_level
=3) show_doc(CrostonSBA.forecast, title_level
=3) show_doc(CrostonSBA.fit, title_level
=3) show_doc(CrostonSBA.predict, title_level
from statsforecast.models import CrostonSBA
from statsforecast.utils import AirPassengers as ap
# CrostonSBA 的使用示例
= CrostonSBA()
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
IMAPA
def _imapa(
# 时间序列
y: np.ndarray, int, # 预测范围
h: bool, # 拟合值
fitted:
):if (y == 0).all():
= {'mean': np.zeros(h, dtype=y.dtype)}
res if fitted:
'fitted'] = np.zeros_like(y)
res['fitted'][0] = np.nan
res[return res
= _ensure_float(y)
y = _intervals(y)
y_intervals = y_intervals.mean().item()
mean_interval = round(mean_interval)
max_aggregation_level = np.empty(max_aggregation_level, dtype=y.dtype)
forecasts for aggregation_level in range(1, max_aggregation_level + 1):
= len(y) % aggregation_level
lost_remainder_data = y[lost_remainder_data:]
y_cut = _chunk_sums(y_cut, aggregation_level)
aggregation_sums = _optimized_ses_forecast(aggregation_sums)
forecast, _ - 1] = forecast / aggregation_level
forecasts[aggregation_level = forecasts.mean()
forecast = {'mean': _repeat_val(val=forecast, h=h)}
res if fitted:
"Computing fitted values for IMAPA is very expensive.")
warnings.warn(= np.empty_like(y)
fitted_vals 0] = np.nan
fitted_vals[for i in range(y.size - 1):
+ 1] = _imapa(y[:i+1], h=1, fitted=False)['mean'].item()
fitted_vals[i 'fitted'] = fitted_vals
res[return res
class IMAPA(_TS):
def __init__(self, alias: str = 'IMAPA', prediction_intervals: Optional[ConformalIntervals] = None,):
r"""IMAPA model.
Intermittent Multiple Aggregation Prediction Algorithm: Similar to ADIDA, but instead of
using a single aggregation level, it considers multiple in order to capture different
dynamics of the data. Uses the optimized SES to generate the forecasts at the new levels
and then combines them using a simple average.
References
----------
[Syntetos, A. A., & Boylan, J. E. (2021). Intermittent demand forecasting: Context, methods and applications. John Wiley & Sons.](https://www.ifors.org/intermittent-demand-forecasting-context-methods-and-applications/).
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the IMAPA model.
Fit an IMAPA to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
IMAPA fitted model.
"""
= _ensure_float(y)
y self.model_ = _imapa(y=y, h=1, fitted=False)
self._y = y
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted IMAPA.
Parameters
----------
h : int
Forecast horizon.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
"""
= _repeat_val(val=self.model_['mean'][0], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception(
"You have to instantiate the class with `prediction_intervals`"
"to calculate them"
)return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted IMAPA insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _imapa(y=self._y, h=1, fitted=True)['fitted']
fitted = {'fitted': fitted}
res if level is not None:
= _calculate_sigma(self._y - fitted, self._y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient IMAPA predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _imapa(y=y, h=h, fitted=fitted)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception(
"You have to instantiate the class with `prediction_intervals`"
"to calculate them"
)if fitted:
= _calculate_sigma(y - res['fitted'], y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
= IMAPA(prediction_intervals=ConformalIntervals(2, 1))
imapa =ap, h=12, skip_insample=False, level=[80])
test_class(imapa, x=deg_ts, h=12, skip_insample=False, level=[80])
test_class(imapa, x= imapa.forecast(ap, 12)
fcst_imapa
_test_fitted_sparse(IMAPA)
# 测试一致性预测
= IMAPA(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
imapa_c =ap, h=13, level=[90, 80], skip_insample=False)
test_class(imapa_c, x= imapa_c.forecast(ap, 13, None, None, (80,95), True)
fcst_imapa_c
test_eq('mean'][:12],
fcst_imapa_c['mean'],
fcst_imapa[
)
_plot_insample_pi(fcst_imapa_c) _plot_fcst(fcst_imapa_c)
#测试别名参数
test_eq(repr(IMAPA()),
'IMAPA'
)
test_eq(repr(IMAPA(alias='IMAPA_custom')),
'IMAPA_custom'
)
=3) show_doc(IMAPA, title_level
=3) show_doc(IMAPA.forecast, title_level
=3) show_doc(IMAPA.fit, title_level
=3) show_doc(IMAPA.predict, title_level
from statsforecast.models import IMAPA
from statsforecast.utils import AirPassengers as ap
# IMAPA的使用示例
= IMAPA()
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
TSB
def _tsb(
# 时间序列
y: np.ndarray, int, # 预测时间范围
h: int, # 拟合值
fitted: float,
alpha_d: float,
alpha_p: -> Dict[str, np.ndarray]:
) if (y == 0).all():
= {'mean': np.zeros(h, dtype=y.dtype)}
res if fitted:
'fitted'] = np.zeros_like(y)
res['fitted'][0] = np.nan
res[return res
= _ensure_float(y)
y = _demand(y)
yd = _probability(y)
yp = _ses_forecast(yp, alpha_p)
ypf, ypft = _ses_forecast(yd, alpha_d)
ydf, ydft = {'mean': _repeat_val(val=ypf * ydf, h=h)}
res if fitted:
= _expand_fitted_demand(np.append(ydft, ydf), y)
ydft 'fitted'] = ypft * ydft
res[return res
class TSB(_TS):
def __init__(
self,
float,
alpha_d: float,
alpha_p: str = 'TSB',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):r"""TSB model.
Teunter-Syntetos-Babai: A modification of Croston's method that replaces the inter-demand
intervals with the demand probability $d_t$, which is defined as follows.
$$
d_t = \begin{cases}
1 & \text{if demand occurs at time t} \\
0 & \text{otherwise.}
\end{cases}
$$
Hence, the forecast is given by
$$\hat{y}_t= \hat{d}_t\hat{z_t}$$
Both $d_t$ and $z_t$ are forecasted using SES. The smooting paramaters of each may differ,
like in the optimized Croston's method.
References
----------
[Teunter, R. H., Syntetos, A. A., & Babai, M. Z. (2011). Intermittent demand: Linking forecasting to inventory obsolescence. European Journal of Operational Research, 214(3), 606-615.](https://www.sciencedirect.com/science/article/abs/pii/S0377221711004437)
Parameters
----------
alpha_d : float
Smoothing parameter for demand.
alpha_p : float
Smoothing parameter for probability.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
"""
self.alpha_d = alpha_d
self.alpha_p = alpha_p
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the TSB model.
Fit an TSB to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
TSB fitted model.
"""
= _ensure_float(y)
y self.model_ = _tsb(
=y,
y=1,
h=True,
fitted=self.alpha_d,
alpha_d=self.alpha_p
alpha_p
)self.model_['sigma'] = _calculate_sigma(y - self.model_['fitted'], y.size)
self._store_cs(y=y, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted TSB.
Parameters
----------
h : int
Forecast horizon.
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _repeat_val(self.model_['mean'][0], h=h)
mean = {'mean': mean}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted TSB insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient TSB predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = _tsb(
res =y, h=h,
y=fitted,
fitted=self.alpha_d,
alpha_d=self.alpha_p
alpha_p
)= dict(res)
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
= _calculate_sigma(y - res['fitted'], y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
= TSB(alpha_d=0.9, alpha_p=0.1, prediction_intervals=ConformalIntervals(2, 1))
tsb =ap, h=12, skip_insample=False, level=[80])
test_class(tsb, x=deg_ts, h=12, skip_insample=False, level=[80])
test_class(tsb, x= tsb.forecast(ap, 12)
fcst_tsb
lambda: TSB(alpha_d=0.9, alpha_p=0.1)) _test_fitted_sparse(
# 测试一致性预测
= TSB(alpha_d=0.9, alpha_p=0.1,prediction_intervals=ConformalIntervals(h=13, n_windows=2))
tsb_c =ap, h=13, level=[90, 80], skip_insample=True)
test_class(tsb_c, x= tsb_c.forecast(ap, 13, None, None, (80,95), True)
fcst_tsb_c
test_eq('mean'][:12],
fcst_tsb_c['mean'],
fcst_tsb[
)
_plot_insample_pi(fcst_tsb_c) _plot_fcst(fcst_tsb_c)
#测试别名参数
test_eq(repr(TSB(0.9, 0.1)),
'TSB'
)
test_eq(repr(TSB(0.9, 0.1, alias='TSB_custom')),
'TSB_custom'
)
=3) show_doc(TSB, title_level
=3) show_doc(TSB.forecast, title_level
=3) show_doc(TSB.fit, title_level
=3) show_doc(TSB.predict, title_level
from statsforecast.models import TSB
from statsforecast.utils import AirPassengers as ap
# TSB的使用示例
= TSB(alpha_d=0.5, alpha_p=0.5)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
多重季节性
MSTL
def _predict_mstl_components(mstl_ob, h, season_length):
= mstl_ob.filter(regex='seasonal*').columns
seasoncolumns = len(seasoncolumns)
nseasons = np.full((h, nseasons), np.nan)
seascomp = [season_length] if isinstance(season_length, int) else season_length
seasonal_periods for i in range(nseasons):
= seasonal_periods[i]
mp = seasoncolumns[i]
colname = np.tile(mstl_ob[colname].values[-mp:], trunc(1 + (h-1)/mp))[:h]
seascomp[:, i] return seascomp
def _predict_mstl_seas(mstl_ob, h, season_length):
= _predict_mstl_components(mstl_ob, h, season_length)
seascomp return seascomp.sum(axis=1)
class MSTL(_TS):
r"""MSTL model.
The MSTL (Multiple Seasonal-Trend decomposition using LOESS) decomposes the time series
in multiple seasonalities using LOESS. Then forecasts the trend using
a custom non-seaonal model and each seasonality using a SeasonalNaive model.
References
----------
[Bandara, Kasun & Hyndman, Rob & Bergmeir, Christoph. (2021). "MSTL: A Seasonal-Trend Decomposition Algorithm for Time Series with Multiple Seasonal Patterns".](https://arxiv.org/abs/2107.13462).
Parameters
----------
season_length : Union[int, List[int]
Number of observations per unit of time. For multiple seasonalities use a list.
trend_forecaster : model, default=AutoETS(model='ZZN')
StatsForecast model used to forecast the trend component.
stl_kwargs : dict
Extra arguments to pass to [`statsmodels.tsa.seasonal.STL`](https://www.statsmodels.org/dev/generated/statsmodels.tsa.seasonal.STL.html#statsmodels.tsa.seasonal.STL).
The `period` and `seasonal` arguments are reserved.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int, List[int]],
season_length: Union[= AutoETS(model='ZZN'),
trend_forecaster = None,
stl_kwargs: Optional[Dict] str = 'MSTL',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
): # 检查ETS模型是否具有季节性
if repr(trend_forecaster) == 'AutoETS':
if trend_forecaster.model[2] != 'N':
raise Exception(
'Trend forecaster should not adjust '
'seasonal models.'
)# 检查趋势预测器是否具有 season_length=1
if hasattr(trend_forecaster, 'season_length'):
if trend_forecaster.season_length != 1:
raise Exception(
'Trend forecaster should not adjust '
'seasonal models. Please pass `season_length=1` '
'to your trend forecaster'
)if isinstance(season_length, int):
= [season_length]
season_length else:
= sorted(season_length)
season_length self.season_length = season_length
self.trend_forecaster = trend_forecaster
self.prediction_intervals = prediction_intervals
self.alias = alias
if self.trend_forecaster.prediction_intervals is None and (self.prediction_intervals is not None):
self.trend_forecaster.prediction_intervals = prediction_intervals
self.stl_kwargs = dict() if stl_kwargs is None else stl_kwargs
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the MSTL model.
Fit MSTL to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X: array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
MSTL fitted model.
"""
= _ensure_float(y)
y self.model_ = mstl(
=y,
x=self.season_length,
period=self.stl_kwargs,
stl_kwargs
)= self.model_[['trend', 'remainder']].sum(axis=1).values
x_sa self.trend_forecaster = self.trend_forecaster.new().fit(y=x_sa, X=X)
self._store_cs(y=x_sa, X=X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[
):r"""Predict with fitted MSTL.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
str, Any] = {'h': h, 'X': X}
kwargs: Dict[if self.trend_forecaster.prediction_intervals is None:
'level'] = level
kwargs[= self.trend_forecaster.predict(**kwargs)
res = _predict_mstl_seas(self.model_, h=h, season_length=self.season_length)
seas = {key: val + seas for key, val in res.items()}
res if level is None or self.trend_forecaster.prediction_intervals is None:
return res
= sorted(level)
level if self.trend_forecaster.prediction_intervals is not None:
= self.trend_forecaster._add_predict_conformal_intervals(res, level)
res else:
raise Exception(
"You have to instantiate either the trend forecaster class or MSTL class with `prediction_intervals` to calculate them"
)return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted MSTL insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= self.trend_forecaster.predict_in_sample(level=level)
res = self.model_.filter(regex='seasonal*').sum(axis=1).values
seas = {key: val + seas for key, val in res.items()}
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient MSTL predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = mstl(
model_ =y,
x=self.season_length,
period=self.stl_kwargs,
stl_kwargs
)= model_[['trend', 'remainder']].sum(axis=1).values
x_sa = {
kwargs 'y': x_sa,
'h': h,
'X': X,
'X_future': X_future,
'fitted': fitted
}if fitted or self.trend_forecaster.prediction_intervals is None:
'level'] = level
kwargs[= self.trend_forecaster.forecast(**kwargs)
res if level is not None:
= sorted(level)
level if self.trend_forecaster.prediction_intervals is not None:
= self.trend_forecaster._add_conformal_intervals(fcst=res, y=x_sa, X=X, level=level)
res elif f'lo-{level[0]}' not in res:
raise Exception(
"You have to instantiate either the trend forecaster class or MSTL class with `prediction_intervals` to calculate them"
) #重新季节性调整结果
= _predict_mstl_seas(model_, h=h, season_length=self.season_length)
seas_h = model_.filter(regex='seasonal*').sum(axis=1).values
seas_insample = {
res + (seas_insample if 'fitted' in key else seas_h) \
key: val for key, val in res.items()
}return res
def forward(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Apply fitted MSTL model to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self.trend_forecaster, 'model_'):
raise Exception('You have to use the `fit` method first')
= _ensure_float(y)
y = mstl(
model_ =y,
x=self.season_length,
period=self.stl_kwargs,
stl_kwargs
)= model_[['trend', 'remainder']].sum(axis=1).values
x_sa = {
kwargs 'y': x_sa,
'h': h,
'X': X,
'X_future': X_future,
'fitted': fitted
}if fitted or self.trend_forecaster.prediction_intervals is None:
'level'] = level
kwargs[= self.trend_forecaster.forward(**kwargs)
res if level is not None:
= sorted(level)
level if self.trend_forecaster.prediction_intervals is not None:
= self.trend_forecaster._add_conformal_intervals(fcst=res, y=x_sa, X=X, level=level)
res #重新季节性调整结果
= _predict_mstl_seas(model_, h=h, season_length=self.season_length)
seas_h = model_.filter(regex='seasonal*').sum(axis=1).values
seas_insample = {
res + (seas_insample if 'fitted' in key else seas_h) \
key: val for key, val in res.items()
}return res
= [
trend_forecasters
AutoARIMA(),
AutoCES(), ='ZZN'),
AutoETS(model
Naive(),
CrostonClassic(),
]= [False, True, False, False, True]
skip_insamples = [False, True, True, False, False]
test_forwards for trend_forecaster, skip_insample, test_forward in zip(trend_forecasters, skip_insamples, test_forwards):
for stl_kwargs in [None, dict(trend=25)]:
= MSTL(
mstl_model =[12, 14],
season_length=trend_forecaster,
trend_forecaster=stl_kwargs,
stl_kwargs
)=ap, h=12,
test_class(mstl_model, x=skip_insample,
skip_insample=None,
level=test_forward) test_forward
# 有保形与无保形的间隔
# 趋势预测支持级别,使用原生级别
= MSTL(season_length=12, trend_forecaster=ARIMA(order=(0, 1, 0)))
mstl_native = pd.DataFrame(mstl_native.fit(y=ap).predict(h=24, level=[80, 95]))
res_native_fp = pd.DataFrame(mstl_native.forecast(y=ap, h=24, level=[80, 95]))
res_native_fc
pd.testing.assert_frame_equal(res_native_fp, res_native_fc)
# 趋势预测支持级别,使用保形方法。
= MSTL(
mstl_conformal =12,
season_length=ARIMA(
trend_forecaster=(0, 1, 0),
order=ConformalIntervals(h=24),
prediction_intervals
),
)= pd.DataFrame(mstl_conformal.fit(y=ap).predict(h=24, level=[80, 95]))
res_conformal_fp = pd.DataFrame(mstl_conformal.forecast(y=ap, h=24, level=[80, 95]))
res_conformal_fc
pd.testing.assert_frame_equal(res_conformal_fp, res_conformal_fc)lambda: pd.testing.assert_frame_equal(test_native_fp, test_conformal_fp))
test_fail(
# trend fcst doesn't support level
= MSTL(season_length=12, trend_forecaster=CrostonClassic())
mstl_bad lambda: mstl_bad.fit(y=ap).predict(h=24, level=[80, 95]), contains='prediction_intervals')
test_fail(lambda: mstl_bad.forecast(y=ap, h=24, level=[80, 95]), contains='prediction_intervals') test_fail(
# 保形预测
# 定义趋势预测器中的预测区间
= [
trend_forecasters =ConformalIntervals(h=13, n_windows=2)),
AutoARIMA(prediction_intervals=ConformalIntervals(h=13, n_windows=2)),
AutoCES(prediction_intervals
]= [False, True]
skip_insamples = [False, True]
test_forwards for trend_forecaster, skip_insample, test_forward in zip(trend_forecasters, skip_insamples, test_forwards):
for stl_kwargs in [None, dict(trend=25)]:
= MSTL(
mstl_model =[12, 14],
season_length=trend_forecaster,
trend_forecaster=stl_kwargs,
stl_kwargs
)=ap, h=13,
test_class(mstl_model, x=skip_insample,
skip_insample=[80, 90] if not skip_insample else None,
level=test_forward)
test_forward13, None, None, (80,95), False)) _plot_fcst(mstl_model.forecast(ap,
# 保形预测
# 定义MSTL中的prediction_interval
= [
trend_forecasters
AutoCES()
]for stl_kwargs in [None, dict(trend=25)]:
= MSTL(
mstl_model =[12, 14],
season_length=trend_forecaster,
trend_forecaster=stl_kwargs,
stl_kwargs=ConformalIntervals(h=13, n_windows=2)
prediction_intervals
)=ap, h=13,
test_class(mstl_model, x=False,
skip_insample=[80, 90] if not skip_insample else None,
level=True)
test_forward13, None, None, (80,95), False)) _plot_fcst(mstl_model.forecast(ap,
#失败的季节性趋势预测者
test_fail(
MSTL,='should not adjust seasonal',
contains=([3, 12], AutoETS(model='ZZZ'))
args
)
test_fail(
MSTL,='should not adjust seasonal',
contains=([3, 12], AutoARIMA(season_length=12))
args )
#测试别名参数
test_eq(repr(MSTL(season_length=7)),
'MSTL'
)
test_eq(repr(MSTL(season_length=7, alias='MSTL_custom')),
'MSTL_custom'
)
=3) show_doc(MSTL, title_level
=3) show_doc(MSTL.fit, title_level
=3) show_doc(MSTL.predict, title_level
=3) show_doc(MSTL.predict_in_sample, title_level
=3) show_doc(MSTL.forecast, title_level
=3) show_doc(MSTL.forward, title_level
from statsforecast.models import MSTL
from statsforecast.utils import AirPassengers as ap
# MSTL的使用示例
= MSTL(season_length=[3, 12], trend_forecaster=AutoARIMA(prediction_intervals=ConformalIntervals(h=4, n_windows=2)))
mstl_model = mstl_model.fit(y=ap)
mstl_model = mstl_model.predict(h=4, level=[80])
y_hat_dict y_hat_dict
TBATS
class TBATS(_TS):
r"""Trigonometric Box-Cox transform, ARMA errors, Trend and Seasonal components (TBATS) model.
TBATS is an innovations state space model framework used for forecasting time series with multiple seasonalities. It uses a Box-Cox tranformation, ARMA errors, and a trigonometric representation of the seasonal patterns based on Fourier series.
The name TBATS is an acronym for the key features of the model: Trigonometric, Box-Cox transform, ARMA errors, Trend, and Seasonal components.
References
----------
- [De Livera, A. M., Hyndman, R. J., & Snyder, R. D. (2011). Forecasting time series with complex seasonal patterns using exponential smoothing. Journal of the American statistical association, 106(496), 1513-1527.](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=f3de25596ab60ef0e886366826bf58a02b35a44f)
- [De Livera, Alysha M (2017). Modeling time series with complex seasonal patterns using exponential smoothing. Monash University. Thesis.](https://doi.org/10.4225/03/589299681de3d)
Parameters
----------
season_length : int or list of int.
Number of observations per unit of time. Ex: 24 Hourly data.
use_boxcox : bool (default=True)
Whether or not to use a Box-Cox transformation.
bc_lower_bound : float (default=0.0)
Lower bound for the Box-Cox transformation.
bc_upper_bound : float (default=1.0)
Upper bound for the Box-Cox transformation.
use_trend : bool (default=True)
Whether or not to use a trend component.
use_damped_trend : bool (default=False)
Whether or not to dampen the trend component.
use_arma_errors : bool (default=False)
Whether or not to use a ARMA errors.
alias : str
Custom name of the model.
"""
@_old_kw_to_pos(['seasonal_periods'], [1])
def __init__(
self,
int, List[int]],
season_length: Union[bool] = True,
use_boxcox: Optional[float = 0.0,
bc_lower_bound: float = 1.0,
bc_upper_bound: bool] = True,
use_trend: Optional[bool] = False,
use_damped_trend: Optional[bool = False,
use_arma_errors: str = 'TBATS',
alias: *,
=None, # noqa:ARG002
seasonal_periods
):if isinstance(season_length, int):
= [season_length]
season_length self.season_length = list(season_length)
self.use_boxcox = use_boxcox
self.bc_lower_bound = bc_lower_bound
self.bc_upper_bound = bc_upper_bound
self.use_trend = use_trend
self.use_damped_trend = use_damped_trend
self.use_arma_errors = use_arma_errors
self.alias = alias
def fit(
self,
y: np.ndarray,= None
X: Optional[np.ndarray]
):r"""Fit TBATS model.
Fit TBATS model to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : numpy.array, optional (default=None)
Ignored
Returns
-------
self :
TBATS model.
"""
= _ensure_float(y)
y self.model_ = tbats_selection(
=y,
y=self.season_length,
seasonal_periods=self.use_boxcox,
use_boxcox=self.bc_lower_bound,
bc_lower_bound=self.bc_upper_bound,
bc_upper_bound=self.use_trend,
use_trend=self.use_damped_trend,
use_damped_trend=self.use_arma_errors
use_arma_errors
)return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None
level: Optional[List[
):r"""Predict with fitted TBATS model.
Parameters
----------
h : int
Forecast horizon.
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= tbats_forecast(self.model_, h)
fcst = {'mean': fcst['mean']}
res if level is not None:
= sorted(level)
level = _compute_sigmah(self.model_, h)
sigmah = _calculate_intervals(res, level, h, sigmah)
pred_int = {**res, **pred_int}
res if self.model_['BoxCox_lambda'] is not None:
= {k: inv_boxcox(v, self.model_['BoxCox_lambda']) for k, v in res.items()}
res_trans for k, v in res_trans.items():
= np.where(np.isnan(v), res[k], v)
res_trans[k] else:
= res
res_trans return res_trans
def predict_in_sample(self, level: Optional[Tuple[int]] = None):
r"""Access fitted TBATS model predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted'].ravel()}
res if level is not None:
= _calculate_sigma(self.model_['errors'], self.model_['errors'].shape[1])
se = _add_fitted_pi(res, se, level)
fitted_pred_int = {**res, **fitted_pred_int}
res if self.model_['BoxCox_lambda'] is not None:
= {k: inv_boxcox(v, self.model_['BoxCox_lambda']) for k, v in res.items()}
res_trans for k, v in res_trans.items():
= np.where(np.isnan(v), res[k], v)
res_trans[k] else:
= res
res_trans return res_trans
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool=False
fitted:
): r"""Memory Efficient TBATS model.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = tbats_selection(
mod =y,
y=self.season_length,
seasonal_periods=self.use_boxcox,
use_boxcox=self.bc_lower_bound,
bc_lower_bound=self.bc_upper_bound,
bc_upper_bound=self.use_trend,
use_trend=self.use_damped_trend,
use_damped_trend=self.use_arma_errors
use_arma_errors
)= tbats_forecast(mod, h)
fcst = {'mean': fcst['mean']}
res if fitted:
'fitted'] = mod['fitted'].ravel()
res[if level is not None:
= sorted(level)
level = _compute_sigmah(mod, h)
sigmah = _calculate_intervals(res, level, h, sigmah)
pred_int = {**res, **pred_int}
res if fitted:
= _calculate_sigma(mod['errors'], mod['errors'].shape[1])
se = _add_fitted_pi(res, se, level)
fitted_pred_int = {**res, **fitted_pred_int}
res if mod['BoxCox_lambda'] is not None:
= {k : inv_boxcox(v, mod['BoxCox_lambda']) for k, v in res.items()}
res_trans for k, v in res_trans.items():
= np.where(np.isnan(v), res[k], v)
res_trans[k] else:
= res
res_trans return res_trans
= TBATS(season_length=12)
tbats =ap, h=12, level=[90, 80]) test_class(tbats, x
=3) show_doc(TBATS, title_level
=3) show_doc(TBATS.fit, title_level
=3) show_doc(TBATS.predict, title_level
=3) show_doc(TBATS.predict_in_sample, title_level
=3) show_doc(TBATS.forecast, title_level
AutoTBATS
class AutoTBATS(TBATS):
r"""AutoTBATS model.
Automatically selects the best TBATS model from all feasible combinations of the parameters use_boxcox, use_trend, use_damped_trend, and use_arma_errors.
Selection is made using the AIC.
Default value for use_arma_errors is True since this enables the evaluation of models with and without ARMA errors.
References
----------
- [De Livera, A. M., Hyndman, R. J., & Snyder, R. D. (2011). Forecasting time series with complex seasonal patterns using exponential smoothing. Journal of the American statistical association, 106(496), 1513-1527.](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=f3de25596ab60ef0e886366826bf58a02b35a44f)
- [De Livera, Alysha M (2017). Modeling time series with complex seasonal patterns using exponential smoothing. Monash University. Thesis.](https://doi.org/10.4225/03/589299681de3d)
Parameters
----------
seasonal_periods : int or list of int.
Number of observations per unit of time. Ex: 24 Hourly data.
use_boxcox : bool (default=None)
Whether or not to use a Box-Cox transformation. By default tries both.
bc_lower_bound : float (default=0.0)
Lower bound for the Box-Cox transformation.
bc_upper_bound : float (default=1.0)
Upper bound for the Box-Cox transformation.
use_trend : bool (default=None)
Whether or not to use a trend component. By default tries both.
use_damped_trend : bool (default=None)
Whether or not to dampen the trend component. By default tries both.
use_arma_errors : bool (default=True)
Whether or not to use a ARMA errors. Default is True and this evaluates both models.
alias : str
Custom name of the model.
"""
@_old_kw_to_pos(['seasonal_periods'], [1])
def __init__(
self,
int, List[int]],
season_length: Union[bool] = None,
use_boxcox: Optional[float = 0.0,
bc_lower_bound: float = 1.0,
bc_upper_bound: bool] = None,
use_trend: Optional[bool] = None,
use_damped_trend: Optional[bool = True,
use_arma_errors: str = 'AutoTBATS',
alias: *,
=None # noqa:ARG002
seasonal_periods
):super().__init__(
=season_length,
season_length=use_boxcox,
use_boxcox=bc_lower_bound,
bc_lower_bound=bc_upper_bound,
bc_upper_bound=use_trend,
use_trend=use_damped_trend,
use_damped_trend=use_arma_errors,
use_arma_errors=alias
alias )
= AutoTBATS(season_length=12)
tbats =ap, h=12, level=[90, 80])
test_class(tbats, x= tbats.forecast(ap, 13, None, None, (80,95), True)
fcst_tbats _plot_fcst(fcst_tbats)
_plot_insample_pi(fcst_tbats)
=3) show_doc(AutoTBATS, title_level
=3) show_doc(AutoTBATS.fit, title_level
=3) show_doc(AutoTBATS.predict, title_level
=3) show_doc(AutoTBATS.predict_in_sample, title_level
=3) show_doc(AutoTBATS.forecast, title_level
Θ家族
标准Theta方法
class Theta(AutoTheta):
r""" Standard Theta Method.
References
----------
[Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
decomposition_type : str
Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1,
season_length: str = 'multiplicative',
decomposition_type: str = 'Theta',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
): super().__init__(
=season_length,
season_length='STM',
model=decomposition_type,
decomposition_type=alias,
alias=prediction_intervals,
prediction_intervals )
= Theta(season_length=12)
stm = stm.forecast(ap,12)
fcast_stm
= AutoTheta(season_length=12, model='STM')
theta = theta.forecast(ap,12)
fcast_theta
np.testing.assert_equal(
fcast_theta,
fcast_stm )
# 测试一致性预测
= Theta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5))
stm_c =ap, h=13, level=[90, 80], test_forward=True)
test_class(stm_c, x= stm_c.forecast(ap, 13, None, None, (80,95), True)
fcst_stm_c
test_eq('mean'][:12],
fcst_stm_c['mean'],
fcast_stm[ )
= Theta(season_length=12)
stm
stm.fit(ap)= stm.predict(12)
fcast_stm = stm.forward(y=ap, h=12)
forward_stm
= AutoTheta(season_length=12, model='STM')
theta = theta.forecast(ap,12)
fcast_theta
theta.fit(ap)= theta.forward(y=ap, h=12)
forward_autotheta
np.testing.assert_equal(
fcast_theta,
fcast_stm,
)
np.testing.assert_equal(
forward_stm,
forward_autotheta )
#测试别名参数
test_eq(repr(Theta()),
'Theta'
)
test_eq(repr(Theta(alias='Theta_custom')),
'Theta_custom'
)
=3) show_doc(Theta, title_level
='Theta.forecast', title_level=3) show_doc(Theta.forecast, name
='Theta.fit', title_level=3) show_doc(Theta.fit, name
='Theta.predict', title_level=3) show_doc(Theta.predict, name
='Theta.predict_in_sample', title_level=3) show_doc(Theta.predict_in_sample, name
='Theta.forward', title_level=3) show_doc(Theta.forward, name
from statsforecast.models import Theta
from statsforecast.utils import AirPassengers as ap
# Theta的使用示例
= Theta(season_length=12)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
优化的Theta方法
class OptimizedTheta(AutoTheta):
r""" Optimized Theta Method.
References
----------
[Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
decomposition_type : str
Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
alias : str
Custom name of the model. Default `OptimizedTheta`.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1,
season_length: str = 'multiplicative',
decomposition_type: str = 'OptimizedTheta',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
): super().__init__(
=season_length,
season_length='OTM',
model=decomposition_type,
decomposition_type=alias,
alias=prediction_intervals,
prediction_intervals )
= OptimizedTheta(season_length=12)
otm = otm.forecast(ap,12)
fcast_otm
otm.fit(ap)= otm.forward(y=ap, h=12)
forward_otm
= AutoTheta(season_length=12, model='OTM')
theta = theta.forecast(ap,12)
fcast_theta
theta.fit(ap)= theta.forward(y=ap, h=12)
forward_autotheta
np.testing.assert_equal(
fcast_theta,
fcast_otm
)
np.testing.assert_equal(
forward_autotheta,
forward_otm )
# 测试一致性预测
= OptimizedTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5))
otm_c =ap, h=13, level=[90, 80], test_forward=True)
test_class(otm_c, x= otm_c.forecast(ap, 13, None, None, (80,95), True)
fcst_otm_c
test_eq('mean'][:12],
fcst_otm_c['mean'],
fcast_otm[ )
# 隐藏
= OptimizedTheta(season_length=12)
otm
otm.fit(ap)= otm.predict(12)
fcast_otm
= AutoTheta(season_length=12, model='OTM')
theta = theta.forecast(ap,12)
fcast_theta
np.testing.assert_equal(
fcast_theta,
fcast_otm )
#测试别名参数
test_eq(repr(OptimizedTheta()),
'OptimizedTheta'
)
test_eq(repr(OptimizedTheta(alias='OptimizedTheta_custom')),
'OptimizedTheta_custom'
)
=3) show_doc(OptimizedTheta, title_level
='OptimizedTheta.forecast', title_level=3) show_doc(OptimizedTheta.forecast, name
='OptimizedTheta.fit', title_level=3) show_doc(OptimizedTheta.fit, name
='OptimizedTheta.predict', title_level=3) show_doc(OptimizedTheta.predict, name
='OptimizedTheta.predict_in_sample', title_level=3) show_doc(OptimizedTheta.predict_in_sample, name
='OptimizedTheta.forward', title_level=3) show_doc(OptimizedTheta.forward, name
from statsforecast.models import OptimizedTheta
from statsforecast.utils import AirPassengers as ap
# OptimzedThetA 的使用示例
= OptimizedTheta(season_length=12)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
动态标准Theta方法
class DynamicTheta(AutoTheta):
r""" Dynamic Standard Theta Method.
References
----------
[Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
decomposition_type : str
Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1,
season_length: str = 'multiplicative',
decomposition_type: str = 'DynamicTheta',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
): super().__init__(
=season_length,
season_length='DSTM',
model=decomposition_type,
decomposition_type=alias,
alias=prediction_intervals,
prediction_intervals )
= DynamicTheta(season_length=12)
dstm = dstm.forecast(ap,12)
fcast_dstm
dstm.fit(ap)= dstm.forward(y=ap, h=12)
forward_dstm
= AutoTheta(season_length=12, model='DSTM')
theta = theta.forecast(ap,12)
fcast_theta
theta.fit(ap)= theta.forward(y=ap, h=12)
forward_autotheta
np.testing.assert_equal(
fcast_theta,
fcast_dstm
)
np.testing.assert_equal(
forward_autotheta,
forward_dstm )
= DynamicTheta(season_length=12)
dstm
dstm.fit(ap)= dstm.predict(12)
fcast_dstm
= AutoTheta(season_length=12, model='DSTM')
theta = theta.forecast(ap,12)
fcast_theta
np.testing.assert_equal(
fcast_theta,
fcast_dstm )
# 测试一致性预测
= DynamicTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5))
dstm_c =ap, h=13, level=[90, 80], test_forward=True)
test_class(dstm_c, x= dstm_c.forecast(ap, 13, None, None, (80,95), True)
fcst_dstm_c
test_eq('mean'][:12],
fcst_dstm_c['mean'],
fcast_dstm[ )
#测试别名参数
test_eq(repr(DynamicTheta()),
'DynamicTheta'
)
test_eq(repr(DynamicTheta(alias='DynamicTheta_custom')),
'DynamicTheta_custom'
)
=3) show_doc(DynamicTheta, title_level
='DynamicTheta.forecast', title_level=3) show_doc(DynamicTheta.forecast, name
='DynamicTheta.fit', title_level=3) show_doc(DynamicTheta.fit, name
='DynamicTheta.predict', title_level=3) show_doc(DynamicTheta.predict, name
='DynamicTheta.predict_in_sample', title_level=3) show_doc(DynamicTheta.predict_in_sample, name
='DynamicTheta.forward', title_level=3) show_doc(DynamicTheta.forward, name
from statsforecast.models import DynamicTheta
from statsforecast.utils import AirPassengers as ap
# DynStandardThetaMethod的使用示例
= DynamicTheta(season_length=12)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
动态优化θ方法
class DynamicOptimizedTheta(AutoTheta):
r""" Dynamic Optimized Theta Method.
References
----------
[Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
decomposition_type : str
Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1,
season_length: str = 'multiplicative',
decomposition_type: str = 'DynamicOptimizedTheta',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
): super().__init__(
=season_length,
season_length='DOTM',
model=decomposition_type,
decomposition_type=alias,
alias=prediction_intervals,
prediction_intervals )
= DynamicOptimizedTheta(season_length=12)
dotm = dotm.forecast(ap,12)
fcast_dotm
dotm.fit(ap)= dotm.forward(y=ap, h=12)
forward_dotm
= AutoTheta(season_length=12, model='DOTM')
theta = theta.forecast(ap,12)
fcast_theta
theta.fit(ap)= theta.forward(y=ap, h=12)
forward_autotheta
np.testing.assert_equal(
fcast_theta,
fcast_dotm
)
np.testing.assert_equal(
forward_autotheta,
forward_dotm )
= DynamicOptimizedTheta(season_length=12)
dotm
dotm.fit(ap)= dotm.predict(12)
fcast_dotm
= AutoTheta(season_length=12, model='DOTM')
theta = theta.forecast(ap,12)
fcast_theta
np.testing.assert_equal(
fcast_theta,
fcast_dotm )
# 测试保形预测
= DynamicOptimizedTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5))
dotm_c =ap, h=13, level=[90, 80], test_forward=True)
test_class(dotm_c, x= dotm_c.forecast(ap, 13, None, None, (80,95), True)
fcst_dotm_c
test_eq('mean'][:12],
fcst_dotm_c['mean'],
fcast_dotm[ )
#测试别名参数
test_eq(repr(DynamicOptimizedTheta()),
'DynamicOptimizedTheta'
)
test_eq(repr(DynamicOptimizedTheta(alias='DynamicOptimizedTheta_custom')),
'DynamicOptimizedTheta_custom'
)
=3) show_doc(DynamicOptimizedTheta, title_level
='DynamicOptimizedTheta.forecast', title_level=3) show_doc(DynamicOptimizedTheta.forecast, name
='DynamicOptimizedTheta.fit', title_level=3) show_doc(DynamicOptimizedTheta.fit, name
='DynamicOptimizedTheta.predict', title_level=3) show_doc(DynamicOptimizedTheta.predict, name
='DynamicOptimizedTheta.predict_in_sample', title_level=3) show_doc(DynamicOptimizedTheta.predict_in_sample, name
='DynamicOptimizedTheta.forward', title_level=3) show_doc(DynamicOptimizedTheta.forward, name
from statsforecast.models import DynamicOptimizedTheta
from statsforecast.utils import AirPassengers as ap
# OptimzedThetaMethod 的使用示例
= DynamicOptimizedTheta(season_length=12)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
ARCH 家族
Garch 模型
class GARCH(_TS):
r"""Generalized Autoregressive Conditional Heteroskedasticity (GARCH) model.
A method for modeling time series that exhibit non-constant volatility over time.
The GARCH model assumes that at time $t$, $y_t$ is given by:
$$y_t = v_t \sigma_t$$
with
$$\sigma_t^2 = w + \sum_{i=1}^p a_i y_{t-i}^2 + \sum_{j=1}^q b_j \sigma_{t-j}^2$$.
Here $v_t$ is a sequence of iid random variables with zero mean and unit variance.
The coefficients $w$, $a_i$, $i=1,...,p$, and $b_j$, $j=1,...,q$ must satisfy the following conditions:
1. $w > 0$ and $a_i, b_j \geq 0$ for all $i$ and $j$.
2. $\sum_{k=1}^{max(p,q)} a_k + b_k < 1$. Here it is assumed that $a_i=0$ for $i>p$ and $b_j=0$ for $j>q$.
The ARCH model is a particular case of the GARCH model when $q=0$.
References
----------
[Engle, R. F. (1982). Autoregressive conditional heteroscedasticity with estimates of the variance of United Kingdom inflation. Econometrica: Journal of the econometric society, 987-1007.](http://www.econ.uiuc.edu/~econ508/Papers/engle82.pdf)
[Bollerslev, T. (1986). Generalized autoregressive conditional heteroskedasticity. Journal of econometrics, 31(3), 307-327.](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=7da8bfa5295375c1141d797e80065a599153c19d)
[James D. Hamilton. Time Series Analysis Princeton University Press, Princeton, New Jersey, 1st Edition, 1994.](https://press.princeton.edu/books/hardcover/9780691042893/time-series-analysis)
Parameters
----------
p : int
Number of lagged versions of the series.
q: int
Number of lagged versions of the volatility.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1,
p: int = 1,
q: str = 'GARCH',
alias: = None,
prediction_intervals: Optional[ConformalIntervals]
):self.p = p
self.q = q
if q !=0:
self.alias = alias+'('+str(p)+','+str(q)+')'
else:
self.alias = alias+'('+str(p)+')'
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,= None
X: Optional[np.ndarray]
):r"""Fit GARCH model.
Fit GARCH model to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
GARCH model.
"""
= _ensure_float(y)
y self.model_ = garch_model(y, p=self.p, q=self.q)
self.model_['actual_residuals'] = y - self.model_['fitted']
self._store_cs(y, X)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None
level: Optional[List[
):r"""Predict with fitted GARCH model.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= garch_forecast(self.model_, h)
fcst = {'mean': fcst['mean'], 'sigma2': fcst['sigma2']}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
= _quantiles(level)
quantiles = res['mean'].reshape(-1, 1) - quantiles * res['sigma2'].reshape(-1, 1)
lo = res['mean'].reshape(-1, 1) + quantiles * res['sigma2'].reshape(-1, 1)
hi = lo[:, ::-1]
lo = {f'lo-{l}': lo[:, i] for i, l in enumerate(reversed(level))}
lo = {f'hi-{l}': hi[:, i] for i, l in enumerate(level)}
hi = {**res, **lo, **hi}
res return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted GARCH model predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= self.model_['actual_residuals']
residuals = _calculate_sigma(residuals, len(residuals) - 1)
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool=False
fitted:
):r"""Memory Efficient GARCH model.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = garch_model(y, p=self.p, q=self.q)
mod = garch_forecast(mod, h)
fcst = ['mean', 'sigma2']
keys if fitted:
'fitted')
keys.append(= {key: fcst[key] for key in keys}
res if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
= _quantiles(level)
quantiles = res['mean'].reshape(-1, 1) - quantiles * res['sigma2'].reshape(-1, 1)
lo = res['mean'].reshape(-1, 1) + quantiles * res['sigma2'].reshape(-1, 1)
hi = lo[:, ::-1]
lo = {f'lo-{l}': lo[:, i] for i, l in enumerate(reversed(level))}
lo = {f'hi-{l}': hi[:, i] for i, l in enumerate(level)}
hi = {**res, **lo, **hi}
res if fitted:
= _calculate_sigma(y - mod['fitted'], len(y) - 1)
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
# 生成GARCH(2,2)数据
= 1000
n = 0.5
w = np.array([0.1, 0.2])
alpha = np.array([0.4, 0.2])
beta
= generate_garch_data(n, w, alpha, beta)
y
=(10,4))
plt.figure(figsize plt.plot(y)
= GARCH(2,2)
garch =y, h=12, skip_insample=False, level=[90,80])
test_class(garch, x= garch.forecast(ap, 12) fcst_garch
# 测试一致性预测
= GARCH(2,2,prediction_intervals=ConformalIntervals(h=13, n_windows=2))
garch_c =ap, h=13, level=[90, 80], test_forward=False)
test_class(garch_c, x= garch_c.forecast(ap, 13, None, None, (80,95), True)
fcst_garch_c
test_eq('mean'][:12],
fcst_garch_c['mean']
fcst_garch[
) _plot_fcst(fcst_garch_c)
=100
h= garch.forecast(y, h=h, level=[80,95], fitted=True) fcst
= plt.subplots(1, 1, figsize = (20,7))
fig, ax #plt.plot(np.arange(0, len(y)), y)
len(y), len(y) + h), fcst['mean'], label='mean')
plt.plot(np.arange(len(y), len(y) + h), fcst['sigma2'], color = 'c', label='sigma2')
plt.plot(np.arange(len(y), len(y) + h), fcst['lo-95'], color = 'r', label='lo-95')
plt.plot(np.arange(len(y), len(y) + h), fcst['hi-95'], color = 'r', label='hi-95')
plt.plot(np.arange(len(y), len(y) + h), fcst['lo-80'], color = 'g', label='lo-80')
plt.plot(np.arange(len(y), len(y) + h), fcst['hi-80'], color = 'g', label='hi-80')
plt.plot(np.arange( plt.legend()
= plt.subplots(1, 1, figsize = (20,7))
fig, ax 0, len(y)), y)
plt.plot(np.arange(0, len(y)), fcst['fitted'], label='fitted')
plt.plot(np.arange(0, len(y)), fcst['fitted-lo-95'], color = 'r', label='fitted-lo-95')
plt.plot(np.arange(0, len(y)), fcst['fitted-hi-95'], color = 'r', label='fitted-hi-95')
plt.plot(np.arange(0, len(y)), fcst['fitted-lo-80'], color = 'g', label='fitted-lo-80')
plt.plot(np.arange(0, len(y)), fcst['fitted-hi-80'], color = 'g', label='fitted-hi-80')
plt.plot(np.arange(len(y)-50, len(y))
plt.xlim( plt.legend()
=3) show_doc(GARCH, title_level
=3) show_doc(GARCH.fit, title_level
=3) show_doc(GARCH.predict, title_level
=3) show_doc(GARCH.predict_in_sample, title_level
=3) show_doc(GARCH.forecast, title_level
ARCH模型
class ARCH(GARCH):
r"""Autoregressive Conditional Heteroskedasticity (ARCH) model.
A particular case of the GARCH(p,q) model where $q=0$.
It assumes that at time $t$, $y_t$ is given by:
$$y_t = \epsilon_t \sigma_t$$
with
$$\sigma_t^2 = w0 + \sum_{i=1}^p a_i y_{t-i}^2$$.
Here $\epsilon_t$ is a sequence of iid random variables with zero mean and unit variance.
The coefficients $w$ and $a_i$, $i=1,...,p$ must be nonnegative and $\sum_{k=1}^p a_k < 1$.
References
----------
[Engle, R. F. (1982). Autoregressive conditional heteroscedasticity with estimates of the variance of United Kingdom inflation. Econometrica: Journal of the econometric society, 987-1007.](http://www.econ.uiuc.edu/~econ508/Papers/engle82.pdf)
[James D. Hamilton. Time Series Analysis Princeton University Press, Princeton, New Jersey, 1st Edition, 1994.](https://press.princeton.edu/books/hardcover/9780691042893/time-series-analysis)
Parameters
----------
p : int
Number of lagged versions of the series.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
int = 1,
p: str = 'ARCH',
alias: = None
prediction_intervals: Optional[ConformalIntervals]
):self.p = p
self.alias = alias
super().__init__(p, q=0, alias=alias)
= ARCH(1)
arch =y, h=12, skip_insample=False, level=[90,80])
test_class(arch, x= arch.forecast(ap, 12) fcst_arch
# 测试一致性预测
= ARCH(1,prediction_intervals=ConformalIntervals(h=13, n_windows=2))
arch_c =ap, h=13, level=[90, 80], test_forward=False)
test_class(arch_c, x= arch_c.forecast(ap, 13, None, None, (80,95), True)
fcst_arch_c
test_eq('mean'][:12],
fcst_arch_c['mean']
fcst_arch[
) _plot_fcst(fcst_arch_c)
= GARCH(p=1, q=0)
garch = garch.forecast(y, h=12, level=[90,80], fitted=True)
fcast_garch
= ARCH(p=1)
arch = arch.forecast(y, h=12, level=[90,80], fitted=True)
fcast_arch
np.testing.assert_equal(
fcast_garch,
fcast_arch )
=3) show_doc(ARCH, title_level
='ARCH.fit', title_level=3) show_doc(ARCH.fit, name
='ARCH.predict', title_level=3) show_doc(ARCH.predict, name
='ARCH.predict_in_sample', title_level=3) show_doc(ARCH.predict_in_sample, name
='ARCH.forecast', title_level=3) show_doc(ARCH.forecast, name
机器学习模型
Sklearn模型
class SklearnModel(_TS):
r"""scikit-learn model wrapper
Parameters
----------
model : sklearn.base.BaseEstimator
scikit-learn estimator
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
alias : str, optional (default=None)
Custom name of the model. If `None` will use the model's class.
"""
= True
uses_exog
def __init__(
self,
model,= None,
prediction_intervals: Optional[ConformalIntervals] str] = None,
alias: Optional[
):self.model = model
self.prediction_intervals = prediction_intervals
self.alias = alias if alias is not None else model.__class__.__name__
def fit(
self,
y: np.ndarray,
X: np.ndarray,-> 'SklearnModel':
) r"""Fit the model.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Exogenous of shape (t, n_x).
Returns
-------
self : SklearnModel
Fitted SklearnModel object.
r"""
from sklearn.base import clone
self.model_ = {'model': clone(self.model)}
self.model_['model'].fit(X, y)
self._store_cs(y=y, X=X)
self.model_['fitted'] = self.model_['model'].predict(X)
= y - self.model_['fitted']
residuals self.model_['sigma'] = _calculate_sigma(residuals, y.size)
return self
def predict(
self,
int,
h:
X: np.ndarray,int]] = None,
level: Optional[List[-> Dict[str, Any]:
) r"""Predict with fitted SklearnModel.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Exogenous of shape (h, n_x).
level: List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
r"""
= {'mean': self.model_['model'].predict(X)}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self, level: Optional[List[int]] = None) -> Dict[str, Any]:
r"""Access fitted SklearnModel insample predictions.
Parameters
----------
level : List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= sorted(level)
level = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h:
X: np.ndarray,
X_future: np.ndarray,int]] = None,
level: Optional[List[bool = False,
fitted: -> Dict[str, Any]:
) r"""Memory Efficient SklearnModel predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
h : int
Forecast horizon.
X : array-like
Insample exogenous of shape (t, n_x).
X_future : array-like
Exogenous of shape (h, n_x).
level : List[int]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
r"""
from sklearn.base import clone
= clone(self.model)
model
model.fit(X, y)= {'mean': model.predict(X_future)}
res if fitted:
'fitted'] = model.predict(X)
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
= y - res['fitted']
residuals = _calculate_sigma(residuals, y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
def forward(
self,
y: np.ndarray,int,
h:
X: np.ndarray,
X_future: np.ndarray,int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Apply fitted SklearnModel to a new/updated time series.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
h : int
Forecast horizon.
X : array-like
Insample exogenous of shape (t, n_x).
X_future : array-like
Exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `constant` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, "model_"):
raise Exception("You have to use the `fit` method first")
= {'mean': self.model_['model'].predict(X_future)}
res if fitted:
'fitted'] = self.model_['model'].predict(X)
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
= _calculate_sigma(y - res['fitted'], y.size)
se = _add_fitted_pi(res=res, se=se, level=level)
res return res
from sklearn.linear_model import Ridge
= 12
h = SklearnModel(Ridge(), prediction_intervals=ConformalIntervals(h=h))
skm = np.arange(ap.size).reshape(-1, 1)
X = ap.size + np.arange(h).reshape(-1, 1)
X_future =ap, X=X, X_future=X_future, h=h, skip_insample=False, level=[80, 95], test_forward=True)
test_class(skm, x= skm.forecast(ap, h, X=X, X_future=X_future, fitted=True, level=[80, 95])
fcst_skm _plot_insample_pi(fcst_skm)
#测试别名参数
test_eq(repr(SklearnModel(Ridge())),
'Ridge'
)
test_eq(repr(SklearnModel(Ridge(), alias='my_ridge')),
'my_ridge'
)
show_doc(SklearnModel)
show_doc(SklearnModel.fit)
show_doc(SklearnModel.predict)
show_doc(SklearnModel.predict_in_sample)
show_doc(SklearnModel.forecast)
MFLES
class MFLES(_TS):
r"""MFLES model.
A method to forecast time series based on Gradient Boosted Time Series Decomposition
which treats traditional decomposition as the base estimator in the boosting
process. Unlike normal gradient boosting, slight learning rates are applied at the
component level (trend/seasonality/exogenous).
The method derives its name from some of the underlying estimators that can
enter into the boosting procedure, specifically: a simple Median, Fourier
functions for seasonality, a simple/piecewise Linear trend, and Exponential
Smoothing.
Parameters
----------
season_length : int or list of int, optional (default=None)
Number of observations per unit of time. Ex: 24 Hourly data.
fourier_order : int, optional (default=None)
How many fourier sin/cos pairs to create, the larger the number the more complex of a seasonal pattern can be fitted.
A lower number leads to smoother results.
This is auto-set based on seasonal_period.
max_rounds : int (default=50)
The max number of boosting rounds. The boosting will auto-stop but depending on other parameters such as rs_lr you may want more rounds.
Generally more rounds means a smoother fit.
ma : int, optional (default=None)
The moving average order to use, this is auto-set based on internal logic.
Passing 4 would fit a 4 period moving average on the residual component.
alpha : float (default=1.0)
The alpha which is used in fitting the underlying LASSO when using piecewise functions.
decay : float (default=-1.0)
Effects the slopes of the piecewise-linear basis function.
changepoints : boolean (default=True)
Whether to fit for changepoints if all other logic allows for it. If False, MFLES will not ever fit a piecewise trend.
n_changepoints : int or float (default=0.25)
Number (if int) or proportion (if float) of changepoint knots to place. The default of 0.25 will place 0.25 * (series length) number of knots.
seasonal_lr : float (default=0.9)
A shrinkage parameter (0 < seasonal_lr <= 1) which penalizes the seasonal fit.
A value of 0.9 will flatly multiply the seasonal fit by 0.9 each boosting round, this can be used to allow more signal to the exogenous component.
trend_lr : float (default=0.9)
A shrinkage parameter (0 < trend_lr <= 1) which penalizes the linear trend fit
A value of 0.9 will flatly multiply the linear fit by 0.9 each boosting round, this can be used to allow more signal to the seasonality or exogenous components.
exogenous_lr : float (default=1.0)
The shrinkage parameter (0 < exogenous_lr <= 1) which controls how much of the exogenous signal is carried to the next round.
residuals_lr : float (default=1.0)
A shrinkage parameter (0 < residuals_lr <= 1) which penalizes the residual smoothing.
A value of 0.9 will flatly multiply the residual fit by 0.9 each boosting round, this can be used to allow more signal to the seasonality or linear components.
cov_threshold : float (default=0.7)
The deseasonalized cov is used to auto-set some logic, lowering the cov_threshold will result in simpler and less complex residual smoothing.
If you pass something like 1000 then there will be no safeguards applied.
moving_medians : bool (default=False)
The default behavior is to fit an initial median to the time series. If True, then it will fit a median per seasonal period.
min_alpha : float (default=0.05)
The minimum alpha in the SES ensemble.
max_alpha : float (default=1.0)
The maximum alpha used in the SES ensemble.
trend_penalty : bool (default=True)
Whether to apply a simple penalty to the linear trend component, very useful for dealing with the potentially dangerous piecewise trend.
multiplicative : bool, optional (default=None)
Auto-set based on internal logic. If True, it will simply take the log of the time series.
smoother : bool (default=False)
If True, then a simple exponential ensemble will be used rather than auto settings.
robust : bool, optional (default=None)
If True then MFLES will fit using more reserved methods, i.e. not using piecewise trend or moving average residual smoother.
Auto-set based on internal logic.
verbose : bool (default=False)
Print debugging information.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
alias : str (default='MFLES')
Custom name of the model.
"""
= True
uses_exog
def __init__(
self,
int, List[int]]] = None,
season_length: Optional[Union[int] = None,
fourier_order: Optional[int = 50,
max_rounds: int] = None,
ma: Optional[float = 1.0,
alpha: float = -1.0,
decay: bool = True,
changepoints: float, int] = 0.25,
n_changepoints: Union[float = 0.9,
seasonal_lr: float = 0.9,
trend_lr: float = 1.0,
exogenous_lr: float = 1.0,
residuals_lr: float = 0.7,
cov_threshold: bool = False,
moving_medians: float = 0.05,
min_alpha: float = 1.0,
max_alpha: bool = True,
trend_penalty: bool] = None,
multiplicative: Optional[bool = False,
smoother: bool] = None,
robust: Optional[bool = False,
verbose: = None,
prediction_intervals: Optional[ConformalIntervals] str = 'MFLES',
alias:
):try:
import sklearn # noqa:F401
except ImportError:
raise ImportError("MFLES requires scikit-learn.") from None
self.season_length = season_length
self.fourier_order = fourier_order
self.max_rounds = max_rounds
self.ma = ma
self.alpha = alpha
self.decay = decay
self.changepoints = changepoints
self.n_changepoints = n_changepoints
self.seasonal_lr = seasonal_lr
self.trend_lr = trend_lr
self.exogenous_lr = exogenous_lr
self.residuals_lr = residuals_lr
self.cov_threshold = cov_threshold
self.moving_medians = moving_medians
self.min_alpha = min_alpha
self.max_alpha = max_alpha
self.trend_penalty = trend_penalty
self.multiplicative = multiplicative
self.smoother = smoother
self.robust = robust
self.verbose = verbose
self.prediction_intervals = prediction_intervals
self.alias = alias
def _fit(self, y: np.ndarray, X: Optional[np.ndarray]) -> Dict[str, Any]:
= _MFLES(verbose=self.verbose, robust=self.robust)
model = model.fit(
fitted =y,
y=X,
X=self.season_length,
seasonal_period=self.fourier_order,
fourier_order=self.ma,
ma=self.alpha,
alpha=self.decay,
decay=self.n_changepoints,
n_changepoints=self.seasonal_lr,
seasonal_lr=self.trend_lr,
linear_lr=self.exogenous_lr,
exogenous_lr=self.residuals_lr,
rs_lr=self.cov_threshold,
cov_threshold=self.moving_medians,
moving_medians=self.max_rounds,
max_rounds=self.min_alpha,
min_alpha=self.max_alpha,
max_alpha=self.trend_penalty,
trend_penalty=self.multiplicative,
multiplicative=self.changepoints,
changepoints=self.smoother,
smoother
)return {'model': model, 'fitted': fitted}
def fit(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> 'MFLES':
r"""Fit the model
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like, optional (default=None)
Exogenous of shape (t, n_x).
Returns
-------
self : MFLES
Fitted MFLES object.
"""
= _ensure_float(y)
y self.model_ = self._fit(y=y, X=X)
self._store_cs(y=y, X=X)
= y - self.model_['fitted']
residuals self.model_['sigma'] = _calculate_sigma(residuals, y.size)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[-> Dict[str, Any]:
) r"""Predict with fitted MFLES.
Parameters
----------
h : int
Forecast horizon.
X : array-like, optional (default=None)
Exogenous of shape (h, n_x).
level: List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= {"mean": self.model_["model"].predict(forecast_horizon=h, X=X)}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self, level: Optional[List[int]] = None) -> Dict[str, Any]:
r"""Access fitted SklearnModel insample predictions.
Parameters
----------
level : List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= {'fitted': self.model_['fitted']}
res if level is not None:
= sorted(level)
level = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted: -> Dict[str, Any]:
) r"""Memory Efficient MFLES predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
h : int
Forecast horizon.
X : array-like
Insample exogenous of shape (t, n_x).
X_future : array-like
Exogenous of shape (h, n_x).
level : List[int]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = self._fit(y=y, X=X)
model = {"mean": model['model'].predict(forecast_horizon=h, X=X_future)}
res if fitted:
"fitted"] = model['fitted']
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
= y - res["fitted"]
residuals = _calculate_sigma(residuals, y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
show_doc(MFLES)
show_doc(MFLES.fit)
show_doc(MFLES.predict)
show_doc(MFLES.predict_in_sample)
show_doc(MFLES.forecast)
= 12
h = np.random.rand(ap.size, 2)
X = np.random.rand(h, 2)
X_future
= MFLES()
mfles =deg_ts, X=X, X_future=X_future, h=h, skip_insample=False, test_forward=False)
test_class(mfles, x
= MFLES(prediction_intervals=ConformalIntervals(h=h, n_windows=2))
mfles =ap, X=X, X_future=X_future, h=h, skip_insample=False, level=[80, 95], test_forward=False)
test_class(mfles, x= mfles.forecast(ap, h, X=X, X_future=X_future, fitted=True, level=[80, 95])
fcst_mfles _plot_insample_pi(fcst_mfles)
自动多层特征学习嵌入系统
class AutoMFLES(_TS):
r"""AutoMFLES
Parameters
----------
test_size : int
Forecast horizon used during cross validation.
season_length : int or list of int, optional (default=None)
Number of observations per unit of time. Ex: 24 Hourly data.
n_windows : int (default=2)
Number of windows used for cross validation.
config : dict, optional (default=None)
Mapping from parameter name (from the init arguments of MFLES) to a list of values to try.
If `None`, will use defaults.
step_size : int, optional (default=None)
Step size between each cross validation window. If `None` will be set to test_size.
metric : str (default='smape')
Metric used to select the best model. Possible options are: 'smape', 'mape', 'mse' and 'mae'.
verbose : bool (default=False)
Print debugging information.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
alias : str (default='AutoMFLES')
Custom name of the model.
"""
def __init__(
self,
int,
test_size: int, List[int]]] = None,
season_length: Optional[Union[int = 2,
n_windows: str, Any]] = None,
config: Optional[Dict[int] = None,
step_size: Optional[str = 'smape',
metric: bool = False,
verbose: = None,
prediction_intervals: Optional[ConformalIntervals] str = 'AutoMFLES',
alias:
):try:
import sklearn # noqa:F401
except ImportError:
raise ImportError("MFLES requires scikit-learn.") from None
self.season_length = season_length
self.n_windows = n_windows
self.test_size = test_size
self.config = config
self.step_size = step_size if step_size is not None else test_size
self.metric = metric
self.verbose = verbose
self.prediction_intervals = prediction_intervals
self.alias = alias
def _fit(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> Dict[str, Any]:
= _MFLES(verbose=self.verbose)
model = model.optimize(
optim_params =y,
y=X,
X=self.test_size,
test_size=self.n_windows,
n_steps=self.step_size,
step_size=self.season_length,
seasonal_period=self.metric,
metric=self.config,
params
)# the seasonal_period may've been found during the optimization
= optim_params.pop('seasonal_period', self.season_length)
seasonal_period = model.fit(
fitted =y,
y=X,
X=seasonal_period,
seasonal_period**optim_params,
)return {'model': model, 'fitted': fitted}
def fit(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> 'AutoMFLES':
r"""Fit the model
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like, optional (default=None)
Exogenous of shape (t, n_x).
Returns
-------
self : AutoMFLES
Fitted AutoMFLES object.
"""
= _ensure_float(y)
y self.model_ = self._fit(y=y, X=X)
self._store_cs(y=y, X=X)
= y - self.model_["fitted"]
residuals self.model_["sigma"] = _calculate_sigma(residuals, y.size)
return self
def predict(
self,
int,
h: = None,
X: Optional[np.ndarray] int]] = None,
level: Optional[List[-> Dict[str, Any]:
) r"""Predict with fitted AutoMFLES.
Parameters
----------
h : int
Forecast horizon.
X : array-like, optional (default=None)
Exogenous of shape (h, n_x).
level: List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
r"""
= {"mean": self.model_["model"].predict(forecast_horizon=h, X=X)}
res if level is None:
return res
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_predict_conformal_intervals(res, level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self, level: Optional[List[int]] = None) -> Dict[str, Any]:
r"""Access fitted AutoMFLES insample predictions.
Parameters
----------
level : List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= {"fitted": self.model_["fitted"]}
res if level is not None:
= sorted(level)
level = _add_fitted_pi(res=res, se=self.model_["sigma"], level=level)
res return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted: -> Dict[str, Any]:
) r"""Memory Efficient AutoMFLES predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
h : int
Forecast horizon.
X : array-like
Insample exogenous of shape (t, n_x).
X_future : array-like
Exogenous of shape (h, n_x).
level : List[int]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = self._fit(y=y, X=X)
model = {"mean": model["model"].predict(forecast_horizon=h, X=X_future)}
res if fitted:
"fitted"] = model["fitted"]
res[if level is not None:
= sorted(level)
level if self.prediction_intervals is not None:
= self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
res else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
= y - res["fitted"]
residuals = _calculate_sigma(residuals, y.size)
sigma = _add_fitted_pi(res=res, se=sigma, level=level)
res return res
= 12
h = np.random.rand(ap.size, 2)
X = np.random.rand(h, 2)
X_future
= AutoMFLES(test_size=h, season_length=12)
auto_mfles =deg_ts, X=X, X_future=X_future, h=h, skip_insample=False, test_forward=False)
test_class(auto_mfles, x
= AutoMFLES(test_size=h, season_length=12, prediction_intervals=ConformalIntervals(h=h, n_windows=2))
auto_mfles =ap, X=X, X_future=X_future, h=h, skip_insample=False, level=[80, 95], test_forward=False)
test_class(auto_mfles, x= auto_mfles.forecast(ap, h, X=X, X_future=X_future, fitted=True, level=[80, 95])
fcst_auto_mfles _plot_insample_pi(fcst_auto_mfles)
后备模型
常量模型
class ConstantModel(_TS):
def __init__(self, constant: float, alias: str = 'ConstantModel'):
r"""Constant Model.
Returns Constant values.
Parameters
----------
constant: float
Custom value to return as forecast.
alias: str
Custom name of the model.
"""
self.constant = constant
self.alias = alias
def fit(
self,
y: np.ndarray,= None,
X: Optional[np.ndarray]
):r"""Fit the Constant model.
Fit an Constant Model to a time series (numpy.array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self:
Constant fitted model.
"""
= _ensure_float(y)
y self.n_y = len(y)
self._dtype = y.dtype
return self
def predict(
self,
int, # 预测时间范围
h: = None, # 外生回归变量
X: Optional[np.ndarray] int]] = None # 置信水平
level: Optional[List[
):r"""Predict with fitted ConstantModel.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= np.full(h, self.constant, dtype=self._dtype)
mean = {'mean': mean}
res
if level is not None:
for lv in sorted(level):
f'lo-{lv}'] = mean
res[f'hi-{lv}'] = mean
res[
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted Constant Model insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
= np.full(self.n_y, self.constant, dtype=self._dtype)
fitted = {'fitted': fitted}
res if level is not None:
for lv in sorted(level):
f'fitted-lo-{lv}'] = fitted
res[f'fitted-hi-{lv}'] = fitted
res[
return res
def forecast(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Memory Efficient Constant Model predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h: int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
= _ensure_float(y)
y = np.full(h, self.constant, dtype=y.dtype)
mean = {'mean': mean}
res
if fitted:
= np.full_like(y, self.constant)
fitted_vals 'fitted'] = fitted_vals
res[
if level is not None:
for lv in sorted(level):
f'lo-{lv}'] = mean
res[f'hi-{lv}'] = mean
res[if fitted:
f'fitted-lo-{lv}'] = fitted_vals
res[f'fitted-hi-{lv}'] = fitted_vals
res[return res
def forward(
self,
y: np.ndarray,int,
h: = None,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] int]] = None,
level: Optional[List[bool = False,
fitted:
):r"""Apply Constant model predictions to a new/updated time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `constant` for point predictions and `level_*` for probabilistic predictions.
"""
= self.forecast(y=y, h=h, X=X, X_future=X_future, level=level, fitted=fitted)
res return res
= ConstantModel(constant=1)
constant_model =ap, h=12, level=[90, 80]) test_class(constant_model, x
12, level=[90, 80]) constant_model.forecast(ap,
12, level=[90, 80]) constant_model.forward(ap,
#测试别名参数
test_eq(repr(ConstantModel(1)),
'ConstantModel'
)
test_eq(repr(ConstantModel(1, alias='ConstantModel_custom')),
'ConstantModel_custom'
)
=3) show_doc(ConstantModel, title_level
=3) show_doc(ConstantModel.forecast, title_level
=3) show_doc(ConstantModel.fit, title_level
=3) show_doc(ConstantModel.predict, title_level
=3) show_doc(ConstantModel.predict_in_sample, title_level
=3) show_doc(ConstantModel.forward, title_level
from statsforecast.models import ConstantModel
from statsforecast.utils import AirPassengers as ap
# ConstantModel 的使用示例
= ConstantModel(1)
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
零模型
class ZeroModel(ConstantModel):
def __init__(self, alias: str = 'ZeroModel'):
r"""返回零值预测。
返回零值。
参数
----------
alias: str
模型的自定义名称。
"""
super().__init__(constant=0, alias=alias)
= ZeroModel()
zero_model =ap, h=12, level=[90, 80]) test_class(constant_model, x
12, level=[90, 80]) zero_model.forecast(ap,
12, level=[90, 80]) zero_model.forward(ap,
#测试别名参数
test_eq(repr(ZeroModel()),
'ZeroModel'
)
test_eq(repr(ZeroModel(alias='ZeroModel_custom')),
'ZeroModel_custom'
)
=3) show_doc(ZeroModel, title_level
=3, name='ZeroModel.forecast') show_doc(ZeroModel.forecast, title_level
=3, name='ZeroModel.fit') show_doc(ZeroModel.fit, title_level
=3, name='ZeroModel.predict') show_doc(ZeroModel.predict, title_level
=3, name='ZeroModel.predict_in_sample') show_doc(ZeroModel.predict_in_sample, title_level
=3, name='ZeroModel.forward') show_doc(ZeroModel.forward, title_level
from statsforecast.models import ZeroModel
from statsforecast.utils import AirPassengers as ap
# NanModel的使用示例
= ZeroModel()
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
NaN模型
class NaNModel(ConstantModel):
def __init__(self, alias: str = 'NaNModel'):
r"""NaN模型。
返回NaN值。
参数
----------
alias: str
模型的自定义名称。
"""
super().__init__(constant=np.nan, alias=alias)
= NaNModel()
nanmodel 12, level=[90, 80]) nanmodel.forecast(ap,
#测试别名参数
test_eq(repr(NaNModel()),
'NaNModel'
)
test_eq(repr(NaNModel(alias='NaN_custom')),
'NaN_custom'
)
=3) show_doc(NaNModel, title_level
=3, name='NaNModel.forecast') show_doc(NaNModel.forecast, title_level
=3, name='NaNModel.fit') show_doc(NaNModel.fit, title_level
=3, name='NaNModel.predict') show_doc(NaNModel.predict, title_level
=3, name='NaNModel.predict_in_sample') show_doc(NaNModel.predict_in_sample, title_level
from statsforecast.models import NaNModel
from statsforecast.utils import AirPassengers as ap
# NanModel的使用示例
= NaNModel()
model = model.fit(y=ap)
model = model.predict(h=4)
y_hat_dict y_hat_dict
参考文献
一般信息
自动预测
指数平滑法
简单方法
稀疏间歇性需求
多季节性
Theta家族
GARCH模型
TBATS模型
Give us a ⭐ on Github