模型

%load_ext autoreload
%autoreload 2

StatsForecast 当前支持的模型

StatsForecast 提供了多种模型,分为以下几类:

import warnings
from math import trunc
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

import numpy as np
from numba import njit
from scipy.optimize import minimize
from scipy.special import inv_boxcox

from statsforecast.arima import (
    Arima,
    auto_arima_f,
    fitted_arima,
    forecast_arima, 
    forward_arima,
    is_constant,
)
from statsforecast.ces import (
    auto_ces, forecast_ces,
    forward_ces
)
from statsforecast.ets import (
    _PHI_LOWER,
    _PHI_UPPER,
    ets_f, forecast_ets, 
    forward_ets,
)
from statsforecast.mfles import MFLES as _MFLES
from statsforecast.mstl import mstl
from statsforecast.theta import (
    auto_theta, forecast_theta, 
    forward_theta
)
from statsforecast.garch import (
    garch_model, garch_forecast
)
from statsforecast.tbats import tbats_selection, tbats_forecast, _compute_sigmah
from statsforecast.utils import (
    _calculate_sigma,
    _calculate_intervals,
    _ensure_float,
    _naive,
    _old_kw_to_pos,
    _quantiles,
    _repeat_val,
    _repeat_val_seas,
    _seasonal_naive,
    CACHE,
    ConformalIntervals,
    NOGIL,
)
from datetime import date, timedelta

import matplotlib.pyplot as plt
import pandas as pd
from fastcore.test import test_eq, test_close, test_fail
from nbdev.showdoc import add_docs, show_doc

from statsforecast.garch import generate_garch_data
from statsforecast.utils import AirPassengers as ap
def _plot_insample_pi(fcst):
    
    fig, ax = plt.subplots(1, 1, figsize = (20,7))

    sdate = date(1949,1,1) # 开始日期 
    edate = date(1961,1,1) # 结束日期
    dates = pd.date_range(sdate,edate-timedelta(days=1),freq='m')

    df = pd.DataFrame({'dates': dates,
                       'actual': ap,
                       'fitted': fcst['fitted'],
                       'fitted_lo_80': fcst['fitted-lo-80'], 
                       'fitted_lo_95': fcst['fitted-lo-95'], 
                       'fitted_hi_80': fcst['fitted-hi-80'],
                       'fitted_hi_95': fcst['fitted-hi-95']})
    plt.plot(df.dates, df.actual, color='firebrick', label='Actual value', linewidth=3)
    plt.plot(df.dates, df.fitted, color='navy', label='Fitted values', linewidth=3)
    plt.plot(df.dates, df.fitted_lo_80, color='darkorange', label='fitted-lo-80', linewidth=3)
    plt.plot(df.dates, df.fitted_lo_95, color='deepskyblue', label='fitted-lo-95', linewidth=3)
    plt.plot(df.dates, df.fitted_hi_80, color='darkorange', label='fitted-hi-80', linewidth=3)
    plt.plot(df.dates, df.fitted_hi_95, color='deepskyblue', label='fitted-hi-95', linewidth=3)
    plt.fill_between(df.dates, df.fitted_lo_95, df.fitted_hi_95, color = 'deepskyblue', alpha = 0.2)
    plt.fill_between(df.dates, df.fitted_lo_80, df.fitted_hi_80, color = 'darkorange', alpha = 0.3)
    plt.legend()
    
def _plot_fcst(fcst): 
    
    fig, ax = plt.subplots(1, 1, figsize = (20,7))
    plt.plot(np.arange(0, len(ap)), ap)
    plt.plot(np.arange(len(ap), len(ap) + 13), fcst['mean'], label='mean')
    plt.plot(np.arange(len(ap), len(ap) + 13), fcst['lo-95'], color = 'r', label='lo-95')
    plt.plot(np.arange(len(ap), len(ap) + 13), fcst['hi-95'], color = 'r', label='hi-95')
    plt.plot(np.arange(len(ap), len(ap) + 13), fcst['lo-80'], color = 'g', label='lo-80')
    plt.plot(np.arange(len(ap), len(ap) + 13), fcst['hi-80'], color = 'g', label='hi-80')
    plt.legend()
def _add_fitted_pi(res, se, level):
    level = sorted(level)
    level = np.asarray(level)
    quantiles = _quantiles(level=level)
    lo = res['fitted'].reshape(-1, 1) - quantiles * se.reshape(-1, 1)
    hi = res['fitted'].reshape(-1, 1) + quantiles * se.reshape(-1, 1)
    lo = lo[:, ::-1]
    lo = {f'fitted-lo-{l}': lo[:, i] for i, l in enumerate(reversed(level))}
    hi = {f'fitted-hi-{l}': hi[:, i] for i, l in enumerate(level)}
    res = {**res, **lo, **hi}
    return res
def _add_conformal_distribution_intervals(
    fcst: Dict,
    cs: np.ndarray,
    level: List[Union[int, float]],
) -> Dict:
    r"""
    Adds conformal intervals to the `fcst` dict based on conformal scores `cs`.
    `level` should be already sorted. This strategy creates forecasts paths
    based on errors and calculate quantiles using those paths.
    """
    alphas = [100 - lv for lv in level]
    cuts = [alpha / 200 for alpha in reversed(alphas)]
    cuts.extend(1 - alpha / 200 for alpha in alphas)
    mean = fcst['mean'].reshape(1, -1)
    scores = np.vstack([mean - cs, mean + cs])
    quantiles = np.quantile(
        scores,
        cuts,
        axis=0,
    )
    quantiles = quantiles.reshape(len(cuts), -1)
    lo_cols = [f"lo-{lv}" for lv in reversed(level)]
    hi_cols = [f"hi-{lv}" for lv in level]
    out_cols = lo_cols + hi_cols
    for i, col in enumerate(out_cols):
        fcst[col] = quantiles[i]
    return fcst
def _get_conformal_method(method: str):
    available_methods = {
        "conformal_distribution": _add_conformal_distribution_intervals,
        #"conformal_error": _add_conformal_error_intervals,
    }
    if method not in available_methods.keys():
        raise ValueError(
            f"prediction intervals method {method} not supported "
            f'please choose one of {", ".join(available_methods.keys())}'
        )
    return available_methods[method]
