核心方法

%load_ext autoreload
%autoreload 2

拟合、预测、快速预测、交叉验证和绘图的方法

StatsForecast 的核心方法包括:

import warnings

from nbdev.showdoc import add_docs, show_doc
from statsforecast.models import Naive
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('always', category=UserWarning)
import datetime as dt
import errno
import inspect
import logging
import os
import pickle
import re
import reprlib
import time
import warnings
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd
import utilsforecast.processing as ufp
from fugue.execution.factory import make_execution_engine, try_get_context_execution_engine
from threadpoolctl import ThreadpoolController
from tqdm.auto import tqdm
from triad import conditional_dispatcher
from utilsforecast.compat import DataFrame, pl_DataFrame, pl_Series
from utilsforecast.grouped_array import GroupedArray as BaseGroupedArray
from utilsforecast.validation import ensure_time_dtype, validate_freq

from statsforecast.utils import ConformalIntervals
if __name__ == '__main__':
    logging.basicConfig(
        format='%(asctime)s %(name)s %(levelname)s: %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )
logger = logging.getLogger(__name__)
_controller = ThreadpoolController()
logger.setLevel(logging.ERROR)
from fastcore.test import test_eq, test_fail, test_warns
from statsforecast.models import _TS
from statsforecast.utils import generate_series
class GroupedArray(BaseGroupedArray):
    
    def __eq__(self, other):
        if not hasattr(other, 'data') or not hasattr(other, 'indptr'):
            return False
        return np.allclose(self.data, other.data) and np.array_equal(self.indptr, other.indptr)
    
    def fit(self, models, fallback_model=None):
        fm = np.full((self.n_groups, len(models)), np.nan, dtype=object)
        for i, grp in enumerate(self):
            y = grp[:, 0] if grp.ndim == 2 else grp
            X = grp[:, 1:] if (grp.ndim == 2 and grp.shape[1] > 1) else None
            for i_model, model in enumerate(models):
                try:
                    new_model = model.new()
                    fm[i, i_model] = new_model.fit(y=y, X=X)
                except Exception as error:
                    if fallback_model is not None:
                        new_fallback_model = fallback_model.new()
                        new_fallback_model.alias = model.alias
                        fm[i, i_model] = new_fallback_model.fit(y=y, X=X)
                    else:
                        raise error
        return fm

    def _get_cols(self, models, attr, h, X, level=tuple()):
        n_models = len(models)
        cuts = np.full(n_models + 1, fill_value=0, dtype=np.int32)
        has_level_models = np.full(n_models, fill_value=False, dtype=bool) 
        cuts[0] = 0
        for i_model, model in enumerate(models):
            len_cols = 1 # 意思是
            has_level = 'level' in inspect.signature(getattr(model, attr)).parameters and len(level) > 0
            has_level_models[i_model] = has_level
            if has_level:
                len_cols += 2 * len(level) #关卡
            cuts[i_model + 1] = len_cols + cuts[i_model]
        return cuts, has_level_models
    
    def _output_fcst(self, models, attr, h, X, level=tuple()):
        #根据方法返回空输出
        cuts, has_level_models = self._get_cols(models=models, attr=attr, h=h, X=X, level=level)
        out = np.full((self.n_groups * h, cuts[-1]), fill_value=np.nan, dtype=self.data.dtype)
        return out, cuts, has_level_models

    def predict(self, fm, h, X=None, level=tuple()):
        #fm 代表拟合模型
        #并且 fm 应该包含拟合模型
        fcsts, cuts, has_level_models = self._output_fcst(
            models=fm[0], attr='predict', 
            h=h, X=X, level=level
        )
        matches = ['mean', 'lo', 'hi']
        cols = []
        for i_model in range(fm.shape[1]):
            has_level = has_level_models[i_model]
            kwargs = {}
            if has_level:
                kwargs['level'] = level
            for i, _ in enumerate(self):
                if X is not None:
                    X_ = X[i]
                else:
                    X_ = None
                res_i = fm[i, i_model].predict(h=h, X=X_, **kwargs)
                cols_m = [key for key in res_i.keys() if any(key.startswith(m) for m in matches)]
                fcsts_i = np.vstack([res_i[key] for key in cols_m]).T
                model_name = repr(fm[i, i_model])
                cols_m = [f'{model_name}' if col == 'mean' else f'{model_name}-{col}' for col in cols_m]
                if fcsts_i.ndim == 1:
                    fcsts_i = fcsts_i[:, None]
                fcsts[i * h : (i + 1) * h, cuts[i_model]:cuts[i_model + 1]] = fcsts_i
            cols += cols_m
        return fcsts, cols
    
    def fit_predict(self, models, h, X=None, level=tuple()):
        #拟合模型
        fm = self.fit(models=models)
        #预测
        fcsts, cols = self.predict(fm=fm, h=h, X=X, level=level)
        return fm, fcsts, cols
    
    def forecast(
        self,
        models,
        h,
        fallback_model=None,
        fitted=False,
        X=None,
        level=tuple(),
        verbose=False,
        target_col='y',
    ):
        fcsts, cuts, has_level_models = self._output_fcst(
            models=models, attr='forecast', h=h, X=X, level=level
        )
        matches = ['mean', 'lo', 'hi']
        matches_fitted = ['fitted', 'fitted-lo', 'fitted-hi']
        if fitted:
            #目前我们不会返回拟合值的水平 
            #预测模式
            fitted_vals = np.full((self.data.shape[0], 1 + cuts[-1]), np.nan, dtype=self.data.dtype)
            if self.data.ndim == 1:
                fitted_vals[:, 0] = self.data
            else:
                fitted_vals[:, 0] = self.data[:, 0]
        iterable = tqdm(enumerate(self), 
                        disable=(not verbose), 
                        total=len(self),
                        desc='Forecast')
        times = {repr(m): 0.0 for m in models}
        for i, grp in iterable:
            y_train = grp[:, 0] if grp.ndim == 2 else grp
            X_train = grp[:, 1:] if (grp.ndim == 2 and grp.shape[1] > 1) else None
            if X is not None:
                X_f = X[i]
            else:
                X_f = None
            cols = []
            cols_fitted = []
            for i_model, model in enumerate(models):
                has_level = has_level_models[i_model]
                kwargs = {}
                if has_level:
                    kwargs['level'] = level
                start = time.perf_counter()
                try:
                    res_i = model.forecast(h=h, y=y_train, X=X_train, X_future=X_f, fitted=fitted, **kwargs)
                except Exception as error:
                    if fallback_model is not None:
                        res_i = fallback_model.forecast(h=h, y=y_train, X=X_train, X_future=X_f, fitted=fitted, **kwargs)
                    else:
                        raise error
                times[repr(model)] += time.perf_counter() - start
                cols_m = [key for key in res_i.keys() if any(key.startswith(m) for m in matches)]
                fcsts_i = np.vstack([res_i[key] for key in cols_m]).T
                cols_m = [f'{repr(model)}' if col == 'mean' else f'{repr(model)}-{col}' for col in cols_m]
                if fcsts_i.ndim == 1:
                    fcsts_i = fcsts_i[:, None]
                fcsts[i * h : (i + 1) * h, cuts[i_model]:cuts[i_model + 1]] = fcsts_i
                cols += cols_m
                if fitted:
                    cols_m_fitted = [key for key in res_i.keys() if any(key.startswith(m) for m in matches_fitted)]
                    fitted_i = np.vstack([res_i[key] for key in cols_m_fitted]).T
                    cols_m_fitted = [f'{repr(model)}' \
                                     if col == 'fitted' else f"{repr(model)}-{col.replace('fitted-', '')}" \
                                     for col in cols_m_fitted]
                    fitted_vals[self.indptr[i] : self.indptr[i + 1], (cuts[i_model] + 1):(cuts[i_model + 1] + 1)] = fitted_i
                    cols_fitted += cols_m_fitted
        result = {'forecasts': fcsts, 'cols': cols, 'times': times}
        if fitted:
            result['fitted'] = {'values': fitted_vals}
            result['fitted']['cols'] = [target_col] + cols_fitted
        return result
    
    def cross_validation(
        self,
        models,
        h,
        test_size,
        fallback_model=None,
        step_size=1,
        input_size=None, 
        fitted=False,
        level=tuple(), 
        refit=True,
        verbose=False,
        target_col='y',
    ):
        # 输出尺寸:(ts, window, h)
        if (test_size - h) % step_size:
            raise Exception('`test_size - h` should be module `step_size`')
        n_windows = int((test_size - h) / step_size) + 1
        n_models = len(models)
        cuts, has_level_models = self._get_cols(models=models, attr='forecast', h=h, X=None, level=level)
        # out的第一列是实际的y值
        out = np.full((self.n_groups, n_windows, h, 1 + cuts[-1]), np.nan, dtype=self.data.dtype)
        if fitted:
            fitted_vals = np.full((self.data.shape[0], n_windows, n_models + 1), np.nan, dtype=self.data.dtype)
            fitted_idxs = np.full((self.data.shape[0], n_windows), False, dtype=bool)
            last_fitted_idxs = np.full_like(fitted_idxs, False, dtype=bool)
        matches = ['mean', 'lo', 'hi']
        steps = list(range(-test_size, -h + 1, step_size))
        for i_ts, grp in enumerate(self):
            iterable = tqdm(
                enumerate(steps, start=0),
                desc=f"Cross Validation Time Series {i_ts + 1}",
                disable=(not verbose),
                total=len(steps),
            )
            fitted_models = [None for _ in range(n_models)]            
            for i_window, cutoff in iterable:
                should_fit = i_window == 0 or (refit > 0 and i_window % refit == 0)
                end_cutoff = cutoff + h
                in_size_disp = cutoff if input_size is None else input_size 
                y = grp[(cutoff - in_size_disp):cutoff]
                y_train = y[:, 0] if y.ndim == 2 else y
                X_train = y[:, 1:] if (y.ndim == 2 and y.shape[1] > 1) else None
                y_test = grp[cutoff:] if end_cutoff == 0 else grp[cutoff:end_cutoff]
                X_future = y_test[:, 1:] if (y_test.ndim == 2 and y_test.shape[1] > 1) else None
                out[i_ts, i_window, :, 0] = y_test[:, 0] if y.ndim == 2 else y_test
                if fitted:
                    fitted_vals[self.indptr[i_ts] : self.indptr[i_ts + 1], i_window, 0][
                        (cutoff - in_size_disp):cutoff
                    ] = y_train
                    fitted_idxs[self.indptr[i_ts] : self.indptr[i_ts + 1], i_window][
                        (cutoff - in_size_disp):cutoff
                    ] = True
                    last_fitted_idxs[
                        self.indptr[i_ts] : self.indptr[i_ts + 1], i_window
                    ][cutoff-1] = True
                cols = [target_col]
                for i_model, model in enumerate(models):
                    has_level = has_level_models[i_model]
                    kwargs = {}
                    if has_level:
                        kwargs['level'] = level
                    # 这是这样实现的,因为并非所有模型都具有forward方法。
                    # so we can't do fit + forward
                    if refit is True:
                        forecast_kwargs = dict(
                            h=h,
                            y=y_train,
                            X=X_train,
                            X_future=X_future,
                            fitted=fitted,
                            **kwargs,
                        )
                        try:
                            res_i = model.forecast(**forecast_kwargs)
                        except Exception as error:
                            if fallback_model is None:
                                raise error
                            res_i = fallback_model.forecast(**forecast_kwargs)
                    else:
                        if should_fit:
                            try:
                                fitted_models[i_model] = model.fit(y=y_train, X=X_train)
                            except Exception as error:
                                if fallback_model is None:
                                    raise error
                                fitted_models[i_model] = fallback_model.new().fit(y=y_train, X=X_train)
                        res_i = fitted_models[i_model].forward(
                            h=h,
                            y=y_train,
                            X=X_train, 
                            X_future=X_future,
                            fitted=fitted,
                            **kwargs,
                        )
                    cols_m = [key for key in res_i.keys() if any(key.startswith(m) for m in matches)]
                    fcsts_i = np.vstack([res_i[key] for key in cols_m]).T
                    cols_m = [f'{repr(model)}' if col == 'mean' else f'{repr(model)}-{col}' for col in cols_m]
                    out[i_ts, i_window, :, (1 + cuts[i_model]):(1 + cuts[i_model + 1])] = fcsts_i
                    if fitted:
                        fitted_vals[self.indptr[i_ts] : self.indptr[i_ts + 1], i_window, i_model + 1][
                            (cutoff - in_size_disp):cutoff
                        ] = res_i['fitted']
                    cols += cols_m
        result = {'forecasts': out.reshape(-1, 1 + cuts[-1]), 'cols': cols}
        if fitted:
            result['fitted'] = {
                'values': fitted_vals, 
                'idxs': fitted_idxs, 
                'last_idxs': last_fitted_idxs,
                'cols': [target_col] + [repr(model) for model in models]
            }
        return result

    def take(self, idxs):
        data, indptr = super().take(idxs)
        return GroupedArray(data, indptr)
    
    def split(self, n_chunks):
        n_chunks = min(n_chunks, self.n_groups)
        return [self.take(idxs) for idxs in np.array_split(range(self.n_groups), n_chunks)]

    def split_fm(self, fm, n_chunks):
        return [fm[idxs] for idxs in np.array_split(range(self.n_groups), n_chunks) if idxs.size]

    @_controller.wrap(limits=1)
    def _single_threaded_fit(self, models, fallback_model=None):
        return self.fit(models=models, fallback_model=fallback_model)

    @_controller.wrap(limits=1)
    def _single_threaded_predict(self, fm, h, X=None, level=tuple()):
        return self.predict(fm=fm, h=h, X=X, level=level)

    @_controller.wrap(limits=1)
    def _single_threaded_fit_predict(self, models, h, X=None, level=tuple()):
        return self.fit_predict(models=models, h=h, X=X, level=level)

    @_controller.wrap(limits=1)
    def _single_threaded_forecast(
        self,
        models,
        h,
        fallback_model=None,
        fitted=False,
        X=None,
        level=tuple(),
        verbose=False,
        target_col='y',
    ):
        return self.forecast(
            models=models,
            h=h,
            fallback_model=fallback_model,
            fitted=fitted,
            X=X,
            level=level,
            verbose=verbose,
            target_col=target_col,
        )

    @_controller.wrap(limits=1)
    def _single_threaded_cross_validation(
        self,
        models,
        h,
        test_size,
        fallback_model=None,
        step_size=1,
        input_size=None, 
        fitted=False,
        level=tuple(), 
        refit=True,
        verbose=False,
        target_col='y',
    ):
        return self.cross_validation(
            models=models,
            h=h,
            test_size=test_size,
            fallback_model=fallback_model,
            step_size=step_size,
            input_size=input_size,
            fitted=fitted,
            level=level,
            refit=refit,
            verbose=verbose,
            target_col=target_col,
        )
# sum ahead 仅返回最后一个值
# 加上h个未来值 
class SumAhead:
    
    def __init__(self):
        pass
    
    def fit(self, y, X):
        self.last_value = y[-1]
        self.fitted_values = np.full(y.size, np.nan, dtype=y.dtype)
        self.fitted_values[1:] = y[:1]
        return self
    
    def predict(self, h, X=None, level=None):
        mean = self.last_value + np.arange(1, h + 1)
        res = {'mean': mean}
        if level is not None:
            for lv in level:
                res[f'lo-{lv}'] = mean - 1.0
                res[f'hi-{lv}'] = mean + 1.0
        return res
    
    def __repr__(self):
        return 'SumAhead'
    
    def forecast(self, y, h, X=None, X_future=None, fitted=False, level=None):
        mean = y[-1] + np.arange(1, h + 1)
        res = {'mean': mean}
        if fitted:
            fitted_values = np.full(y.size, np.nan, dtype=y.dtype)
            fitted_values[1:] = y[1:]
            res['fitted'] = fitted_values
        if level is not None:
            for lv in level:
                res[f'lo-{lv}'] = mean - 1.0
                res[f'hi-{lv}'] = mean + 1.0
        return res
    
    def forward(self, y, h, X=None, X_future=None, fitted=False, level=None):
        # 修复self.last_value以供测试使用
        mean = self.last_value + np.arange(1, h + 1)
        res = {'mean': mean}
        if fitted:
            fitted_values = np.full(y.size, np.nan, dtype=mean.dtype)
            fitted_values[1:] = y[1:]
            res['fitted'] = fitted_values
        if level is not None:
            for lv in level:
                res[f'lo-{lv}'] = mean - 1.0
                res[f'hi-{lv}'] = mean + 1.0
        return res
    
    def new(self):
        b = type(self).__new__(type(self))
        b.__dict__.update(self.__dict__)
        return b
#用于测试的数据
data = np.arange(12).reshape(-1, 1)
indptr = np.array([0, 4, 8, 12])

# 测试我们可以恢复 
# 系列数量
ga = GroupedArray(data, indptr)
test_eq(len(ga), 3)

#数据测试集划分
splits = ga.split(2)
test_eq(splits[0], GroupedArray(data[:8], indptr[:3]))
test_eq(splits[1], GroupedArray(data[8:], np.array([0, 4])))

# 为每个时间序列拟合模型
models = [Naive(), Naive()]
fm = ga.fit(models)
test_eq(fm.shape, (3, 2))
test_eq(len(ga.split_fm(fm, 2)), 2)

# 测试预测
exp_fcsts = np.vstack([2 * [data[i]] for i in indptr[1:] - 1])
fcsts, cols = ga.predict(fm=fm, h=2)
np.testing.assert_equal(
    fcsts,
    np.hstack([exp_fcsts, exp_fcsts]),
)

#测试拟合和预测管道
fm_fp, fcsts_fp, cols_fp = ga.fit_predict(models=models, h=2) 
test_eq(fm_fp.shape, (3, 2))
np.testing.assert_equal(fcsts_fp, fcsts)
np.testing.assert_equal(cols_fp, cols)

#测试级别
fm_lv, fcsts_lv, cols_lv = ga.fit_predict(models=models, h=2, level=(50, 90))
test_eq(fcsts_lv.shape, (2 * len(ga), 10)) 

#测试预测
fcst_f = ga.forecast(models=models, h=2, fitted=True)
test_eq(fcst_f['forecasts'], fcsts_fp)
test_eq(fcst_f['cols'], cols_fp)
class NullModel(_TS):
    
    def __init__(self):
        pass
    
    def forecast(self):
        pass
    
    def __repr__(self):
        return "NullModel"

#测试备用模型
fcst_f = ga.forecast(models=[NullModel(), NullModel()], fallback_model=Naive(), h=2, fitted=True)
test_eq(fcst_f['forecasts'], fcsts_fp)
test_eq(fcst_f['cols'], ['NullModel', 'NullModel'])
test_fail(ga.forecast, kwargs={'models': [NullModel()]})
#测试级别
lv = (50, 60)
h = 2
#预测测试
fcsts_lv = ga.forecast(models=[SumAhead()], h=h, fitted=True, level=lv)
test_eq(
    fcsts_lv['forecasts'].shape,
    (len(ga) * h, 1 + 2 * len(lv))
)
test_eq(
    fcsts_lv['cols'],
    ['SumAhead', 
     'SumAhead-lo-50', 
     'SumAhead-hi-50',
     'SumAhead-lo-60',
     'SumAhead-hi-60']
)
#拟合和预测流水线
fm_lv_fp, fcsts_lv_fp, cols_lv_fp = ga.fit_predict(models=[SumAhead()], h=h, level=lv)
test_eq(
    fcsts_lv['forecasts'],
    fcsts_lv_fp
)
test_eq(
    fcsts_lv['cols'],
    cols_lv_fp
)
# 交叉验证测试
data = np.hstack([np.arange(10), np.arange(100, 200), np.arange(20, 40)])
indptr = np.array([0, 10, 110, 130])
ga = GroupedArray(data, indptr)
    
res_cv = ga.cross_validation(models=[SumAhead()], h=2, test_size=5, fitted=True)
fcsts_cv = res_cv['forecasts']
cols_cv = res_cv['cols']
test_eq(
    fcsts_cv[:, cols_cv.index('y')], 
    fcsts_cv[:, cols_cv.index('SumAhead')]
)

#关卡
res_cv_lv = ga.cross_validation(models=[SumAhead(), Naive()], h=2, test_size=5, level=(50, 60))
actual_step_size = np.unique(np.diff(fcsts_cv[:, cols_cv.index('SumAhead')].reshape((3, -1, 2)), axis=1))
test_eq(actual_step_size, 1)
horizons = [1, 2, 3, 2]
test_sizes = [3, 4, 6, 6]
step_sizes = [2, 2, 3, 4]
for h, test_size, step_size in zip(horizons, test_sizes, step_sizes):
    res_cv = ga.cross_validation(
        models=[SumAhead()], h=h, 
        test_size=test_size, 
        step_size=step_size,
        fitted=True
    )
    fcsts_cv = res_cv['forecasts']
    cols_cv = res_cv['cols']
    test_eq(
        fcsts_cv[:, cols_cv.index('y')], 
        fcsts_cv[:, cols_cv.index('SumAhead')]
    )
    fcsts_cv = fcsts_cv[:, cols_cv.index('SumAhead')].reshape((3, -1, h))
    actual_step_size = np.unique(
        np.diff(fcsts_cv, axis=1)
    )
    test_eq(actual_step_size, step_size)
    actual_n_windows = res_cv['forecasts'].shape[1]
    test_eq(actual_n_windows, int((test_size - h)/step_size) + 1)
def fail_cv(h, test_size, step_size):
    return ga.cross_validation(models=[SumAhead()], h=h, test_size=test_size, step_size=step_size)
test_fail(fail_cv, contains='module', kwargs=dict(h=2, test_size=5, step_size=2))
#测试备用模型
# 交叉验证
fcst_cv_f = ga.cross_validation(
    models=[NullModel(), NullModel()], 
    fallback_model=Naive(), h=2, 
    test_size=5,
    fitted=True
)
fcst_cv_naive = ga.cross_validation(
    models=[Naive(), Naive()], 
    h=2, 
    test_size=5,
    fitted=True
)
test_eq(fcst_cv_f['forecasts'], fcst_cv_naive['forecasts'])
np.testing.assert_array_equal(fcst_cv_f['fitted']['values'], fcst_cv_naive['fitted']['values'])
# 在交叉验证中测试拟合失败时的回退模型
class FailedFit:

    def __init__(self):
        pass

    def forecast(self):
        pass

    def fit(self, y, X):
        raise Exception('Failed fit')

    def __repr__(self):
        return "FailedFit"

fcst_cv_f = ga.cross_validation(
    models=[FailedFit()], 
    fallback_model=Naive(), h=2, 
    test_size=5,
    refit=False,
    fitted=True,
)
fcst_cv_naive = ga.cross_validation(
    models=[Naive()], 
    h=2, 
    test_size=5,
    refit=False,
    fitted=True,
)
test_eq(fcst_cv_f['forecasts'], fcst_cv_naive['forecasts'])
np.testing.assert_array_equal(fcst_cv_f['fitted']['values'], fcst_cv_naive['fitted']['values'])
# 不重拟合的交叉验证测试
cv_starts = np.array([0, 8, 16])
res_cv_wo_refit = ga.cross_validation(models=[SumAhead()], h=2, test_size=5, refit=False, level=(50, 60))
res_cv_refit = ga.cross_validation(models=[SumAhead()], h=2, test_size=5, refit=True, level=(50, 60))
test_fail(test_eq, args=(res_cv_wo_refit['forecasts'], res_cv_refit['forecasts']))
#测试首次预测相等
test_eq(
    res_cv_wo_refit['forecasts'][cv_starts],
    res_cv_refit['forecasts'][cv_starts]
)
# 对于改装=2,前两个窗口应保持一致。
res_cv_refit2 = ga.cross_validation(models=[SumAhead()], h=2, test_size=5, refit=2)
test_eq(
    res_cv_refit2['forecasts'][np.hstack([cv_starts + 0, cv_starts + 1]), 1],
    res_cv_refit2['forecasts'][np.hstack([cv_starts + 2, cv_starts + 3]), 1],
)
# 而接下来的两个窗口应该是一样的。
test_eq(
    res_cv_refit2['forecasts'][np.hstack([cv_starts + 4, cv_starts + 5]), 1],
    res_cv_refit2['forecasts'][np.hstack([cv_starts + 6, cv_starts + 7]), 1],
)
# 但它们之间有所不同。
test_fail(
    lambda: test_eq(
        res_cv_refit2['forecasts'][np.hstack([cv_starts + 0, cv_starts + 1]), 1],
        res_cv_refit2['forecasts'][np.hstack([cv_starts + 4, cv_starts + 5]), 1],
    )
)
from statsforecast.models import AutoCES
res_cv_wo_refit = ga.cross_validation(models=[AutoCES()], h=2, test_size=5, refit=False, level=(50, 60))
res_cv_refit = ga.cross_validation(models=[AutoCES()], h=2, test_size=5, refit=True, level=(50, 60))
test_fail(test_eq, args=(res_cv_wo_refit['forecasts'], res_cv_refit['forecasts']))
#测试首次预测相等
test_eq(
    res_cv_wo_refit['forecasts'][[0, 8, 16]],
    res_cv_refit['forecasts'][[0, 8, 16]]
)
def _get_n_jobs(n_groups, n_jobs):
    if n_jobs == -1 or (n_jobs is None):
        actual_n_jobs = os.cpu_count()
    else:
        actual_n_jobs = n_jobs
    return min(n_groups, actual_n_jobs)
#测试中涉及的系列数量超过资源数量
test_eq(_get_n_jobs(100, -1), os.cpu_count()) 
test_eq(_get_n_jobs(100, None), os.cpu_count())
test_eq(_get_n_jobs(100, 2), 2)
#测试资源数量多于系列数量
test_eq(_get_n_jobs(1, -1), 1) 
test_eq(_get_n_jobs(1, None), 1)
test_eq(_get_n_jobs(2, 10), 2)
def _warn_df_constructor():
    warnings.warn(
        "The `df` argument of the StatsForecast constructor as well as reusing stored "
        "dfs from other methods is deprecated and will raise an error in a future version. "
        "Please provide the `df` argument to the corresponding method instead, e.g. fit/forecast.",
        category=FutureWarning,
    )

def _maybe_warn_sort_df(sort_df):
    if not sort_df:  
        warnings.warn(
            "The `sort_df` argument is deprecated and will be removed in a future version. "
            "You can leave it to its default value (True) to supress this warning",
            category=FutureWarning,
        )

def _warn_id_as_idx():
    warnings.warn(
        "In a future version the predictions will have the id as a column. "
        "You can set the `NIXTLA_ID_AS_COL` environment variable "
        "to adopt the new behavior and to suppress this warning.",
        category=FutureWarning,
    )

def _id_as_idx() -> bool:
    return not bool(os.getenv('NIXTLA_ID_AS_COL', ''))
_param_descriptions = {
    'freq': """freq : str or int
            Frequency of the data. Must be a valid pandas or polars offset alias, or an integer.""",
    'df': """df : pandas or polars DataFrame, optional (default=None)
            DataFrame with ids, times, targets and exogenous.""",
    'sort_df': """sort_df : bool (default=True)
            Sort `df` by ids and times.""",
    'fallback_model': """fallback_model : Any, optional (default=None)
            Any, optional (default=None)
            Model to be used if a model fails.
            Only works with the `forecast` and `cross_validation` methods.""",
    'id_col': """id_col : str (default='unique_id')
            Column that identifies each serie.""",
    'time_col': """time_col : str (default='ds')
            Column that identifies each timestep, its values can be timestamps or integers.""",
    'target_col': """target_col : str (default='y')
            Column that contains the target.""",
    'h': """h : int
            Forecast horizon.""",
    'X_df': """X_df : pandas or polars DataFrame, optional (default=None)
            DataFrame with ids, times and future exogenous.""",
    'level': """level : List[float], optional (default=None)
            Confidence levels between 0 and 100 for prediction intervals.""",
    'prediction_intervals': """prediction_intervals : ConformalIntervals, optional (default=None)
            Configuration to calibrate prediction intervals (Conformal Prediction).""",
    'fitted': """fitted : bool (default=False)
            Store in-sample predictions.""",
    'n_jobs': """n_jobs : int (default=1)
            Number of jobs used in the parallel processing, use -1 for all cores.""",
    'verbose': """verbose : bool (default=True)
            Prints TQDM progress bar when `n_jobs=1`.""",
    'models': """models : List[Any]
            List of instantiated objects models.StatsForecast.""",
    'n_windows': """n_windows : int (default=1)
            Number of windows used for cross validation.""",
    'step_size': """step_size : int (default=1)
            Step size between each window.""",
    'test_size': """test_size : int, optional (default=None)
            Length of test size. If passed, set `n_windows=None`.""",
    'input_size': """input_size : int, optional (default=None)
            Input size for each window, if not none rolled windows.""",
    'refit': """refit : bool or int (default=True)
            Wether or not refit the model for each window.
            If int, train the models every `refit` windows.""",
}
class _StatsForecast:
    """The `StatsForecast` class allows you to efficiently fit multiple `StatsForecast` models 
    for large sets of time series. It operates on a DataFrame `df` with at least three columns
    ids, times and targets.

    The class has memory-efficient `StatsForecast.forecast` method that avoids storing partial 
    model outputs. While the `StatsForecast.fit` and `StatsForecast.predict` methods with 
    Scikit-learn interface store the fitted models.

    The `StatsForecast` class offers parallelization utilities with Dask, Spark and Ray back-ends.
    See distributed computing example [here](https://github.com/Nixtla/statsforecast/tree/main/experiments/ray).
    """
    
    def __init__(
        self, 
        models: List[Any],
        freq: Union[str, int],
        n_jobs: int = 1,
        df: Optional[DataFrame] = None,
        sort_df: bool = True,
        fallback_model: Optional[Any] = None,
        verbose: bool = False,
    ):
        """训练统计模型。

参数
----------
{models}
{freq}
{n_jobs}
{df}
{sort_df}
{fallback_model}
{verbose}
        """
        # 待办 @fede:残差计算所需,稍后考虑
        self.models = models
        self._validate_model_names()
        self.freq = freq
        self.n_jobs = n_jobs
        self.fallback_model = fallback_model
        self.verbose = verbose
        if df is not None:
            _warn_df_constructor()
            self._prepare_fit(df=df, sort_df=sort_df)
        else:
            _maybe_warn_sort_df(sort_df)

    __init__.__doc__ = __init__.__doc__.format(**_param_descriptions)  # 类型:忽略[联合属性]
    
    def _validate_model_names(self):
        # Some test models don't have alias
        names = [getattr(model, 'alias', lambda: None) for model in self.models]
        names = [x for x in names if x is not None]
        if len(names) != len(set(names)):
            raise ValueError('Model names must be unique. You can use `alias` to set a unique name for each model.')

    def _prepare_fit(
        self,
        df: Optional[DataFrame],
        sort_df: bool = True,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',        
    ) -> None:
        if df is None:
            if not hasattr(self, 'ga'):
                raise ValueError('You must provide the `df` argument.')
            _warn_df_constructor()
            return
        df = ensure_time_dtype(df, time_col)
        validate_freq(df[time_col], self.freq)
        if isinstance(df, pd.DataFrame) and df.index.name == id_col:
            warnings.warn(
                "Passing unique_id as the index is deprecated. "
                "Please provide it as a column instead.",
                category=FutureWarning
            )
            df = df.reset_index()
        _maybe_warn_sort_df(sort_df)
        self.uids, last_times, data, indptr, sort_idxs = ufp.process_df(
            df, id_col, time_col, target_col
        )
        if isinstance(df, pd.DataFrame):
            self.last_dates = pd.Index(last_times, name=time_col)
        else:
            self.last_dates = pl_Series(last_times)
        self.ga = GroupedArray(data, indptr)
        self.og_dates = df[time_col].to_numpy()
        if sort_idxs is not None:
            self.og_dates = self.og_dates[sort_idxs]
        self.n_jobs = _get_n_jobs(len(self.ga), self.n_jobs)
        self.df_constructor = type(df)
        self.id_col = id_col
        self.time_col = time_col
        self.target_col = target_col
        self._exog = [c for c in df.columns if c not in (id_col, time_col, target_col)]        

    def _validate_sizes_for_prediction_intervals(
        self,
        prediction_intervals: Optional[ConformalIntervals],
        offset: int = 0,
    ) -> None:
        if prediction_intervals is None:
            return
        sizes = np.diff(self.ga.indptr) - offset
        # the absolute minimum requires two windows
        min_samples = 2 * prediction_intervals.h + 1
        if np.any(sizes < min_samples):
            raise ValueError(
                f'Minimum samples for computing prediction intervals are {min_samples + offset:,}, '
                'some series have less. Please remove them or adjust the horizon.'
            )
        # required samples for current configuration
        required_samples = prediction_intervals.n_windows * prediction_intervals.h + 1
        if np.any(sizes < required_samples):
            warnings.warn(
                f'Prediction intervals settings require at least {required_samples + offset:,} samples, '
                'some series have less and will use less windows.'
            )

    def _set_prediction_intervals(
        self, prediction_intervals: Optional[ConformalIntervals]
    ) -> None:
        for model in self.models:
            interval = getattr(model, "prediction_intervals", None)
            if interval is None:
                setattr(model, "prediction_intervals", prediction_intervals)

    def fit(
        self,
        df: Optional[DataFrame] = None, 
        sort_df: bool = True,
        prediction_intervals: Optional[ConformalIntervals] = None,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',
    ):
        """Fit statistical models.

        Fit `models` to a large set of time series from DataFrame `df`
        and store fitted models for later inspection.

        Parameters
        ----------
        {df}
            If None, the `StatsForecast` class should have been instantiated using `df`.
        {sort_df}
        {prediction_intervals}
        {id_col}
        {time_col}
        {target_col}

        Returns
        -------
        self : StatsForecast
            Returns with stored `StatsForecast` fitted `models`.
        """
        self._prepare_fit(
            df=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
        )
        self._validate_sizes_for_prediction_intervals(prediction_intervals)
        self._set_prediction_intervals(prediction_intervals=prediction_intervals)
        if self.n_jobs == 1:
            self.fitted_ = self.ga.fit(models=self.models, fallback_model=self.fallback_model)
        else:
            self.fitted_ = self._fit_parallel()
        return self

    fit.__doc__ = fit.__doc__.format(**_param_descriptions)  # 类型:忽略[联合属性]
    
    def _make_future_df(self, h: int):
        start_dates = ufp.offset_times(self.last_dates, freq=self.freq, n=1)
        dates = ufp.time_ranges(start_dates, freq=self.freq, periods=h)
        uids = ufp.repeat(self.uids, n=h)
        df = self.df_constructor({self.id_col: uids, self.time_col: dates})
        if isinstance(df, pd.DataFrame):
            if _id_as_idx():
                _warn_id_as_idx()
                df = df.set_index(self.id_col)
            else:
                df = df.reset_index(drop=True)
        return df

    def _parse_X_level(self, h: int, X: Optional[DataFrame], level: Optional[List[int]]):
        if level is None:
            level = []
        if X is None:
            return X, level
        expected_shape = (h * len(self.ga), self.ga.data.shape[1] + 1)
        if X.shape != expected_shape:
            raise ValueError(f'Expected X to have shape {expected_shape}, but got {X.shape}')
        _, _, data, indptr, _ = ufp.process_df(X, self.id_col, self.time_col, None)
        return GroupedArray(data, indptr), level

    def _validate_exog(self, X_df: Optional[DataFrame] = None) -> None:
        if not any(m.uses_exog for m in self.models) or not self._exog:
            return
        err_msg = (
            f'Models require the following exogenous features {self._exog} '
            'for the forecasting step. Please provide them through `X_df`.'
        )
        if X_df is None:
            raise ValueError(err_msg)
        missing_exog = [c for c in self._exog if c not in X_df.columns]
        if missing_exog:
            raise ValueError(err_msg)

    def predict(
        self,
        h: int,
        X_df: Optional[DataFrame] = None,
        level: Optional[List[int]] = None,
    ):
        """Predict statistical models.

        Use stored fitted `models` to predict large set of time series from DataFrame `df`.        

        Parameters
        ----------
        {h}
        {X_df}
        {level}

        Returns
        -------
        fcsts_df : pandas or polars DataFrame
            DataFrame with `models` columns for point predictions and probabilistic
            predictions for all fitted `models`.
        """
        if not hasattr(self, 'fitted_'):
            raise ValueError('You must call the fit method before calling predict.')
        if any(getattr(m, 'prediction_intervals', None) is not None for m in self.models) and level is None:
            warnings.warn(
                "Prediction intervals are set but `level` was not provided. "
                "Predictions won't have intervals."
            )
        self._validate_exog(X_df)
        X, level = self._parse_X_level(h=h, X=X_df, level=level)
        if self.n_jobs == 1:
            fcsts, cols = self.ga.predict(fm=self.fitted_, h=h, X=X, level=level)
        else:
            fcsts, cols = self._predict_parallel(h=h, X=X, level=level)
        fcsts_df = self._make_future_df(h=h)
        fcsts_df[cols] = fcsts
        return fcsts_df

    predict.__doc__ = predict.__doc__.format(**_param_descriptions)  # 类型:忽略[联合属性]
    
    def fit_predict(
        self,
        h: int,
        df: Optional[DataFrame] = None,
        X_df: Optional[DataFrame] = None,
        level: Optional[List[int]] = None,
        sort_df: bool = True,
        prediction_intervals: Optional[ConformalIntervals] = None,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',        
    ) -> DataFrame:
        """Fit and Predict with statistical models.

        This method avoids memory burden due from object storage.
        It is analogous to Scikit-Learn `fit_predict` without storing information.
        It requires the forecast horizon `h` in advance. 
        
        In contrast to `StatsForecast.forecast` this method stores partial models outputs.

        Parameters
        ----------
        {h}
        {df}
            If None, the `StatsForecast` class should have been instantiated using `df`.
        {X_df}
        {level}
        {sort_df}
        {prediction_intervals}
        {id_col}
        {time_col}
        {target_col}

        Returns
        -------
        fcsts_df : pandas or polars DataFrame
            DataFrame with `models` columns for point predictions and probabilistic
            predictions for all fitted `models`.
        """
        self._prepare_fit(
            df=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
        )
        self._validate_exog(X_df)
        if prediction_intervals is not None and level is None:
            raise ValueError('You must specify `level` when using `prediction_intervals`')
        self._validate_sizes_for_prediction_intervals(prediction_intervals)            
        self._set_prediction_intervals(prediction_intervals=prediction_intervals)
        X, level = self._parse_X_level(h=h, X=X_df, level=level)
        if self.n_jobs == 1:
            self.fitted_, fcsts, cols = self.ga.fit_predict(models=self.models, h=h, X=X, level=level)
        else:
            self.fitted_, fcsts, cols = self._fit_predict_parallel(h=h, X=X, level=level)
        fcsts_df = self._make_future_df(h=h)
        fcsts_df[cols] = fcsts
        return fcsts_df

    fit_predict.__doc__ = fit_predict.__doc__.format(**_param_descriptions)  # 类型:忽略[联合属性]
    
    def forecast(
        self,
        h: int,
        df: Optional[DataFrame] = None,
        X_df: Optional[DataFrame] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
        sort_df: bool = True,
        prediction_intervals: Optional[ConformalIntervals] = None,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',
    ) -> DataFrame:
        """Memory Efficient predictions.

        This method avoids memory burden due from object storage.
        It is analogous to Scikit-Learn `fit_predict` without storing information.
        It requires the forecast horizon `h` in advance.

        Parameters
        ----------
        {h}
        {df}
        {X_df}
        {level}
        {fitted}
        {sort_df}
        {prediction_intervals}
        {id_col}
        {time_col}
        {target_col}           
        
        Returns
        -------
        fcsts_df : pandas or polars DataFrame
            DataFrame with `models` columns for point predictions and probabilistic
            predictions for all fitted `models`.
        """
        self.__dict__.pop('fcst_fitted_values_', None)
        self._prepare_fit(
            df=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
        )
        self._validate_exog(X_df)
        self._validate_sizes_for_prediction_intervals(prediction_intervals)        
        self._set_prediction_intervals(prediction_intervals=prediction_intervals)
        X, level = self._parse_X_level(h=h, X=X_df, level=level)
        if self.n_jobs == 1:
            res_fcsts = self.ga.forecast(
                models=self.models, 
                h=h,
                fallback_model=self.fallback_model, 
                fitted=fitted,
                X=X,
                level=level, 
                verbose=self.verbose,
                target_col=target_col,
            )
        else:
            res_fcsts = self._forecast_parallel(
                h=h,
                fitted=fitted,
                X=X,
                level=level,
                target_col=target_col,
            )
        if fitted:
            self.fcst_fitted_values_ = res_fcsts['fitted']
        fcsts = res_fcsts['forecasts']
        cols = res_fcsts['cols']
        fcsts_df = self._make_future_df(h=h)
        fcsts_df[cols] = fcsts
        self.forecast_times_ = res_fcsts['times']
        return fcsts_df

    forecast.__doc__ = forecast.__doc__.format(**_param_descriptions)  # 类型:忽略[联合属性]
    
    def forecast_fitted_values(self):
        """Access insample predictions.

        After executing `StatsForecast.forecast`, you can access the insample 
        prediction values for each model. To get them, you need to pass `fitted=True` 
        to the `StatsForecast.forecast` method and then use the 
        `StatsForecast.forecast_fitted_values` method.

        Returns
        -------
        fcsts_df : pandas.DataFrame | polars.DataFrame
            DataFrame with insample `models` columns for point predictions and probabilistic
            predictions for all fitted `models`.
        """
        if not hasattr(self, "fcst_fitted_values_"):
            raise Exception("Please run `forecast` method using `fitted=True`")
        cols = self.fcst_fitted_values_["cols"]
        df = self.df_constructor({
            self.id_col: ufp.repeat(self.uids, np.diff(self.ga.indptr)),
            self.time_col: self.og_dates
        })
        df[cols] = self.fcst_fitted_values_['values']
        if isinstance(df, pd.DataFrame):
            if _id_as_idx():
                _warn_id_as_idx()
                df = df.set_index(self.id_col)
            else:
                df = df.reset_index(drop=True)
        return df

    def cross_validation(
        self,
        h: int,
        df: Optional[DataFrame] = None,
        n_windows: int = 1,
        step_size: int = 1,
        test_size: Optional[int] = None,
        input_size: Optional[int] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
        refit: Union[bool, int] = True,
        sort_df: bool = True,
        prediction_intervals: Optional[ConformalIntervals] = None,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',        
    ) -> DataFrame:
        """Temporal Cross-Validation.

        Efficiently fits a list of `StatsForecast` 
        models through multiple training windows, in either chained or rolled manner.
        
        `StatsForecast.models`' speed allows to overcome this evaluation technique 
        high computational costs. Temporal cross-validation provides better model's 
        generalization measurements by increasing the test's length and diversity.

        Parameters
        ----------
        {h}
        {df}
            If None, the `StatsForecast` class should have been instantiated using `df`.
        {n_windows}
        {step_size}
        {test_size}
        {input_size}            
        {level}
        {fitted}
        {refit}
        {sort_df}
        {prediction_intervals}
        {id_col}
        {time_col}
        {target_col}

        Returns
        -------
        fcsts_df : pandas or polars DataFrame
            DataFrame with insample `models` columns for point predictions and probabilistic
            predictions for all fitted `models`.
        """
        if n_windows is None and test_size is None:
            raise ValueError('you must define `n_windows` or `test_size`')
        if test_size is None:
            test_size = h + step_size * (n_windows - 1)
        if prediction_intervals is not None and level is None:
            raise ValueError('You must specify `level` when using `prediction_intervals`')
        if refit != True:
            no_forward = [m for m in self.models if not hasattr(m, 'forward')]
            if no_forward:
                raise ValueError(
                    'Can only use integer refit or refit=False with models that implement the forward method. '
                    f'The following models do not implement the forward method: {no_forward}.'
                )
            if self.fallback_model is not None and not hasattr(self.fallback_model, 'forward'):
                raise ValueError(
                    'Can only use integer refit or refit=False with a fallback model that implements the forward method.'
                )
        self.__dict__.pop('cv_fitted_values_', None)
        self._prepare_fit(
            df=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
        )
        series_sizes = np.diff(self.ga.indptr)
        short_series = series_sizes <= test_size
        if short_series.any():
            short_ids = self.uids[short_series].to_numpy().tolist()
            raise ValueError(
                f"The following series are too short for the cross validation settings: {reprlib.repr(short_ids)}\n"
                "Please remove these series or change the settings, e.g. reducing the horizon or the number of windows."
            )        
        self._validate_sizes_for_prediction_intervals(
            prediction_intervals=prediction_intervals, offset=test_size
        )
        self._set_prediction_intervals(prediction_intervals=prediction_intervals)
        _, level = self._parse_X_level(h=h, X=None, level=level)
        if self.n_jobs == 1:
            res_fcsts = self.ga.cross_validation(
                models=self.models, h=h, test_size=test_size, 
                fallback_model=self.fallback_model, 
                step_size=step_size, 
                input_size=input_size, 
                fitted=fitted,
                level=level,
                verbose=self.verbose,
                refit=refit,
                target_col=target_col,
            )
        else:
            res_fcsts = self._cross_validation_parallel(
                h=h, 
                test_size=test_size,
                step_size=step_size,
                input_size=input_size,
                fitted=fitted,
                level=level,
                refit=refit,
                target_col=target_col,
            )
        if fitted:
            self.cv_fitted_values_ = res_fcsts['fitted']
            self.n_cv_ = n_windows
        fcsts_df = ufp.cv_times(
            times=self.og_dates,
            uids=self.uids,
            indptr=self.ga.indptr,
            h=h,
            test_size=test_size,
            step_size=step_size,
            id_col=id_col,
            time_col=time_col,
        )
        # the cv_times is sorted by window and then id
        fcsts_df = ufp.sort(fcsts_df, [id_col, "cutoff", time_col])
        fcsts_df = ufp.assign_columns(fcsts_df, res_fcsts["cols"], res_fcsts["forecasts"])
        if isinstance(fcsts_df, pd.DataFrame) and _id_as_idx():
            _warn_id_as_idx()
            fcsts_df = fcsts_df.set_index(id_col)
        return fcsts_df

    cross_validation.__doc__ = cross_validation.__doc__.format(**_param_descriptions)  # 类型:忽略[联合属性]

    def cross_validation_fitted_values(self) -> DataFrame:
        """Access insample cross validated predictions.

        After executing `StatsForecast.cross_validation`, you can access the insample 
        prediction values for each model and window. To get them, you need to pass `fitted=True` 
        to the `StatsForecast.cross_validation` method and then use the 
        `StatsForecast.cross_validation_fitted_values` method.

        Returns
        -------
        fcsts_df : pandas or polars DataFrame
            DataFrame with insample `models` columns for point predictions 
            and probabilistic predictions for all fitted `models`.
        """        
        if not hasattr(self, 'cv_fitted_values_'):
            raise Exception('Please run `cross_validation` method using `fitted=True`')
        idxs = self.cv_fitted_values_['idxs'].flatten(order='F')            
        train_uids = ufp.repeat(self.uids, np.diff(self.ga.indptr))
        cv_uids = ufp.vertical_concat([train_uids for _ in range(self.n_cv_)])
        used_uids = ufp.take_rows(cv_uids, idxs)
        dates = np.tile(self.og_dates, self.n_cv_)[idxs]
        cutoffs_mask = self.cv_fitted_values_['last_idxs'].flatten(order='F')[idxs]
        cutoffs_sizes = np.diff(np.append(0, np.where(cutoffs_mask)[0] + 1))
        cutoffs = np.repeat(dates[cutoffs_mask], cutoffs_sizes)        
        df = self.df_constructor({
            self.id_col: used_uids,
            self.time_col: dates,
            'cutoff': cutoffs,
        })
        fitted_vals = np.reshape(
            self.cv_fitted_values_['values'],
            (-1, len(self.models) + 1),
            order='F',
        )
        df = ufp.assign_columns(df, self.cv_fitted_values_['cols'], fitted_vals[idxs])
        df = ufp.drop_index_if_pandas(df)
        if isinstance(df, pd.DataFrame):
            if _id_as_idx():
                _warn_id_as_idx()
                df = df.set_index(self.id_col)
            else:
                df = df.reset_index(drop=True)
        return df

    def _get_pool(self):
        from multiprocessing import Pool

        pool_kwargs = dict()
        return Pool, pool_kwargs
    
    def _fit_parallel(self):
        gas = self.ga.split(self.n_jobs)
        Pool, pool_kwargs = self._get_pool()
        with Pool(self.n_jobs, **pool_kwargs) as executor:
            futures = []
            for ga in gas:
                future = executor.apply_async(
                    ga._single_threaded_fit,
                    (self.models, self.fallback_model)
                )
                futures.append(future)
            fm = np.vstack([f.get() for f in futures])
        return fm    
    
    def _get_gas_Xs(self, X, tasks_per_job=1):
        n_chunks = min(tasks_per_job * self.n_jobs, self.ga.n_groups)
        gas = self.ga.split(n_chunks)
        if X is not None:
            Xs = X.split(n_chunks)
        else:
            from itertools import repeat

            Xs = repeat(None)
        return gas, Xs
    
    def _predict_parallel(self, h, X, level):
        #create elements for each core
        gas, Xs = self._get_gas_Xs(X=X)
        fms = self.ga.split_fm(self.fitted_, self.n_jobs)
        Pool, pool_kwargs = self._get_pool()
        #compute parallel forecasts
        with Pool(self.n_jobs, **pool_kwargs) as executor:
            futures = []
            for ga, fm, X_ in zip(gas, fms, Xs):
                future = executor.apply_async(
                    ga._single_threaded_predict,
                    (fm, h, X_, level),
                )
                futures.append(future)
            out = [f.get() for f in futures]
            fcsts, cols = list(zip(*out))
            fcsts = np.vstack(fcsts)
            cols = cols[0]
        return fcsts, cols
    
    def _fit_predict_parallel(self, h, X, level):
        #create elements for each core
        gas, Xs = self._get_gas_Xs(X=X)
        Pool, pool_kwargs = self._get_pool()
        #compute parallel forecasts
        with Pool(self.n_jobs, **pool_kwargs) as executor:
            futures = []
            for ga, X_ in zip(gas, Xs):
                future = executor.apply_async(
                    ga._single_threaded_fit_predict,
                    (self.models, h, X_, level),
                )
                futures.append(future)
            out = [f.get() for f in futures]
            fm, fcsts, cols = list(zip(*out))
            fm = np.vstack(fm)
            fcsts = np.vstack(fcsts)
            cols = cols[0]
        return fm, fcsts, cols

    def _forecast_parallel(self, h, fitted, X, level, target_col):
        gas, Xs = self._get_gas_Xs(X=X, tasks_per_job=100)
        results = [None] * len(gas)
        with ProcessPoolExecutor(self.n_jobs) as executor:
            future2pos = {
                executor.submit(
                    ga._single_threaded_forecast,
                    h=h,
                    models=self.models,
                    fallback_model=self.fallback_model,
                    fitted=fitted,
                    X=X,
                    level=level,
                    verbose=False,
                    target_col=target_col,
                ): i
                for i, (ga, X) in enumerate(zip(gas, Xs))
            }
            iterable = tqdm(
                as_completed(future2pos),
                disable=not self.verbose,
                total=len(future2pos),
                desc="Forecast",
                bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed}{postfix}]",
            )
            for future in iterable:
                i = future2pos[future]
                results[i] = future.result()
        result = {
            'cols': results[0]['cols'],
            'forecasts': np.vstack([r['forecasts'] for r in results]),
            'times': {
                m: sum(r['times'][m] for r in results)
                for m in [repr(m) for m in self.models]
            },
        }
        if fitted:
            result['fitted'] = {
                'cols': results[0]['fitted']['cols'],
                'values': np.vstack([r['fitted']['values'] for r in results]),
            }
        return result

    def _cross_validation_parallel(self, h, test_size, step_size, input_size, fitted, level, refit, target_col):
        #create elements for each core
        gas = self.ga.split(self.n_jobs)
        Pool, pool_kwargs = self._get_pool()
        #compute parallel forecasts
        result = {}
        with Pool(self.n_jobs, **pool_kwargs) as executor:
            futures = []
            for ga in gas:
                future = executor.apply_async(
                    ga._single_threaded_cross_validation,
                    tuple(),
                    dict(
                        models=self.models,
                        h=h,
                        test_size=test_size,
                        fallback_model=self.fallback_model,
                        step_size=step_size,
                        input_size=input_size,
                        fitted=fitted,
                        level=level,
                        refit=refit,
                        verbose=self.verbose,
                        target_col=target_col,
                    ),
                )
                futures.append(future)
            out = [f.get() for f in futures]
            fcsts = [d['forecasts'] for d in out]
            fcsts = np.vstack(fcsts)
            cols = out[0]['cols']
            result['forecasts'] = fcsts
            result['cols'] = cols
            if fitted:
                result['fitted'] = {}
                result['fitted']['values'] = np.concatenate([d['fitted']['values'] for d in out])
                for key in ['last_idxs', 'idxs']:
                    result['fitted'][key] = np.concatenate([d['fitted'][key] for d in out])
                result['fitted']['cols'] = out[0]['fitted']['cols']
        return result
    
    @staticmethod
    def plot(
        df: DataFrame,
        forecasts_df: Optional[DataFrame] = None,
        unique_ids: Union[Optional[List[str]], np.ndarray] = None,
        plot_random: bool = True, 
        models: Optional[List[str]] = None, 
        level: Optional[List[float]] = None,
        max_insample_length: Optional[int] = None,
        plot_anomalies: bool = False,
        engine: str = 'matplotlib',
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',
        resampler_kwargs: Optional[Dict] = None
    ):
        """Plot forecasts and insample values.
        
        Parameters
        ----------
        {df}
        forecasts_df : pandas or polars DataFrame, optional (default=None)
            DataFrame ids, times and models.
        unique_ids : list of str, optional (default=None)
            ids to plot. If None, they're selected randomly.
        plot_random : bool (default=True)
            Select time series to plot randomly.
        models : List[str], optional (default=None)
            List of models to plot.
        level : List[float], optional (default=None)
            List of prediction intervals to plot if paseed.
        max_insample_length : int, optional (default=None)
            Max number of train/insample observations to be plotted.
        plot_anomalies : bool (default=False)
            Plot anomalies for each prediction interval.
        engine : str (default='matplotlib')
            Library used to plot. 'plotly', 'plotly-resampler' or 'matplotlib'.
        {id_col}
        {time_col}
        {target_col}            
        resampler_kwargs : dict
            Kwargs to be passed to plotly-resampler constructor. 
            For further custumization ("show_dash") call the method,
            store the plotting object and add the extra arguments to
            its `show_dash` method.
        """
        from utilsforecast.plotting import plot_series

        df = ensure_time_dtype(df, time_col)        
        if isinstance(df, pd.DataFrame) and df.index.name == id_col:
            warnings.warn(
                "Passing the ids as the index is deprecated. "
                "Please provide them as a column instead.",
                category=FutureWarning
            )
            df = df.reset_index()
        if forecasts_df is not None:
            forecasts_df = ensure_time_dtype(forecasts_df, time_col)            
        if isinstance(forecasts_df, pd.DataFrame) and forecasts_df.index.name == id_col:
            warnings.warn(
                "Passing the ids as the index is deprecated. "
                "Please provide them as a column instead.",
                category=FutureWarning
            )
            forecasts_df = forecasts_df.reset_index()          
        return plot_series(
            df=df,
            forecasts_df=forecasts_df,
            ids=unique_ids,
            plot_random=plot_random,
            models=models,
            level=level,
            max_insample_length=max_insample_length,
            plot_anomalies=plot_anomalies,
            engine=engine,
            resampler_kwargs=resampler_kwargs,
            palette='tab20b',
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
    
    def save(
        self, 
        path: Optional[Union[Path, str]] = None,
        max_size: Optional[str] = None,
        trim: bool = False,
    ):
        """Function that will save StatsForecast class with certain settings to make it 
        reproducible.
        
        Parameters
        ----------
        path : str or pathlib.Path, optional (default=None)
            Path of the file to be saved. If `None` will create one in the current 
            directory using the current UTC timestamp.
        max_size : str, optional (default = None)
            StatsForecast object should not exceed this size.
            Available byte naming: ['B', 'KB', 'MB', 'GB']
        trim : bool (default = False)
            Delete any attributes not needed for inference.
        """
        # Will be used to find the size of the fitted models
        # Never expecting anything higher than GB (even that's a lot')
        bytes_hmap = {
            "B": 1,
            "KB": 2**10,
            "MB": 2**20,
            "GB": 2**30,
        }

        # Removing unnecessary attributes
        # @jmoralez decide future implementation
        trim_attr:list = ["fcst_fitted_values_", "cv_fitted_values_"]
        if trim:
            for attr in trim_attr:
                # remove unnecessary attributes here
                self.__dict__.pop(attr, None)

        sf_size = len(pickle.dumps(self))

        if max_size is not None:
            cap_size = self._get_cap_size(max_size, bytes_hmap)
            if sf_size >= cap_size:
                err_messg = "StatsForecast is larger than the specified max_size"
                raise OSError(errno.EFBIG, err_messg) 

        converted_size, sf_byte = None, None
        for key in reversed(list(bytes_hmap.keys())):
            x_byte = bytes_hmap[key]
            if sf_size >= x_byte:
                converted_size = sf_size / x_byte
                sf_byte = key
                break
    
        if converted_size is None or sf_byte is None:
            err_messg = "Internal Error, this shouldn't happen, please open an issue"
            raise RuntimeError(err_messg)
    
        print(f"Saving StatsForecast object of size {converted_size:.2f}{sf_byte}.")
    
        if path is None:
            datetime_record = dt.datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S")
            path = f"StatsForecast_{datetime_record}.pkl"
    
        with open(path, "wb") as m_file:
            pickle.dump(self, m_file)
        print("StatsForecast object saved")

    def _get_cap_size(self, max_size, bytes_hmap):
        max_size = max_size.upper().replace(" ", "")
        match = re.match(r'(\d+\.\d+|\d+)(\w+)', max_size)
        if match is None or len(match.groups()) < 2 or match[2] not in bytes_hmap.keys():
            parsing_error = "Couldn't parse `max_size`, it should be `None`", \
            " or a number followed by one of the following units: ['B', 'KB', 'MB', 'GB']"
            raise ValueError(parsing_error)
        else:
            m_size = float(match[1])
            key_ = match[2]
            cap_size = m_size * bytes_hmap[key_]
        return cap_size
    
    @staticmethod
    def load(path:Union[Path, str]):
        """
        Automatically loads the model into ready StatsForecast.

        Parameters
        ----------
        path : str or pathlib.Path
            Path to saved StatsForecast file.
        
        Returns
        -------
        sf: StatsForecast
            Previously saved StatsForecast
        """
        if not Path(path).exists():
            raise ValueError("Specified path does not exist, check again and retry.")
        with open(path, "rb") as f:
            return pickle.load(f)
    
    def __repr__(self):
        return f"StatsForecast(models=[{','.join(map(repr, self.models))}])"

_StatsForecast.plot.__doc__ = _StatsForecast.plot.__doc__.format(**_param_descriptions)  # 类型:忽略[联合属性]
class ParallelBackend:
    def forecast(
        self,
        *,
        models,
        fallback_model,        
        freq,
        h,        
        df,
        X_df,
        level,
        fitted,
        prediction_intervals,
        id_col,
        time_col,
        target_col,
    ) -> Any:
        model = _StatsForecast(
            models=models,
            freq=freq,
            fallback_model=fallback_model,
        )
        return model.forecast(
            df=df,
            h=h,
            X_df=X_df,
            level=level,
            fitted=fitted,
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,            
        )

    def cross_validation(
        self,
        *,
        df,
        models,
        freq,
        fallback_model,
        h,
        n_windows,
        step_size,
        test_size,
        input_size,
        level,
        refit,
        fitted,
        prediction_intervals,
        id_col,
        time_col,
        target_col,        
    ) -> Any:
        model = _StatsForecast(
            models=models,
            freq=freq,
            fallback_model=fallback_model,
        )
        return model.cross_validation(
            df=df,
            h=h,
            n_windows=n_windows,
            step_size=step_size,
            test_size=test_size,
            input_size=input_size,
            level=level,
            refit=refit,
            fitted=fitted,
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,             
        )

@conditional_dispatcher
def make_backend(obj:Any, *args:Any, **kwargs:Any) -> ParallelBackend:
    return ParallelBackend()
class StatsForecast(_StatsForecast):
    def forecast(
        self,
        h: int,
        df: Any = None,
        X_df: Optional[DataFrame] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
        sort_df: bool = True,
        prediction_intervals: Optional[ConformalIntervals] = None,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',
    ):
        if prediction_intervals is not None and level is None:
            raise ValueError('You must specify `level` when using `prediction_intervals`')
        if self._is_native(df=df):
            return super().forecast(
                df=df,                
                h=h,
                X_df=X_df,
                level=level,
                fitted=fitted,
                sort_df=sort_df,
                prediction_intervals=prediction_intervals,
                id_col=id_col,
                time_col=time_col,
                target_col=target_col,                
            )
        assert df is not None
        engine = make_execution_engine(infer_by=[df])
        self._backend = make_backend(engine)
        return self._backend.forecast(
            models=self.models,
            fallback_model=self.fallback_model,            
            freq=self.freq,            
            df=df,
            h=h,
            X_df=X_df,
            level=level,
            fitted=fitted,
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )

    def forecast_fitted_values(self):
        if hasattr(self, '_backend'):
            res = self._backend.forecast_fitted_values()
        else:
            res = super().forecast_fitted_values()
        return res
    
    def cross_validation(
        self,
        h: int,
        df: Any = None,
        n_windows: int = 1,
        step_size: int = 1,
        test_size: Optional[int] = None,
        input_size: Optional[int] = None,
        level: Optional[List[int]] = None,
        fitted: bool = False,
        refit: Union[bool, int] = True,
        sort_df: bool = True,
        prediction_intervals: Optional[ConformalIntervals] = None,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',        
    ):
        if self._is_native(df=df):
            return super().cross_validation(
                h=h,
                df=df,
                n_windows=n_windows,
                step_size=step_size,
                test_size=test_size,
                input_size=input_size,
                level=level,
                fitted=fitted,
                refit=refit,
                sort_df=sort_df,
                prediction_intervals=prediction_intervals,
                id_col=id_col,
                time_col=time_col,
                target_col=target_col,                
            )
        assert df is not None
        engine = make_execution_engine(infer_by=[df])
        backend = make_backend(engine)
        return backend.cross_validation(
            df=df,
            models=self.models,
            freq=self.freq,
            fallback_model=self.fallback_model,
            h=h,
            n_windows=n_windows,
            step_size=step_size,
            test_size=test_size,
            input_size=input_size,
            level=level,
            refit=refit,
            fitted=fitted,
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,            
        )

    def _is_native(self, df) -> bool:
        engine = try_get_context_execution_engine()
        return engine is None and (df is None or isinstance(df, pd.DataFrame) or isinstance(df, pl_DataFrame))
show_doc(StatsForecast, title_level=2, name='StatsForecast')
# StatsForecast 的类使用示例

#来自 statsforecast.core 的 StatsForecast
from statsforecast.models import ( 
    ADIDA,
    AutoARIMA,
    CrostonClassic,
    CrostonOptimized,
    CrostonSBA,
    HistoricAverage,
    IMAPA,
    Naive,
    RandomWalkWithDrift,
    SeasonalExponentialSmoothing,
    SeasonalNaive,
    SeasonalWindowAverage,
    SimpleExponentialSmoothing,
    TSB,
    WindowAverage,
    DynamicOptimizedTheta,
    AutoETS,
    AutoCES
)
# 生成合成面板数据框示例
panel_df = generate_series(n_series=9, equal_ends=False, engine='pandas')
panel_df.groupby('unique_id').tail(4)
if 'NIXTLA_ID_AS_COL' in os.environ:
    del os.environ['NIXTLA_ID_AS_COL']
# 将id用作索引的警告
fcst = StatsForecast(models=[Naive()], freq='D')
fcst.fit(df=panel_df)
with warnings.catch_warnings(record=True) as issued_warnings:
    warnings.simplefilter('always', category=FutureWarning)
    std_preds = fcst.predict(h=1)
    std_preds2 = fcst.fit_predict(df=panel_df, h=1)
    std_fcst = fcst.forecast(df=panel_df, h=1, fitted=True)
    std_fitted = fcst.forecast_fitted_values()
    std_cv = fcst.cross_validation(df=panel_df, h=1, fitted=True)
    std_fitted_cv = fcst.cross_validation_fitted_values()
assert len(issued_warnings) == 6
assert all('the predictions will have the id as a column' in str(w.message) for w in issued_warnings)
os.environ['NIXTLA_ID_AS_COL'] = '1'
# 如果我们使用包含外生变量的模型进行训练,则必须通过X_df提供这些外生变量。
# 否则将引发错误,提示此问题。
panel_with_exog = panel_df[panel_df['unique_id'] == 0].copy()
panel_with_exog['month'] = panel_df['ds'].dt.month
sf = StatsForecast(
    models=[AutoARIMA(season_length=12)],
    freq='M',
)
sf.fit(panel_with_exog)
expected_msg = "['month'] for the forecasting step. Please provide them through `X_df`"
test_fail(
    lambda: sf.predict(h=12),
    contains=expected_msg,
)
test_fail(
    lambda: sf.forecast(df=panel_with_exog, h=12),
    contains=expected_msg,
)
test_fail(
    lambda: sf.fit_predict(df=panel_with_exog, h=12),
    contains=expected_msg,
)
# if the models don't use exog then it continues
sf = StatsForecast(
    models=[SeasonalNaive(season_length=10), Naive()],
    freq='M',
)
sf.fit(panel_with_exog)
_ = sf.predict(h=12)
# 检查具有预测区间的尺寸
fcst = StatsForecast(models=[Naive()], freq='D')
intervals = ConformalIntervals(n_windows=4, h=10)
# 间隔需要41个样本,而30个样本我们只能使用2个窗口,应发出警告。
with warnings.catch_warnings(record=True) as issued_warnings:
    fcst.fit(df=panel_df.head(30), prediction_intervals=intervals)
assert 'will use less windows' in str(issued_warnings[0].message)
assert fcst.fitted_[0, 0]._cs.shape[0] == 2
# 如果我们拥有的样本少于21个(即两个窗口,h = 10 + 1用于训练),它应该会失败。
test_fail(
    lambda: fcst.fit(df=panel_df.head(20), prediction_intervals=intervals),
    contains='Please remove them or adjust the horizon',
)
# 对于CV,应考虑测试规模(CV为20,区间为21)
test_fail(
    lambda: fcst.cross_validation(
        df=panel_df.head(40),
        n_windows=2,
        step_size=10,
        h=10,
        prediction_intervals=intervals,
        level=[80],
    ),
    contains='Minimum samples for computing prediction intervals are 41',
)
# 整数重置或重置=False 对不支持的模型引发错误
fcst = StatsForecast(models=[Naive(), SeasonalNaive(season_length=7)], freq='D')
test_fail(
    lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=2),
    contains='implement the forward method: [SeasonalNaive]'
)
test_fail(
    lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=False),
    contains='implement the forward method: [SeasonalNaive]'
)
fcst = StatsForecast(models=[Naive()], freq='D', fallback_model=SeasonalNaive(season_length=7))
test_fail(
    lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=2),
    contains='a fallback model that implements the forward method.'
)
test_fail(
    lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=False),
    contains='a fallback model that implements the forward method.'
)
# 非标准列名
renamer = {'unique_id': 'uid', 'ds': 'time', 'y': 'target'}
kwargs = dict(id_col='uid', time_col='time', target_col='target')
inverse_renamer = {v: k for k, v in renamer.items()}
non_std_df = panel_df.rename(columns=renamer)