class _TS:
    uses_exog = False
    
    def new(self):
        b = type(self).__new__(type(self))
        b.__dict__.update(self.__dict__)
        return b

    def __repr__(self):
        return self.alias
    
    def _conformity_scores(
        self, 
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ) -> np.ndarray:
        y = _ensure_float(y)
        n_windows = self.prediction_intervals.n_windows # 类型:忽略[属性定义]
        h = self.prediction_intervals.h # 类型:忽略[属性定义]
        n_samples = y.size
        # 尽可能多地使用窗口进行短期系列
        # 训练集减去1
        n_windows = min(n_windows, (n_samples - 1) // h)
        if n_windows < 2:
            raise ValueError(
                f"Prediction intervals settings require at least {2 * h + 1:,} samples, serie has {n_samples:,}."
            )
        test_size = n_windows * h
        cs = np.empty((n_windows, h), dtype=y.dtype)
        for i_window in range(n_windows):
            train_end = n_samples - test_size + i_window * h
            y_train = y[:train_end]
            y_test = y[train_end : train_end + h]
            if X is not None:
                X_train = X[:train_end]
                X_test = X[train_end : train_end + h]
            else:
                X_train = None
                X_test = None
            fcst_window = self.forecast(h=h, y=y_train, X=X_train, X_future=X_test)  # 类型:忽略[属性定义]
            cs[i_window] = np.abs(fcst_window["mean"] - y_test)
        return cs
    
    @property
    def _conformal_method(self):
        return _get_conformal_method(self.prediction_intervals.method)

    def _store_cs(self, y, X):
        if self.prediction_intervals is not None:
            self._cs = self._conformity_scores(y, X)

    def _add_conformal_intervals(self, fcst, y, X, level):
        if self.prediction_intervals is not None and level is not None:
            cs = self._conformity_scores(y, X) if y is not None else self._cs
            res = self._conformal_method(fcst=fcst, cs=cs, level=level)
            return res
        return fcst

    def _add_predict_conformal_intervals(self, fcst, level):
        return self._add_conformal_intervals(fcst=fcst, y=None, X=None, level=level)
# 测试一致性得分
class ZeroModel(_TS):
    
    def __init__(self, prediction_intervals: ConformalIntervals = None):
        self.prediction_intervals = prediction_intervals
        self.alias = 'SumAhead'
    
    def forecast(self, y, h, X=None, X_future=None, fitted=False, level=None):
        res = {'mean': np.zeros(h)}
        if self.prediction_intervals is not None and level is not None:
            cs = self._conformity_scores(y, X)
            res = self._conformal_method(fcst=res, cs=cs, level=level)
        return res
    
    def fit(self, y, X):
        return self
    
    def predict(self, h, X=None, level=None):
        res = {'mean': np.zeros(h)}
        return res
conf_intervals = ConformalIntervals(h=12, n_windows=10)
expected_cs = np.full((conf_intervals.n_windows, conf_intervals.h), np.nan)
cs_info = ap[-conf_intervals.h*conf_intervals.n_windows:]
for i in range(conf_intervals.n_windows):
    expected_cs[i] = cs_info[i * conf_intervals.h:(i+1) * conf_intervals.h]
current_cs = ZeroModel(conf_intervals)._conformity_scores(ap)
test_eq(expected_cs, current_cs)
zero_model = ZeroModel(conf_intervals)
fcst_conformal = zero_model.forecast(ap, h=12, level=[80, 90])
test_eq(list(fcst_conformal.keys()), ['mean', 'lo-90', 'lo-80', 'hi-80', 'hi-90'])

自动化预测

自动ARIMA

class AutoARIMA(_TS):
    r"""AutoARIMA model.

    Automatically selects the best ARIMA (AutoRegressive Integrated Moving Average) 
    model using an information criterion. Default is Akaike Information Criterion (AICc). 
    
    Notes
    -----
    This implementation is a mirror of Hyndman's [forecast::auto.arima](https://github.com/robjhyndman/forecast).
    
    References
    ----------
    [Rob J. Hyndman, Yeasmin Khandakar (2008). "Automatic Time Series Forecasting: The forecast package for R"](https://www.jstatsoft.org/article/view/v027i03).
    
    Parameters
    ----------
    d : Optional[int] 
        Order of first-differencing.
    D : Optional[int] 
        Order of seasonal-differencing.
    max_p : int
        Max autorregresives p.
    max_q : int 
        Max moving averages q.
    max_P : int 
        Max seasonal autorregresives P.
    max_Q : int 
        Max seasonal moving averages Q.
    max_order : int 
        Max p+q+P+Q value if not stepwise selection.
    max_d : int 
        Max non-seasonal differences.
    max_D : int 
        Max seasonal differences.
    start_p : int 
        Starting value of p in stepwise procedure.
    start_q : int 
        Starting value of q in stepwise procedure.
    start_P : int 
        Starting value of P in stepwise procedure.
    start_Q : int 
        Starting value of Q in stepwise procedure.
    stationary : bool 
        If True, restricts search to stationary models.
    seasonal : bool 
        If False, restricts search to non-seasonal models.
    ic : str 
        Information criterion to be used in model selection.
    stepwise : bool
        If True, will do stepwise selection (faster).
    nmodels : int 
        Number of models considered in stepwise search.
    trace : bool 
        If True, the searched ARIMA models is reported.
    approximation : Optional[bool] 
        If True, conditional sums-of-squares estimation, final MLE.
    method : Optional[str] 
        Fitting method between maximum likelihood or sums-of-squares.
    truncate : Optional[int] 
        Observations truncated series used in model selection.
    test : str 
        Unit root test to use. See `ndiffs` for details.
    test_kwargs : Optional[str] 
        Unit root test additional arguments.
    seasonal_test : str 
        Selection method for seasonal differences.
    seasonal_test_kwargs : Optional[dict] 
        Seasonal unit root test arguments.
    allowdrift : bool (default True)
        If True, drift models terms considered.
    allowmean : bool (default True)
        If True, non-zero mean models considered.
    blambda : Optional[float] 
        Box-Cox transformation parameter.
    biasadj : bool 
        Use adjusted back-transformed mean Box-Cox.
    season_length : int 
        Number of observations per unit of time. Ex: 24 Hourly data.
    alias : str 
        Custom name of the model.  
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    uses_exog = True
    
    def __init__(
        self,
        d: Optional[int] = None,
        D: Optional[int] = None,
        max_p: int = 5,
        max_q: int = 5,
        max_P: int = 2,
        max_Q: int = 2,
        max_order: int = 5,
        max_d: int = 2,
        max_D: int = 1,
        start_p: int = 2,
        start_q: int = 2,
        start_P: int = 1,
        start_Q: int = 1,
        stationary: bool = False,
        seasonal: bool = True,
        ic: str = 'aicc',
        stepwise: bool = True,
        nmodels: int = 94,
        trace: bool = False,
        approximation: Optional[bool] = False,
        method: Optional[str] = None,
        truncate: Optional[bool] = None,
        test: str = 'kpss',
        test_kwargs: Optional[str] = None,
        seasonal_test: str = 'seas',
        seasonal_test_kwargs: Optional[Dict] = None,
        allowdrift: bool = True,
        allowmean: bool = True,
        blambda: Optional[float] = None,
        biasadj: bool = False,
        season_length: int = 1,
        alias: str = 'AutoARIMA',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        self.d=d
        self.D=D
        self.max_p=max_p
        self.max_q=max_q
        self.max_P=max_P
        self.max_Q=max_Q
        self.max_order=max_order
        self.max_d=max_d
        self.max_D=max_D
        self.start_p=start_p
        self.start_q=start_q
        self.start_P=start_P
        self.start_Q=start_Q
        self.stationary=stationary
        self.seasonal=seasonal
        self.ic=ic
        self.stepwise=stepwise
        self.nmodels=nmodels
        self.trace=trace
        self.approximation=approximation
        self.method=method
        self.truncate=truncate
        self.test=test
        self.test_kwargs=test_kwargs
        self.seasonal_test=seasonal_test
        self.seasonal_test_kwargs=seasonal_test_kwargs
        self.allowdrift=allowdrift
        self.allowmean=allowmean
        self.blambda=blambda
        self.biasadj=biasadj
        self.season_length=season_length
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        
    def fit(
        self, 
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the AutoARIMA model.

        Fit an AutoARIMA to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X : array-like 
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            AutoARIMA fitted model.
        """
        y = _ensure_float(y)
        with np.errstate(invalid='ignore'):
            self.model_ = auto_arima_f(
                x=y,
                d=self.d,
                D=self.D,
                max_p=self.max_p,
                max_q=self.max_q,
                max_P=self.max_P,
                max_Q=self.max_Q,
                max_order=self.max_order,
                max_d=self.max_d,
                max_D=self.max_D,
                start_p=self.start_p,
                start_q=self.start_q,
                start_P=self.start_P,
                start_Q=self.start_Q,
                stationary=self.stationary,
                seasonal=self.seasonal,
                ic=self.ic,
                stepwise=self.stepwise,
                nmodels=self.nmodels,
                trace=self.trace,
                approximation=self.approximation,
                method=self.method,
                truncate=self.truncate,
                xreg=X,
                test=self.test,
                test_kwargs=self.test_kwargs,
                seasonal_test=self.seasonal_test,
                seasonal_test_kwargs=self.seasonal_test_kwargs,
                allowdrift=self.allowdrift,
                allowmean=self.allowmean,
                blambda=self.blambda,
                biasadj=self.biasadj,
                period=self.season_length
            )

        self._store_cs(y=y, X=X)
        return self
    
    def predict(
        self, 
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted AutoArima.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        fcst = forecast_arima(self.model_, h=h, xreg=X, level=level)
        mean = fcst['mean']
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            res = {
                'mean': mean,
                **{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
                **{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
            }
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted AutoArima insample predictions.

        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = fitted_arima(self.model_)
        res = {'fitted': mean}
        if level is not None:
            se = np.sqrt(self.model_['sigma2'])
            res = _add_fitted_pi(res=res, se=se, level=level)
        return res
    
    def forecast(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient AutoARIMA predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenpus of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x) optional exogenous. 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        with np.errstate(invalid='ignore'):
            mod = auto_arima_f(
                x=y,
                d=self.d,
                D=self.D,
                max_p=self.max_p,
                max_q=self.max_q,
                max_P=self.max_P,
                max_Q=self.max_Q,
                max_order=self.max_order,
                max_d=self.max_d,
                max_D=self.max_D,
                start_p=self.start_p,
                start_q=self.start_q,
                start_P=self.start_P,
                start_Q=self.start_Q,
                stationary=self.stationary,
                seasonal=self.seasonal,
                ic=self.ic,
                stepwise=self.stepwise,
                nmodels=self.nmodels,
                trace=self.trace,
                approximation=self.approximation,
                method=self.method,
                truncate=self.truncate,
                xreg=X,
                test=self.test,
                test_kwargs=self.test_kwargs,
                seasonal_test=self.seasonal_test,
                seasonal_test_kwargs=self.seasonal_test_kwargs,
                allowdrift=self.allowdrift,
                allowmean=self.allowmean,
                blambda=self.blambda,
                biasadj=self.biasadj,
                period=self.season_length
            )
        fcst = forecast_arima(mod, h, xreg=X_future, level=level)
        res = {'mean': fcst['mean']}
        if fitted:
            res['fitted'] = fitted_arima(mod)
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                res = {
                    **res,
                    **{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
                    **{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
                }
            if fitted:
                # add prediction intervals for fitted values
                se = np.sqrt(mod['sigma2'])
                res = _add_fitted_pi(res=res, se=se, level=level)
        return res

    def forward(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Apply fitted ARIMA model to a new time series.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels for prediction intervals.
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        if not hasattr(self, 'model_'):
            raise Exception('You have to use the `fit` method first')
        y = _ensure_float(y)
        with np.errstate(invalid='ignore'):
            mod = forward_arima(self.model_, y=y, xreg=X, method=self.method)
        fcst = forecast_arima(mod, h, xreg=X_future, level=level)
        res = {'mean': fcst['mean']}
        if fitted:
            res['fitted'] = fitted_arima(mod)
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                res = {
                    **res,
                    **{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
                    **{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
                }
            if fitted:
                # add prediction intervals for fitted values
                se = np.sqrt(mod['sigma2'])
                res = _add_fitted_pi(res=res, se=se, level=level)
        return res
def test_class(
    cls_, x, h, skip_insample=False, level=None, test_forward=False, X=None, X_future=None
):
    cls_ = cls_.fit(x, X=X)
    fcst_cls = cls_.predict(h=h, X=X_future)
    test_eq(len(fcst_cls['mean']), h)
    # 测试拟合 + 预测 = 预报
    test_eq(
        cls_.forecast(y=x, h=h, X=X, X_future=X_future)['mean'],
        fcst_cls['mean']
    )
    if not skip_insample:
        test_eq(len(cls_.predict_in_sample()['fitted']), len(x))
        assert isinstance(cls_.predict_in_sample()['fitted'], np.ndarray)
        np.testing.assert_array_equal(
            cls_.forecast(y=x, h=h, X=X, X_future=X_future, fitted=True)['fitted'],
            cls_.predict_in_sample()['fitted'], 
        )
        if test_forward:
            np.testing.assert_array_equal(
                cls_.forward(y=x, h=h, X=X, X_future=X_future, fitted=True)['fitted'],
                cls_.predict_in_sample()['fitted'], 
            )

    if test_forward:
        try:
            pd.testing.assert_frame_equal(
                pd.DataFrame(cls_.predict(h=h, X=X_future)),
                pd.DataFrame(cls_.forward(y=x, X=X, X_future=X_future, h=h)),
            )
        except AssertionError:
            raise Exception('predict and forward methods are not equal')
    
    if level is not None:
        fcst_cls = pd.DataFrame(cls_.predict(h=h, X=X_future, level=level))
        fcst_forecast = pd.DataFrame(cls_.forecast(y=x, h=h, X=X, X_future=X_future, level=level))
        try:
            pd.testing.assert_frame_equal(fcst_cls, fcst_forecast)
        except AssertionError:
            raise Exception('predict and forecast methods are not equal with levels')
            
        if test_forward:
            try:
                pd.testing.assert_frame_equal(
                    pd.DataFrame(cls_.predict(h=h, X=X_future, level=level)),
                    pd.DataFrame(cls_.forward(y=x, h=h, X=X, X_future=X_future, level=level))
                )
            except AssertionError:
                raise Exception('predict and forward methods are not equal with levels')
        
        if not skip_insample:
            fcst_cls = pd.DataFrame(cls_.predict_in_sample(level=level))
            fcst_forecast = cls_.forecast(y=x, h=h, X=X, X_future=X_future, level=level, fitted=True)
            fcst_forecast = pd.DataFrame({key: val for key, val in fcst_forecast.items() if 'fitted' in key})
            try:
                pd.testing.assert_frame_equal(fcst_cls, fcst_forecast)
            except AssertionError:
                raise Exception(
                    'predict and forecast methods are not equal with ' 
                    'levels for fitted values '
                )
            if test_forward:
                fcst_forward = cls_.forecast(y=x, h=h, X=X, X_future=X_future, level=level, fitted=True)
                fcst_forward = pd.DataFrame({key: val for key, val in fcst_forward.items() if 'fitted' in key})
                try:
                    pd.testing.assert_frame_equal(fcst_cls, fcst_forward)
                except AssertionError:
                    raise Exception(
                        'predict and forward methods are not equal with ' 
                        'levels for fitted values '
                    )

def _test_fitted_sparse(model_factory):
    y1 = np.array([2, 5, 0, 1, 3, 0, 1, 1, 0], dtype=np.float64)
    y2 = np.array([0, 0, 1, 0, 0, 7, 1, 0, 1], dtype=np.float64)
    y3 = np.array([0, 0, 1, 0, 0, 7, 1, 0, 0], dtype=np.float64)
    y4 = np.zeros(9, dtype=np.float64)
    for y in [y1, y2, y3, y4]:
        expected_fitted = np.hstack(
            [
                model_factory().forecast(y=y[:i + 1], h=1)['mean']
                for i in range(y.size - 1)]
        )
        np.testing.assert_allclose(
            model_factory().forecast(y=y, h=1, fitted=True)['fitted'],
            np.append(np.nan, expected_fitted),
            atol=1e-6,
        )
arima = AutoARIMA(season_length=12) 
test_class(arima, x=ap, h=12, level=[90, 80], test_forward=True)
fcst_arima = arima.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_arima)
# 测试一致性预测
arima_c = AutoARIMA(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(arima_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_arima_c = arima_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_arima_c["mean"],
    fcst_arima["mean"],
)
_plot_fcst(fcst_arima_c)
#测试别名参数
test_eq(
    repr(AutoARIMA()),
    'AutoARIMA'
)
test_eq(
    repr(AutoARIMA(alias='AutoARIMA_seasonality')),
    'AutoARIMA_seasonality'
)
_plot_fcst(fcst_arima)
show_doc(AutoARIMA, title_level=3)
show_doc(AutoARIMA.fit, title_level=3)
show_doc(AutoARIMA.predict, title_level=3)
show_doc(AutoARIMA.predict_in_sample, title_level=3)
show_doc(AutoARIMA.forecast, title_level=3)
show_doc(AutoARIMA.forward, title_level=3)
from statsforecast.models import AutoARIMA
from statsforecast.utils import AirPassengers as ap
# AutoARIMA的使用示例
arima = AutoARIMA(season_length=4)
arima = arima.fit(y=ap)
y_hat_dict = arima.predict(h=4, level=[80])
y_hat_dict

自动ETS

class AutoETS(_TS):
    r"""Automatic Exponential Smoothing model.

    Automatically selects the best ETS (Error, Trend, Seasonality) 
    model using an information criterion. Default is Akaike Information Criterion (AICc), while particular models are estimated using maximum likelihood.
    The state-space equations can be determined based on their $M$ multiplicative, $A$ additive, 
    $Z$ optimized or $N$ ommited components. The `model` string parameter defines the ETS equations: 
    E in [$M, A, Z$], T in [$N, A, M, Z$], and S in [$N, A, M, Z$].
    
    For example when model='ANN' (additive error, no trend, and no seasonality), ETS will 
    explore only a simple exponential smoothing.
    
    If the component is selected as 'Z', it operates as a placeholder to ask the AutoETS model
    to figure out the best parameter.
    
    Notes
    -----
    This implementation is a mirror of Hyndman's [forecast::ets](https://github.com/robjhyndman/forecast).
    
    References
    ----------
    [Rob J. Hyndman, Yeasmin Khandakar (2008). "Automatic Time Series Forecasting: The forecast package for R"](https://www.jstatsoft.org/article/view/v027i03).
    
    [Hyndman, Rob, et al (2008). "Forecasting with exponential smoothing: the state space approach"](https://robjhyndman.com/expsmooth/).
    
    Parameters
    ----------
    model : str
        Controlling state-space-equations.
    season_length : int
        Number of observations per unit of time. Ex: 24 Hourly data.
    damped : bool
        A parameter that 'dampens' the trend.
    phi : float, optional (default=None)
        Smoothing parameter for trend damping. Only used when `damped=True`.
    alias : str
        Custom name of the model.
    prediction_intervals : Optional[ConformalIntervals],
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    def __init__(
        self, 
        season_length: int = 1,
        model: str = 'ZZZ',
        damped: Optional[bool] = None,
        phi: Optional[float] = None,
        alias: str = 'AutoETS',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        self.season_length = season_length
        self.model = model
        self.damped = damped
        if phi is not None:
            if not isinstance(phi, float):
                raise ValueError('phi must be `None` or float.')
            if not _PHI_LOWER <= phi <= _PHI_UPPER:
                raise ValueError(f'Valid range for phi is [{_PHI_LOWER}, {_PHI_UPPER}]')
        self.phi = phi
        self.alias = alias
        self.prediction_intervals = prediction_intervals
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the Exponential Smoothing model.

        Fit an Exponential Smoothing model to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters 
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X : array-like 
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            Exponential Smoothing fitted model.
        """
        y = _ensure_float(y)
        self.model_ = ets_f(y, m=self.season_length, model=self.model, damped=self.damped, phi=self.phi)
        self.model_['actual_residuals'] = y - self.model_['fitted']
        self._store_cs(y=y, X=X)
        return self
    
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None 
    ):
        r"""Predict with fitted Exponential Smoothing.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like 
            Optional exogenpus of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        fcst = forecast_ets(self.model_, h=h, level=level)
        res = {'mean': fcst['mean']}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            res = {
                **res,
                **{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
                **{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
            }
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted Exponential Smoothing insample predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            residuals = self.model_['actual_residuals']
            se = _calculate_sigma(residuals, len(residuals) - self.model_['n_params'])
            res = _add_fitted_pi(res=res, se=se, level=level)
        return res
    
    def forecast(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient Exponential Smoothing predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int
            Forecast horizon.
        X : array-like 
            Optional insample exogenpus of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        mod = ets_f(y, m=self.season_length, model=self.model, damped=self.damped, phi=self.phi)
        fcst = forecast_ets(mod, h=h, level=level)
        keys = ['mean']
        if fitted:
            keys.append('fitted')
        res = {key: fcst[key] for key in keys}
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                res = {
                    **res,
                    **{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
                    **{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
                }
            if fitted:
                # add prediction intervals for fitted values
                se = _calculate_sigma(y - mod['fitted'], len(y) - mod['n_params'])
                res = _add_fitted_pi(res=res, se=se, level=level)
        return res
    
    def forward(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Apply fitted Exponential Smoothing model to a new time series.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenpus of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        if not hasattr(self, 'model_'):
            raise Exception('You have to use the `fit` method first')
        y = _ensure_float(y)
        mod = forward_ets(self.model_, y=y)
        fcst = forecast_ets(mod, h=h, level=level)
        keys = ['mean']
        if fitted:
            keys.append('fitted')
        res = {key: fcst[key] for key in keys}
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                res = {
                    **res,
                    **{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
                    **{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
                }
            if fitted:
                # add prediction intervals for fitted values
                se = _calculate_sigma(y - mod['fitted'], len(y) - mod['n_params'])
                res = _add_fitted_pi(res=res, se=se, level=level)
        return res
autoets = AutoETS(season_length=12)
test_class(autoets, x=ap, h=12, level=[90, 80], test_forward=True)
fcst_ets = autoets.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_ets)
_plot_fcst(fcst_ets)
#测试别名参数
test_eq(
    repr(AutoETS()),
    'AutoETS'
)
test_eq(
    repr(AutoETS(alias='AutoETS_custom')),
    'AutoETS_custom'
)
autoets = AutoETS(season_length=12, model='AAA')
test_class(autoets, x=ap, h=12, level=[90, 80])
fcst_ets = autoets.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_ets)
# 测试保形预测
autoets_c = AutoETS(season_length=12, model='AAA', prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(autoets_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_ets_c = autoets_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(fcst_ets_c['mean'],
    fcst_ets['mean'])
_plot_fcst(fcst_ets_c)
# 测试预测和拟合预测是否生成相同的结果 
models = ['ANNN', 'AANN', 'ANAN', 'AAAN', 'AAND', 'AAAD', # 班级 1 
          'MNNN', 'MANN', 'MAND', 'MNAN', 'MAAN', 'MAAD', # 二班 
          'MNMN', 'MAMN', 'MAMD'] # 三班 

for k in range(0,len(models)): 
    mod = models[k][0:3]
    damped_val = models[k][-1]
    if damped_val == 'N': 
        damped = False
    else: 
        damped = True
        
    ets = AutoETS(season_length=12, model=mod, damped=damped) 
    test_class(ets, x=ap, h=13, level=[90, 80], test_forward=True)
show_doc(AutoETS, title_level=3)
show_doc(AutoETS.fit, title_level=3)
show_doc(AutoETS.predict, title_level=3)
show_doc(AutoETS.predict_in_sample, title_level=3)
show_doc(AutoETS.forecast, title_level=3)
show_doc(AutoETS.forward, title_level=3)
from statsforecast.models import AutoETS
from statsforecast.utils import AirPassengers as ap
# AutoETS' usage example
# Multiplicative trend, optimal error and seasonality
autoets = AutoETS(model='ZMZ', season_length=4)
autoets = autoets.fit(y=ap)
y_hat_dict = autoets.predict(h=4)
y_hat_dict
class ETS(AutoETS):
    @classmethod
    def _warn(cls):
        warnings.warn(
            '`ETS` will be deprecated in future versions of `StatsForecast`. Please use `AutoETS` instead.',
            category=FutureWarning,
            stacklevel=2
        )
        
    def __init__(self, season_length: int = 1, model: str = 'ZZZ', 
                 damped: Optional[bool] = None,
                 phi: Optional[float] = None,
                 alias: str = 'ETS',
                 prediction_intervals: Optional[ConformalIntervals] = None):
        ETS._warn()
        super().__init__(
            season_length=season_length,
            model=model,
            damped=damped,
            phi=phi,
            alias=alias,
            prediction_intervals=prediction_intervals,
        )
ets = ETS(model='ZMZ', season_length=4)
ets = ETS(model='ZMZ', season_length=4)
autoets = AutoETS(model='ZMZ',  
              season_length=4)
test_eq(ets.forecast(y=ap, h=12)['mean'], 
        autoets.forecast(y=ap, h=12)['mean'])
#测试别名参数
test_eq(
    repr(ETS()),
    'ETS'
)
test_eq(
    repr(ETS(alias='ETS_custom')),
    'ETS_custom'
)

自动化环境控制系统 (AutoCES)

class AutoCES(_TS):
    r"""Complex Exponential Smoothing model.

    Automatically selects the best Complex Exponential Smoothing
    model using an information criterion. Default is Akaike Information Criterion (AICc), while particular 
    models are estimated using maximum likelihood.
    The state-space equations can be determined based on their $S$ simple, $P$ parial, 
    $Z$ optimized or $N$ ommited components. The `model` string parameter defines the 
    kind of CES model: $N$ for simple CES (withous seasonality), $S$ for simple seasonality (lagged CES),
    $P$ for partial seasonality (without complex part), $F$ for full seasonality (lagged CES
    with real and complex seasonal parts).
    
    If the component is selected as 'Z', it operates as a placeholder to ask the AutoCES model
    to figure out the best parameter.
    
    References
    ----------
    [Svetunkov, Ivan & Kourentzes, Nikolaos. (2015). "Complex Exponential Smoothing". 10.13140/RG.2.1.3757.2562. ](https://onlinelibrary.wiley.com/doi/full/10.1002/nav.22074).
    
    Parameters
    ----------
    model : str 
        Controlling state-space-equations.
    season_length : int 
        Number of observations per unit of time. Ex: 24 Hourly data.
    alias : str 
        Custom name of the model.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    
    def __init__(
        self, 
        season_length: int = 1,
        model: str = 'Z',
        alias: str = 'CES',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        self.season_length = season_length
        self.model = model
        self.alias = alias
        self.prediction_intervals = prediction_intervals

    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the Complex Exponential Smoothing model.

        Fit the Complex Exponential Smoothing model to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X : array-like 
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            Complex Exponential Smoothing fitted model.
        """
        y = _ensure_float(y)
        if is_constant(y):
            model = Naive(alias=self.alias, prediction_intervals=self.prediction_intervals)
            model.fit(y=y, X=X)
            return model
        self.model_ = auto_ces(y, m=self.season_length, model=self.model)
        self.model_['actual_residuals'] = y - self.model_['fitted']        
        self._store_cs(y=y, X=X)
        return self
    
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None
    ):
        r"""Predict with fitted Exponential Smoothing.

        Parameters
        ----------
        h : int
            Forecast horizon.
        X : array-like 
            Optional exogenous of shape (h, n_x). 
        level: List[float] 
            Confidence levels (0-100) for prediction intervals. 
        
        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        fcst = forecast_ces(self.model_, h=h, level=level)
        res = {"mean": fcst["mean"]}
        if level is None: 
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            res = {
                **res,
                **{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
                **{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
            }
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted Exponential Smoothing insample predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None: 
            residuals = self.model_['actual_residuals']
            se = _calculate_sigma(residuals, self.model_['n'])
            res = _add_fitted_pi(res=res, se=se, level=level)
        return res
    
    def forecast(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None, 
        fitted: bool = False,
    ):
        r"""Memory Efficient Complex Exponential Smoothing predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenpus of shape (h, n_x). 
        level: List[float]
            Confidence levels (0-100) for prediction intervals. 
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        if is_constant(y):
            model = Naive(alias=self.alias, prediction_intervals=self.prediction_intervals)
            return model.forecast(y=y, h=h, X=X, X_future=X_future, level=level, fitted=fitted)
        mod = auto_ces(y, m=self.season_length, model=self.model)
        fcst = forecast_ces(mod, h, level=level)
        keys = ['mean']
        if fitted:
            keys.append('fitted')
        res = {key: fcst[key] for key in keys}
        if level is not None: 
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                res = {
                    **res, 
                    **{f'lo-{l}': fcst[f'lo-{l}'] for l in reversed(level)}, 
                    **{f'hi-{l}': fcst[f'hi-{l}'] for l in level}, 
                }
            if fitted: 
                # 为拟合值添加预测区间 
                se = _calculate_sigma(y - mod['fitted'], len(y)) 
                res = _add_fitted_pi(res=res, se=se, level=level)
        return res
    
    def forward(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Apply fitted Complex Exponential Smoothing to a new time series.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenpus of shape (h, n_x). 
        level: List[float]
            Confidence levels (0-100) for prediction intervals. 
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        if not hasattr(self, 'model_'):
            raise Exception('You have to use the `fit` method first')
        y = _ensure_float(y)
        mod = forward_ces(self.model_, y=y)
        fcst = forecast_ces(mod, h, level=level)
        keys = ['mean']
        if fitted:
            keys.append('fitted')
        res = {key: fcst[key] for key in keys}
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                res = {
                    **res,
                    **{f'lo-{l}': fcst[f'lo-{l}'] for l in reversed(level)},
                    **{f'hi-{l}': fcst[f'hi-{l}'] for l in level},
                }
            if fitted:
                # 为拟合值添加预测区间
                se = _calculate_sigma(y - mod['fitted'], len(y))
                res = _add_fitted_pi(res=res, se=se, level=level)
        return res
autoces = AutoCES(season_length=12)
fcst_ces = autoces.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_ces)
# 测试保形预测
autoces_c = AutoCES(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(autoces_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_ces_c = autoces_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_ces["mean"],
    fcst_ces_c["mean"]
)
_plot_fcst(fcst_ces)
_plot_fcst(fcst_ces_c)
fit = autoces.fit(ap) 
fcst = fit.predict(13, None, (80,95))
_plot_fcst(fit.predict(13, None, (80,95)))

values = ['mean', 'lo-80', 'lo-95', 'hi-80', 'hi-95']
for k in range(0, len(values)): 
    np.testing.assert_equal(
    fcst_ces[values[k]], 
    fcst[values[k]]
)
pi_insample = fit.predict_in_sample((80,95))
_plot_insample_pi(pi_insample)

values = ['fitted', 'fitted-lo-80', 'fitted-lo-95', 'fitted-hi-80', 'fitted-hi-95']
for k in range(0, len(values)): 
    np.testing.assert_equal(
    fcst_ces[values[k]], 
    pi_insample[values[k]]
)
ces = AutoCES(season_length=12)
test_class(ces, x=ap, h=12, test_forward=True, level=[90, 80])
#测试别名参数
test_eq(
    repr(AutoCES()),
    'CES'
)
test_eq(
    repr(AutoCES(alias='AutoCES_custom')),
    'AutoCES_custom'
)
show_doc(AutoCES, title_level=3)
show_doc(AutoCES.fit, title_level=3)
show_doc(AutoCES.predict, title_level=3)
show_doc(AutoCES.predict_in_sample, title_level=3)
show_doc(AutoCES.forecast, title_level=3)
show_doc(AutoCES.forward, title_level=3)
from statsforecast.models import AutoCES
from statsforecast.utils import AirPassengers as ap
# CES' usage example
# Multiplicative trend, optimal error and seasonality
ces = AutoCES(model='Z',  
              season_length=4)
ces = ces.fit(y=ap)
y_hat_dict = ces.predict(h=4)
y_hat_dict

自动Theta

class AutoTheta(_TS):
    r"""AutoTheta model.

    Automatically selects the best Theta (Standard Theta Model ('STM'),
    Optimized Theta Model ('OTM'), Dynamic Standard Theta Model ('DSTM'),
    Dynamic Optimized Theta Model ('DOTM')) model using mse. 
    
    References
    ----------
    [Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
    
    Parameters
    ----------
    season_length : int
        Number of observations per unit of time. Ex: 24 Hourly data.
    decomposition_type : str 
        Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
    model : str 
        Controlling Theta Model. By default searchs the best model.
    alias : str 
        Custom name of the model.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.    
    """
    def __init__(
        self,
        season_length: int = 1,
        decomposition_type: str = 'multiplicative',
        model: Optional[str] = None,
        alias: str = 'AutoTheta',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        self.season_length = season_length
        self.decomposition_type = decomposition_type
        self.model = model
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        
    def fit(
        self, 
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the AutoTheta model.

        Fit an AutoTheta model to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X : array-like 
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            AutoTheta fitted model.
        """
        y = _ensure_float(y)
        self.model_ = auto_theta(
            y=y,
            m=self.season_length, 
            model=self.model, 
            decomposition_type=self.decomposition_type,
        )
        self.model_['fitted'] = y - self.model_['residuals']
        self._store_cs(y, X)
        return self
    
    def predict(
        self, 
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted AutoTheta.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        fcst = forecast_theta(self.model_, h=h, level=level)
        if self.prediction_intervals is not None and level is not None:
            fcst = self._add_predict_conformal_intervals(fcst, level)
        return fcst
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted AutoTheta insample predictions.

        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            se = np.std(self.model_['residuals'][3:], ddof=1)
            res = _add_fitted_pi(res=res, se=se, level=level)
        return res
    
    def forecast(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient AutoTheta predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        mod = auto_theta(
            y=y, 
            m=self.season_length, 
            model=self.model, 
            decomposition_type=self.decomposition_type
        )
        res = forecast_theta(mod, h, level=level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        if fitted:
            res['fitted'] = y - mod['residuals']
        if level is not None and fitted:
            # 为拟合值添加预测区间
            se = np.std(mod['residuals'][3:], ddof=1)
            res = _add_fitted_pi(res=res, se=se, level=level)
        return res
    
    def forward(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Apply fitted AutoTheta to a new time series.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        if not hasattr(self, 'model_'):
            raise Exception('You have to use the `fit` method first')
        y = _ensure_float(y)
        mod = forward_theta(self.model_, y=y)
        res = forecast_theta(mod, h, level=level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        if fitted:
            res['fitted'] = y - mod['residuals']
        if level is not None and fitted:
            # 为拟合值添加预测区间
            se = np.std(mod['residuals'][3:], ddof=1)
            res = _add_fitted_pi(res=res, se=se, level=level)
        return res
theta = AutoTheta(season_length=12)
test_class(theta, x=ap, h=12, level=[80, 90], test_forward=True)
fcst_theta = theta.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_theta)
# 测试一致性预测
theta_c = AutoTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(theta_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_theta_c = theta_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_theta_c['mean'],
    fcst_theta['mean'],
)
_plot_fcst(fcst_theta_c)
zero_theta = theta.forward(np.zeros(10), h=12, level=[80, 90], fitted=True)
#测试别名参数
test_eq(
    repr(AutoTheta()),
    'AutoTheta'
)
test_eq(
    repr(AutoTheta(alias='AutoTheta_custom')),
    'AutoTheta_custom'
)
show_doc(AutoTheta, title_level=3)
show_doc(AutoTheta.fit, title_level=3)
show_doc(AutoTheta.predict, title_level=3)
show_doc(AutoTheta.predict_in_sample, title_level=3)
show_doc(AutoTheta.forecast, title_level=3)
show_doc(AutoTheta.forward, title_level=3)
from statsforecast.models import AutoTheta
from statsforecast.utils import AirPassengers as ap
# AutoTheta的使用示例
theta = AutoTheta(season_length=4)
theta = theta.fit(y=ap)
y_hat_dict = theta.predict(h=4)
y_hat_dict

ARIMA家族

自回归积分滑动平均模型 (ARIMA)

class ARIMA(_TS):
    r"""ARIMA model.

    AutoRegressive Integrated Moving Average model.

    References
    ----------
    [Rob J. Hyndman, Yeasmin Khandakar (2008). "Automatic Time Series Forecasting: The forecast package for R"](https://www.jstatsoft.org/article/view/v027i03).
    
    Parameters
    ----------
    order : tuple (default=(0, 0, 0))
        A specification of the non-seasonal part of the ARIMA model: the three components (p, d, q) are the AR order, the degree of differencing, and the MA order.
    season_length : int (default=1)
        Number of observations per unit of time. Ex: 24 Hourly data.
    seasonal_order : tuple (default=(0, 0, 0))
        A specification of the seasonal part of the ARIMA model.
        (P, D, Q) for the  AR order, the degree of differencing, the MA order.
    include_mean : bool (default=True)
        Should the ARIMA model include a mean term? 
        The default is True for undifferenced series, False for differenced ones (where a mean would not affect the fit nor predictions).
    include_drift : bool (default=False)
        Should the ARIMA model include a linear drift term? 
        (i.e., a linear regression with ARIMA errors is fitted.)
    include_constant : bool, optional (default=None)
        If True, then includ_mean is set to be True for undifferenced series and include_drift is set to be True for differenced series. 
        Note that if there is more than one difference taken, no constant is included regardless of the value of this argument. 
        This is deliberate as otherwise quadratic and higher order polynomial trends would be induced.
    blambda : float, optional (default=None)
        Box-Cox transformation parameter.
    biasadj : bool (default=False)
        Use adjusted back-transformed mean Box-Cox.
    method : str (default='CSS-ML')
        Fitting method: maximum likelihood or minimize conditional sum-of-squares. 
        The default (unless there are missing values) is to use conditional-sum-of-squares to find starting values, then maximum likelihood.
    fixed : dict, optional (default=None)
        Dictionary containing fixed coefficients for the arima model. Example: `{'ar1': 0.5, 'ma2': 0.75}`.
        For autoregressive terms use the `ar{i}` keys. For its seasonal version use `sar{i}`.
        For moving average terms use the `ma{i}` keys. For its seasonal version use `sma{i}`.
        For intercept and drift use the `intercept` and `drift` keys.
        For exogenous variables use the `ex_{i}` keys.
    alias : str
        Custom name of the model.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    uses_exog = True

    def __init__(
        self,
        order: Tuple[int, int, int] = (0, 0, 0),
        season_length: int = 1,
        seasonal_order: Tuple[int, int, int] = (0, 0, 0),
        include_mean: bool = True,
        include_drift: bool = False,
        include_constant: Optional[bool] = None,
        blambda: Optional[float] = None,
        biasadj: bool = False,
        method: str = 'CSS-ML',
        fixed: Optional[dict] = None, 
        alias: str = 'ARIMA',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        self.order=order
        self.season_length=season_length
        self.seasonal_order=seasonal_order
        self.include_mean=include_mean
        self.include_drift=include_drift
        self.include_constant=include_constant
        self.blambda=blambda
        self.biasadj=biasadj
        self.method=method
        self.fixed=fixed
        self.alias=alias
        self.prediction_intervals=prediction_intervals
    
    def fit(
        self, 
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""
        Fit the model to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X : array-like 
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            Fitted model.
        """
        y = _ensure_float(y)
        with np.errstate(invalid='ignore'):
            self.model_ = Arima(
                x=y,
                order=self.order,
                seasonal={'order': self.seasonal_order, 
                          'period': self.season_length},
                xreg=X,
                include_mean=self.include_mean,
                include_constant=self.include_constant,
                include_drift=self.include_drift,
                blambda=self.blambda,
                biasadj=self.biasadj,
                method=self.method,
                fixed=self.fixed
            )
        self._store_cs(y=y, X=X)
        return self
    
    def predict(
        self, 
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted model.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        fcst = forecast_arima(self.model_, h=h, xreg=X, level=level)
        mean = fcst['mean']
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            res = {
                "mean": mean,
                **{f"lo-{l}": fcst["lower"][f"{l}%"] for l in reversed(level)},
                **{f"hi-{l}": fcst["upper"][f"{l}%"] for l in level},
            }
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted insample predictions.

        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = fitted_arima(self.model_)
        res = {'fitted': mean}
        if level is not None:
            se = np.sqrt(self.model_['sigma2'])
            res = _add_fitted_pi(res=res, se=se, level=level)
        return res
    
    def forecast(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory efficient predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x) optional exogenous. 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        with np.errstate(invalid='ignore'):
            mod = Arima(
                x=y,
                order=self.order,
                seasonal={'order': self.seasonal_order, 
                          'period': self.season_length},
                xreg=X,
                include_mean=self.include_mean,
                include_constant=self.include_constant,
                include_drift=self.include_drift,
                blambda=self.blambda,
                biasadj=self.biasadj,
                method=self.method,
                fixed=self.fixed
            )
        fcst = forecast_arima(mod, h, xreg=X_future, level=level)
        res = {'mean': fcst['mean']}
        if fitted:
            res['fitted'] = fitted_arima(mod)
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                res = {
                    **res,
                    **{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
                    **{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
                }
            if fitted:
                # 为拟合值添加预测区间
                se = np.sqrt(mod['sigma2'])
                res = _add_fitted_pi(res=res, se=se, level=level)
        return res

    def forward(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Apply fitted model to a new time series.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels for prediction intervals.
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        if not hasattr(self, 'model_'):
            raise Exception('You have to use the `fit` method first')
        y = _ensure_float(y)
        with np.errstate(invalid='ignore'):
            mod = forward_arima(self.model_, y=y, xreg=X, method=self.method)
        fcst = forecast_arima(mod, h, xreg=X_future, level=level)
        res = {'mean': fcst['mean']}
        if fitted:
            res['fitted'] = fitted_arima(mod)
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                res = {
                    **res,
                    **{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
                    **{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
                }
            if fitted:
                # 为拟合值添加预测区间
                se = np.sqrt(mod['sigma2'])
                res = _add_fitted_pi(res=res, se=se, level=level)
        return res
simple_arima = ARIMA(order=(1, 0, 0), season_length=12) 
test_class(simple_arima, x=ap, h=12, level=[90, 80], test_forward=True)
fcst_simple_arima = simple_arima.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_simple_arima)
# 测试一致性预测
simple_arima_c = ARIMA(order=(1, 0, 0), season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(simple_arima_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_simple_arima_c = simple_arima_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_simple_arima_c['mean'],
    fcst_simple_arima['mean'],
)
_plot_fcst(fcst_simple_arima_c)
simple_arima = ARIMA(order=(2, 0, 0), season_length=12, fixed={'ar1': 0.5, 'ar2': 0.5}) 
test_class(simple_arima, x=ap, h=12, level=[90, 80], test_forward=True)
fcst_simple_arima = simple_arima.forecast(ap, 4, None, None, (80,95), True)
_plot_insample_pi(fcst_simple_arima)
test_eq(
    fcst_simple_arima['mean'],
    np.array([411., 421.5, 416.25, 418.875])
)
#测试别名参数
test_eq(
    repr(ARIMA()),
    'ARIMA'
)
test_eq(
    repr(ARIMA(alias='ARIMA_seasonality')),
    'ARIMA_seasonality'
)
show_doc(ARIMA, title_level=3)
show_doc(ARIMA.fit, title_level=3)
show_doc(ARIMA.predict, title_level=3)
show_doc(ARIMA.predict_in_sample, title_level=3)
show_doc(ARIMA.forecast, title_level=3)
show_doc(ARIMA.forward, title_level=3)
from statsforecast.models import ARIMA
from statsforecast.utils import AirPassengers as ap
# ARIMA的应用示例
arima = ARIMA(order=(1, 0, 0), season_length=12)
arima = arima.fit(y=ap)
y_hat_dict = arima.predict(h=4, level=[80])
y_hat_dict

自回归

class AutoRegressive(ARIMA):
    r"""Simple Autoregressive model.

    Parameters
    ----------
    lags : int or list 
        Number of lags to include in the model. 
        If an int is passed then all lags up to `lags` are considered.
        If a list, only the elements of the list are considered as lags.
    include_mean : bool (default=True)
        Should the AutoRegressive model include a mean term? 
        The default is True for undifferenced series, False for differenced ones (where a mean would not affect the fit nor predictions).
    include_drift : bool (default=False)
        Should the AutoRegressive model include a linear drift term? 
        (i.e., a linear regression with AutoRegressive errors is fitted.)
    blambda : float, optional (default=None)
        Box-Cox transformation parameter.
    biasadj : bool (default=False)
        Use adjusted back-transformed mean Box-Cox.
    method : str (default='CSS-ML')
        Fitting method: maximum likelihood or minimize conditional sum-of-squares. 
        The default (unless there are missing values) is to use conditional-sum-of-squares to find starting values, then maximum likelihood.
    fixed : dict, optional (default=None)
        Dictionary containing fixed coefficients for the AutoRegressive model. Example: `{'ar1': 0.5, 'ar5': 0.75}`.
        For autoregressive terms use the `ar{i}` keys.
    alias : str
        Custom name of the model.  
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    uses_exog = True
    
    def __init__(
        self,
        lags: Tuple[int, List],
        include_mean: bool = True,
        include_drift: bool = False,
        blambda: Optional[float] = None,
        biasadj: bool = False,
        method: str = 'CSS-ML',
        fixed: Optional[dict] = None, 
        alias: str = 'AutoRegressive',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        if isinstance(lags, int):
            order = (lags, 0, 0)
        elif isinstance(lags, list):
            order = (max(lags), 0, 0)
            fixed_lags = {f'ar{i+1}': np.nan if (i+1) in lags else 0 for i in range(max(lags))}
            if fixed is not None:
                fixed_lags.update(fixed)
            fixed = fixed_lags
        else:
            raise ValueError('Please provide an int or a list specifying the lags to use.')
        super().__init__(
            order=order,
            include_mean=include_mean, 
            include_drift=include_drift,
            blambda=blambda,
            biasadj=biasadj,
            method=method,
            alias=alias,
            fixed=fixed,
            prediction_intervals=prediction_intervals,
        )
ar = AutoRegressive(lags=[12], fixed={'ar12': 0.9999999}) 
test_class(ar, x=ap, h=12, level=[90, 80], test_forward=True)
fcst_ar = ar.forecast(ap, 13, None, None, (80,95), True)
# 我们应该恢复季节性朴素
test_close(
    fcst_ar['mean'][:-1],
    ap[-12:], 
    eps=1e-4
)
_plot_insample_pi(fcst_simple_arima)
# 测试一致性预测
ar_c = AutoRegressive(lags=[12], fixed={'ar12': 0.9999999}, prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(ar_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_ar_c = ar_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_ar_c['mean'],
    fcst_ar['mean'],
)
_plot_fcst(fcst_ar_c)
#测试别名参数
test_eq(
    repr(AutoRegressive(lags=[12])),
    'AutoRegressive'
)
test_eq(
    repr(AutoRegressive(lags=[12], alias='AutoRegressive_lag12')),
    'AutoRegressive_lag12'
)
show_doc(AutoRegressive, title_level=3)
show_doc(AutoRegressive.fit, title_level=3, name='AutoRegressive.fit')
show_doc(AutoRegressive.predict, title_level=3, name='AutoRegressive.predict')
show_doc(AutoRegressive.predict_in_sample, title_level=3, name='AutoRegressive.predict_in_sample')
show_doc(AutoRegressive.forecast, title_level=3, name='AutoRegressive.forecast')
show_doc(AutoRegressive.forward, title_level=3, name='AutoRegressive.forward')
from statsforecast.models import AutoRegressive
from statsforecast.utils import AirPassengers as ap
# 自回归模型的应用示例
ar = AutoRegressive(lags=[12])
ar = ar.fit(y=ap)
y_hat_dict = ar.predict(h=4, level=[80])
y_hat_dict

指数平滑法

简单平滑

@njit(nogil=NOGIL, cache=CACHE)
def _ses_fcst_mse(x: np.ndarray, alpha: float) -> Tuple[float, float, np.ndarray]:
    r"""对一个序列执行简单指数平滑。

    该函数返回一步预测值以及拟合的均方误差。
    """
    smoothed = x[0]
    n = x.size
    mse = 0.
    fitted = np.full(n, np.nan, dtype=x.dtype)

    for i in range(1, n):
        smoothed = (alpha * x[i - 1] + (1 - alpha) * smoothed).item()
        error = x[i] - smoothed
        mse += error * error
        fitted[i] = smoothed

    mse /= n
    forecast = alpha * x[-1] + (1 - alpha) * smoothed
    return forecast, mse, fitted


def _ses_mse(alpha: float, x: np.ndarray) -> float:
    r"""计算简单指数平滑拟合的均方误差。"""
    _, mse, _ = _ses_fcst_mse(x, alpha)
    return mse


def _ses_forecast(x: np.ndarray, alpha: float) -> Tuple[float, np.ndarray]:
    r"""一步超前预测,采用简单指数平滑法。"""
    forecast, _, fitted = _ses_fcst_mse(x, alpha)
    return forecast, fitted


def _demand(x: np.ndarray) -> np.ndarray:
    r"""提取向量中的正元素。"""
    return x[x > 0]


def _intervals(x: np.ndarray) -> np.ndarray:
    r"""计算向量中非零元素之间的间隔。"""
    nonzero_idxs = np.where(x != 0)[0]
    return np.diff(nonzero_idxs + 1, prepend=0).astype(x.dtype)


def _probability(x: np.ndarray) -> np.ndarray:
    r"""计算元素为非零的概率。"""
    return (x != 0).astype(x.dtype)


def _optimized_ses_forecast(
        x: np.ndarray,
        bounds: Sequence[Tuple[float, float]] = [(0.1, 0.3)]
    ) -> Tuple[float, np.ndarray]:
    r"""搜索最优的alpha值并计算一步超前简单指数平滑预测。"""
    alpha = minimize(
        fun=_ses_mse,
        x0=(0,),
        args=(x,),
        bounds=bounds,
        method='L-BFGS-B'
    ).x[0]
    forecast, fitted = _ses_forecast(x, alpha)
    return forecast, fitted


def _chunk_sums(array: np.ndarray, chunk_size: int) -> np.ndarray:
    r"""将数组分割成多个块,并返回每个块的总和。

不完整的块将被丢弃。"""
    n_chunks = array.size // chunk_size
    n_elems = n_chunks * chunk_size
    return array[:n_elems].reshape(n_chunks, chunk_size).sum(axis=1)
def _ses(
    y: np.ndarray, # 时间序列
    h: int, # 预测时间范围
    fitted: bool, # 拟合值
    alpha: float, # 平滑参数
) -> Dict[str, np.ndarray]: 
    fcst, _, fitted_vals = _ses_fcst_mse(y, alpha)
    fcst = {'mean': _repeat_val(val=fcst, h=h)}
    if fitted:
        fcst['fitted'] = fitted_vals
    return fcst
class SimpleExponentialSmoothing(_TS):
    r"""SimpleExponentialSmoothing model.

    Uses a weighted average of all past observations where the weights decrease exponentially into the past. 
    Suitable for data with no clear trend or seasonality. 
    Assuming there are $t$ observations, the one-step forecast is given by: $\hat{y}_{t+1} = \alpha y_t + (1-\alpha) \hat{y}_{t-1}$

    The rate $0 \leq \alpha \leq 1$ at which the weights decrease is called the smoothing parameter. When $\alpha = 1$, SES is equal to the naive method.

    References
    ----------
    [Charles C Holt (1957). “Forecasting seasonals and trends by exponentially weighted moving averages”](https://doi.org/10.1016/j.ijforecast).

    Parameters
    ----------
    alpha : float 
        Smoothing parameter.
    alias : str 
        Custom name of the model. 
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    def __init__(
        self, 
        alpha: float,
        alias: str = 'SES',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        self.alpha = alpha
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True
        
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the SimpleExponentialSmoothing model.

        Fit an SimpleExponentialSmoothing to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X : array-like 
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            SimpleExponentialSmoothing fitted model.
        """
        y = _ensure_float(y)
        mod = _ses(y=y, alpha=self.alpha, h=1, fitted=True)
        self.model_ = dict(mod)
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted SimpleExponentialSmoothing.

        Parameters
        ----------
        h : int
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val(val=self.model_['mean'][0], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to " "compute them.")
        return res
    
    def predict_in_sample(self):
        r"""Access fitted SimpleExponentialSmoothing insample predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions.
            
        """
        res = {'fitted': self.model_['fitted']}
        return res
    
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient SimpleExponentialSmoothing predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _ses(y=y, h=h, fitted=fitted, alpha=self.alpha)
        res = dict(res)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        else:
            raise Exception("You must pass `prediction_intervals` to " "compute them.")
        return res
ses = SimpleExponentialSmoothing(alpha=0.1)
test_class(ses, x=ap, h=12)
#更多测试
ses = ses.fit(ap)
fcst_ses = ses.predict(12)
test_close(fcst_ses['mean'], np.repeat(460.3028, 12), eps=1e-4)
从R中恢复这些残差
#你必须通过initial="simple"
在 `ses` 函数中
np.testing.assert_allclose(
    ses.predict_in_sample()['fitted'][[0, 1, -1]], 
    np.array([np.nan, 118 - 6., 432 + 31.447525])
)
# 测试一致性预测
ses_c = SimpleExponentialSmoothing(alpha=0.1, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(ses_c, x=ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
fcst_ses_c = ses_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_ses_c['mean'][:12],
    fcst_ses['mean']
)
_plot_fcst(fcst_ses_c)
#测试别名参数
test_eq(
    repr(SimpleExponentialSmoothing(alpha=0.1)),
    'SES'
)
test_eq(
    repr(SimpleExponentialSmoothing(alpha=0.1, alias='SES_custom')),
    'SES_custom'
)
show_doc(SimpleExponentialSmoothing, title_level=3)
show_doc(SimpleExponentialSmoothing.forecast, title_level=3)
show_doc(SimpleExponentialSmoothing.fit, title_level=3)
show_doc(SimpleExponentialSmoothing.predict, title_level=3)
show_doc(SimpleExponentialSmoothing.predict_in_sample, title_level=3)
from statsforecast.models import SimpleExponentialSmoothing
from statsforecast.utils import AirPassengers as ap
# 简单指数平滑的使用示例
ses = SimpleExponentialSmoothing(alpha=0.5)
ses = ses.fit(y=ap)
y_hat_dict = ses.predict(h=4)
y_hat_dict

简单平滑优化

def _ses_optimized(
        y: np.ndarray, # 时间序列
        h: int, # 预测时间范围
        fitted: bool, # 拟合值
    ):
    fcst_, fitted_vals = _optimized_ses_forecast(y, [(0.01, 0.99)])
    mean = _repeat_val(val=fcst_, h=h)
    fcst = {'mean': mean}
    if fitted:
        fcst['fitted'] = fitted_vals
    return fcst
class SimpleExponentialSmoothingOptimized(_TS):
    r"""SimpleExponentialSmoothing model.

    Uses a weighted average of all past observations where the weights decrease exponentially into the past. 
    Suitable for data with no clear trend or seasonality. 
    Assuming there are $t$ observations, the one-step forecast is given by: $\hat{y}_{t+1} = \alpha y_t + (1-\alpha) \hat{y}_{t-1}$

    The smoothing parameter $\alpha^*$ is optimized by square error minimization.

    References
    ----------
    [Charles C Holt (1957). “Forecasting seasonals and trends by exponentially weighted moving averages”](https://doi.org/10.1016/j.ijforecast).

    Parameters
    ----------
    alias: str 
        Custom name of the model.   
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        This is required for generating future prediction intervals.
    """
    def __init__(
        self,
        alias: str = "SESOpt",
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True

    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the SimpleExponentialSmoothingOptimized model.

        Fit an SimpleExponentialSmoothingOptimized to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X : array-like 
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            SimpleExponentialSmoothingOptimized fitted model.
        """
        y = _ensure_float(y)
        mod = _ses_optimized(y=y, h=1, fitted=True)
        self.model_ = dict(mod)
        self._store_cs(y=y, X=X)
        return self

    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted SimpleExponentialSmoothingOptimized.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x).
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val(val=self.model_['mean'][0], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res

    def predict_in_sample(self):
        r"""Access fitted SimpleExponentialSmoothingOptimized insample predictions.

        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions.
        """
        res = {'fitted': self.model_['fitted']}
        return res

    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient SimpleExponentialSmoothingOptimized predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _ses_optimized(y=y, h=h, fitted=fitted)
        res = dict(res)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
ses_op = SimpleExponentialSmoothingOptimized()
test_class(ses_op, x=ap, h=12)
ses_op = ses_op.fit(ap)
fcst_ses_op = ses_op.predict(12)
# 测试一致性预测
ses_op_c = SimpleExponentialSmoothingOptimized(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(ses_op_c, x=ap, h=13, level=[90, 80], skip_insample=True)
fcst_ses_op_c = ses_op_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_ses_op_c['mean'][:12],
    fcst_ses_op['mean']
    )
_plot_fcst(fcst_ses_op_c)
#测试别名参数
test_eq(
    repr(SimpleExponentialSmoothingOptimized()),
    'SESOpt'
)
test_eq(
    repr(SimpleExponentialSmoothingOptimized(alias='SESOpt_custom')),
    'SESOpt_custom'
)
show_doc(SimpleExponentialSmoothingOptimized, title_level=3)
show_doc(SimpleExponentialSmoothingOptimized.fit, title_level=3)
show_doc(SimpleExponentialSmoothingOptimized.predict, title_level=3)
show_doc(SimpleExponentialSmoothingOptimized.predict_in_sample, title_level=3)
show_doc(SimpleExponentialSmoothingOptimized.forecast, title_level=3)
from statsforecast.models import SimpleExponentialSmoothingOptimized
from statsforecast.utils import AirPassengers as ap
# 简单指数平滑优化模型的应用示例
seso = SimpleExponentialSmoothingOptimized()
seso = seso.fit(y=ap)
y_hat_dict = seso.predict(h=4)
y_hat_dict

季节平滑

def _seasonal_exponential_smoothing(
    y: np.ndarray, # 时间序列
    h: int, # 预测时间范围
    fitted: bool, # 拟合值
    season_length: int, # 赛季时长
    alpha: float, # 平滑参数
) -> Dict[str, np.ndarray]:
    n = y.size
    if n < season_length:
        return {'mean': np.full(h, np.nan, dtype=y.dtype)}
    season_vals = np.empty(season_length, dtype=y.dtype)
    fitted_vals = np.full_like(y, np.nan)
    for i in range(season_length):
        init_idx = (i + n % season_length)
        season_vals[i], fitted_vals[init_idx::season_length] = _ses_forecast(y[init_idx::season_length], alpha)
    out = _repeat_val_seas(season_vals=season_vals, h=h)
    fcst = {'mean': out}
    if fitted:
        fcst['fitted'] = fitted_vals
    return fcst
class SeasonalExponentialSmoothing(_TS):
    r"""SeasonalExponentialSmoothing model.

    Uses a weighted average of all past observations where the weights decrease exponentially into the past. 
    Suitable for data with no clear trend or seasonality. 
    Assuming there are $t$ observations and season $s$, the one-step forecast is given by: 
    $\hat{y}_{t+1,s} = \alpha y_t + (1-\alpha) \hat{y}_{t-1,s}$

    Notes
    -----
    This method is an extremely simplified of Holt-Winter's method where the trend and level are set to zero.
    And a single seasonal smoothing parameter $\alpha$ is shared across seasons.

    References
    ----------
    [Charles. C. Holt (1957). "Forecasting seasonals and trends by exponentially weighted moving averages", ONR Research Memorandum, Carnegie Institute of Technology 52.](https://www.sciencedirect.com/science/article/abs/pii/S0169207003001134).
    [Peter R. Winters (1960). "Forecasting sales by exponentially weighted moving averages". Management Science](https://pubsonline.informs.org/doi/abs/10.1287/mnsc.6.3.324).

    Parameters
    ----------
    alpha : float 
        Smoothing parameter.
    season_length : int 
        Number of observations per unit of time. Ex: 24 Hourly data.
    alias : str 
        Custom name of the model.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        This is required for generating future prediction intervals.
    """
    def __init__(
            self, 
            season_length: int,
            alpha: float,
            alias: str = 'SeasonalES',
            prediction_intervals: Optional[ConformalIntervals] = None,
        ):
        self.season_length = season_length
        self.alpha = alpha
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the SeasonalExponentialSmoothing model.

        Fit an SeasonalExponentialSmoothing to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ). 
        X : array-like 
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            SeasonalExponentialSmoothing fitted model.
        """
        y = _ensure_float(y)
        mod = _seasonal_exponential_smoothing(
            y=y, 
            season_length=self.season_length, 
            alpha=self.alpha,
            fitted=True,
            h=self.season_length,
        )
        self.model_ = dict(mod)
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted SeasonalExponentialSmoothing.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val_seas(self.model_['mean'], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
    
    def predict_in_sample(self):
        r"""Access fitted SeasonalExponentialSmoothing insample predictions.

        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions.
        """
        res = {'fitted': self.model_['fitted']}
        return res
    
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient SeasonalExponentialSmoothing predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x).
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _seasonal_exponential_smoothing(
            y=y, h=h, fitted=fitted, alpha=self.alpha, season_length=self.season_length
        )
        res = dict(res)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
seas_es = SeasonalExponentialSmoothing(season_length=12, alpha=1.)
test_class(seas_es, x=ap, h=12)
test_eq(seas_es.predict_in_sample()['fitted'][-3:],  np.array([461 - 54., 390 - 28., 432 - 27.]))
seas_es = seas_es.fit(ap)
fcst_seas_es = seas_es.predict(12)
# 测试一致性预测
seas_es_c = SeasonalExponentialSmoothing(season_length=12, alpha=1., prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(seas_es_c, x=ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
fcst_seas_es_c = seas_es_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_seas_es_c['mean'][:12],
    fcst_seas_es['mean']
)
_plot_fcst(fcst_seas_es_c)
# 测试我们可以恢复预期的季节性
test_eq(
    seas_es.forecast(ap[4:], h=12)['mean'],
    seas_es.forecast(ap, h=12)['mean']
)
# 测试接近季节性朴素预测
for i in range(1, 13):
    test_close(
        ap[i:][-12:],
        seas_es.forecast(ap[i:], h=12)['mean'],
    )
plt.plot(np.concatenate([ap[6:], seas_es.forecast(ap[6:], h=12)['mean']]))
#测试别名参数
test_eq(
    repr(SeasonalExponentialSmoothing(season_length=12, alpha=1.)),
    'SeasonalES'
)
test_eq(
    repr(SeasonalExponentialSmoothing(season_length=12, alpha=1., alias='SeasonalES_custom')),
    'SeasonalES_custom'
)
show_doc(SeasonalExponentialSmoothing, title_level=3)
show_doc(SeasonalExponentialSmoothing.fit, title_level=3)
show_doc(SeasonalExponentialSmoothing.predict, title_level=3)
show_doc(SeasonalExponentialSmoothing.predict_in_sample, title_level=3)
show_doc(SeasonalExponentialSmoothing.forecast, title_level=3)
from statsforecast.models import SeasonalExponentialSmoothing
from statsforecast.utils import AirPassengers as ap
# 季节性指数平滑法的使用示例
model = SeasonalExponentialSmoothing(alpha=0.5, season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

季节性平滑优化

def _seasonal_ses_optimized(
    y: np.ndarray, # 时间序列
    h: int, # 预测范围
    fitted: bool , # 拟合值
    season_length: int, # 季节长度
): 
    n = y.size
    if n < season_length:
        return {'mean': np.full(h, np.nan, dtype=y.dtype)}
    season_vals = np.empty(season_length, dtype=y.dtype)
    fitted_vals = np.full_like(y, np.nan)
    for i in range(season_length):
        init_idx = (i + n % season_length)
        season_vals[i], fitted_vals[init_idx::season_length] = _optimized_ses_forecast(y[init_idx::season_length], [(0.01, 0.99)])
    out = _repeat_val_seas(season_vals=season_vals, h=h)
    fcst = {'mean': out}
    if fitted:
        fcst['fitted'] = fitted_vals
    return fcst
class SeasonalExponentialSmoothingOptimized(_TS):
    
    def __init__(
        self, 
        season_length: int,
        alias: str = 'SeasESOpt',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        r"""SeasonalExponentialSmoothingOptimized model.

        Uses a weighted average of all past observations where the weights decrease exponentially into the past. 
        Suitable for data with no clear trend or seasonality. 
        Assuming there are $t$ observations and season $s$, the one-step forecast is given by: 
        $\hat{y}_{t+1,s} = \alpha y_t + (1-\alpha) \hat{y}_{t-1,s}$
        
        The smoothing parameter $\alpha^*$ is optimized by square error minimization.        

        Notes
        -----
        This method is an extremely simplified of Holt-Winter's method where the trend and level are set to zero.
        And a single seasonal smoothing parameter $\alpha$ is shared across seasons.

        References
        ----------
        [Charles. C. Holt (1957). "Forecasting seasonals and trends by exponentially weighted moving averages", ONR Research Memorandum, Carnegie Institute of Technology 52.](https://www.sciencedirect.com/science/article/abs/pii/S0169207003001134).
        [Peter R. Winters (1960). "Forecasting sales by exponentially weighted moving averages". Management Science](https://pubsonline.informs.org/doi/abs/10.1287/mnsc.6.3.324).
        
        Parameters
        ----------
        season_length : int  
            Number of observations per unit of time. Ex: 24 Hourly data.
        alias : str 
            Custom name of the model.  
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            This is required for generating future prediction intervals.
        """
        self.season_length = season_length
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True

    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the SeasonalExponentialSmoothingOptimized model.

        Fit an SeasonalExponentialSmoothingOptimized to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X : array-like 
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            SeasonalExponentialSmoothingOptimized fitted model.
        """
        y = _ensure_float(y)
        mod = _seasonal_ses_optimized(
            y=y, 
            season_length=self.season_length, 
            fitted=True,
            h=self.season_length,
        )
        self.model_ = dict(mod)
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted SeasonalExponentialSmoothingOptimized.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val_seas(self.model_['mean'], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
    
    def predict_in_sample(self):
        r"""Access fitted SeasonalExponentialSmoothingOptimized insample predictions.

        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions.
        """
        res = {'fitted': self.model_['fitted']}
        return res
    
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient SeasonalExponentialSmoothingOptimized predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _seasonal_ses_optimized(
            y=y, h=h, fitted=fitted, 
            season_length=self.season_length
        )
        res = dict(res)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
seas_es_opt = SeasonalExponentialSmoothingOptimized(season_length=12)
test_class(seas_es_opt, x=ap, h=12)
fcst_seas_es_opt = seas_es_opt.forecast(ap, h=12)
# 测试保形预测
seas_es_opt_c = SeasonalExponentialSmoothingOptimized(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(seas_es_opt_c, x=ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
fcst_seas_es_opt_c = seas_es_opt_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_seas_es_opt_c['mean'][:12],
    fcst_seas_es_opt['mean']
)
_plot_fcst(fcst_seas_es_opt_c)
for i in range(1, 13):
    test_close(
        ap[i:][-12:],
        seas_es_opt.forecast(ap[i:], h=12)['mean'],
        eps=0.8
    )
#测试别名参数
test_eq(
    repr(SeasonalExponentialSmoothingOptimized(season_length=12)),
    'SeasESOpt'
)
test_eq(
    repr(SeasonalExponentialSmoothingOptimized(season_length=12, alias='SeasESOpt_custom')),
    'SeasESOpt_custom'
)
show_doc(SeasonalExponentialSmoothingOptimized, title_level=3)
show_doc(SeasonalExponentialSmoothingOptimized.forecast, title_level=3)
show_doc(SeasonalExponentialSmoothingOptimized.fit, title_level=3)
show_doc(SeasonalExponentialSmoothingOptimized.predict, title_level=3)
show_doc(SeasonalExponentialSmoothingOptimized.predict_in_sample, title_level=3)
from statsforecast.models import SeasonalExponentialSmoothingOptimized
from statsforecast.utils import AirPassengers as ap
# 季节性指数平滑优化用法示例
model = SeasonalExponentialSmoothingOptimized(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

霍尔特方法

class Holt(AutoETS): 
    r""" Holt's method. 

    Also known as double exponential smoothing, Holt's method is an extension of exponential smoothing for series with a trend.
    This implementation returns the corresponding `ETS` model with additive (A) or multiplicative (M) errors (so either 'AAN' or 'MAN'). 

    References
    ----------
    [Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Methods with trend"](https://otexts.com/fpp3/holt.html).

    Parameters
    ----------
    season_length : int 
        Number of observations per unit of time. Ex: 12 Monthly data. 
    error_type : str 
        The type of error of the ETS model. Can be additive (A) or multiplicative (M). 
    alias : str 
        Custom name of the model. 
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """

    def __init__(
        self, 
        season_length: int = 1, 
        error_type: str = 'A',
        alias: str = 'Holt',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ): 
        self.season_length = season_length
        self.error_type = error_type
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        model = error_type + 'AN'
        super().__init__(
            season_length, model, alias=alias, prediction_intervals=prediction_intervals
        )
holt = Holt(season_length=12, error_type='A')
fcast_holt = holt.forecast(ap,12)

ets = AutoETS(season_length=12, model='AAN')
fcast_ets = ets.forecast(ap,12)

np.testing.assert_equal(
    fcast_holt, 
    fcast_ets
)
holt = Holt(season_length=12, error_type='A')
holt.fit(ap)
fcast_holt = holt.predict(12)

ets = AutoETS(season_length=12, model='AAN')
fcast_ets = ets.forecast(ap,12)

np.testing.assert_equal(
    fcast_holt, 
    fcast_ets
)
holt_c = Holt(season_length=12, error_type='A', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
fcast_holt_c = holt_c.forecast(ap, 12, level=[80, 90])

ets_c = AutoETS(season_length=12, model='AAN', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
fcast_ets_c = ets_c.forecast(ap, 12, level=[80, 90])

np.testing.assert_equal(
    fcast_holt_c, 
    fcast_ets_c,
)
#测试别名参数
test_eq(
    repr(Holt()),
    'Holt'
)
test_eq(
    repr(Holt(alias='Holt_custom')),
    'Holt_custom'
)
show_doc(Holt, title_level=3)
show_doc(Holt.forecast, name='Holt.forecast', title_level=3)
show_doc(Holt.fit, name='Holt.fit', title_level=3)
show_doc(Holt.predict, name='Holt.predict', title_level=3)
show_doc(Holt.predict_in_sample, name='Holt.predict_in_sample', title_level=3)
show_doc(Holt.forward, name='Holt.forward', title_level=3)
from statsforecast.models import Holt
from statsforecast.utils import AirPassengers as ap
# Holt's usage example
model = Holt(season_length=12, error_type='A')
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

霍尔特-温特斯方法

class HoltWinters(AutoETS): 
    r""" Holt-Winters' method. 
    
    Also known as triple exponential smoothing, Holt-Winters' method is an extension of exponential smoothing for series that contain both trend and seasonality.
    This implementation returns the corresponding `ETS` model with additive (A) or multiplicative (M) errors (so either 'AAA' or 'MAM'). 
    
    References
    ----------
    [Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Methods with seasonality"](https://otexts.com/fpp3/holt-winters.html).
    
    Parameters
    ----------
    season_length : int 
        Number of observations per unit of time. Ex: 12 Monthly data. 
    error_type : str
        The type of error of the ETS model. Can be additive (A) or multiplicative (M).
    alias : str 
        Custom name of the model. 
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    def __init__(
        self, 
        season_length: int = 1, # 季节长度
        error_type: str = 'A', # 错误类型
        alias: str = 'HoltWinters',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ): 
        self.season_length = season_length
        self.error_type = error_type
        self.alias = alias
        model = error_type + 'A' + error_type
        super().__init__(
            season_length, model, alias=alias, prediction_intervals=prediction_intervals
        )
hw = HoltWinters(season_length=12, error_type='A')
fcast_hw = hw.forecast(ap,12)

ets = AutoETS(season_length=12, model='AAA')
fcast_ets = ets.forecast(ap,12)

np.testing.assert_equal(
    fcast_hw, 
    fcast_ets
)
hw_c = HoltWinters(season_length=12, error_type='A', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
fcast_hw_c = hw_c.forecast(ap, 12, level=[80, 90])

ets_c = AutoETS(season_length=12, model='AAA', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
fcast_ets_c = ets_c.forecast(ap, 12, level=[80, 90])

np.testing.assert_equal(
    fcast_hw_c, 
    fcast_ets_c,
)
hw = HoltWinters(season_length=12, error_type='A')
hw.fit(ap)
fcast_hw = hw.predict(12)

ets = AutoETS(season_length=12, model='AAA')
fcast_ets = ets.forecast(ap,12)

np.testing.assert_equal(
    fcast_hw, 
    fcast_ets
)
#测试别名参数
test_eq(
    repr(HoltWinters()),
    'HoltWinters'
)
test_eq(
    repr(HoltWinters(alias='HoltWinters_custom')),
    'HoltWinters_custom'
)
show_doc(HoltWinters, title_level=3)
show_doc(HoltWinters.forecast, name='HoltWinters.forecast', title_level=3)
show_doc(HoltWinters.fit, name='HoltWinters.fit', title_level=3) 
show_doc(HoltWinters.predict, name='HoltWinters.predict', title_level=3)
show_doc(HoltWinters.predict_in_sample, name= 'HoltWinters.predict_in_sample', title_level=3)
show_doc(HoltWinters.forward, name='HoltWinters.forward', title_level=3)
from statsforecast.models import HoltWinters
from statsforecast.utils import AirPassengers as ap
# Holt-Winters' usage example
model = HoltWinters(season_length=12, error_type='A')
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

基线模型

历史平均值

def _historic_average(
    y: np.ndarray, # 时间序列
    h: int, # 预测时间范围
    fitted: bool, # 拟合值
) -> Dict[str, np.ndarray]:
    fcst = {'mean': _repeat_val(val=y.mean(), h=h)}
    if fitted:
        fitted_vals = _repeat_val(val=y.mean(), h=len(y))
        fcst['fitted'] = fitted_vals
    return fcst
class HistoricAverage(_TS):

    def __init__(self, alias: str = 'HistoricAverage', prediction_intervals: Optional[ConformalIntervals] = None,):
        r"""HistoricAverage model.

        Also known as mean method. Uses a simple average of all past observations. 
        Assuming there are $t$ observations, the one-step forecast is given by: 
        $$\hat{y}_{t+1} = \frac{1}{t} \sum_{j=1}^t y_j$$

        References
        ----------
        [Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).

        Parameters 
        ----------
        alias: str
              Custom name of the model. 
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            By default, the model will compute the native prediction
            intervals.
        """
        self.alias = alias
        self.prediction_intervals = prediction_intervals
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the HistoricAverage model.

        Fit an HistoricAverage to a time series (numpy array) `y`.

        Parameters 
        ----------
        y : numpy.array  
         Clean time series of shape (t, ).  
        X : array-like 
         Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self
            HistoricAverage fitted model.
        r"""
        y = _ensure_float(y)
        mod = _historic_average(y, h=1, fitted=True)
        mod = dict(mod) 
        residuals = y - mod['fitted']
        mod['sigma'] = _calculate_sigma(residuals, len(residuals) - 1)
        mod['n'] = len(y)
        self.model_ = mod
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self, 
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted HistoricAverage.

        Parameters
        ----------
        h : int
            Forecast horizon.
        X : numpy.array 
            Optional exogenous of shape (h, n_x).
        level : List[float]
            Confidence levels (0-100) for prediction intervals. 
        
        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val(val=self.model_['mean'][0], h=h)
        res = {'mean': mean}
        
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            sigma = self.model_['sigma']
            sigmah = sigma * np.sqrt(1 + (1 / self.model_['n']))
            pred_int = _calculate_intervals(res, level, h, sigmah)
            res = {**res, **pred_int}
        
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted HistoricAverage insample predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            sigma = self.model_['sigma']
            sigmah = sigma * np.sqrt(1 + (1 / self.model_['n']))
            res = _add_fitted_pi(res, se=sigmah, level=level)
        return res
    
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):        
        r"""Memory Efficient HistoricAverage predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ).  
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like  
            Optional exogenous of shape (h, n_x). 
        level : list[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict  
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        out = _historic_average(y=y, h=h, fitted=fitted or (level is not None))
        res = {'mean': out['mean']}
        
        if fitted:
            res['fitted'] = out['fitted']
        if level is not None: 
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                residuals = y - out["fitted"]
                sigma = _calculate_sigma(residuals, len(residuals) - 1)
                sigmah = sigma * np.sqrt(1 + (1 / len(y)))
                pred_int = _calculate_intervals(out, level, h, sigmah)
                res = {**res, **pred_int}
            if fitted:
                residuals = y - out["fitted"]
                sigma = _calculate_sigma(residuals, len(residuals) - 1)
                sigmah = sigma * np.sqrt(1 + (1 / len(y)))
                res = _add_fitted_pi(res=res, se=sigmah, level=level)
        
        return res
ha = HistoricAverage()
test_class(ha, x=ap, h=12, level=[80, 90])
#更多测试
ha.fit(ap)
fcst_ha = ha.predict(12)
test_close(fcst_ha['mean'], np.repeat(ap.mean(), 12), eps=1e-5)
np.testing.assert_almost_equal(
    ha.predict_in_sample()['fitted'][:4],
    #np.array([np.nan, 112., 115., 120.6666667]), 
    np.array([280.2986,280.2986,280.2986,280.2986]), 
    decimal=4
)
ha = HistoricAverage()
fcst_ha = ha.forecast(ap,12,None,None,(80,95), True)
np.testing.assert_almost_equal(
    fcst_ha['lo-80'],
    np.repeat(126.0227,12),
    decimal=4
)
_plot_insample_pi(fcst_ha)
# 测试一致性预测
ha_c = HistoricAverage(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(ha_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_ha_c = ha_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_ha_c['mean'][:12],
    fcst_ha['mean'],
)
_plot_fcst(fcst_ha_c)
#测试别名参数
test_eq(
    repr(HistoricAverage()),
    'HistoricAverage'
)
test_eq(
    repr(HistoricAverage(alias='HistoricAverage_custom')),
    'HistoricAverage_custom'
)
show_doc(HistoricAverage, title_level=3)
show_doc(HistoricAverage.forecast, title_level=3)
show_doc(HistoricAverage.fit, title_level=3)
show_doc(HistoricAverage.predict, title_level=3)
show_doc(HistoricAverage.predict_in_sample, title_level=3)
from statsforecast.models import HistoricAverage
from statsforecast.utils import AirPassengers as ap
# HistoricAverage 的使用示例
model = HistoricAverage()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

单纯的

class Naive(_TS):
    
    def __init__(self, alias: str = 'Naive', prediction_intervals: Optional[ConformalIntervals] = None):
        r"""Naive model.
        
        All forecasts have the value of the last observation:  
        $\hat{y}_{t+1} = y_t$ for all $t$
         
        References
        ----------
        [Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html). 
        
        Parameters 
        ----------
        alias: str
            Custom name of the model.
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            By default, the model will compute the native prediction
            intervals.
        """
        self.alias = alias
        self.prediction_intervals = prediction_intervals
    
    def fit(
        self, 
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the Naive model.

        Fit an Naive to a time series (numpy.array) `y`.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ). 
        X : array-like
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self:
            Naive fitted model.
        """
        y = _ensure_float(y)
        mod = _naive(y, h=1, fitted=True)
        mod = dict(mod) 
        residuals = y - mod['fitted']
        sigma = _calculate_sigma(residuals, len(residuals) - 1)
        mod['sigma'] = sigma
        self.model_ = mod
        self._store_cs(y=y, X=X)
        return self
    
    def predict(
        self, 
        h: int, # 预测时间范围 
        X: Optional[np.ndarray] = None, # 外生回归变量
        level: Optional[List[int]] = None # 置信水平
    ):
        r"""Predict with fitted Naive.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals. 

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val(self.model_['mean'][0], h=h)
        res = {'mean': mean}
        
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            steps = np.arange(1,h+1)
            sigma = self.model_['sigma']
            sigmah = sigma * np.sqrt(steps)
            pred_int = _calculate_intervals(res, level, h, sigmah)
            res = {**res, **pred_int}
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted Naive insample predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `fitted` for point predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
        return res
    
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient Naive predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n,). 
        h: int
            Forecast horizon.
        X : array-like
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        out = _naive(y=y, h=h, fitted=fitted or (level is not None))
        res = {'mean': out['mean']}
        if fitted:
            res['fitted'] = out['fitted']
        if level is not None: 
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                steps = np.arange(1, h + 1)
                residuals = y - out["fitted"]
                sigma = _calculate_sigma(residuals, len(residuals) - 1)
                sigmah = sigma * np.sqrt(steps)
                pred_int = _calculate_intervals(out, level, h, sigmah)
                res = {**res, **pred_int}
            if fitted:
                residuals = y - out["fitted"]
                sigma = _calculate_sigma(residuals, len(residuals) - 1)
                res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res
    
    def forward(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r""" Apply fitted model to an new/updated series.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n,). 
        h: int
            Forecast horizon.
        X : array-like
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = self.forecast(y=y, h=h, X=X, X_future=X_future, level=level, fitted=fitted)
        return res
# 测试预测区间 - 预测
naive = Naive()
naive.forecast(ap, 12)
naive.forecast(ap, 12, None, None, (80,95), True)
# 测试预测区间 - 拟合与预测
naive.fit(ap)
naive.predict(12)
naive.predict(12, None, (80,95))
naive = Naive()
test_class(naive, x=ap, h=12, level=[90, 80])
naive.fit(ap)
fcst_naive = naive.predict(12)
test_close(fcst_naive['mean'], np.repeat(ap[-1], 12), eps=1e-5)
naive = Naive()
fcst_naive = naive.forecast(ap,12,None,None,(80,95), True)
np.testing.assert_almost_equal(
    fcst_naive['lo-80'],
    np.array([388.7984, 370.9037, 357.1726, 345.5967, 335.3982, 326.1781, 317.6992, 309.8073, 302.3951, 295.3845, 288.7164, 282.3452]),
    decimal=4
) # 这几乎相等,因为Hyndman的预测值被四舍五入到小数点后四位。
_plot_insample_pi(fcst_naive)
# 单元测试前向:=预测
naive = Naive()
fcst_naive = naive.forward(ap,12,None,None,(80,95), True)
np.testing.assert_almost_equal(
    fcst_naive['lo-80'],
    np.array([388.7984, 370.9037, 357.1726, 345.5967, 335.3982, 326.1781, 317.6992, 309.8073, 302.3951, 295.3845, 288.7164, 282.3452]),
    decimal=4
) # 这几乎相等,因为Hyndman的预测值被四舍五入到小数点后四位。
# 测试一致性预测
naive_c = Naive(prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(naive_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_naive_c = naive_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_naive_c['mean'][:12],
    fcst_naive['mean']
)
_plot_fcst(fcst_naive_c)
#测试别名参数
test_eq(
    repr(Naive()),
    'Naive'
)
test_eq(
    repr(Naive(alias='Naive_custom')),
    'Naive_custom'
)
show_doc(Naive, title_level=3)
show_doc(Naive.forecast, title_level=3)
show_doc(Naive.fit, title_level=3)
show_doc(Naive.predict, title_level=3)
show_doc(Naive.predict_in_sample, title_level=3)
from statsforecast.models import Naive
from statsforecast.utils import AirPassengers as ap
# Naive 的使用示例
model = Naive()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

带漂移的随机游走

def _random_walk_with_drift(
    y: np.ndarray, # 时间序列
    h: int, # 预测时间范围
    fitted: bool, # 拟合值
) -> Dict[str, np.ndarray]: 
    slope = (y[-1] - y[0]) / (y.size - 1)
    mean = slope * (1 + np.arange(h, dtype=y.dtype)) + y[-1]
    fcst = {'mean': mean,
            'slope': np.array([slope], dtype=y.dtype),
            'last_y': np.array([y[-1]], dtype=y.dtype)}
    if fitted:
        fitted_vals = np.full_like(y, np.nan)
        fitted_vals[1:] = slope + y[:-1]
        fcst['fitted'] = fitted_vals
    return fcst
class RandomWalkWithDrift(_TS):
    
    def __init__(self, alias: str = 'RWD', prediction_intervals: Optional[ConformalIntervals] = None):
        r"""RandomWalkWithDrift model.

        A variation of the naive method allows the forecasts to change over time. 
        The amout of change, called drift, is the average change seen in the historical data. 

        $$\hat{y}_{t+1} = y_t+\frac{1}{t-1}\sum_{j=1}^t (y_j-y_{j-1}) = y_t+ \frac{y_t-y_1}{t-1}$$

        From the previous equation, we can see that this is equivalent to extrapolating a line between 
        the first and the last observation. 

        References
        ----------
        [Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).

        Parameters
        ----------
        alias : str 
            Custom name of the model.
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            By default, the model will compute the native prediction
            intervals.
        """
        self.alias = alias
        self.prediction_intervals = prediction_intervals
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the RandomWalkWithDrift model.

        Fit an RandomWalkWithDrift to a time series (numpy array) `y`.

        Parameters
        ----------
        y: numpy.array 
            Clean time series of shape (t, ). 

        Returns
        -------
        self : 
            RandomWalkWithDrift fitted model.
        r"""
        y = _ensure_float(y)
        mod = _random_walk_with_drift(y, h=1, fitted=True)
        mod = dict(mod) 
        residuals = y - mod['fitted']
        sigma = _calculate_sigma(residuals, len(residuals) - 1)
        mod['sigma'] = sigma
        mod['n'] = len(y)
        self.model_ = mod
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self,
        h: int, 
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None
    ):
        r"""Predict with fitted RandomWalkWithDrift.

        Parameters
        ----------
        h : int
            Forecast horizon.
        X : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        hrange = np.arange(h, dtype=self.model_['last_y'].dtype)
        mean = self.model_['slope'] * (1 + hrange) + self.model_['last_y']
        res = {'mean': mean}
        
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            steps = np.arange(1, h + 1)
            sigma = self.model_['sigma']
            sigmah = sigma * np.sqrt(steps * (1 + steps / (self.model_['n'] - 1)))
            pred_int = _calculate_intervals(res, level, h, sigmah)
            res = {**res, **pred_int}
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted RandomWalkWithDrift insample predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
        return res

    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient RandomWalkWithDrift predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (n,). 
        h : int
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts: dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        out = _random_walk_with_drift(y=y, h=h, fitted=fitted or (level is not None))
        res = {'mean': out['mean']}
        
        if fitted:
            res['fitted'] = out['fitted']
        
        if level is not None: 
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                steps = np.arange(1, h + 1)
                residuals = y - out["fitted"]
                sigma = _calculate_sigma(residuals, len(residuals) - 1)
                sigmah = sigma * np.sqrt(steps * (1 + steps / (len(y) - 1)))
                pred_int = _calculate_intervals(out, level, h, sigmah)
                res = {**res, **pred_int}
            if fitted:
                residuals = y - out["fitted"]
                sigma = _calculate_sigma(residuals, len(residuals) - 1)
                res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res 
# 测试预测区间 - 预测
rwd = RandomWalkWithDrift()
rwd.forecast(ap, 12)
rwd.forecast(ap, 12, None, None, (80,95), True)
# 测试预测区间 - 拟合与预测 
rwd = RandomWalkWithDrift()
rwd.fit(ap)
rwd.predict(12)
rwd.predict(12, None, (80,95))
rwd = RandomWalkWithDrift()
test_class(rwd, x=ap, h=12, level=[90, 80])
rwd = rwd.fit(ap)
fcst_rwd = rwd.predict(12)
test_close(fcst_rwd['mean'][:2], np.array([434.2378, 436.4755]), eps=1e-4)
np.testing.assert_almost_equal(
    rwd.predict_in_sample()['fitted'][:3], 
    np.array([np.nan, 118 - 3.7622378, 132 - 11.7622378]),
    decimal=6
)
# 测试一致性预测
rwd_c = RandomWalkWithDrift(prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(rwd_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_rwd_c = rwd_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_rwd_c['mean'][:12],
    fcst_rwd['mean']
)
_plot_fcst(fcst_rwd_c)
rwd = RandomWalkWithDrift()
fcst_rwd = rwd.forecast(ap,12,None,None,(80,95), True)
np.testing.assert_almost_equal(
    fcst_rwd['lo-80'],
    np.array([390.9799, 375.0862, 363.2664, 353.5325, 345.1178, 337.6304, 330.8384, 324.5916, 318.7857, 313.3453, 308.2136, 303.3469]),
    decimal=1
)
_plot_insample_pi(fcst_rwd)
#测试别名参数
test_eq(
    repr(RandomWalkWithDrift()),
    'RWD'
)
test_eq(
    repr(RandomWalkWithDrift(alias='RWD_custom')),
    'RWD_custom'
)
show_doc(RandomWalkWithDrift, title_level=3)
show_doc(RandomWalkWithDrift.forecast, title_level=3)
show_doc(RandomWalkWithDrift.fit, title_level=3)
show_doc(RandomWalkWithDrift.predict, title_level=3)
show_doc(RandomWalkWithDrift.predict_in_sample, title_level=3)
from statsforecast.models import RandomWalkWithDrift
from statsforecast.utils import AirPassengers as ap
# RandomWalkWithDrift 的使用示例
model = RandomWalkWithDrift()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

季节性朴素法

class SeasonalNaive(_TS):
    
    def __init__(self, season_length: int, alias: str = 'SeasonalNaive', prediction_intervals: Optional[ConformalIntervals] = None):
        r"""Seasonal naive model.

        A method similar to the naive, but uses the last known observation of the same period (e.g. the same month of the previous year) in order to capture seasonal variations.

        References
        ----------
        [Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html#seasonal-na%C3%AFve-method).

        Parameters
        ----------
        season_length : int
            Number of observations per unit of time. Ex: 24 Hourly data.
        alias : str
            Custom name of the model.
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            By default, the model will compute the native prediction
            intervals.
        """
        self.season_length = season_length
        self.alias = alias
        self.prediction_intervals = prediction_intervals

    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the SeasonalNaive model.

        Fit an SeasonalNaive to a time series (numpy array) `y`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X: array-like
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            SeasonalNaive fitted model.
        r"""
        y = _ensure_float(y)
        mod = _seasonal_naive(
            y=y, 
            season_length=self.season_length, 
            h=self.season_length, 
            fitted=True,
        )
        mod = dict(mod) 
        residuals = y - mod['fitted']
        mod['sigma'] = _calculate_sigma(residuals, 
                                        len(y) - self.season_length)
        self.model_ = mod
        self._store_cs(y=y, X=X)
        return self
        
        
    def predict(
        self,
        h: int,  
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None, 
    ):
        r"""Predict with fitted Naive.

        Parameters
        ----------
        h : int
            Forecast horizon.
        X: array-like
            Optional exogenous of shape (h, n_x). 
        level: List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val_seas(season_vals=self.model_['mean'], h=h)
        res = {'mean': mean}
        
        if level is None:
            return res
        level = sorted(level)            
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            k = np.floor((h - 1) / self.season_length)
            sigma = self.model_['sigma']
            sigmah = sigma * np.sqrt(k + 1)
            pred_int = _calculate_intervals(res, level, h, sigmah)
            res = {**res, **pred_int}
        return res
        
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted SeasonalNaive insample predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals. 

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        r"""        
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            level = sorted(level)
            res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
        return res
 
    
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient SeasonalNaive predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals. 
        fitted : bool
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        out = _seasonal_naive(
            y=y, h=h, fitted=fitted or (level is not None), 
            season_length=self.season_length
        )
        res = {'mean': out['mean']}
        if fitted:
            res['fitted'] = out['fitted']
        if level is not None: 
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                k = np.floor((h - 1) / self.season_length)
                residuals = y - out["fitted"]
                sigma = _calculate_sigma(residuals, len(y) - self.season_length)
                sigmah = sigma * np.sqrt(k + 1)
                pred_int = _calculate_intervals(out, level, h, sigmah)
                res = {**res, **pred_int}
            if fitted:
                k = np.floor((h - 1) / self.season_length)
                residuals = y - out["fitted"]
                sigma = _calculate_sigma(residuals, len(y) - self.season_length)
                res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res    
# 测试预测区间 - 预测
seas_naive = SeasonalNaive(season_length=12)
seas_naive.forecast(ap, 12)
seas_naive.forecast(ap, 12, None, None, (80,95), True)
# 测试预测区间 - 拟合与预测 
seas_naive = SeasonalNaive(season_length=12)
seas_naive.fit(ap)
seas_naive.predict(12)
seas_naive.predict(12, None, (80,95))
seas_naive = SeasonalNaive(season_length=12)
test_class(seas_naive, x=ap, h=12, level=[90, 80])
seas_naive = seas_naive.fit(ap)
fcst_seas_naive = seas_naive.predict(12)
test_eq(seas_naive.predict_in_sample()['fitted'][-3:], np.array([461 - 54., 390 - 28., 432 - 27.]))
seas_naive = SeasonalNaive(season_length=12)
fcst_seas_naive = seas_naive.forecast(ap, 12, None, None, (80,95), True)
np.testing.assert_almost_equal(
    fcst_seas_naive['lo-80'],
    np.array([370.4595, 344.4595, 372.4595, 414.4595, 425.4595, 488.4595, 
              575.4595, 559.4595, 461.4595, 414.4595, 343.4595, 385.4595]),
    decimal=4
)
_plot_insample_pi(fcst_seas_naive)
# 测试保形预测
seas_naive_c = SeasonalNaive(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(seas_naive_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_seas_naive_c = seas_naive_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_seas_naive_c['mean'][:12],
    fcst_seas_naive['mean']
)
_plot_fcst(fcst_seas_naive_c)
#测试别名参数
test_eq(
    repr(SeasonalNaive(12)),
    'SeasonalNaive'
)
test_eq(
    repr(SeasonalNaive(12, alias='SeasonalNaive_custom')),
    'SeasonalNaive_custom'
)
show_doc(SeasonalNaive, title_level=3)
show_doc(SeasonalNaive.forecast, title_level=3)
show_doc(SeasonalNaive.fit, title_level=3)
show_doc(SeasonalNaive.predict, title_level=3)
show_doc(SeasonalNaive.predict_in_sample, title_level=3)
from statsforecast.models import SeasonalNaive
from statsforecast.utils import AirPassengers as ap
# 季节性朴素法的应用示例
model = SeasonalNaive(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

窗口平均值

def _window_average(
    y: np.ndarray, # 时间序列
    h: int, # 预测时间范围
    fitted: bool, # 拟合值
    window_size: int, # 窗口大小
) -> Dict[str, np.ndarray]: 
    if fitted:
        raise NotImplementedError('return fitted')
    if y.size < window_size:
        return {'mean': np.full(h, np.nan, dtype=y.dtype)}
    wavg = y[-window_size:].mean()
    mean = _repeat_val(val=wavg, h=h)
    return {'mean': mean}
class WindowAverage(_TS):
    
    def __init__(
            self, 
            window_size: int,
            alias: str = 'WindowAverage',
            prediction_intervals: Optional[ConformalIntervals] = None,
        ):
        r"""WindowAverage model.

        Uses the average of the last $k$ observations, with $k$ the length of the window.
        Wider windows will capture global trends, while narrow windows will reveal local trends.
        The length of the window selected should take into account the importance of past
        observations and how fast the series changes.

        References
        ----------
        [Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).

        Parameters
        ----------
        window_size : int 
            Size of truncated series on which average is estimated.
        alias : str 
            Custom name of the model. 
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            This is required for generating future prediction intervals.
        r"""        
        self.window_size = window_size
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the WindowAverage model.

        Fit an WindowAverage to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ). 
        X : array-like
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            WindowAverage fitted model.
        """
        y = _ensure_float(y)
        mod = _window_average(y=y, h=1, window_size=self.window_size, fitted=False)
        self.model_ = dict(mod)
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self, 
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted WindowAverage.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : numpy.array
            Optional exogenous of shape (h, n_x).
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val(self.model_['mean'][0], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
    
    def predict_in_sample(self):
        r"""Access fitted WindowAverage insample predictions.

        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        raise NotImplementedError
        
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient WindowAverage predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x).
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _window_average(y=y, h=h, fitted=fitted, window_size=self.window_size)
        res = dict(res)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
w_avg = WindowAverage(window_size=24)
test_class(w_avg, x=ap, h=12, skip_insample=True)
w_avg = w_avg.fit(ap)
fcst_w_avg = w_avg.predict(12)
test_close(fcst_w_avg['mean'], np.repeat(ap[-24:].mean(), 12))
# 测试一致性预测
w_avg_c = WindowAverage(window_size=24, prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(w_avg_c, x=ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
fcst_w_avg_c = w_avg_c.forecast(ap, 13, None, None, (80,95), False)
test_eq(
    fcst_w_avg_c['mean'][:12],
    fcst_w_avg['mean']
)
_plot_fcst(fcst_w_avg_c)
#测试别名参数
test_eq(
    repr(WindowAverage(1)),
    'WindowAverage'
)
test_eq(
    repr(WindowAverage(1, alias='WindowAverage_custom')),
    'WindowAverage_custom'
)
show_doc(WindowAverage, title_level=3)
show_doc(WindowAverage.forecast, title_level=3)
show_doc(WindowAverage.fit, title_level=3)
show_doc(WindowAverage.predict, title_level=3)
from statsforecast.models import WindowAverage
from statsforecast.utils import AirPassengers as ap
# WindowAverage 的使用示例
model = WindowAverage(window_size=12*4)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

季节窗口平均值

def _seasonal_window_average(
    y: np.ndarray,
    h: int,
    fitted: bool,
    season_length: int,
    window_size: int,
) -> Dict[str, np.ndarray]:
    if fitted:
        raise NotImplementedError('return fitted')
    min_samples = season_length * window_size
    if y.size < min_samples:
        return {'mean': np.full(h, np.nan, dtype=y.dtype)}
    season_avgs = y[-min_samples:].reshape(window_size, season_length).mean(axis=0)
    out = _repeat_val_seas(season_vals=season_avgs, h=h)
    return {'mean': out}
class SeasonalWindowAverage(_TS):
    
    def __init__(
        self, 
        season_length: int,
        window_size: int,
        alias: str = 'SeasWA',
        prediction_intervals: Optional[ConformalIntervals] = None
    ):
        r"""SeasonalWindowAverage model.

        An average of the last $k$ observations of the same period, with $k$ the length of the window.

        References
        ----------
        [Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).

        Parameters
        ----------
        window_size : int 
            Size of truncated series on which average is estimated.
        seasonal_length : int 
            Number of observations per cycle.
        alias : str 
            Custom name of the model. 
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            This is required for generating future prediction intervals.
        r"""        
        self.season_length = season_length
        self.window_size = window_size
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the SeasonalWindowAverage model.

        Fit an SeasonalWindowAverage to a time series (numpy array) `y`
        and optionally exogenous variables (numpy array) `X`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X : array-like
            Optional exogenpus of shape (t, n_x). 

        Returns
        -------
        self : 
            SeasonalWindowAverage fitted model.
        """
        y = _ensure_float(y)
        mod = _seasonal_window_average(
            y=y, 
            h=self.season_length,
            fitted=False,
            season_length=self.season_length, 
            window_size=self.window_size,
        )
        self.model_ = dict(mod)
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted SeasonalWindowAverage.

        Parameters 
        ----------
        h : int 
            Forecast horizon.
        X : array-like
            Optional insample exogenous of shape (t, n_x).
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val_seas(season_vals=self.model_['mean'], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
    
    def predict_in_sample(self):
        r"""Access fitted SeasonalWindowAverage insample predictions.

        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals. 

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        raise NotImplementedError

    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient SeasonalWindowAverage predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n,). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x).  
        level : List[float] 
            Confidence levels for prediction intervals.
        fitted : bool
            Whether or not to return insample predictions.
            
            
        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _seasonal_window_average(
            y=y, h=h, fitted=fitted, 
            season_length=self.season_length,
            window_size=self.window_size
        )
        res = dict(res)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
seas_w_avg = SeasonalWindowAverage(season_length=12, window_size=1)
test_class(seas_w_avg, x=ap, h=12, skip_insample=True)
seas_w_avg = seas_w_avg.fit(ap)
fcst_seas_w_avg = seas_w_avg.predict(12)
# 测试一致性预测
seas_w_avg_c = SeasonalWindowAverage(season_length=12, window_size=1, prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(seas_w_avg_c, x=ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
fcst_seas_w_avg_c = seas_w_avg_c.forecast(ap, 13, None, None, (80,95), False)
test_eq(
    fcst_seas_w_avg_c['mean'][:12],
    fcst_seas_w_avg['mean']
)
fcst_seas_w_avg_c['mean'][:12]
_plot_fcst(fcst_seas_w_avg_c)
#测试别名参数
test_eq(
    repr(SeasonalWindowAverage(12, 1)),
    'SeasWA'
)
test_eq(
    repr(SeasonalWindowAverage(12, 1, alias='SeasWA_custom')),
    'SeasWA_custom'
)
show_doc(SeasonalWindowAverage, title_level=3)
show_doc(SeasonalWindowAverage.forecast, title_level=3)
show_doc(SeasonalWindowAverage.fit, title_level=3)
show_doc(SeasonalWindowAverage.predict, title_level=3)
from statsforecast.models import SeasonalWindowAverage
from statsforecast.utils import AirPassengers as ap
# SeasonalWindowAverage 的使用示例
model = SeasonalWindowAverage(season_length=12, window_size=4)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

稀疏或间歇性

ADIDA

def _chunk_forecast(y, aggregation_level):
    lost_remainder_data = len(y) % aggregation_level
    y_cut = y[lost_remainder_data:]
    aggregation_sums = _chunk_sums(y_cut, aggregation_level)
    sums_forecast, _ = _optimized_ses_forecast(aggregation_sums)
    return sums_forecast

@njit(nogil=NOGIL, cache=CACHE)
def _expand_fitted_demand(fitted: np.ndarray, y: np.ndarray) -> np.ndarray:
    out = np.empty_like(y)
    out[0] = np.nan
    fitted_idx = 0
    for i in range(1, y.size):
        if y[i - 1] > 0:
            fitted_idx += 1
            out[i] = fitted[fitted_idx]
        elif fitted_idx > 0:
            # if this entry is zero, the model didn't change
            out[i] = out[i - 1]
        else:
            # if we haven't seen any demand, use naive
            out[i] = y[i - 1]
    return out

@njit(nogil=NOGIL, cache=CACHE)
def _expand_fitted_intervals(fitted: np.ndarray, y: np.ndarray) -> np.ndarray:
    out = np.empty_like(y)
    out[0] = np.nan
    fitted_idx = 0
    for i in range(1, y.size):
        if y[i - 1] != 0:
            fitted_idx += 1
            if fitted[fitted_idx] == 0:
                # 为了避免除以零
                out[i] = 1
            else:
                out[i] = fitted[fitted_idx]
        elif fitted_idx > 0:
            # if this entry is zero, the model didn't change
            out[i] = out[i - 1]
        else:
            # if we haven't seen any intervals, use 1 to avoid division by zero
            out[i] = 1
    return out
    
def _adida(
        y: np.ndarray, # 时间序列
        h: int, # 预测时间范围
        fitted: bool, # 拟合值
    ):
    if (y == 0).all():
        res = {'mean': np.zeros(h, dtype=y.dtype)}
        if fitted:
            res['fitted'] = np.zeros_like(y)
            res['fitted'][0] = np.nan
        return res
    y = _ensure_float(y)
    y_intervals = _intervals(y)
    mean_interval = y_intervals.mean()
    aggregation_level = round(mean_interval)
    sums_forecast = _chunk_forecast(y, aggregation_level)
    forecast = sums_forecast / aggregation_level
    res = {'mean': _repeat_val(val=forecast, h=h)}
    if fitted:
        warnings.warn("Computing fitted values for ADIDA is very expensive")
        fitted_aggregation_levels = np.round(
            y_intervals.cumsum() / np.arange(1, y_intervals.size + 1)
        )
        fitted_aggregation_levels = _expand_fitted_intervals(
            np.append(np.nan, fitted_aggregation_levels), y
        )[1:].astype(np.int32)

        sums_fitted = np.empty(y.size - 1, dtype=y.dtype)
        for i, agg_lvl in enumerate(fitted_aggregation_levels):
            sums_fitted[i] = _chunk_forecast(y[:i+1], agg_lvl)

        res['fitted'] = np.append(np.nan, sums_fitted / fitted_aggregation_levels)
    return res
class ADIDA(_TS):

    def __init__(self, alias: str = 'ADIDA', prediction_intervals: Optional[ConformalIntervals] = None):
        r"""ADIDA model.

        Aggregate-Dissagregate Intermittent Demand Approach: Uses temporal aggregation to reduce the 
        number of zero observations. Once the data has been agregated, it uses the optimized SES to 
        generate the forecasts at the new level. It then breaks down the forecast to the original 
        level using equal weights.

        ADIDA specializes on sparse or intermittent series are series with very few non-zero observations. 
        They are notoriously hard to forecast, and so, different methods have been developed 
        especifically for them.
        
        References
        ----------
        [Nikolopoulos, K., Syntetos, A. A., Boylan, J. E., Petropoulos, F., & Assimakopoulos, V. (2011). An aggregate–disaggregate intermittent demand approach (ADIDA) to forecasting: an empirical proposition and analysis. Journal of the Operational Research Society, 62(3), 544-554.](https://researchportal.bath.ac.uk/en/publications/an-aggregate-disaggregate-intermittent-demand-approach-adida-to-f).
        
        Parameters
        ----------
        alias : str 
            Custom name of the model. 
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            By default, the model will compute the native prediction
            intervals.
        r"""        
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True

    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the ADIDA model.

        Fit an ADIDA to a time series (numpy array) `y`.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ). 

        Returns
        -------
        self :
            ADIDA fitted model.
        """
        y = _ensure_float(y)
        self.model_ = _adida(y=y, h=1, fitted=False)
        self._y = y
        self._store_cs(y=y, X=X)
        return self

    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted ADIDA.

        Parameters
        ----------
        h : int
            Forecast horizon.
        X : array-like
            Optional exogenous of shape (h, n_x).
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """        
        mean = _repeat_val(val=self.model_['mean'][0], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception(
                "You have to instantiate the class with `prediction_intervals`"
                "to calculate them"
            )
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted ADIDA insample predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        fitted = _adida(y=self._y, h=1, fitted=True)['fitted']
        res = {'fitted': fitted}
        if level is not None:
            sigma = _calculate_sigma(self._y - fitted, self._y.size)            
            res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res

    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient ADIDA predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n,). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _adida(y=y, h=h, fitted=fitted)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        else:
            raise Exception(
                "You have to instantiate the class with `prediction_intervals`"
                "to calculate them"
            )
        if fitted:
            sigma = _calculate_sigma(y - res['fitted'], y.size)
            res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res
deg_ts = np.zeros(10)
adida = ADIDA()
test_class(adida, x=ap, h=12, skip_insample=False)
test_class(adida, x=deg_ts, h=12, skip_insample=False)
fcst_adida = adida.forecast(ap, 12)

_test_fitted_sparse(ADIDA)
# 测试一致性预测
adida_c = ADIDA(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(adida_c, x=ap, h=13, level=[90, 80], skip_insample=False)
fcst_adida_c = adida_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_adida_c['mean'][:12],
    fcst_adida['mean'],
)
_plot_insample_pi(fcst_adida_c)
_plot_fcst(fcst_adida_c)
#测试别名参数
test_eq(
    repr(ADIDA()),
    'ADIDA'
)
test_eq(
    repr(ADIDA(alias='ADIDA_custom')),
    'ADIDA_custom'
)
show_doc(ADIDA, title_level=3)
show_doc(ADIDA.forecast, title_level=3)
show_doc(ADIDA.fit, title_level=3)
show_doc(ADIDA.predict, title_level=3)
from statsforecast.models import ADIDA
from statsforecast.utils import AirPassengers as ap
# ADIDA的使用示例
model = ADIDA()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

Croston经典

def _croston_classic(
    y: np.ndarray, # 时间序列
    h: int, # 预测范围
    fitted: bool, # 拟合值
): 
    y = _ensure_float(y)
    # 需求
    yd = _demand(y)
    if not yd.size: #无需求
        return _naive(y=y, h=h, fitted=fitted)    
    ydp, ydf = _ses_forecast(yd, 0.1)

    # 间隔
    yi = _intervals(y)
    yip, yif = _ses_forecast(yi, 0.1)

    if yip != 0.0:
        mean = ydp / yip
    else:
        mean = ydp
    out = {'mean': _repeat_val(val=mean, h=h)}
    if fitted:
        ydf = _expand_fitted_demand(np.append(ydf, ydp), y)
        yif = _expand_fitted_intervals(np.append(yif, yip), y)        
        out['fitted'] = ydf / yif
    return out
class CrostonClassic(_TS):
    
    def __init__(self, alias: str = 'CrostonClassic', prediction_intervals: Optional[ConformalIntervals] = None):
        r"""CrostonClassic model.

        A method to forecast time series that exhibit intermittent demand.
        It decomposes the original time series into a non-zero demand size $z_t$ and 
        inter-demand intervals $p_t$. Then the forecast is given by:
        $$\hat{y}_t = \frac{\hat{z}_t}{\hat{p}_t}$$ 

        where $\hat{z}_t$ and $\hat{p}_t$ are forecasted using SES. The smoothing parameter 
        of both components is set equal to 0.1

        References
        ----------
        [Croston, J. D. (1972). Forecasting and stock control for intermittent demands. Journal of the Operational Research Society, 23(3), 289-303.](https://link.springer.com/article/10.1057/jors.1972.50)
        
        Parameters
        ----------
        alias : str 
            Custom name of the model.
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            By default, the model will compute the native prediction
            intervals.
        """        
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the CrostonClassic model.

        Fit an CrostonClassic to a time series (numpy array) `y`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 

        Returns
        -------
        self : 
            CrostonClassic fitted model.
        """
        y = _ensure_float(y)
        self.model_ = _croston_classic(y=y, h=1, fitted=True)
        self.model_['sigma'] = _calculate_sigma(y - self.model_['fitted'], y.size)
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted CrostonClassic.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like
            Optional exogenous of shape (h, n_x).
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val(val=self.model_['mean'][0], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception(
                "You have to instantiate the class with `prediction_intervals` to calculate them"
            )
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted CrostonClassic insample predictions.

        Parameters
        ----------
        level: List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
        return res
        
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient CrostonClassic predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _croston_classic(y=y, h=h, fitted=fitted)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=dict(res), y=y, X=X, level=level)
        else:
            raise Exception(
                "You have to instantiate the class with `prediction_intervals` to calculate them"
            )
        if fitted:
            sigma = _calculate_sigma(y - res['fitted'], y.size)
            res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res
croston = CrostonClassic(prediction_intervals=ConformalIntervals(2, 1))
test_class(croston, x=ap, h=12, skip_insample=False, level=[80])
test_class(croston, x=deg_ts, h=12, skip_insample=False, level=[80])
fcst_croston = croston.forecast(ap, 12)

_test_fitted_sparse(CrostonClassic)
# 测试一致性预测
croston_c = CrostonClassic(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(croston_c, x=ap, h=13, level=[90.0, 80.0], skip_insample=False)
fcst_croston_c = croston_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_croston_c['mean'][:12],
    fcst_croston['mean'],
)
_plot_insample_pi(fcst_croston_c)
_plot_fcst(fcst_croston_c)
#测试别名参数
test_eq(
    repr(CrostonClassic()),
    'CrostonClassic'
)
test_eq(
    repr(CrostonClassic(alias='CrostonClassic_custom')),
    'CrostonClassic_custom'
)
show_doc(CrostonClassic, title_level=3)
show_doc(CrostonClassic.forecast, title_level=3)
show_doc(CrostonClassic.fit, title_level=3)
show_doc(CrostonClassic.predict, title_level=3)
from statsforecast.models import CrostonClassic
from statsforecast.utils import AirPassengers as ap
# CrostonClassic 的使用示例
model = CrostonClassic()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

Croston优化

def _croston_optimized(
    y: np.ndarray, # 时间序列
    h: int, # 预测时间范围
    fitted: bool, # 拟合值
):
    y = _ensure_float(y)
    # 需求
    yd = _demand(y)
    if not yd.size:
        return _naive(y=y, h=h, fitted=fitted)
    ydp, _ = _optimized_ses_forecast(yd)

    # 间隔
    yi = _intervals(y)
    yip, _ = _optimized_ses_forecast(yi)

    if yip != 0.0:
        mean = ydp / yip
    else:
        mean = ydp
    out = {'mean': _repeat_val(val=mean, h=h)}
    if fitted:
        warnings.warn("Computing fitted values for CrostonOptimized is very expensive")
        ydf = np.empty(yd.size + 1, dtype=y.dtype)
        ydf[0] = np.nan
        for i in range(yd.size):
            ydf[i + 1] = _optimized_ses_forecast(yd[:i + 1])[0]

        yif = np.empty(yi.size + 1, dtype=y.dtype)
        yif[0] = np.nan
        for i in range(yi.size):
            yiff = _optimized_ses_forecast(yi[:i + 1])[0]
            if yiff == 0:
                yiff = 1.0
            yif[i + 1] = yiff

        ydf = _expand_fitted_demand(ydf, y)
        yif = _expand_fitted_intervals(yif, y)
        out['fitted'] = ydf / yif
    return out
class CrostonOptimized(_TS):
    
    def __init__(self, alias: str = 'CrostonOptimized', prediction_intervals: Optional[ConformalIntervals] = None,):
        r"""CrostonOptimized model.

        A method to forecast time series that exhibit intermittent demand.
        It decomposes the original time series into a non-zero demand size $z_t$ and 
        inter-demand intervals $p_t$. Then the forecast is given by:
        $$\hat{y}_t = \frac{\hat{z}_t}{\hat{p}_t}$$

        A variation of the classic Croston's method where the smooting paramater is optimally 
        selected from the range $[0.1,0.3]$. Both the non-zero demand $z_t$ and the inter-demand 
        intervals $p_t$ are smoothed separately, so their smoothing parameters can be different.

        References
        ----------
        [Croston, J. D. (1972). Forecasting and stock control for intermittent demands. Journal of the Operational Research Society, 23(3), 289-303.](https://link.springer.com/article/10.1057/jors.1972.50).

        Parameters
        ----------
        alias : str 
            Custom name of the model.
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            This is required for generating future prediction intervals.
        r"""        
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the CrostonOptimized model.

        Fit an CrostonOptimized to a time series (numpy array) `y`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 

        Returns
        -------
        self : 
            CrostonOptimized fitted model.
        """
        y = _ensure_float(y)
        self.model_ = _croston_optimized(y=y, h=1, fitted=False)
        self._y = y
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted CrostonOptimized.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like
            Optional insample exogenous of shape (t, n_x).
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val(val=self.model_['mean'][0], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted CrostonOptimized insample predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        fitted = _croston_optimized(y=self._y, h=1, fitted=True)['fitted']
        res = {'fitted': fitted}
        if level is not None:
            sigma = _calculate_sigma(self._y - fitted, self._y.size)            
            res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res

    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient CrostonOptimized predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int
            Forecast horizon.
        X : array-like
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _croston_optimized(y=y, h=h, fitted=fitted)
        res = dict(res)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        if fitted:
            sigma = _calculate_sigma(y - res['fitted'], y.size)
            res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res
croston_op = CrostonOptimized(prediction_intervals=ConformalIntervals(2, 1))
test_class(croston_op, x=ap, h=12, skip_insample=False, level=[80])
test_class(croston_op, x=deg_ts, h=12, skip_insample=False, level=[80])
fcst_croston_op = croston_op.forecast(ap, 12)

_test_fitted_sparse(CrostonOptimized)
# 测试一致性预测
croston_op_c = CrostonOptimized(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(croston_op_c, x=ap, h=13, level=[90, 80], skip_insample=False)
fcst_croston_op_c = croston_op_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_croston_op_c['mean'][:12],
    fcst_croston_op['mean'],
)
_plot_insample_pi(fcst_croston_op_c)
_plot_fcst(fcst_croston_op_c)
#测试别名参数
test_eq(
    repr(CrostonOptimized()),
    'CrostonOptimized'
)
test_eq(
    repr(CrostonOptimized(alias='CrostonOptimized_custom')),
    'CrostonOptimized_custom'
)
show_doc(CrostonOptimized, title_level=3)
show_doc(CrostonOptimized.forecast, title_level=3)
show_doc(CrostonOptimized.fit, title_level=3)
show_doc(CrostonOptimized.predict, title_level=3)
from statsforecast.models import CrostonOptimized
from statsforecast.utils import AirPassengers as ap
# CrostonOptimized 的使用示例
model = CrostonOptimized()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

CrostonSBA

def _croston_sba(
    y: np.ndarray, # 时间序列
    h: int, # 预测范围
    fitted: bool,  # 拟合值
) -> Dict[str, np.ndarray]:
    out = _croston_classic(y=y, h=h, fitted=fitted)
    out['mean'] *= 0.95    
    if fitted:
        out['fitted'] *= 0.95
    return out
class CrostonSBA(_TS):
    
    def __init__(self, alias: str = 'CrostonSBA', prediction_intervals: Optional[ConformalIntervals] = None,):
        r"""CrostonSBA model.

        A method to forecast time series that exhibit intermittent demand.
        It decomposes the original time series into a non-zero demand size $z_t$ and 
        inter-demand intervals $p_t$. Then the forecast is given by:
        $$\hat{y}_t = \frac{\hat{z}_t}{\hat{p}_t}$$

        A variation of the classic Croston's method that uses a debiasing factor, so that the 
        forecast is given by:
        $$\hat{y}_t = 0.95  \frac{\hat{z}_t}{\hat{p}_t}$$

        References
        ----------
        [Croston, J. D. (1972). Forecasting and stock control for intermittent demands. Journal of the Operational Research Society, 23(3), 289-303.](https://link.springer.com/article/10.1057/jors.1972.50).

        Parameters
        ----------
        alias : str 
            Custom name of the model.
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            By default, the model will compute the native prediction
            intervals.
        r"""        
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True

    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the CrostonSBA model.

        Fit an CrostonSBA to a time series (numpy array) `y`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 

        Returns
        -------
        self : 
            CrostonSBA fitted model.
        """
        y = _ensure_float(y)
        self.model_ = _croston_sba(y=y, h=1, fitted=True)
        self.model_['sigma'] = _calculate_sigma(y - self.model_['fitted'], y.size)
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted CrostonSBA.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like
            Optional exogenous of shape (h, n_x).
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = _repeat_val(val=self.model_['mean'][0], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception(
                "You have to instantiate the class with `prediction_intervals` to calculate them"
            )
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted CrostonSBA insample predictions.

        Parameters
        ----------
        level: List[float]
            Confidence levels (0-100) prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
        return res
        
    def forecast(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient CrostonSBA predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _croston_sba(y=y, h=h, fitted=fitted)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=dict(res), y=y, X=X, level=level)
        else:
            raise Exception(
                "You have to instantiate the class with `prediction_intervals`"
                "to calculate them"
            )
        if fitted:
            sigma = _calculate_sigma(y - res['fitted'], y.size)
            res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res
croston_sba = CrostonSBA(prediction_intervals=ConformalIntervals(2, 1))
test_class(croston_sba, x=ap, h=12, skip_insample=False, level=[80])
test_class(croston_sba, x=deg_ts, h=12, skip_insample=False, level=[80])
fcst_croston_sba = croston_sba.forecast(ap, 12)

_test_fitted_sparse(CrostonSBA)
# 测试保形预测
croston_sba_c = CrostonSBA(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(croston_sba_c, x=ap, h=13, level=[90, 80], skip_insample=False)
fcst_croston_sba_c = croston_sba_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_croston_sba_c['mean'][:12],
    fcst_croston_sba['mean'],
)
_plot_insample_pi(fcst_croston_sba_c)
_plot_fcst(fcst_croston_sba_c)
#测试别名参数
test_eq(
    repr(CrostonSBA()),
    'CrostonSBA'
)
test_eq(
    repr(CrostonSBA(alias='CrostonSBA_custom')),
    'CrostonSBA_custom'
)
show_doc(CrostonSBA, title_level=3)
show_doc(CrostonSBA.forecast, title_level=3)
show_doc(CrostonSBA.fit, title_level=3)
show_doc(CrostonSBA.predict, title_level=3)
from statsforecast.models import CrostonSBA
from statsforecast.utils import AirPassengers as ap
# CrostonSBA 的使用示例
model = CrostonSBA()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

IMAPA

def _imapa(
    y: np.ndarray, # 时间序列
    h: int, # 预测范围
    fitted: bool, # 拟合值
):
    if (y == 0).all():
        res = {'mean': np.zeros(h, dtype=y.dtype)}
        if fitted:
            res['fitted'] = np.zeros_like(y)
            res['fitted'][0] = np.nan
        return res
    y = _ensure_float(y)        
    y_intervals = _intervals(y)
    mean_interval = y_intervals.mean().item()
    max_aggregation_level = round(mean_interval)
    forecasts = np.empty(max_aggregation_level, dtype=y.dtype)
    for aggregation_level in range(1, max_aggregation_level + 1):
        lost_remainder_data = len(y) % aggregation_level
        y_cut = y[lost_remainder_data:]
        aggregation_sums = _chunk_sums(y_cut, aggregation_level)
        forecast, _ = _optimized_ses_forecast(aggregation_sums)
        forecasts[aggregation_level - 1] = forecast / aggregation_level
    forecast = forecasts.mean()
    res = {'mean': _repeat_val(val=forecast, h=h)}
    if fitted:
        warnings.warn("Computing fitted values for IMAPA is very expensive.")
        fitted_vals = np.empty_like(y)
        fitted_vals[0] = np.nan
        for i in range(y.size - 1):
            fitted_vals[i + 1] = _imapa(y[:i+1], h=1, fitted=False)['mean'].item()
        res['fitted'] = fitted_vals
    return res
class IMAPA(_TS):
    
    def __init__(self, alias: str = 'IMAPA', prediction_intervals: Optional[ConformalIntervals] = None,):
        r"""IMAPA model.

        Intermittent Multiple Aggregation Prediction Algorithm: Similar to ADIDA, but instead of
        using a single aggregation level, it considers multiple in order to capture different
        dynamics of the data. Uses the optimized SES to generate the forecasts at the new levels
        and then combines them using a simple average.
        
        References
        ----------
        [Syntetos, A. A., & Boylan, J. E. (2021). Intermittent demand forecasting: Context, methods and applications. John Wiley & Sons.](https://www.ifors.org/intermittent-demand-forecasting-context-methods-and-applications/).

        Parameters
        ----------
        alias : str 
            Custom name of the model. 
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            By default, the model will compute the native prediction
            intervals.
        """
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the IMAPA model.

        Fit an IMAPA to a time series (numpy array) `y`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
            
        Returns
        -------
        self : 
            IMAPA fitted model.
        """
        y = _ensure_float(y)
        self.model_ = _imapa(y=y, h=1, fitted=False)
        self._y = y
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted IMAPA.

        Parameters
        ----------
        h : int 
            Forecast horizon.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        X : array-like
            Optional exogenous of shape (h, n_x).
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        """
        mean = _repeat_val(val=self.model_['mean'][0], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception(
                "You have to instantiate the class with `prediction_intervals`"
                "to calculate them"
            )
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted IMAPA insample predictions.

        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        fitted = _imapa(y=self._y, h=1, fitted=True)['fitted']
        res = {'fitted': fitted}
        if level is not None:
            sigma = _calculate_sigma(self._y - fitted, self._y.size)
            res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res
        
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient IMAPA predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (n, ). 
        h : int
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _imapa(y=y, h=h, fitted=fitted)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        else:
            raise Exception(
                "You have to instantiate the class with `prediction_intervals`"
                "to calculate them"
            )
        if fitted:
            sigma = _calculate_sigma(y - res['fitted'], y.size)
            res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res
imapa = IMAPA(prediction_intervals=ConformalIntervals(2, 1))
test_class(imapa, x=ap, h=12, skip_insample=False, level=[80])
test_class(imapa, x=deg_ts, h=12, skip_insample=False, level=[80])
fcst_imapa = imapa.forecast(ap, 12)

_test_fitted_sparse(IMAPA)
# 测试一致性预测
imapa_c = IMAPA(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(imapa_c, x=ap, h=13, level=[90, 80], skip_insample=False)
fcst_imapa_c = imapa_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_imapa_c['mean'][:12],
    fcst_imapa['mean'],
)
_plot_insample_pi(fcst_imapa_c)
_plot_fcst(fcst_imapa_c)
#测试别名参数
test_eq(
    repr(IMAPA()),
    'IMAPA'
)
test_eq(
    repr(IMAPA(alias='IMAPA_custom')),
    'IMAPA_custom'
)
show_doc(IMAPA, title_level=3)
show_doc(IMAPA.forecast, title_level=3)
show_doc(IMAPA.fit, title_level=3)
show_doc(IMAPA.predict, title_level=3)
from statsforecast.models import IMAPA
from statsforecast.utils import AirPassengers as ap
# IMAPA的使用示例
model = IMAPA()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

TSB

def _tsb(
    y: np.ndarray, # 时间序列
    h: int, # 预测时间范围
    fitted: int, # 拟合值
    alpha_d: float,
    alpha_p: float,
) -> Dict[str, np.ndarray]:
    if (y == 0).all():
        res = {'mean': np.zeros(h, dtype=y.dtype)}
        if fitted:
            res['fitted'] = np.zeros_like(y)
            res['fitted'][0] = np.nan
        return res
    y = _ensure_float(y)
    yd = _demand(y)
    yp = _probability(y)
    ypf, ypft = _ses_forecast(yp, alpha_p)
    ydf, ydft = _ses_forecast(yd, alpha_d)
    res = {'mean': _repeat_val(val=ypf * ydf, h=h)}
    if fitted:
        ydft = _expand_fitted_demand(np.append(ydft, ydf), y)
        res['fitted'] = ypft * ydft
    return res
class TSB(_TS):
    
    def __init__(
        self, 
        alpha_d: float,
        alpha_p: float,
        alias: str = 'TSB',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        r"""TSB model.

        Teunter-Syntetos-Babai: A modification of Croston's method that replaces the inter-demand
        intervals with the demand probability $d_t$, which is defined as follows.

        $$
        d_t = \begin{cases}
            1  & \text{if demand occurs at time t} \\
            0  & \text{otherwise.}
        \end{cases}
        $$

        Hence, the forecast is given by

        $$\hat{y}_t= \hat{d}_t\hat{z_t}$$

        Both $d_t$ and $z_t$ are forecasted using SES. The smooting paramaters of each may differ,
        like in the optimized Croston's method.

        References
        ----------
        [Teunter, R. H., Syntetos, A. A., & Babai, M. Z. (2011). Intermittent demand: Linking forecasting to inventory obsolescence. European Journal of Operational Research, 214(3), 606-615.](https://www.sciencedirect.com/science/article/abs/pii/S0377221711004437)

        Parameters
        ----------
        alpha_d : float
            Smoothing parameter for demand.
        alpha_p : float
            Smoothing parameter for probability.
        alias : str
            Custom name of the model.
        prediction_intervals : Optional[ConformalIntervals]
            Information to compute conformal prediction intervals.
            This is required for generating future prediction intervals.
        """
        self.alpha_d = alpha_d
        self.alpha_p = alpha_p
        self.alias = alias
        self.prediction_intervals = prediction_intervals
        self.only_conformal_intervals = True
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the TSB model.

        Fit an TSB to a time series (numpy array) `y`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 

        Returns
        -------
        self : 
            TSB fitted model.
        """
        y = _ensure_float(y)
        self.model_ = _tsb(
            y=y,
            h=1, 
            fitted=True, 
            alpha_d=self.alpha_d, 
            alpha_p=self.alpha_p
        )
        self.model_['sigma'] = _calculate_sigma(y - self.model_['fitted'], y.size)
        self._store_cs(y=y, X=X)
        return self
        
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted TSB.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """        
        mean = _repeat_val(self.model_['mean'][0], h=h)
        res = {'mean': mean}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted TSB insample predictions.
        
        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
        return res
        
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient TSB predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        res = _tsb(
            y=y, h=h, 
            fitted=fitted, 
            alpha_d=self.alpha_d, 
            alpha_p=self.alpha_p
        )
        res = dict(res)
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        if fitted:
            sigma = _calculate_sigma(y - res['fitted'], y.size)
            res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res            
tsb = TSB(alpha_d=0.9, alpha_p=0.1, prediction_intervals=ConformalIntervals(2, 1))
test_class(tsb, x=ap, h=12, skip_insample=False, level=[80])
test_class(tsb, x=deg_ts, h=12, skip_insample=False, level=[80])
fcst_tsb = tsb.forecast(ap, 12)

_test_fitted_sparse(lambda: TSB(alpha_d=0.9, alpha_p=0.1))
# 测试一致性预测
tsb_c = TSB(alpha_d=0.9, alpha_p=0.1,prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(tsb_c, x=ap, h=13, level=[90, 80], skip_insample=True)
fcst_tsb_c = tsb_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_tsb_c['mean'][:12],
    fcst_tsb['mean'],
)
_plot_insample_pi(fcst_tsb_c)
_plot_fcst(fcst_tsb_c)
#测试别名参数
test_eq(
    repr(TSB(0.9, 0.1)),
    'TSB'
)
test_eq(
    repr(TSB(0.9, 0.1, alias='TSB_custom')),
    'TSB_custom'
)
show_doc(TSB, title_level=3)
show_doc(TSB.forecast, title_level=3)
show_doc(TSB.fit, title_level=3)
show_doc(TSB.predict, title_level=3)
from statsforecast.models import TSB
from statsforecast.utils import AirPassengers as ap
# TSB的使用示例
model = TSB(alpha_d=0.5, alpha_p=0.5)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

多重季节性

MSTL

def _predict_mstl_components(mstl_ob, h, season_length):
    seasoncolumns = mstl_ob.filter(regex='seasonal*').columns
    nseasons = len(seasoncolumns)
    seascomp = np.full((h, nseasons), np.nan)
    seasonal_periods = [season_length] if isinstance(season_length, int) else season_length
    for i in range(nseasons):
        mp = seasonal_periods[i]
        colname = seasoncolumns[i]
        seascomp[:, i] = np.tile(mstl_ob[colname].values[-mp:], trunc(1 + (h-1)/mp))[:h]
    return seascomp

def _predict_mstl_seas(mstl_ob, h, season_length):
    seascomp = _predict_mstl_components(mstl_ob, h, season_length)
    return seascomp.sum(axis=1)
class MSTL(_TS):
    r"""MSTL model.
    
    The MSTL (Multiple Seasonal-Trend decomposition using LOESS) decomposes the time series
    in multiple seasonalities using LOESS. Then forecasts the trend using 
    a custom non-seaonal model and each seasonality using a SeasonalNaive model.
    
    References
    ----------
    [Bandara, Kasun & Hyndman, Rob & Bergmeir, Christoph. (2021). "MSTL: A Seasonal-Trend Decomposition Algorithm for Time Series with Multiple Seasonal Patterns".](https://arxiv.org/abs/2107.13462).
    
    Parameters
    ----------
    season_length : Union[int, List[int] 
        Number of observations per unit of time. For multiple seasonalities use a list.
    trend_forecaster : model, default=AutoETS(model='ZZN')
        StatsForecast model used to forecast the trend component.
    stl_kwargs : dict
        Extra arguments to pass to [`statsmodels.tsa.seasonal.STL`](https://www.statsmodels.org/dev/generated/statsmodels.tsa.seasonal.STL.html#statsmodels.tsa.seasonal.STL).
        The `period` and `seasonal` arguments are reserved.
    alias : str
        Custom name of the model.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    
    def __init__(
        self, 
        season_length: Union[int, List[int]],
        trend_forecaster = AutoETS(model='ZZN'),
        stl_kwargs: Optional[Dict] = None,
        alias: str = 'MSTL',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):  
        # 检查ETS模型是否具有季节性
        if repr(trend_forecaster) == 'AutoETS':
            if trend_forecaster.model[2] != 'N':
                raise Exception(
                    'Trend forecaster should not adjust '
                    'seasonal models.'
                )
        # 检查趋势预测器是否具有 season_length=1
        if hasattr(trend_forecaster, 'season_length'):
            if trend_forecaster.season_length != 1:
                raise Exception(
                    'Trend forecaster should not adjust '
                    'seasonal models. Please pass `season_length=1` '
                    'to your trend forecaster'
                )
        if isinstance(season_length, int):
            season_length = [season_length]
        else:
            season_length = sorted(season_length)
        self.season_length = season_length
        self.trend_forecaster = trend_forecaster
        self.prediction_intervals = prediction_intervals
        self.alias = alias

        if self.trend_forecaster.prediction_intervals is None and (self.prediction_intervals is not None):
            self.trend_forecaster.prediction_intervals = prediction_intervals
        self.stl_kwargs = dict() if stl_kwargs is None else stl_kwargs

    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the MSTL model.

        Fit MSTL to a time series (numpy array) `y`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 
        X: array-like 
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self : 
            MSTL fitted model.
        """
        y = _ensure_float(y)
        self.model_ = mstl(
            x=y, 
            period=self.season_length,
            stl_kwargs=self.stl_kwargs,
        )
        x_sa = self.model_[['trend', 'remainder']].sum(axis=1).values
        self.trend_forecaster = self.trend_forecaster.new().fit(y=x_sa, X=X)
        self._store_cs(y=x_sa, X=X)
        return self
        
    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ):
        r"""Predict with fitted MSTL.

        Parameters
        ----------
        h : int  
            Forecast horizon.
        X : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        kwargs: Dict[str, Any] = {'h': h, 'X': X}
        if self.trend_forecaster.prediction_intervals is None:
            kwargs['level'] = level
        res = self.trend_forecaster.predict(**kwargs)
        seas = _predict_mstl_seas(self.model_, h=h, season_length=self.season_length)
        res = {key: val + seas for key, val in res.items()}
        if level is None or self.trend_forecaster.prediction_intervals is None:
            return res
        level = sorted(level)
        if self.trend_forecaster.prediction_intervals is not None:
            res = self.trend_forecaster._add_predict_conformal_intervals(res, level)
        else:
            raise Exception(
                "You have to instantiate either the trend forecaster class or MSTL class with `prediction_intervals` to calculate them"
            )
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted MSTL insample predictions.

        Parameters
        ----------
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        res = self.trend_forecaster.predict_in_sample(level=level)
        seas = self.model_.filter(regex='seasonal*').sum(axis=1).values
        res = {key: val + seas for key, val in res.items()}
        return res
        
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient MSTL predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        model_ = mstl(
            x=y, 
            period=self.season_length,
            stl_kwargs=self.stl_kwargs,
        )
        x_sa = model_[['trend', 'remainder']].sum(axis=1).values
        kwargs = {
            'y': x_sa,
            'h': h,
            'X': X,
            'X_future': X_future,
            'fitted': fitted
        }
        if fitted or self.trend_forecaster.prediction_intervals is None:
            kwargs['level'] = level
        res = self.trend_forecaster.forecast(**kwargs)
        if level is not None:
            level = sorted(level)
            if self.trend_forecaster.prediction_intervals is not None:
                res = self.trend_forecaster._add_conformal_intervals(fcst=res, y=x_sa, X=X, level=level)
            elif f'lo-{level[0]}' not in res:
                raise Exception(
                    "You have to instantiate either the trend forecaster class or MSTL class with `prediction_intervals` to calculate them"
                )        
        #重新季节性调整结果
        seas_h = _predict_mstl_seas(model_, h=h, season_length=self.season_length)
        seas_insample = model_.filter(regex='seasonal*').sum(axis=1).values
        res = {
            key: val + (seas_insample if 'fitted' in key else seas_h) \
            for key, val in res.items()
        }
        return res

    def forward(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Apply fitted MSTL model to a new time series.

        Parameters 
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool 
            Whether or not to return insample predictions. 

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        if not hasattr(self.trend_forecaster, 'model_'):
            raise Exception('You have to use the `fit` method first')
        y = _ensure_float(y)
        model_ = mstl(
            x=y, 
            period=self.season_length,
            stl_kwargs=self.stl_kwargs,
        )
        x_sa = model_[['trend', 'remainder']].sum(axis=1).values
        kwargs = {
            'y': x_sa,
            'h': h,
            'X': X,
            'X_future': X_future,
            'fitted': fitted
        }
        if fitted or self.trend_forecaster.prediction_intervals is None:
            kwargs['level'] = level
        res = self.trend_forecaster.forward(**kwargs)
        if level is not None:
            level = sorted(level)
            if self.trend_forecaster.prediction_intervals is not None:
                res = self.trend_forecaster._add_conformal_intervals(fcst=res, y=x_sa, X=X, level=level)        
        #重新季节性调整结果
        seas_h = _predict_mstl_seas(model_, h=h, season_length=self.season_length)
        seas_insample = model_.filter(regex='seasonal*').sum(axis=1).values
        res = {
            key: val + (seas_insample if 'fitted' in key else seas_h) \
            for key, val in res.items()
        }
        return res
trend_forecasters = [
    AutoARIMA(), 
    AutoCES(), 
    AutoETS(model='ZZN'),
    Naive(),
    CrostonClassic(),
]
skip_insamples = [False, True, False, False, True]
test_forwards = [False, True, True, False, False]
for trend_forecaster, skip_insample, test_forward in zip(trend_forecasters, skip_insamples, test_forwards):
    for stl_kwargs in [None, dict(trend=25)]:
        mstl_model = MSTL(
            season_length=[12, 14], 
            trend_forecaster=trend_forecaster,
            stl_kwargs=stl_kwargs,
        )
        test_class(mstl_model, x=ap, h=12, 
                   skip_insample=skip_insample,
                   level=None,
                   test_forward=test_forward)
# 有保形与无保形的间隔
# 趋势预测支持级别,使用原生级别
mstl_native = MSTL(season_length=12, trend_forecaster=ARIMA(order=(0, 1, 0)))
res_native_fp = pd.DataFrame(mstl_native.fit(y=ap).predict(h=24, level=[80, 95]))
res_native_fc = pd.DataFrame(mstl_native.forecast(y=ap, h=24, level=[80, 95]))
pd.testing.assert_frame_equal(res_native_fp, res_native_fc)

# 趋势预测支持级别,使用保形方法。
mstl_conformal = MSTL(
    season_length=12,
    trend_forecaster=ARIMA(
        order=(0, 1, 0),
        prediction_intervals=ConformalIntervals(h=24),
    ),
)
res_conformal_fp = pd.DataFrame(mstl_conformal.fit(y=ap).predict(h=24, level=[80, 95]))
res_conformal_fc = pd.DataFrame(mstl_conformal.forecast(y=ap, h=24, level=[80, 95]))
pd.testing.assert_frame_equal(res_conformal_fp, res_conformal_fc)
test_fail(lambda: pd.testing.assert_frame_equal(test_native_fp, test_conformal_fp))

# trend fcst doesn't support level
mstl_bad = MSTL(season_length=12, trend_forecaster=CrostonClassic())
test_fail(lambda: mstl_bad.fit(y=ap).predict(h=24, level=[80, 95]), contains='prediction_intervals')
test_fail(lambda: mstl_bad.forecast(y=ap, h=24, level=[80, 95]), contains='prediction_intervals')
# 保形预测
# 定义趋势预测器中的预测区间
trend_forecasters = [
    AutoARIMA(prediction_intervals=ConformalIntervals(h=13, n_windows=2)),
    AutoCES(prediction_intervals=ConformalIntervals(h=13, n_windows=2)),
]
skip_insamples = [False, True]
test_forwards = [False, True]
for trend_forecaster, skip_insample, test_forward in zip(trend_forecasters, skip_insamples, test_forwards):
    for stl_kwargs in [None, dict(trend=25)]:
        mstl_model = MSTL(
            season_length=[12, 14], 
            trend_forecaster=trend_forecaster,
            stl_kwargs=stl_kwargs,
        )
        test_class(mstl_model, x=ap, h=13, 
                   skip_insample=skip_insample,
                   level=[80, 90] if not skip_insample else None,
                   test_forward=test_forward)
        _plot_fcst(mstl_model.forecast(ap, 13, None, None, (80,95), False))
# 保形预测
# 定义MSTL中的prediction_interval
trend_forecasters = [
    AutoCES()
]
for stl_kwargs in [None, dict(trend=25)]:
    mstl_model = MSTL(
        season_length=[12, 14], 
        trend_forecaster=trend_forecaster,
        stl_kwargs=stl_kwargs,
        prediction_intervals=ConformalIntervals(h=13, n_windows=2)
    )
    test_class(mstl_model, x=ap, h=13, 
                skip_insample=False,
                level=[80, 90] if not skip_insample else None,
                test_forward=True)
    _plot_fcst(mstl_model.forecast(ap, 13, None, None, (80,95), False))
#失败的季节性趋势预测者
test_fail(
    MSTL,
    contains='should not adjust seasonal',
    args=([3, 12], AutoETS(model='ZZZ'))
)
test_fail(
    MSTL,
    contains='should not adjust seasonal',
    args=([3, 12], AutoARIMA(season_length=12))
)
#测试别名参数
test_eq(
    repr(MSTL(season_length=7)),
    'MSTL'
)
test_eq(
    repr(MSTL(season_length=7, alias='MSTL_custom')),
    'MSTL_custom'
)
show_doc(MSTL, title_level=3)
show_doc(MSTL.fit, title_level=3)
show_doc(MSTL.predict, title_level=3)
show_doc(MSTL.predict_in_sample, title_level=3)
show_doc(MSTL.forecast, title_level=3)
show_doc(MSTL.forward, title_level=3)
from statsforecast.models import MSTL
from statsforecast.utils import AirPassengers as ap
# MSTL的使用示例
mstl_model = MSTL(season_length=[3, 12], trend_forecaster=AutoARIMA(prediction_intervals=ConformalIntervals(h=4, n_windows=2)))
mstl_model = mstl_model.fit(y=ap)
y_hat_dict = mstl_model.predict(h=4, level=[80])
y_hat_dict

TBATS

class TBATS(_TS):
    r"""Trigonometric Box-Cox transform, ARMA errors, Trend and Seasonal components (TBATS) model.

    TBATS is an innovations state space model framework used for forecasting time series with multiple seasonalities. It uses a Box-Cox tranformation, ARMA errors, and a trigonometric representation of the seasonal patterns based on Fourier series.

    The name TBATS is an acronym for the key features of the model: Trigonometric, Box-Cox transform, ARMA errors, Trend, and Seasonal components. 
    
    References
    ----------
    - [De Livera, A. M., Hyndman, R. J., & Snyder, R. D. (2011). Forecasting time series with complex seasonal patterns using exponential smoothing. Journal of the American statistical association, 106(496), 1513-1527.](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=f3de25596ab60ef0e886366826bf58a02b35a44f)

    - [De Livera, Alysha M (2017). Modeling time series with complex seasonal patterns using exponential smoothing. Monash University. Thesis.](https://doi.org/10.4225/03/589299681de3d)

    Parameters
    ----------
    season_length : int or list of int.
        Number of observations per unit of time. Ex: 24 Hourly data.
    use_boxcox : bool (default=True)
        Whether or not to use a Box-Cox transformation.
    bc_lower_bound : float (default=0.0)
        Lower bound for the Box-Cox transformation.
    bc_upper_bound : float (default=1.0)
        Upper bound for the Box-Cox transformation.    
    use_trend : bool (default=True)
        Whether or not to use a trend component.
    use_damped_trend : bool (default=False)
        Whether or not to dampen the trend component.
    use_arma_errors : bool (default=False)
        Whether or not to use a ARMA errors. 
    alias : str 
        Custom name of the model. 
    """

    @_old_kw_to_pos(['seasonal_periods'], [1])
    def __init__(
        self, 
        season_length: Union[int, List[int]], 
        use_boxcox: Optional[bool] = True, 
        bc_lower_bound: float = 0.0,
        bc_upper_bound: float = 1.0,
        use_trend: Optional[bool] = True,
        use_damped_trend: Optional[bool] = False, 
        use_arma_errors: bool = False,  
        alias: str = 'TBATS',
        *,
        seasonal_periods=None,  # noqa:ARG002
    ):
        if isinstance(season_length, int):
            season_length = [season_length]
        self.season_length = list(season_length)
        self.use_boxcox = use_boxcox
        self.bc_lower_bound = bc_lower_bound
        self.bc_upper_bound = bc_upper_bound
        self.use_trend = use_trend
        self.use_damped_trend = use_damped_trend
        self.use_arma_errors = use_arma_errors
        self.alias = alias
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None
    ):
        r"""Fit TBATS model.

        Fit TBATS model to a time series (numpy array) `y`.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ).
        X : numpy.array, optional (default=None)
            Ignored

        Returns
        -------
        self :
            TBATS model.
        """
        y = _ensure_float(y)
        self.model_ = tbats_selection(
            y=y,
            seasonal_periods=self.season_length,
            use_boxcox=self.use_boxcox,
            bc_lower_bound=self.bc_lower_bound,
            bc_upper_bound=self.bc_upper_bound,
            use_trend=self.use_trend,
            use_damped_trend=self.use_damped_trend,
            use_arma_errors=self.use_arma_errors
        )
        return self
    
    def predict(
        self, 
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None
    ):
        r"""Predict with fitted TBATS model.

        Parameters
        ----------
        h : int
            Forecast horizon.
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        fcst = tbats_forecast(self.model_, h)
        res = {'mean': fcst['mean']}
        if level is not None:
            level = sorted(level)
            sigmah = _compute_sigmah(self.model_, h)
            pred_int = _calculate_intervals(res, level, h, sigmah)
            res = {**res, **pred_int}
        if self.model_['BoxCox_lambda'] is not None:
            res_trans = {k: inv_boxcox(v, self.model_['BoxCox_lambda']) for k, v in res.items()}
            for k, v in res_trans.items():
                res_trans[k] = np.where(np.isnan(v), res[k], v)
        else: 
            res_trans = res
        return res_trans
    
    def predict_in_sample(self, level: Optional[Tuple[int]] = None):
        r"""Access fitted TBATS model predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        
        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted'].ravel()}
        if level is not None:
            se = _calculate_sigma(self.model_['errors'], self.model_['errors'].shape[1])
            fitted_pred_int = _add_fitted_pi(res, se, level)
            res = {**res, **fitted_pred_int}
        if self.model_['BoxCox_lambda'] is not None:
            res_trans = {k: inv_boxcox(v, self.model_['BoxCox_lambda']) for k, v in res.items()}
            for k, v in res_trans.items():
                res_trans[k] = np.where(np.isnan(v), res[k], v)
        else:
            res_trans = res
        return res_trans

    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool=False
    ): 
        r"""Memory Efficient TBATS model.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (n, ).
        h : int
            Forecast horizon.
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        mod = tbats_selection(
            y=y,
            seasonal_periods=self.season_length,
            use_boxcox=self.use_boxcox,
            bc_lower_bound=self.bc_lower_bound,
            bc_upper_bound=self.bc_upper_bound,
            use_trend=self.use_trend,
            use_damped_trend=self.use_damped_trend,
            use_arma_errors=self.use_arma_errors
        )
        fcst = tbats_forecast(mod, h)
        res = {'mean': fcst['mean']}
        if fitted:
            res['fitted'] = mod['fitted'].ravel()
        if level is not None:
            level = sorted(level)
            sigmah = _compute_sigmah(mod, h)
            pred_int = _calculate_intervals(res, level, h, sigmah)
            res = {**res, **pred_int}
            if fitted:
                se = _calculate_sigma(mod['errors'], mod['errors'].shape[1])
                fitted_pred_int = _add_fitted_pi(res, se, level)
                res = {**res, **fitted_pred_int}
        if mod['BoxCox_lambda'] is not None:
            res_trans = {k : inv_boxcox(v, mod['BoxCox_lambda']) for k, v in res.items()}
            for k, v in res_trans.items():
                res_trans[k] = np.where(np.isnan(v), res[k], v)
        else:
            res_trans = res
        return res_trans 
tbats = TBATS(season_length=12)
test_class(tbats, x=ap, h=12, level=[90, 80])
show_doc(TBATS, title_level=3)
show_doc(TBATS.fit, title_level=3)
show_doc(TBATS.predict, title_level=3)
show_doc(TBATS.predict_in_sample, title_level=3)
show_doc(TBATS.forecast, title_level=3)

AutoTBATS

class AutoTBATS(TBATS):
    r"""AutoTBATS model.

    Automatically selects the best TBATS model from all feasible combinations of the parameters use_boxcox, use_trend, use_damped_trend, and use_arma_errors. 
    Selection is made using the AIC. 
    Default value for use_arma_errors is True since this enables the evaluation of models with and without ARMA errors.  
    
    References
    ----------
    - [De Livera, A. M., Hyndman, R. J., & Snyder, R. D. (2011). Forecasting time series with complex seasonal patterns using exponential smoothing. Journal of the American statistical association, 106(496), 1513-1527.](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=f3de25596ab60ef0e886366826bf58a02b35a44f)

    - [De Livera, Alysha M (2017). Modeling time series with complex seasonal patterns using exponential smoothing. Monash University. Thesis.](https://doi.org/10.4225/03/589299681de3d)

    Parameters
    ----------
    seasonal_periods : int or list of int.
        Number of observations per unit of time. Ex: 24 Hourly data.
    use_boxcox : bool (default=None)
        Whether or not to use a Box-Cox transformation. By default tries both. 
    bc_lower_bound : float (default=0.0)
        Lower bound for the Box-Cox transformation.
    bc_upper_bound : float (default=1.0)
        Upper bound for the Box-Cox transformation.    
    use_trend : bool (default=None)
        Whether or not to use a trend component. By default tries both. 
    use_damped_trend : bool (default=None)
        Whether or not to dampen the trend component. By default tries both.
    use_arma_errors : bool (default=True)
        Whether or not to use a ARMA errors. Default is True and this evaluates both models. 
    alias : str 
        Custom name of the model. 
    """
    @_old_kw_to_pos(['seasonal_periods'], [1])
    def __init__(
        self, 
        season_length: Union[int, List[int]], 
        use_boxcox: Optional[bool] = None, 
        bc_lower_bound: float = 0.0,
        bc_upper_bound: float = 1.0,
        use_trend: Optional[bool] = None,
        use_damped_trend: Optional[bool] = None, 
        use_arma_errors: bool = True,  
        alias: str = 'AutoTBATS',
        *,
        seasonal_periods=None  # noqa:ARG002
    ):
        super().__init__(
            season_length=season_length, 
            use_boxcox=use_boxcox, 
            bc_lower_bound=bc_lower_bound,
            bc_upper_bound=bc_upper_bound,
            use_trend=use_trend, 
            use_damped_trend=use_damped_trend, 
            use_arma_errors=use_arma_errors, 
            alias=alias
        )
tbats = AutoTBATS(season_length=12)
test_class(tbats, x=ap, h=12, level=[90, 80])
fcst_tbats = tbats.forecast(ap, 13, None, None, (80,95), True)
_plot_fcst(fcst_tbats)
_plot_insample_pi(fcst_tbats)
show_doc(AutoTBATS, title_level=3)
show_doc(AutoTBATS.fit, title_level=3)
show_doc(AutoTBATS.predict, title_level=3)
show_doc(AutoTBATS.predict_in_sample, title_level=3)
show_doc(AutoTBATS.forecast, title_level=3)

Θ家族

标准Theta方法

class Theta(AutoTheta): 
    r""" Standard Theta Method. 

    References
    ----------
    [Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)

    Parameters
    ----------
    season_length : int 
        Number of observations per unit of time. Ex: 24 Hourly data.
    decomposition_type : str 
        Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
    alias : str 
        Custom name of the model.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    def __init__(
        self, 
        season_length: int = 1, 
        decomposition_type: str = 'multiplicative',
        alias: str = 'Theta',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ): 
        super().__init__(
            season_length=season_length, 
            model='STM', 
            decomposition_type=decomposition_type, 
            alias=alias,
            prediction_intervals=prediction_intervals,
        )
stm = Theta(season_length=12)
fcast_stm = stm.forecast(ap,12)

theta = AutoTheta(season_length=12, model='STM')
fcast_theta = theta.forecast(ap,12)

np.testing.assert_equal(
    fcast_theta, 
    fcast_stm
)
# 测试一致性预测
stm_c = Theta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5)) 
test_class(stm_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_stm_c = stm_c.forecast(ap, 13, None, None, (80,95), True)

test_eq(
    fcst_stm_c['mean'][:12],
    fcast_stm['mean'],
)
stm = Theta(season_length=12)
stm.fit(ap)
fcast_stm = stm.predict(12)
forward_stm = stm.forward(y=ap, h=12)

theta = AutoTheta(season_length=12, model='STM')
fcast_theta = theta.forecast(ap,12)
theta.fit(ap)
forward_autotheta = theta.forward(y=ap, h=12)

np.testing.assert_equal(
    fcast_theta, 
    fcast_stm,
)
np.testing.assert_equal(
    forward_stm,
    forward_autotheta
)
#测试别名参数
test_eq(
    repr(Theta()),
    'Theta'
)
test_eq(
    repr(Theta(alias='Theta_custom')),
    'Theta_custom'
)
show_doc(Theta, title_level=3)
show_doc(Theta.forecast, name='Theta.forecast', title_level=3)
show_doc(Theta.fit, name='Theta.fit', title_level=3)
show_doc(Theta.predict, name='Theta.predict', title_level=3)
show_doc(Theta.predict_in_sample, name='Theta.predict_in_sample', title_level=3)
show_doc(Theta.forward, name='Theta.forward', title_level=3)
from statsforecast.models import Theta
from statsforecast.utils import AirPassengers as ap
# Theta的使用示例
model = Theta(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

优化的Theta方法

class OptimizedTheta(AutoTheta): 
    r""" Optimized Theta Method. 

    References
    ----------
    [Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)

    Parameters
    ----------
    season_length : int 
        Number of observations per unit of time. Ex: 24 Hourly data.
    decomposition_type : str 
        Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
    alias : str 
        Custom name of the model. Default `OptimizedTheta`.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    def __init__(
        self, 
        season_length: int = 1, 
        decomposition_type: str = 'multiplicative',
        alias: str = 'OptimizedTheta',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ): 
        super().__init__(
            season_length=season_length, 
            model='OTM', 
            decomposition_type=decomposition_type, 
            alias=alias,
            prediction_intervals=prediction_intervals,
        )
otm = OptimizedTheta(season_length=12)
fcast_otm = otm.forecast(ap,12)
otm.fit(ap)
forward_otm = otm.forward(y=ap, h=12)

theta = AutoTheta(season_length=12, model='OTM')
fcast_theta = theta.forecast(ap,12)
theta.fit(ap)
forward_autotheta = theta.forward(y=ap, h=12)

np.testing.assert_equal(
    fcast_theta, 
    fcast_otm
)

np.testing.assert_equal(
    forward_autotheta, 
    forward_otm
)
# 测试一致性预测
otm_c = OptimizedTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5)) 
test_class(otm_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_otm_c = otm_c.forecast(ap, 13, None, None, (80,95), True)

test_eq(
    fcst_otm_c['mean'][:12],
    fcast_otm['mean'],
)
# 隐藏 
otm = OptimizedTheta(season_length=12)
otm.fit(ap)
fcast_otm = otm.predict(12)

theta = AutoTheta(season_length=12, model='OTM')
fcast_theta = theta.forecast(ap,12)

np.testing.assert_equal(
    fcast_theta, 
    fcast_otm
)
#测试别名参数
test_eq(
    repr(OptimizedTheta()),
    'OptimizedTheta'
)
test_eq(
    repr(OptimizedTheta(alias='OptimizedTheta_custom')),
    'OptimizedTheta_custom'
)
show_doc(OptimizedTheta, title_level=3)
show_doc(OptimizedTheta.forecast, name='OptimizedTheta.forecast', title_level=3)
show_doc(OptimizedTheta.fit, name='OptimizedTheta.fit', title_level=3)
show_doc(OptimizedTheta.predict, name='OptimizedTheta.predict', title_level=3)
show_doc(OptimizedTheta.predict_in_sample, name='OptimizedTheta.predict_in_sample', title_level=3)
show_doc(OptimizedTheta.forward, name='OptimizedTheta.forward', title_level=3)
from statsforecast.models import OptimizedTheta
from statsforecast.utils import AirPassengers as ap
# OptimzedThetA 的使用示例
model = OptimizedTheta(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

动态标准Theta方法

class DynamicTheta(AutoTheta): 
    r""" Dynamic Standard Theta Method. 

    References
    ----------
    [Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)

    Parameters
    ----------
    season_length : int 
        Number of observations per unit of time. Ex: 24 Hourly data.
    decomposition_type : str 
        Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
    alias : str 
        Custom name of the model.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    def __init__(
        self, 
        season_length: int = 1, 
        decomposition_type: str = 'multiplicative',
        alias: str = 'DynamicTheta',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ): 
        super().__init__(
            season_length=season_length, 
            model='DSTM', 
            decomposition_type=decomposition_type, 
            alias=alias,
            prediction_intervals=prediction_intervals,
        )
dstm = DynamicTheta(season_length=12)
fcast_dstm = dstm.forecast(ap,12)
dstm.fit(ap)
forward_dstm = dstm.forward(y=ap, h=12)

theta = AutoTheta(season_length=12, model='DSTM')
fcast_theta = theta.forecast(ap,12)
theta.fit(ap)
forward_autotheta = theta.forward(y=ap, h=12)

np.testing.assert_equal(
    fcast_theta, 
    fcast_dstm
)

np.testing.assert_equal(
    forward_autotheta, 
    forward_dstm
)
dstm = DynamicTheta(season_length=12)
dstm.fit(ap)
fcast_dstm = dstm.predict(12)

theta = AutoTheta(season_length=12, model='DSTM')
fcast_theta = theta.forecast(ap,12)

np.testing.assert_equal(
    fcast_theta, 
    fcast_dstm
)
# 测试一致性预测
dstm_c = DynamicTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5)) 
test_class(dstm_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_dstm_c = dstm_c.forecast(ap, 13, None, None, (80,95), True)

test_eq(
    fcst_dstm_c['mean'][:12],
    fcast_dstm['mean'],
)
#测试别名参数
test_eq(
    repr(DynamicTheta()),
    'DynamicTheta'
)
test_eq(
    repr(DynamicTheta(alias='DynamicTheta_custom')),
    'DynamicTheta_custom'
)
show_doc(DynamicTheta, title_level=3)
show_doc(DynamicTheta.forecast, name='DynamicTheta.forecast', title_level=3)
show_doc(DynamicTheta.fit, name='DynamicTheta.fit', title_level=3)
show_doc(DynamicTheta.predict, name='DynamicTheta.predict', title_level=3)
show_doc(DynamicTheta.predict_in_sample, name='DynamicTheta.predict_in_sample', title_level=3)
show_doc(DynamicTheta.forward, name='DynamicTheta.forward', title_level=3)
from statsforecast.models import DynamicTheta
from statsforecast.utils import AirPassengers as ap
# DynStandardThetaMethod的使用示例
model = DynamicTheta(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

动态优化θ方法

class DynamicOptimizedTheta(AutoTheta): 
    r""" Dynamic Optimized Theta Method. 
    
    References
    ----------
    [Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)

    Parameters
    ----------
    season_length : int 
        Number of observations per unit of time. Ex: 24 Hourly data.
    decomposition_type : str 
        Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
    alias : str 
        Custom name of the model.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    def __init__(
        self, 
        season_length: int = 1, 
        decomposition_type: str = 'multiplicative',
        alias: str = 'DynamicOptimizedTheta',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ): 
        super().__init__(
            season_length=season_length, 
            model='DOTM', 
            decomposition_type=decomposition_type, 
            alias=alias,
            prediction_intervals=prediction_intervals,
        )
dotm = DynamicOptimizedTheta(season_length=12)
fcast_dotm = dotm.forecast(ap,12)
dotm.fit(ap)
forward_dotm = dotm.forward(y=ap, h=12)

theta = AutoTheta(season_length=12, model='DOTM')
fcast_theta = theta.forecast(ap,12)
theta.fit(ap)
forward_autotheta = theta.forward(y=ap, h=12)

np.testing.assert_equal(
    fcast_theta, 
    fcast_dotm
)
np.testing.assert_equal(
    forward_autotheta, 
    forward_dotm
)
dotm = DynamicOptimizedTheta(season_length=12)
dotm.fit(ap)
fcast_dotm = dotm.predict(12)

theta = AutoTheta(season_length=12, model='DOTM')
fcast_theta = theta.forecast(ap,12)

np.testing.assert_equal(
    fcast_theta, 
    fcast_dotm
)
# 测试保形预测
dotm_c = DynamicOptimizedTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5)) 
test_class(dotm_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_dotm_c = dotm_c.forecast(ap, 13, None, None, (80,95), True)

test_eq(
    fcst_dotm_c['mean'][:12],
    fcast_dotm['mean'],
)
#测试别名参数
test_eq(
    repr(DynamicOptimizedTheta()),
    'DynamicOptimizedTheta'
)
test_eq(
    repr(DynamicOptimizedTheta(alias='DynamicOptimizedTheta_custom')),
    'DynamicOptimizedTheta_custom'
)
show_doc(DynamicOptimizedTheta, title_level=3)
show_doc(DynamicOptimizedTheta.forecast, name='DynamicOptimizedTheta.forecast', title_level=3)
show_doc(DynamicOptimizedTheta.fit, name='DynamicOptimizedTheta.fit', title_level=3)
show_doc(DynamicOptimizedTheta.predict, name='DynamicOptimizedTheta.predict', title_level=3)
show_doc(DynamicOptimizedTheta.predict_in_sample, name='DynamicOptimizedTheta.predict_in_sample', title_level=3)
show_doc(DynamicOptimizedTheta.forward, name='DynamicOptimizedTheta.forward', title_level=3)
from statsforecast.models import DynamicOptimizedTheta
from statsforecast.utils import AirPassengers as ap
# OptimzedThetaMethod 的使用示例
model = DynamicOptimizedTheta(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

ARCH 家族

Garch 模型

class GARCH(_TS):
    r"""Generalized Autoregressive Conditional Heteroskedasticity (GARCH) model. 
    
    A method for modeling time series that exhibit non-constant volatility over time. 
    The GARCH model assumes that at time $t$, $y_t$ is given by: 
    
    $$y_t = v_t \sigma_t$$ 
    
    with 
    
    $$\sigma_t^2 = w + \sum_{i=1}^p a_i y_{t-i}^2 + \sum_{j=1}^q b_j \sigma_{t-j}^2$$. 
    
    Here $v_t$ is a sequence of iid random variables with zero mean and unit variance. 
    The coefficients $w$, $a_i$, $i=1,...,p$, and $b_j$, $j=1,...,q$ must satisfy the following conditions: 
    
    1. $w > 0$ and $a_i, b_j \geq 0$ for all $i$ and $j$. 
    2. $\sum_{k=1}^{max(p,q)} a_k + b_k < 1$. Here it is assumed that $a_i=0$ for $i>p$ and $b_j=0$ for $j>q$. 
    
    The ARCH model is a particular case of the GARCH model when $q=0$. 
    
    References
    ----------
    [Engle, R. F. (1982). Autoregressive conditional heteroscedasticity with estimates of the variance of United Kingdom inflation. Econometrica: Journal of the econometric society, 987-1007.](http://www.econ.uiuc.edu/~econ508/Papers/engle82.pdf) 
    
    [Bollerslev, T. (1986). Generalized autoregressive conditional heteroskedasticity. Journal of econometrics, 31(3), 307-327.](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=7da8bfa5295375c1141d797e80065a599153c19d)

    [James D. Hamilton. Time Series Analysis Princeton University Press, Princeton, New Jersey, 1st Edition, 1994.](https://press.princeton.edu/books/hardcover/9780691042893/time-series-analysis)

    Parameters
    ----------
    p : int 
        Number of lagged versions of the series. 
    q: int 
        Number of lagged versions of the volatility. 
    alias : str 
        Custom name of the model. 
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
    def __init__(
        self, 
        p: int = 1,
        q: int = 1,
        alias: str = 'GARCH',
        prediction_intervals: Optional[ConformalIntervals] = None,
    ):
        self.p = p
        self.q = q
        if q !=0: 
            self.alias = alias+'('+str(p)+','+str(q)+')'
        else: 
            self.alias = alias+'('+str(p)+')'
        self.prediction_intervals = prediction_intervals
    
    def fit(
        self,
        y: np.ndarray,
        X: Optional[np.ndarray] = None
    ):
        r"""Fit GARCH model.

        Fit GARCH model to a time series (numpy array) `y`.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (t, ). 

        Returns
        -------
        self : 
            GARCH model.
        """
        y = _ensure_float(y)
        self.model_ = garch_model(y, p=self.p, q=self.q)
        self.model_['actual_residuals'] = y - self.model_['fitted']
        self._store_cs(y, X)
        return self
    
    def predict(
        self,
        h: int, 
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None
    ):
        r"""Predict with fitted GARCH model.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like
            Optional exogenous of shape (h, n_x).
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        fcst = garch_forecast(self.model_, h)
        res = {'mean': fcst['mean'], 'sigma2': fcst['sigma2']}
        if level is None: 
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else: 
            quantiles = _quantiles(level)
            lo = res['mean'].reshape(-1, 1) - quantiles * res['sigma2'].reshape(-1, 1)
            hi = res['mean'].reshape(-1, 1) + quantiles * res['sigma2'].reshape(-1, 1)
            lo = lo[:, ::-1]
            lo = {f'lo-{l}': lo[:, i] for i, l in enumerate(reversed(level))}
            hi = {f'hi-{l}': hi[:, i] for i, l in enumerate(level)}
            res = {**res, **lo, **hi}
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted GARCH model predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            residuals = self.model_['actual_residuals']
            se = _calculate_sigma(residuals, len(residuals) - 1)
            res = _add_fitted_pi(res=res, se=se, level=level)
        return res
    
    def forecast(
        self,
        y: np.ndarray,
        h: int, 
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool=False 
    ):
        r"""Memory Efficient GARCH model.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int
            Forecast horizon.
        level : List[float] 
            Confidence levels (0-100) for prediction intervals.
        fitted : bool
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        mod = garch_model(y, p=self.p, q=self.q)
        fcst = garch_forecast(mod, h)
        keys = ['mean', 'sigma2']
        if fitted: 
            keys.append('fitted')
        res = {key: fcst[key] for key in keys}
        if level is not None: 
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_predict_conformal_intervals(res, level)
            else:
                quantiles = _quantiles(level)
                lo = res['mean'].reshape(-1, 1) - quantiles * res['sigma2'].reshape(-1, 1)
                hi = res['mean'].reshape(-1, 1) + quantiles * res['sigma2'].reshape(-1, 1)
                lo = lo[:, ::-1]
                lo = {f'lo-{l}': lo[:, i] for i, l in enumerate(reversed(level))}
                hi = {f'hi-{l}': hi[:, i] for i, l in enumerate(level)}
                res = {**res, **lo, **hi}
            if fitted: 
                se = _calculate_sigma(y - mod['fitted'], len(y) - 1)
                res = _add_fitted_pi(res=res, se=se, level=level)
        return res
# 生成GARCH(2,2)数据 
n = 1000 
w = 0.5
alpha = np.array([0.1, 0.2])
beta = np.array([0.4, 0.2])

y = generate_garch_data(n, w, alpha, beta) 

plt.figure(figsize=(10,4))
plt.plot(y)
garch = GARCH(2,2)
test_class(garch, x=y, h=12, skip_insample=False, level=[90,80])
fcst_garch = garch.forecast(ap, 12)
# 测试一致性预测
garch_c = GARCH(2,2,prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(garch_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_garch_c = garch_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_garch_c['mean'][:12],
    fcst_garch['mean']
)
_plot_fcst(fcst_garch_c)
h=100
fcst = garch.forecast(y, h=h, level=[80,95], fitted=True)
fig, ax = plt.subplots(1, 1, figsize = (20,7))
#plt.plot(np.arange(0, len(y)), y) 
plt.plot(np.arange(len(y), len(y) + h), fcst['mean'], label='mean')
plt.plot(np.arange(len(y), len(y) + h), fcst['sigma2'], color = 'c', label='sigma2')
plt.plot(np.arange(len(y), len(y) + h), fcst['lo-95'], color = 'r', label='lo-95')
plt.plot(np.arange(len(y), len(y) + h), fcst['hi-95'], color = 'r', label='hi-95')
plt.plot(np.arange(len(y), len(y) + h), fcst['lo-80'], color = 'g', label='lo-80')
plt.plot(np.arange(len(y), len(y) + h), fcst['hi-80'], color = 'g', label='hi-80')
plt.legend()
fig, ax = plt.subplots(1, 1, figsize = (20,7))
plt.plot(np.arange(0, len(y)), y) 
plt.plot(np.arange(0, len(y)), fcst['fitted'], label='fitted') 
plt.plot(np.arange(0, len(y)), fcst['fitted-lo-95'], color = 'r', label='fitted-lo-95')
plt.plot(np.arange(0, len(y)), fcst['fitted-hi-95'], color = 'r', label='fitted-hi-95')
plt.plot(np.arange(0, len(y)), fcst['fitted-lo-80'], color = 'g', label='fitted-lo-80')
plt.plot(np.arange(0, len(y)), fcst['fitted-hi-80'], color = 'g', label='fitted-hi-80')
plt.xlim(len(y)-50, len(y))
plt.legend()
show_doc(GARCH, title_level=3)
show_doc(GARCH.fit, title_level=3)
show_doc(GARCH.predict, title_level=3)
show_doc(GARCH.predict_in_sample, title_level=3)
show_doc(GARCH.forecast, title_level=3)

ARCH模型

class ARCH(GARCH): 
    r"""Autoregressive Conditional Heteroskedasticity (ARCH) model. 
    
    A particular case of the GARCH(p,q) model where $q=0$. 
    It assumes that at time $t$, $y_t$ is given by: 
    
    $$y_t = \epsilon_t \sigma_t$$ 
    
    with 
    
    $$\sigma_t^2 = w0 + \sum_{i=1}^p a_i y_{t-i}^2$$. 
    
    Here $\epsilon_t$ is a sequence of iid random variables with zero mean and unit variance. 
    The coefficients $w$ and $a_i$, $i=1,...,p$ must be nonnegative and $\sum_{k=1}^p a_k < 1$.  
    
    References
    ----------
    [Engle, R. F. (1982). Autoregressive conditional heteroscedasticity with estimates of the variance of United Kingdom inflation. Econometrica: Journal of the econometric society, 987-1007.](http://www.econ.uiuc.edu/~econ508/Papers/engle82.pdf) 

    [James D. Hamilton. Time Series Analysis Princeton University Press, Princeton, New Jersey, 1st Edition, 1994.](https://press.princeton.edu/books/hardcover/9780691042893/time-series-analysis)
    
     Parameters
    ----------
    p : int 
        Number of lagged versions of the series. 
    alias : str 
        Custom name of the model. 
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        By default, the model will compute the native prediction
        intervals.
    """
        
    def __init__(
        self, 
        p: int = 1,
        alias: str = 'ARCH',
        prediction_intervals: Optional[ConformalIntervals] = None
    ):
        self.p = p
        self.alias = alias
        super().__init__(p, q=0, alias=alias)
arch = ARCH(1)
test_class(arch, x=y, h=12, skip_insample=False, level=[90,80])
fcst_arch = arch.forecast(ap, 12)
# 测试一致性预测
arch_c = ARCH(1,prediction_intervals=ConformalIntervals(h=13, n_windows=2)) 
test_class(arch_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_arch_c = arch_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
    fcst_arch_c['mean'][:12],
    fcst_arch['mean']
)
_plot_fcst(fcst_arch_c)
garch = GARCH(p=1, q=0)
fcast_garch = garch.forecast(y, h=12, level=[90,80], fitted=True)

arch = ARCH(p=1)
fcast_arch = arch.forecast(y, h=12, level=[90,80], fitted=True)

np.testing.assert_equal(
    fcast_garch, 
    fcast_arch
)
show_doc(ARCH, title_level=3)
show_doc(ARCH.fit, name='ARCH.fit', title_level=3)
show_doc(ARCH.predict, name='ARCH.predict', title_level=3)
show_doc(ARCH.predict_in_sample, name='ARCH.predict_in_sample', title_level=3)
show_doc(ARCH.forecast, name='ARCH.forecast', title_level=3)

机器学习模型

Sklearn模型

class SklearnModel(_TS):
    r"""scikit-learn model wrapper

    Parameters
    ----------
    model : sklearn.base.BaseEstimator
        scikit-learn estimator
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        This is required for generating future prediction intervals.
    alias : str, optional (default=None)
        Custom name of the model. If `None` will use the model's class.
    """
    uses_exog = True
    
    def __init__(
        self,
        model,
        prediction_intervals: Optional[ConformalIntervals] = None,
        alias: Optional[str] = None,
    ):
        self.model = model
        self.prediction_intervals = prediction_intervals
        self.alias = alias if alias is not None else model.__class__.__name__

    def fit(
        self,
        y: np.ndarray,
        X: np.ndarray,
    ) -> 'SklearnModel':
        r"""Fit the model.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ).
        X : array-like
            Exogenous of shape (t, n_x).

        Returns
        -------
        self : SklearnModel
            Fitted SklearnModel object.
        r"""        
        from sklearn.base import clone

        self.model_ = {'model': clone(self.model)}
        self.model_['model'].fit(X, y)
        self._store_cs(y=y, X=X)
        self.model_['fitted'] = self.model_['model'].predict(X)
        residuals = y - self.model_['fitted']
        self.model_['sigma'] = _calculate_sigma(residuals, y.size)
        return self

    def predict(
        self,
        h: int,
        X: np.ndarray,
        level: Optional[List[int]] = None,
    ) -> Dict[str, Any]:
        r"""Predict with fitted SklearnModel.

        Parameters
        ----------
        h : int
            Forecast horizon.
        X : array-like
            Exogenous of shape (h, n_x).
        level: List[int]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        r"""        
        res = {'mean': self.model_['model'].predict(X)}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res

    def predict_in_sample(self, level: Optional[List[int]] = None) -> Dict[str, Any]:
        r"""Access fitted SklearnModel insample predictions.

        Parameters
        ----------
        level : List[int]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            level = sorted(level)
            res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
        return res

    def forecast(
        self,
        y: np.ndarray,
        h: int,
        X: np.ndarray,
        X_future: np.ndarray,
        level: Optional[List[int]] = None,        
        fitted: bool = False,
    ) -> Dict[str, Any]:
        r"""Memory Efficient SklearnModel predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ).
        h : int
            Forecast horizon.
        X : array-like
            Insample exogenous of shape (t, n_x).
        X_future : array-like
            Exogenous of shape (h, n_x).
        level : List[int]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        r"""        
        from sklearn.base import clone

        model = clone(self.model)
        model.fit(X, y)
        res = {'mean': model.predict(X_future)}
        if fitted:
            res['fitted'] = model.predict(X)
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                raise Exception("You must pass `prediction_intervals` to compute them.")
            if fitted:
                residuals = y - res['fitted']
                sigma = _calculate_sigma(residuals, y.size)
                res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res

    def forward(
        self,
        y: np.ndarray,
        h: int,
        X: np.ndarray,
        X_future: np.ndarray,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Apply fitted SklearnModel to a new/updated time series.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ).
        h : int
            Forecast horizon.
        X : array-like
            Insample exogenous of shape (t, n_x).
        X_future : array-like
            Exogenous of shape (h, n_x).
        level : List[float]
            Confidence levels for prediction intervals.
        fitted : bool
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `constant` for point predictions and `level_*` for probabilistic predictions.
        """
        if not hasattr(self, "model_"):
            raise Exception("You have to use the `fit` method first")
        res = {'mean': self.model_['model'].predict(X_future)}
        if fitted:
            res['fitted'] = self.model_['model'].predict(X)
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                raise Exception("You must pass `prediction_intervals` to compute them.")
            if fitted:
                se = _calculate_sigma(y - res['fitted'], y.size)
                res = _add_fitted_pi(res=res, se=se, level=level)
        return res
from sklearn.linear_model import Ridge
h = 12
skm = SklearnModel(Ridge(), prediction_intervals=ConformalIntervals(h=h))
X = np.arange(ap.size).reshape(-1, 1)
X_future = ap.size + np.arange(h).reshape(-1, 1)
test_class(skm, x=ap, X=X, X_future=X_future, h=h, skip_insample=False, level=[80, 95], test_forward=True)
fcst_skm = skm.forecast(ap, h, X=X, X_future=X_future, fitted=True, level=[80, 95])
_plot_insample_pi(fcst_skm)
#测试别名参数
test_eq(
    repr(SklearnModel(Ridge())),
    'Ridge'
)
test_eq(
    repr(SklearnModel(Ridge(), alias='my_ridge')),
    'my_ridge'
)
show_doc(SklearnModel)
show_doc(SklearnModel.fit)
show_doc(SklearnModel.predict)
show_doc(SklearnModel.predict_in_sample)
show_doc(SklearnModel.forecast)

MFLES

class MFLES(_TS):
    r"""MFLES model.

    A method to forecast time series based on Gradient Boosted Time Series Decomposition
    which treats traditional decomposition as the base estimator in the boosting 
    process. Unlike normal gradient boosting, slight learning rates are applied at the 
    component level (trend/seasonality/exogenous).

    The method derives its name from some of the underlying estimators that can
    enter into the boosting procedure, specifically: a simple Median, Fourier
    functions for seasonality, a simple/piecewise Linear trend, and Exponential
    Smoothing.
    
    Parameters
    ----------
    season_length : int or list of int, optional (default=None)
        Number of observations per unit of time. Ex: 24 Hourly data.
    fourier_order : int, optional (default=None)
        How many fourier sin/cos pairs to create, the larger the number the more complex of a seasonal pattern can be fitted.
        A lower number leads to smoother results.
        This is auto-set based on seasonal_period.
    max_rounds : int (default=50)
        The max number of boosting rounds. The boosting will auto-stop but depending on other parameters such as rs_lr you may want more rounds.
        Generally more rounds means a smoother fit.        
    ma : int, optional (default=None)
        The moving average order to use, this is auto-set based on internal logic.
        Passing 4 would fit a 4 period moving average on the residual component.
    alpha : float (default=1.0)
        The alpha which is used in fitting the underlying LASSO when using piecewise functions.
    decay : float (default=-1.0)
        Effects the slopes of the piecewise-linear basis function.
    changepoints : boolean (default=True)
        Whether to fit for changepoints if all other logic allows for it. If False, MFLES will not ever fit a piecewise trend.        
    n_changepoints : int or float (default=0.25)
        Number (if int) or proportion (if float) of changepoint knots to place. The default of 0.25 will place 0.25 * (series length) number of knots.
    seasonal_lr : float (default=0.9)
        A shrinkage parameter (0 < seasonal_lr <= 1) which penalizes the seasonal fit.
        A value of 0.9 will flatly multiply the seasonal fit by 0.9 each boosting round, this can be used to allow more signal to the exogenous component.
    trend_lr : float (default=0.9)
        A shrinkage parameter (0 < trend_lr <= 1) which penalizes the linear trend fit
        A value of 0.9 will flatly multiply the linear fit by 0.9 each boosting round, this can be used to allow more signal to the seasonality or exogenous components.
    exogenous_lr : float (default=1.0)
        The shrinkage parameter (0 < exogenous_lr <= 1) which controls how much of the exogenous signal is carried to the next round.
    residuals_lr : float (default=1.0)
        A shrinkage parameter (0 < residuals_lr <= 1) which penalizes the residual smoothing.
        A value of 0.9 will flatly multiply the residual fit by 0.9 each boosting round, this can be used to allow more signal to the seasonality or linear components.
    cov_threshold : float (default=0.7)
        The deseasonalized cov is used to auto-set some logic, lowering the cov_threshold will result in simpler and less complex residual smoothing.
        If you pass something like 1000 then there will be no safeguards applied.
    moving_medians : bool (default=False)
        The default behavior is to fit an initial median to the time series. If True, then it will fit a median per seasonal period.
    min_alpha : float (default=0.05)
        The minimum alpha in the SES ensemble.
    max_alpha : float (default=1.0)
        The maximum alpha used in the SES ensemble.
    trend_penalty : bool (default=True)
        Whether to apply a simple penalty to the linear trend component, very useful for dealing with the potentially dangerous piecewise trend.
    multiplicative : bool, optional (default=None)
        Auto-set based on internal logic. If True, it will simply take the log of the time series.
    smoother : bool (default=False)
        If True, then a simple exponential ensemble will be used rather than auto settings.
    robust : bool, optional (default=None)
        If True then MFLES will fit using more reserved methods, i.e. not using piecewise trend or moving average residual smoother.
        Auto-set based on internal logic.
    verbose : bool (default=False)
        Print debugging information.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        This is required for generating future prediction intervals.        
    alias : str (default='MFLES')
        Custom name of the model.
    """
    uses_exog = True
    
    def __init__(
        self,
        season_length: Optional[Union[int, List[int]]] = None,
        fourier_order: Optional[int] = None,
        max_rounds: int = 50,        
        ma: Optional[int] = None,
        alpha: float = 1.0,
        decay: float = -1.0,
        changepoints: bool = True,        
        n_changepoints: Union[float, int] = 0.25,
        seasonal_lr: float = 0.9,
        trend_lr: float = 0.9,
        exogenous_lr: float = 1.0,
        residuals_lr: float = 1.0,
        cov_threshold: float = 0.7,
        moving_medians: bool = False,
        min_alpha: float = 0.05,
        max_alpha: float = 1.0,
        trend_penalty: bool = True,
        multiplicative: Optional[bool] = None,
        smoother: bool = False,
        robust: Optional[bool] = None,        
        verbose: bool = False,
        prediction_intervals: Optional[ConformalIntervals] = None,        
        alias: str = 'MFLES',
    ):
        try:
            import sklearn  # noqa:F401
        except ImportError:
            raise ImportError("MFLES requires scikit-learn.") from None
        self.season_length = season_length
        self.fourier_order = fourier_order
        self.max_rounds = max_rounds        
        self.ma = ma
        self.alpha = alpha
        self.decay = decay
        self.changepoints = changepoints
        self.n_changepoints = n_changepoints
        self.seasonal_lr = seasonal_lr
        self.trend_lr = trend_lr
        self.exogenous_lr = exogenous_lr
        self.residuals_lr = residuals_lr
        self.cov_threshold = cov_threshold
        self.moving_medians = moving_medians
        self.min_alpha = min_alpha
        self.max_alpha = max_alpha
        self.trend_penalty = trend_penalty
        self.multiplicative = multiplicative
        self.smoother = smoother
        self.robust = robust
        self.verbose = verbose
        self.prediction_intervals = prediction_intervals
        self.alias = alias

    def _fit(self, y: np.ndarray, X: Optional[np.ndarray]) -> Dict[str, Any]:
        model = _MFLES(verbose=self.verbose, robust=self.robust)
        fitted = model.fit(
            y=y,
            X=X,
            seasonal_period=self.season_length,
            fourier_order=self.fourier_order,
            ma=self.ma,
            alpha=self.alpha,
            decay=self.decay,
            n_changepoints=self.n_changepoints,
            seasonal_lr=self.seasonal_lr,
            linear_lr=self.trend_lr, 
            exogenous_lr=self.exogenous_lr,            
            rs_lr=self.residuals_lr,
            cov_threshold=self.cov_threshold,
            moving_medians=self.moving_medians,
            max_rounds=self.max_rounds,
            min_alpha=self.min_alpha,
            max_alpha=self.max_alpha,
            trend_penalty=self.trend_penalty,
            multiplicative=self.multiplicative,
            changepoints=self.changepoints,
            smoother=self.smoother,
        )
        return {'model': model, 'fitted': fitted}
    
    def fit(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> 'MFLES':
        r"""Fit the model

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ).
        X : array-like, optional (default=None)
            Exogenous of shape (t, n_x).

        Returns
        -------
        self : MFLES
            Fitted MFLES object.
        """
        y = _ensure_float(y)
        self.model_ = self._fit(y=y, X=X)
        self._store_cs(y=y, X=X)
        residuals = y - self.model_['fitted']
        self.model_['sigma'] = _calculate_sigma(residuals, y.size)
        return self

    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ) -> Dict[str, Any]:
        r"""Predict with fitted MFLES.

        Parameters
        ----------
        h : int
            Forecast horizon.
        X : array-like, optional (default=None)
            Exogenous of shape (h, n_x).
        level: List[int]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {"mean": self.model_["model"].predict(forecast_horizon=h, X=X)}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res

    def predict_in_sample(self, level: Optional[List[int]] = None) -> Dict[str, Any]:
        r"""Access fitted SklearnModel insample predictions.

        Parameters
        ----------
        level : List[int]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {'fitted': self.model_['fitted']}
        if level is not None:
            level = sorted(level)
            res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
        return res

    def forecast(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ) -> Dict[str, Any]:
        r"""Memory Efficient MFLES predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ).
        h : int
            Forecast horizon.
        X : array-like
            Insample exogenous of shape (t, n_x).
        X_future : array-like
            Exogenous of shape (h, n_x).
        level : List[int]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        model = self._fit(y=y, X=X)
        res = {"mean": model['model'].predict(forecast_horizon=h, X=X_future)}
        if fitted:
            res["fitted"] = model['fitted']
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                raise Exception("You must pass `prediction_intervals` to compute them.")
            if fitted:
                residuals = y - res["fitted"]
                sigma = _calculate_sigma(residuals, y.size)
                res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res
show_doc(MFLES)
show_doc(MFLES.fit)
show_doc(MFLES.predict)
show_doc(MFLES.predict_in_sample)
show_doc(MFLES.forecast)
h = 12
X = np.random.rand(ap.size, 2)
X_future = np.random.rand(h, 2)

mfles = MFLES()
test_class(mfles, x=deg_ts, X=X, X_future=X_future, h=h, skip_insample=False, test_forward=False)

mfles = MFLES(prediction_intervals=ConformalIntervals(h=h, n_windows=2))
test_class(mfles, x=ap, X=X, X_future=X_future, h=h, skip_insample=False, level=[80, 95], test_forward=False)
fcst_mfles = mfles.forecast(ap, h, X=X, X_future=X_future, fitted=True, level=[80, 95])
_plot_insample_pi(fcst_mfles)

自动多层特征学习嵌入系统

class AutoMFLES(_TS):
    r"""AutoMFLES
    
    Parameters
    ----------
    test_size : int
        Forecast horizon used during cross validation.
    season_length : int or list of int, optional (default=None)
        Number of observations per unit of time. Ex: 24 Hourly data.
    n_windows : int (default=2)
        Number of windows used for cross validation.
    config : dict, optional (default=None)
        Mapping from parameter name (from the init arguments of MFLES) to a list of values to try.
        If `None`, will use defaults.
    step_size : int, optional (default=None)
        Step size between each cross validation window. If `None` will be set to test_size.
    metric : str (default='smape')
        Metric used to select the best model. Possible options are: 'smape', 'mape', 'mse' and 'mae'.
    verbose : bool (default=False)
        Print debugging information.
    prediction_intervals : Optional[ConformalIntervals]
        Information to compute conformal prediction intervals.
        This is required for generating future prediction intervals.
    alias : str (default='AutoMFLES')
        Custom name of the model.
    """
    def __init__(
        self,
        test_size: int,
        season_length: Optional[Union[int, List[int]]] = None,
        n_windows: int = 2,
        config: Optional[Dict[str, Any]] = None,
        step_size: Optional[int] = None,
        metric: str = 'smape',
        verbose: bool = False,
        prediction_intervals: Optional[ConformalIntervals] = None,
        alias: str = 'AutoMFLES',
    ):
        try:
            import sklearn  # noqa:F401
        except ImportError:
            raise ImportError("MFLES requires scikit-learn.") from None
        self.season_length = season_length
        self.n_windows = n_windows
        self.test_size = test_size
        self.config = config
        self.step_size = step_size if step_size is not None else test_size
        self.metric = metric
        self.verbose = verbose
        self.prediction_intervals = prediction_intervals
        self.alias = alias

    def _fit(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> Dict[str, Any]:
        model = _MFLES(verbose=self.verbose)
        optim_params = model.optimize(
            y=y,
            X=X,
            test_size=self.test_size,
            n_steps=self.n_windows,
            step_size=self.step_size,
            seasonal_period=self.season_length,
            metric=self.metric,
            params=self.config,
        )
        # the seasonal_period may've been found during the optimization
        seasonal_period = optim_params.pop('seasonal_period', self.season_length)
        fitted = model.fit(
            y=y,
            X=X,
            seasonal_period=seasonal_period,
            **optim_params,
        )
        return {'model': model, 'fitted': fitted}

    def fit(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> 'AutoMFLES':
        r"""Fit the model

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ).
        X : array-like, optional (default=None)
            Exogenous of shape (t, n_x).

        Returns
        -------
        self : AutoMFLES
            Fitted AutoMFLES object.
        """
        y = _ensure_float(y)
        self.model_ = self._fit(y=y, X=X)
        self._store_cs(y=y, X=X)
        residuals = y - self.model_["fitted"]
        self.model_["sigma"] = _calculate_sigma(residuals, y.size)
        return self

    def predict(
        self,
        h: int,
        X: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
    ) -> Dict[str, Any]:
        r"""Predict with fitted AutoMFLES.

        Parameters
        ----------
        h : int
            Forecast horizon.
        X : array-like, optional (default=None)
            Exogenous of shape (h, n_x).
        level: List[int]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        r"""        
        res = {"mean": self.model_["model"].predict(forecast_horizon=h, X=X)}
        if level is None:
            return res
        level = sorted(level)
        if self.prediction_intervals is not None:
            res = self._add_predict_conformal_intervals(res, level)
        else:
            raise Exception("You must pass `prediction_intervals` to compute them.")
        return res

    def predict_in_sample(self, level: Optional[List[int]] = None) -> Dict[str, Any]:
        r"""Access fitted AutoMFLES insample predictions.

        Parameters
        ----------
        level : List[int]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        res = {"fitted": self.model_["fitted"]}
        if level is not None:
            level = sorted(level)
            res = _add_fitted_pi(res=res, se=self.model_["sigma"], level=level)
        return res

    def forecast(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ) -> Dict[str, Any]:
        r"""Memory Efficient AutoMFLES predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ).
        h : int
            Forecast horizon.
        X : array-like
            Insample exogenous of shape (t, n_x).
        X_future : array-like
            Exogenous of shape (h, n_x).
        level : List[int]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        model = self._fit(y=y, X=X)
        res = {"mean": model["model"].predict(forecast_horizon=h, X=X_future)}
        if fitted:
            res["fitted"] = model["fitted"]
        if level is not None:
            level = sorted(level)
            if self.prediction_intervals is not None:
                res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
            else:
                raise Exception("You must pass `prediction_intervals` to compute them.")
            if fitted:
                residuals = y - res["fitted"]
                sigma = _calculate_sigma(residuals, y.size)
                res = _add_fitted_pi(res=res, se=sigma, level=level)
        return res
h = 12
X = np.random.rand(ap.size, 2)
X_future = np.random.rand(h, 2)

auto_mfles = AutoMFLES(test_size=h, season_length=12)
test_class(auto_mfles, x=deg_ts, X=X, X_future=X_future, h=h, skip_insample=False, test_forward=False)

auto_mfles = AutoMFLES(test_size=h, season_length=12, prediction_intervals=ConformalIntervals(h=h, n_windows=2))
test_class(auto_mfles, x=ap, X=X, X_future=X_future, h=h, skip_insample=False, level=[80, 95], test_forward=False)
fcst_auto_mfles = auto_mfles.forecast(ap, h, X=X, X_future=X_future, fitted=True, level=[80, 95])
_plot_insample_pi(fcst_auto_mfles)

后备模型

常量模型

class ConstantModel(_TS):
    
    def __init__(self, constant: float, alias: str = 'ConstantModel'):
        r"""Constant Model.
        
        Returns Constant values.
         
        Parameters 
        ----------
        constant: float
            Custom value to return as forecast.
        alias: str
            Custom name of the model.  
        """
        self.constant = constant
        self.alias = alias
    
    def fit(
        self, 
        y: np.ndarray,
        X: Optional[np.ndarray] = None,
    ):
        r"""Fit the Constant model.

        Fit an Constant Model to a time series (numpy.array) `y`.

        Parameters
        ----------
        y : numpy.array
            Clean time series of shape (t, ). 
        X : array-like
            Optional exogenous of shape (t, n_x). 

        Returns
        -------
        self:
            Constant fitted model.
        """
        y = _ensure_float(y)
        self.n_y = len(y)
        self._dtype = y.dtype
        return self
    
    def predict(
        self, 
        h: int, # 预测时间范围 
        X: Optional[np.ndarray] = None, # 外生回归变量
        level: Optional[List[int]] = None # 置信水平
    ):
        r"""Predict with fitted ConstantModel.

        Parameters
        ----------
        h : int 
            Forecast horizon.
        X : array-like
            Optional exogenous of shape (h, n_x). 
        level : List[float] 
            Confidence levels (0-100) for prediction intervals. 

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        mean = np.full(h, self.constant, dtype=self._dtype)
        res = {'mean': mean}
        
        if level is not None: 
            for lv in sorted(level):
                res[f'lo-{lv}'] = mean
                res[f'hi-{lv}'] = mean
        
        return res
    
    def predict_in_sample(self, level: Optional[List[int]] = None):
        r"""Access fitted Constant Model insample predictions.

        Parameters
        ----------
        level : List[float]
            Confidence levels (0-100) for prediction intervals.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
        """
        fitted = np.full(self.n_y, self.constant, dtype=self._dtype)
        res = {'fitted': fitted}
        if level is not None:
            for lv in sorted(level):
                res[f'fitted-lo-{lv}'] = fitted
                res[f'fitted-hi-{lv}'] = fitted
                
        return res
    
    def forecast(
        self, 
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Memory Efficient Constant Model predictions.

        This method avoids memory burden due from object storage.
        It is analogous to `fit_predict` without storing information.
        It assumes you know the forecast horizon in advance.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n,). 
        h: int
            Forecast horizon.
        X : array-like
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels (0-100) for prediction intervals.
        fitted : bool
            Whether or not to return insample predictions.

        Returns
        -------
        forecasts : dict
            Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
        """
        y = _ensure_float(y)
        mean = np.full(h, self.constant, dtype=y.dtype)
        res = {'mean': mean}
        
        if fitted:
            fitted_vals = np.full_like(y, self.constant)
            res['fitted'] = fitted_vals
        
        if level is not None: 
            for lv in sorted(level):
                res[f'lo-{lv}'] = mean
                res[f'hi-{lv}'] = mean
                if fitted:
                    res[f'fitted-lo-{lv}'] = fitted_vals
                    res[f'fitted-hi-{lv}'] = fitted_vals
        return res
    
    def forward(
        self,
        y: np.ndarray,
        h: int,
        X: Optional[np.ndarray] = None,
        X_future: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
    ):
        r"""Apply Constant model predictions to a new/updated time series.

        Parameters
        ----------
        y : numpy.array 
            Clean time series of shape (n, ). 
        h : int 
            Forecast horizon.
        X : array-like 
            Optional insample exogenous of shape (t, n_x). 
        X_future : array-like 
            Optional exogenous of shape (h, n_x). 
        level : List[float]
            Confidence levels for prediction intervals.
        fitted : bool 
            Whether or not returns insample predictions.

        Returns
        -------
        forecasts : dict 
            Dictionary with entries `constant` for point predictions and `level_*` for probabilistic predictions.
        """
        res = self.forecast(y=y, h=h, X=X, X_future=X_future, level=level, fitted=fitted)
        return res
constant_model = ConstantModel(constant=1)
test_class(constant_model, x=ap, h=12, level=[90, 80])
constant_model.forecast(ap, 12, level=[90, 80])
constant_model.forward(ap, 12, level=[90, 80])
#测试别名参数
test_eq(
    repr(ConstantModel(1)),
    'ConstantModel'
)
test_eq(
    repr(ConstantModel(1, alias='ConstantModel_custom')),
    'ConstantModel_custom'
)
show_doc(ConstantModel, title_level=3)
show_doc(ConstantModel.forecast, title_level=3)
show_doc(ConstantModel.fit, title_level=3)
show_doc(ConstantModel.predict, title_level=3)
show_doc(ConstantModel.predict_in_sample, title_level=3)
show_doc(ConstantModel.forward, title_level=3)
from statsforecast.models import ConstantModel
from statsforecast.utils import AirPassengers as ap
# ConstantModel 的使用示例
model = ConstantModel(1)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

零模型

class ZeroModel(ConstantModel):
    
    def __init__(self, alias: str = 'ZeroModel'):
        r"""返回零值预测。

返回零值。

参数
----------
alias: str
    模型的自定义名称。  
        """
        super().__init__(constant=0, alias=alias)
zero_model = ZeroModel()
test_class(constant_model, x=ap, h=12, level=[90, 80])
zero_model.forecast(ap, 12, level=[90, 80])
zero_model.forward(ap, 12, level=[90, 80])
#测试别名参数
test_eq(
    repr(ZeroModel()),
    'ZeroModel'
)
test_eq(
    repr(ZeroModel(alias='ZeroModel_custom')),
    'ZeroModel_custom'
)
show_doc(ZeroModel, title_level=3)
show_doc(ZeroModel.forecast, title_level=3, name='ZeroModel.forecast')
show_doc(ZeroModel.fit, title_level=3, name='ZeroModel.fit')
show_doc(ZeroModel.predict, title_level=3, name='ZeroModel.predict')
show_doc(ZeroModel.predict_in_sample, title_level=3, name='ZeroModel.predict_in_sample')
show_doc(ZeroModel.forward, title_level=3, name='ZeroModel.forward')
from statsforecast.models import ZeroModel
from statsforecast.utils import AirPassengers as ap
# NanModel的使用示例
model = ZeroModel()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

NaN模型

class NaNModel(ConstantModel):
    
    def __init__(self, alias: str = 'NaNModel'):
        r"""NaN模型。

返回NaN值。

参数
----------
alias: str
    模型的自定义名称。  
        """
        super().__init__(constant=np.nan, alias=alias)
nanmodel = NaNModel()
nanmodel.forecast(ap, 12, level=[90, 80])
#测试别名参数
test_eq(
    repr(NaNModel()),
    'NaNModel'
)
test_eq(
    repr(NaNModel(alias='NaN_custom')),
    'NaN_custom'
)
show_doc(NaNModel, title_level=3)
show_doc(NaNModel.forecast, title_level=3, name='NaNModel.forecast')
show_doc(NaNModel.fit, title_level=3, name='NaNModel.fit')
show_doc(NaNModel.predict, title_level=3, name='NaNModel.predict')
show_doc(NaNModel.predict_in_sample, title_level=3, name='NaNModel.predict_in_sample')
from statsforecast.models import NaNModel
from statsforecast.utils import AirPassengers as ap
# NanModel的使用示例
model = NaNModel()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict

参考文献

一般信息

自动预测

指数平滑法

简单方法

稀疏间歇性需求

多季节性

Theta家族

GARCH模型

TBATS模型

Give us a ⭐ on Github