def assert_equal_results(df1, df2):
    pd.testing.assert_frame_equal(
        df1.reset_index(),
        df2.rename(columns=inverse_renamer),
    )

fcst = StatsForecast(models=[Naive()], freq='D')
fcst.fit(df=non_std_df, **kwargs)
non_std_preds = fcst.predict(h=1)
non_std_preds2 = fcst.fit_predict(df=non_std_df, h=1, **kwargs)
non_std_fcst = fcst.forecast(df=non_std_df, h=1, fitted=True, **kwargs)
non_std_fitted = fcst.forecast_fitted_values()
non_std_cv = fcst.cross_validation(df=non_std_df, h=1, fitted=True, **kwargs)
non_std_fitted_cv = fcst.cross_validation_fitted_values()

assert_equal_results(std_preds, non_std_preds)
assert_equal_results(std_preds2, non_std_preds2)
assert_equal_results(std_fcst, non_std_fcst)
assert_equal_results(std_fitted, non_std_fitted)
assert_equal_results(std_cv, non_std_cv)
assert_equal_results(std_fitted_cv, non_std_fitted_cv)
# 声明要拟合的StatsForecast估计器实例列表
# You can try other estimator's hyperparameters
# You can try other methods from the `models.StatsForecast` collection
# Check them here: https://nixtla.github.io/statsforecast/models.html
models=[AutoARIMA(), Naive(), 
        AutoETS(), AutoARIMA(allowmean=True, alias='MeanAutoARIMA')] 

# Instantiate StatsForecast class
fcst = StatsForecast(models=models,
                     freq='D',
                     n_jobs=1,
                     verbose=True)

# Efficiently predict
fcsts_df = fcst.forecast(df=panel_df, h=4, fitted=True)
fcsts_df.groupby('unique_id').tail(4)
# 测试保存和加载 
import tempfile
from polars.testing import assert_frame_equal
with tempfile.TemporaryDirectory() as td:
    f_path = Path(td).joinpath("sf_test.pickle")
    
    test_df = generate_series(n_series=9, equal_ends=False, engine='polars')
    test_frcs = StatsForecast(
        models=models,
        freq='1d', 
        n_jobs=1, 
        verbose=True
    )

    origin_df = test_frcs.forecast(df=test_df, h=4, fitted=True)

    test_frcs.save(f_path)

    sf_test = StatsForecast.load(f_path)
    load_df = sf_test.forecast(df=test_df, h=4, fitted=True)
    
    assert_frame_equal(origin_df, load_df)
# 测试自定义名称
test_eq(
    fcsts_df.columns[-1],
    'MeanAutoARIMA'
)
# 测试无重复名称
test_fail(lambda: StatsForecast(models=[Naive(), Naive()], freq="D"))
StatsForecast(models=[Naive(), Naive(alias="Naive2")], freq="D")
fig = StatsForecast.plot(panel_df, max_insample_length=10)
fig
test_fail(
    StatsForecast.plot, 
    contains='Please use a list',
    kwargs={'df': panel_df, 'level': 90}
)
fcst.plot(panel_df, fcsts_df, engine='matplotlib')
# 以ds为对象的测试图
panel_df['ds'] = panel_df['ds'].astype(str)
fcst.plot(panel_df, fcsts_df)
fcsts_df = fcst.forecast(df=panel_df, h=4, fitted=True, level=[90, 80, 30])
fcsts_df.groupby('unique_id').tail(4)
fcst.plot(panel_df, fcsts_df, models=['AutoARIMA', 'AutoETS'], level=[90, 80], max_insample_length=28)
fcst.plot(fcst.forecast_fitted_values(),
          forecasts_df=fcsts_df,
          models=['AutoARIMA', 'AutoETS'], level=[80], 
          max_insample_length=20,
          plot_anomalies=True)
fcst.plot(panel_df, fcsts_df, models=['AutoARIMA', 'Naive'])
fcst.plot(panel_df, fcsts_df, models=['AutoARIMA', 'Naive'], max_insample_length=28)
fcst.plot(panel_df.query('unique_id in [0, 1]'), fcsts_df, models=['AutoARIMA', 'Naive'], level=[90])
fcst.plot(panel_df, fcsts_df, unique_ids=[0, 1], models=['AutoARIMA', 'Naive'], level=[90])
fcst.plot(panel_df.query('unique_id == 0'), fcsts_df, models=['AutoARIMA', 'Naive'], level=[90])
fcst.plot(panel_df, fcsts_df.query('unique_id == 0'), models=['AutoARIMA', 'Naive'], level=[90])
fcst.plot(panel_df, fcsts_df, unique_ids=[0], models=['AutoARIMA', 'Naive'], level=[90])
fcst.plot(panel_df.query('unique_id in [0, 1, 3]'), fcsts_df, models=['AutoARIMA', 'Naive'], level=[90])
fcst.plot(panel_df, fcsts_df, unique_ids=[0, 1, 2], level=[90])
fig = fcst.plot(panel_df, fcsts_df, unique_ids=[0, 1, 2, 3, 4], models=['AutoARIMA', 'Naive'], level=[90])
fig
fig = fcst.plot(
    panel_df, fcsts_df, unique_ids=[0, 1, 2, 3, 4], 
    models=['AutoARIMA', 'Naive'], 
    level=[90],
    engine='matplotlib'
)
fig
# 测试模型预测区间覆盖
models=[SimpleExponentialSmoothing(alpha=0.1, prediction_intervals=ConformalIntervals(h=24, n_windows=2))]
fcst = StatsForecast(models=models, freq='D', n_jobs=1)
fcst._set_prediction_intervals(None)
assert models[0].prediction_intervals is not None
fcst = StatsForecast(models=[AutoARIMA(season_length=7)],
                     freq='D', 
                     n_jobs=1, 
                     verbose=True)
fcsts_df = fcst.forecast(df=panel_df, h=4, fitted=True, level=[90])
fcsts_df.groupby('unique_id').tail(4)
fitted_vals = fcst.forecast_fitted_values()
fcst.plot(panel_df, fitted_vals.drop(columns='y'), level=[90])
show_doc(_StatsForecast.fit, 
         title_level=2,
         name='StatsForecast.fit')
show_doc(_StatsForecast.predict, 
         title_level=2,
         name='SatstForecast.predict')
show_doc(_StatsForecast.fit_predict, 
         title_level=2,
         name='StatsForecast.fit_predict')
show_doc(_StatsForecast.forecast, title_level=2, name='StatsForecast.forecast')
# StatsForecast.forecast 方法使用示例

#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import AutoARIMA, Naive
# 实例化 StatsForecast 类
fcst = StatsForecast(models=[AutoARIMA(), Naive()],
                     freq='D', n_jobs=1)

# 高效预测,无需存储记忆
fcsts_df = fcst.forecast(df=panel_df, h=4, fitted=True)
fcsts_df.groupby('unique_id').tail(4)
series = generate_series(100, n_static_features=2, equal_ends=False)

models = [
    ADIDA(), CrostonClassic(), CrostonOptimized(),
    CrostonSBA(), HistoricAverage(), 
    IMAPA(), Naive(), 
    RandomWalkWithDrift(), 
    SeasonalExponentialSmoothing(season_length=7, alpha=0.1),
    SeasonalNaive(season_length=7),
    SeasonalWindowAverage(season_length=7, window_size=4),
    SimpleExponentialSmoothing(alpha=0.1),
    TSB(alpha_d=0.1, alpha_p=0.3),
    WindowAverage(window_size=4)
]

fcst = StatsForecast(
    models=models,
    freq='D',
    n_jobs=1,
    verbose=True
)

res = fcst.forecast(df=series, h=14)
# 不带日期时间作为日期时间的测试系列
series_wo_dt = series.copy()
series_wo_dt['ds'] = series_wo_dt['ds'].astype(str) 
fcst = StatsForecast(models=models, freq='D')
fcsts_wo_dt = fcst.forecast(df=series_wo_dt, h=14)
test_eq(res, fcsts_wo_dt)
test_eq(res['unique_id'].unique(), fcst.uids.values)
last_dates = series.groupby('unique_id')['ds'].max()
test_eq(res.groupby('unique_id')['ds'].min().values, last_dates + pd.offsets.Day())
test_eq(res.groupby('unique_id')['ds'].max().values, last_dates + 14 * pd.offsets.Day())
#月度数据测试
monthly_series = generate_series(10_000, freq='M', min_length=10, max_length=20, equal_ends=True)
monthly_series

fcst = StatsForecast(
    models=[Naive()],
    freq='M'
)
monthly_res = fcst.forecast(df=monthly_series, h=4)
monthly_res

last_dates = monthly_series.groupby('unique_id')['ds'].max()
test_eq(monthly_res.groupby('unique_id')['ds'].min().values, pd.Series(fcst.last_dates) + pd.offsets.MonthEnd())
test_eq(monthly_res.groupby('unique_id')['ds'].max().values, pd.Series(fcst.last_dates) + 4 * pd.offsets.MonthEnd())
show_doc(_StatsForecast.forecast_fitted_values, 
         title_level=2, 
         name='StatsForecast.forecast_fitted_values')
# StatsForecast.forecast_fitted_values 方法使用示例

#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import Naive
# 实例化 StatsForecast 类
fcst = StatsForecast(models=[AutoARIMA()], freq='D', n_jobs=1)

# 访问样本预测
fcsts_df = fcst.forecast(df=panel_df, h=12, fitted=True, level=(90, 10))
insample_fcsts_df = fcst.forecast_fitted_values()
insample_fcsts_df.tail(4)
#拟合值的测试
def test_fcst_fitted(series, n_jobs=1, str_ds=False):
    if str_ds:
        series = series.copy()
        series['ds'] = series['ds'].astype(str)
    fitted_fcst = StatsForecast(
        models=[Naive()],
        freq='D',
        n_jobs=n_jobs,
    )
    fitted_res = fitted_fcst.forecast(df=series, h=14, fitted=True)
    fitted = fitted_fcst.forecast_fitted_values()
    if str_ds:
        test_eq(pd.to_datetime(series['ds']), fitted['ds'])
    else:
        test_eq(series['ds'], fitted['ds'])
    test_eq(series['y'], fitted['y'])
test_fcst_fitted(series)
test_fcst_fitted(series, str_ds=True)
#备用模型的测试
def test_fcst_fallback_model(n_jobs=1):
    fitted_fcst = StatsForecast(
        models=[NullModel()],
        freq='D',
        n_jobs=n_jobs,
        fallback_model=Naive()
    )
    fitted_res = fitted_fcst.forecast(df=series, h=14, fitted=True)
    fitted = fitted_fcst.forecast_fitted_values()
    test_eq(series['ds'], fitted['ds'])
    test_eq(series['y'], fitted['y'])
    # 测试空模型实际上失败了
    fitted_fcst = StatsForecast(
        models=[NullModel()],
        freq='D',
        n_jobs=n_jobs,
    )
    test_fail(lambda: fitted_fcst.forecast(df=series, h=14))
test_fcst_fallback_model()
show_doc(_StatsForecast.cross_validation, 
         title_level=2, 
         name='StatsForecast.cross_validation')
# StatsForecast.crossvalidation 方法使用示例

#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import Naive
# 实例化 StatsForecast 类
fcst = StatsForecast(models=[Naive()],
                     freq='D', n_jobs=1, verbose=True)

# 访问样本预测
rolled_fcsts_df = fcst.cross_validation(df=panel_df, h=14, n_windows=2)
rolled_fcsts_df.head(4)
#交叉验证测试
series_cv = pd.DataFrame({
    'unique_id': np.array(10 * ['id_0'] + 100 * ['id_1'] + 20 * ['id_2']),    
    'ds': np.hstack([
        pd.date_range(end='2021-01-01', freq='D', periods=10),
        pd.date_range(end='2022-01-01', freq='D', periods=100),
        pd.date_range(end='2020-01-01', freq='D', periods=20)
    ]),
    'y': np.hstack([np.arange(10.), np.arange(100, 200), np.arange(20, 40)]),
})

fcst = StatsForecast(
    models=[SumAhead(), Naive()],
    freq='D',
    verbose=True,
)
res_cv = fcst.cross_validation(df=series_cv, h=2, test_size=5, n_windows=None, level=(50, 60))
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))

n_windows = fcst.cross_validation(df=series_cv, h=2, n_windows=2).groupby('unique_id').size().unique()
test_eq(n_windows, 2 * 2)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))

n_windows = fcst.cross_validation(df=series_cv, h=3, n_windows=3, step_size=3, fitted=True).groupby('unique_id').size().unique()
test_eq(n_windows, 3 * 3)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))

test_fail(lambda: fcst.cross_validation(df=series_cv, h=10), contains="The following series are too short for the cross validation settings: ['id_0']")
test_fail(lambda: fcst.cross_validation(df=series_cv, h=20), contains="The following series are too short for the cross validation settings: ['id_0', 'id_2']")
# 测试交叉验证,不重新拟合
fcst = StatsForecast(
    models=[SumAhead()],
    freq='D',
    verbose=True
)
res_cv_wo_refit = fcst.cross_validation(
    df=series_cv,
    h=2,
    test_size=5,
    n_windows=None,
    level=(50, 60),
    refit=False,
)
test_fail(test_eq, args=(res_cv_wo_refit, res_cv))
cols_wo_refit = res_cv_wo_refit.columns
test_eq(res_cv_wo_refit.groupby('unique_id').head(1), res_cv[cols_wo_refit].groupby('unique_id').head(1))

n_windows = fcst.cross_validation(
    df=series_cv,
    h=2,
    n_windows=2,
    refit=False,
).groupby('unique_id').size().unique()
test_eq(n_windows, 2 * 2)

n_windows = fcst.cross_validation(
    df=series_cv,
    h=3,
    n_windows=3,
    step_size=3,
    fitted=True,
    refit=False,
).groupby('unique_id').size().unique()
test_eq(n_windows, 3 * 3)
# 测试交叉验证,不重新拟合模型。
fcst = StatsForecast(
    models=[DynamicOptimizedTheta(), AutoCES(), 
            DynamicOptimizedTheta(season_length=7, alias='test')],
    freq='D',
    verbose=True
)
res_cv_wo_refit = fcst.cross_validation(
    df=series_cv,
    h=2,
    test_size=5,
    n_windows=None,
    level=(50, 60),
    refit=False,
)
res_cv_w_refit = fcst.cross_validation(
    df=series_cv,
    h=2,
    test_size=5,
    n_windows=None,
    level=(50, 60),
    refit=True,
)
test_fail(test_eq, args=(res_cv_wo_refit, res_cv_w_refit))
test_eq(
    res_cv_wo_refit.groupby('unique_id').head(1), 
    res_cv_w_refit.groupby('unique_id').head(1)
)
# 不带日期时间作为日期时间的测试系列
series_cv_wo_dt = series_cv.copy()
series_cv_wo_dt['ds'] = series_cv_wo_dt['ds'].astype(str) 
fcst = StatsForecast(
    models=[SumAhead(), Naive()],
    freq='D',
    verbose=False
)
res_cv_wo_dt = fcst.cross_validation(
    df=series_cv_wo_dt,
    h=2,
    test_size=5,
    n_windows=None,
    level=(50, 60),
)
test_eq(res_cv, res_cv_wo_dt)
#测试等端交叉验证
series_cv = pd.DataFrame({
    'unique_id': np.hstack([np.zeros(10), np.zeros(100) + 1, np.zeros(20) + 2]).astype('int64'),
    'ds': np.hstack([
        pd.date_range(end='2022-01-01', freq='D', periods=10),
        pd.date_range(end='2022-01-01', freq='D', periods=100),
        pd.date_range(end='2022-01-01', freq='D', periods=20)
    ]),
    'y': np.hstack([np.arange(10), np.arange(100, 200), np.arange(20, 40)]),
})
fcst = StatsForecast(
    models=[SumAhead()],
    freq='D',
)
res_cv = fcst.cross_validation(
    df=series_cv,
    h=2,
    test_size=5,
    n_windows=None,
    level=(50,60),
    fitted=True,
)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))

n_windows = fcst.cross_validation(
    df=series_cv,
    h=2,
    n_windows=2,
).groupby('unique_id').size().unique()
test_eq(n_windows, 2 * 2)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))

n_windows = fcst.cross_validation(
    df=series_cv,
    h=3,
    n_windows=3, 
    step_size=3,
).groupby('unique_id').size().unique()
test_eq(n_windows, 3 * 3)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))
show_doc(_StatsForecast.cross_validation_fitted_values, 
         title_level=2, 
         name='StatsForecast.cross_validation_fitted_values')
# StatsForecast.cross_validation_fitted_values 方法使用示例

#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import Naive
# 实例化 StatsForecast 类
fcst = StatsForecast(models=[Naive()],
                     freq='D', n_jobs=1)

# 访问样本预测
rolled_fcsts_df = fcst.cross_validation(df=panel_df, h=12, n_windows=2, fitted=True)
insample_rolled_fcsts_df = fcst.cross_validation_fitted_values()
insample_rolled_fcsts_df.tail(4)
#拟合值交叉验证测试
def test_cv_fitted(series_cv, n_jobs=1, str_ds=False):
    if str_ds:
        series_cv = series_cv.copy()
        series_cv['ds'] = series_cv['ds'].astype(str)
    resids_fcst = StatsForecast(
        models=[SumAhead(), Naive()],
        freq='D',
        n_jobs=n_jobs
    )
    resids_res_cv = resids_fcst.cross_validation(df=series_cv, h=2, n_windows=4, fitted=True)
    resids_cv = resids_fcst.cross_validation_fitted_values()
    np.testing.assert_array_equal(
        resids_cv['cutoff'].unique(),
        resids_res_cv['cutoff'].unique()
    )
    if str_ds:
        series_cv['ds'] = pd.to_datetime(series_cv['ds'])
    for uid in resids_cv['unique_id'].unique():
        resids_uid = resids_cv[resids_cv['unique_id'].eq(uid)]
        for cutoff in resids_uid['cutoff'].unique():
            pd.testing.assert_frame_equal(
                resids_uid.query('cutoff == @cutoff')[['unique_id', 'ds', 'y']].reset_index(drop=True),
                series_cv.query('ds <= @cutoff & unique_id == @uid')[['unique_id', 'ds', 'y']].reset_index(drop=True),
                check_dtype=False
            )
test_cv_fitted(series_cv)
test_cv_fitted(series_cv, str_ds=True)
#备用模型的测试
def test_cv_fallback_model(n_jobs=1):
    fitted_fcst = StatsForecast(
        models=[NullModel()],
        freq='D',
        n_jobs=n_jobs,
        fallback_model=Naive()
    )
    fitted_res = fitted_fcst.cross_validation(df=series, h=2, n_windows=4, fitted=True)
    fitted = fitted_fcst.cross_validation_fitted_values()
    # 测试空模型实际上失败了
    fitted_fcst = StatsForecast(
        models=[NullModel()],
        freq='D',
        n_jobs=n_jobs,
    )
    test_fail(lambda: fitted_fcst.cross_validation(df=series, h=12, n_windows=4), 
              contains='got an unexpected keyword argument')
test_cv_fallback_model()
show_doc(_StatsForecast.plot, 
         title_level=2, 
         name='StatsForecast.plot')
show_doc(StatsForecast.save, title_level=2, name='StatsForecast.save')
show_doc(StatsForecast.load, title_level=2, name='StatsForecast.load')
fcst.fit(df=series)
test_eq(
    fcst.predict(h=12),
    fcst.forecast(df=series, h=12)
)
test_eq(
    fcst.fit_predict(df=series, h=12),
    fcst.forecast(df=series, h=12)
)
# 用于一致性预测的测试
uids = series.index.unique()[:10]
series_subset = series.query('unique_id in @uids')[['unique_id', 'ds', 'y']]
sf = StatsForecast(
    models=[SeasonalNaive(season_length=7)],
    freq='D', 
    n_jobs=1,
)
sf = sf.fit(df=series_subset, prediction_intervals=ConformalIntervals(h=12))
test_eq(
    sf.predict(h=12, level=[80, 90]),
    sf.fit_predict(df=series_subset, h=12, level=[80, 90], prediction_intervals=ConformalIntervals(h=12)),
)
test_eq(
    sf.predict(h=12, level=[80, 90]),
    sf.forecast(df=series_subset, h=12, level=[80, 90], prediction_intervals=ConformalIntervals(h=12)),
)

# 当未指定级别时,会引发测试错误/警告
intervals = ConformalIntervals(h=12)
sf2 = StatsForecast(
    models=[ADIDA()],
    freq='D', 
    n_jobs=1,
)
sf2.fit(df=series_subset, prediction_intervals=intervals)
test_warns(lambda: sf2.predict(h=12))
test_fail(lambda: sf2.forecast(df=series_subset, h=12, prediction_intervals=intervals))
test_fail(lambda: sf2.fit_predict(df=series_subset, h=12, prediction_intervals=intervals))
test_fail(lambda: sf2.cross_validation(df=series_subset, h=12, prediction_intervals=intervals))
# 测试保形交叉验证
cv_conformal = sf.cross_validation(
    df=series_subset, 
    h=12, 
    n_windows=2,
    level=[80, 90], 
    prediction_intervals=ConformalIntervals(h=12),
)
cv_no_conformal = sf.cross_validation(
    df=series_subset, 
    h=12, 
    n_windows=2,
    level=[80, 90],  
)
test_eq(
    cv_conformal.columns,
    cv_no_conformal.columns,    
)
test_eq(
    cv_conformal.filter(regex='ds|cutoff|y|AutoARIMA$'),
    cv_no_conformal.filter(regex='ds|cutoff|y|AutoARIMA$')
)
fcst = StatsForecast(
    models=[ADIDA(), SimpleExponentialSmoothing(0.1), 
            HistoricAverage(), CrostonClassic()],
    freq='D',
    n_jobs=1
)
res = fcst.forecast(df=series, h=14)
#| 评估: 错误
#并行处理测试
fcst = StatsForecast(
    models=[ADIDA(), SimpleExponentialSmoothing(0.1), 
            HistoricAverage(), CrostonClassic()],
    freq='D',
    n_jobs=-1
)
res = fcst.forecast(df=series, h=14)
res_cv = fcst.cross_validation(df=series, h=3, test_size=10, n_windows=None)
fcst = StatsForecast(
    models=[SumAhead()],
    freq='D',
)
res_cv = fcst.cross_validation(df=series_cv, h=2, test_size=5, n_windows=None)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))

test_fcst_fitted(series, n_jobs=-1)
test_cv_fitted(series_cv, n_jobs=-1)
test_fcst_fitted(series, n_jobs=-1, str_ds=True)
test_cv_fitted(series_cv, n_jobs=-1, str_ds=True)
# 检查 n_windows 参数
n_windows = fcst.cross_validation(df=series_cv, h=2, n_windows=2).groupby('unique_id').size().unique()
test_eq(n_windows, 2 * 2)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))
# 检查step_size参数
n_windows = fcst.cross_validation(df=series_cv, h=3, n_windows=3, step_size=3).groupby('unique_id').size().unique()
test_eq(n_windows, 3 * 3)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))

杂项

整数日期戳

StatsForecast 类还可以接收整数作为日期戳,以下示例展示了如何做到这一点。

# 从statsforecast.core导入StatsForecast
from statsforecast.utils import AirPassengers as ap
from statsforecast.models import HistoricAverage
int_ds_df = pd.DataFrame({'ds': np.arange(1, len(ap) + 1), 'y': ap})
int_ds_df.insert(0, 'unique_id', 'AirPassengers')
int_ds_df.head()
int_ds_df.tail()
int_ds_df
fcst = StatsForecast(models=[HistoricAverage()], freq=1)
horizon = 7
forecast = fcst.forecast(df=int_ds_df, h=horizon)
forecast.head()
last_date = int_ds_df['ds'].max()
test_eq(forecast['ds'].values, np.arange(last_date + 1, last_date + 1 + horizon))
int_ds_cv = fcst.cross_validation(df=int_ds_df, h=7, test_size=8, n_windows=None)
int_ds_cv

外部回归变量

每个y之后的列都被视为外部回归变量,并将传递给允许使用它们的模型。如果使用这些外部回归变量,您必须向StatsForecast.forecast方法提供未来值。

class LinearRegression(_TS):
    
    def __init__(self):
        pass
    
    def fit(self, y, X):
        self.coefs_, *_ = np.linalg.lstsq(X, y, rcond=None)
        return self
    
    def predict(self, h, X):
        mean = X @ coefs
        return mean
    
    def __repr__(self):
        return 'LinearRegression()'
    
    def forecast(self, y, h, X=None, X_future=None, fitted=False):
        coefs, *_ = np.linalg.lstsq(X, y, rcond=None)
        return {'mean': X_future @ coefs}
    
    def new(self):
        b = type(self).__new__(type(self))
        b.__dict__.update(self.__dict__)
        return b
series_xreg = series = generate_series(10_000, equal_ends=True)
series_xreg['intercept'] = 1
series_xreg['dayofweek'] = series_xreg['ds'].dt.dayofweek
series_xreg = pd.get_dummies(series_xreg, columns=['dayofweek'], drop_first=True)
series_xreg
dates = sorted(series_xreg['ds'].unique())
valid_start = dates[-14]
train_mask = series_xreg['ds'] < valid_start
series_train = series_xreg[train_mask]
series_valid = series_xreg[~train_mask]
X_valid = series_valid.drop(columns=['y'])
fcst = StatsForecast(
    models=[LinearRegression()],
    freq='D',
)
xreg_res = fcst.forecast(df=series_train, h=14, X_df=X_valid)
xreg_res['y'] = series_valid['y'].values
xreg_res.drop(columns='unique_id').groupby('ds').mean().plot()
xreg_res_cv = fcst.cross_validation(df=series_train, h=3, test_size=5, n_windows=None)
# 以下单元格包含对外部回归量的测试
class ReturnX(_TS):
    
    def __init__(self):
        pass
    
    def fit(self, y, X):
        return self
    
    def predict(self, h, X):
        mean = X
        return X
    
    def __repr__(self):
        return 'ReturnX'
    
    def forecast(self, y, h, X=None, X_future=None, fitted=False):
        return {'mean': X_future.flatten()}
    
    def new(self):
        b = type(self).__new__(type(self))
        b.__dict__.update(self.__dict__)
        return b
df = pd.DataFrame(
    {
        'unique_id': [0] * 10 + [1] * 10,
        'ds': np.hstack([np.arange(10), np.arange(10)]),
        'y': np.random.rand(20),
        'x': np.arange(20, dtype=np.float64),
    }
)
train_mask = df['ds'] < 6
train_df = df[train_mask]
test_df = df[~train_mask]
def test_x_vars(n_jobs=1):
    fcst = StatsForecast(
        models=[ReturnX()],
        freq=1,
        n_jobs=n_jobs,
    )
    xreg = test_df.drop(columns='y')
    res = fcst.forecast(df=train_df, h=4, X_df=xreg)
    expected_res = xreg.rename(columns={'x': 'ReturnX'})
    pd.testing.assert_frame_equal(
        res,
        expected_res.reset_index(drop=True),
        check_dtype=False,
    )
test_x_vars(n_jobs=1)
#| 评估: 错误
test_x_vars(n_jobs=2)

预测区间

您可以将参数 level 传递给 StatsForecast.forecast 方法,以计算预测区间。并非所有模型目前都能计算预测区间,因此我们只能获得那些已实现此功能模型的区间。

ap_df = pd.DataFrame({'ds': np.arange(ap.size), 'y': ap})
ap_df['unique_id'] = 0
sf = StatsForecast(
    models=[
        SeasonalNaive(season_length=12), 
        AutoARIMA(season_length=12)
    ],
    freq=1,
    n_jobs=1
)
ap_ci = sf.forecast(df=ap_df, h=12, level=(80, 95))
fcst.plot(ap_df, ap_ci, level=[80], engine="matplotlib")

适应性预测区间

您还可以使用以下代码添加符合间隔。

from statsforecast.utils import ConformalIntervals
sf = StatsForecast(
    models=[
        AutoARIMA(season_length=12),
        AutoARIMA(
            season_length=12, 
            prediction_intervals=ConformalIntervals(n_windows=2, h=12),
            alias='ConformalAutoARIMA'
        ),
    ],
    freq=1,
    n_jobs=1
)
ap_ci = sf.forecast(df=ap_df, h=12, level=(80, 95))
fcst.plot(ap_df, ap_ci, level=[80], engine="plotly")

您还可以为所有支持的模型计算保形区间,使用以下内容,

sf = StatsForecast(
    models=[
        AutoARIMA(season_length=12),
    ],
    freq=1,
    n_jobs=1
)
ap_ci = sf.forecast(
    df=ap_df, 
    h=12, 
    level=(50, 80, 95), 
    prediction_intervals=ConformalIntervals(h=12),
)
fcst.plot(ap_df, ap_ci, level=[80], engine="matplotlib")
def test_conf_intervals(n_jobs=1):
    ap_df = pd.DataFrame(
        {
            'unique_id': [0] * ap.size,
            'ds': np.arange(ap.size),
             'y': ap
        }
    )
    fcst = StatsForecast(
        models=[
            SeasonalNaive(season_length=12), 
            AutoARIMA(season_length=12)
        ],
        freq=1,
        n_jobs=n_jobs
    )
    ap_ci = fcst.forecast(df=ap_df, h=12, level=(80, 95))
    ap_ci.drop(columns='unique_id').set_index('ds').plot(marker='.', figsize=(10, 6))
test_conf_intervals(n_jobs=1)
#| 评估: 错误
#测试任务数量大于可用核心数
test_conf_intervals(n_jobs=101)

Give us a ⭐ on Github