%load_ext autoreload
%autoreload 2
核心方法
拟合、预测、快速预测、交叉验证和绘图的方法
StatsForecast
的核心方法包括:
StatsForecast.fit
StatsForecast.predict
StatsForecast.forecast
StatsForecast.cross_validation
StatsForecast.plot
import warnings
from nbdev.showdoc import add_docs, show_doc
from statsforecast.models import Naive
'ignore', category=FutureWarning)
warnings.filterwarnings('always', category=UserWarning) warnings.filterwarnings(
import datetime as dt
import errno
import inspect
import logging
import os
import pickle
import re
import reprlib
import time
import warnings
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd
import utilsforecast.processing as ufp
from fugue.execution.factory import make_execution_engine, try_get_context_execution_engine
from threadpoolctl import ThreadpoolController
from tqdm.auto import tqdm
from triad import conditional_dispatcher
from utilsforecast.compat import DataFrame, pl_DataFrame, pl_Series
from utilsforecast.grouped_array import GroupedArray as BaseGroupedArray
from utilsforecast.validation import ensure_time_dtype, validate_freq
from statsforecast.utils import ConformalIntervals
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s: %(message)s',
='%Y-%m-%d %H:%M:%S',
datefmt
)= logging.getLogger(__name__)
logger = ThreadpoolController() _controller
logger.setLevel(logging.ERROR)
from fastcore.test import test_eq, test_fail, test_warns
from statsforecast.models import _TS
from statsforecast.utils import generate_series
class GroupedArray(BaseGroupedArray):
def __eq__(self, other):
if not hasattr(other, 'data') or not hasattr(other, 'indptr'):
return False
return np.allclose(self.data, other.data) and np.array_equal(self.indptr, other.indptr)
def fit(self, models, fallback_model=None):
= np.full((self.n_groups, len(models)), np.nan, dtype=object)
fm for i, grp in enumerate(self):
= grp[:, 0] if grp.ndim == 2 else grp
y = grp[:, 1:] if (grp.ndim == 2 and grp.shape[1] > 1) else None
X for i_model, model in enumerate(models):
try:
= model.new()
new_model = new_model.fit(y=y, X=X)
fm[i, i_model] except Exception as error:
if fallback_model is not None:
= fallback_model.new()
new_fallback_model = model.alias
new_fallback_model.alias = new_fallback_model.fit(y=y, X=X)
fm[i, i_model] else:
raise error
return fm
def _get_cols(self, models, attr, h, X, level=tuple()):
= len(models)
n_models = np.full(n_models + 1, fill_value=0, dtype=np.int32)
cuts = np.full(n_models, fill_value=False, dtype=bool)
has_level_models 0] = 0
cuts[for i_model, model in enumerate(models):
= 1 # 意思是
len_cols = 'level' in inspect.signature(getattr(model, attr)).parameters and len(level) > 0
has_level = has_level
has_level_models[i_model] if has_level:
+= 2 * len(level) #关卡
len_cols + 1] = len_cols + cuts[i_model]
cuts[i_model return cuts, has_level_models
def _output_fcst(self, models, attr, h, X, level=tuple()):
#根据方法返回空输出
= self._get_cols(models=models, attr=attr, h=h, X=X, level=level)
cuts, has_level_models = np.full((self.n_groups * h, cuts[-1]), fill_value=np.nan, dtype=self.data.dtype)
out return out, cuts, has_level_models
def predict(self, fm, h, X=None, level=tuple()):
#fm 代表拟合模型
#并且 fm 应该包含拟合模型
= self._output_fcst(
fcsts, cuts, has_level_models =fm[0], attr='predict',
models=h, X=X, level=level
h
)= ['mean', 'lo', 'hi']
matches = []
cols for i_model in range(fm.shape[1]):
= has_level_models[i_model]
has_level = {}
kwargs if has_level:
'level'] = level
kwargs[for i, _ in enumerate(self):
if X is not None:
= X[i]
X_ else:
= None
X_ = fm[i, i_model].predict(h=h, X=X_, **kwargs)
res_i = [key for key in res_i.keys() if any(key.startswith(m) for m in matches)]
cols_m = np.vstack([res_i[key] for key in cols_m]).T
fcsts_i = repr(fm[i, i_model])
model_name = [f'{model_name}' if col == 'mean' else f'{model_name}-{col}' for col in cols_m]
cols_m if fcsts_i.ndim == 1:
= fcsts_i[:, None]
fcsts_i * h : (i + 1) * h, cuts[i_model]:cuts[i_model + 1]] = fcsts_i
fcsts[i += cols_m
cols return fcsts, cols
def fit_predict(self, models, h, X=None, level=tuple()):
#拟合模型
= self.fit(models=models)
fm #预测
= self.predict(fm=fm, h=h, X=X, level=level)
fcsts, cols return fm, fcsts, cols
def forecast(
self,
models,
h,=None,
fallback_model=False,
fitted=None,
X=tuple(),
level=False,
verbose='y',
target_col
):= self._output_fcst(
fcsts, cuts, has_level_models =models, attr='forecast', h=h, X=X, level=level
models
)= ['mean', 'lo', 'hi']
matches = ['fitted', 'fitted-lo', 'fitted-hi']
matches_fitted if fitted:
#目前我们不会返回拟合值的水平
#预测模式
= np.full((self.data.shape[0], 1 + cuts[-1]), np.nan, dtype=self.data.dtype)
fitted_vals if self.data.ndim == 1:
0] = self.data
fitted_vals[:, else:
0] = self.data[:, 0]
fitted_vals[:, = tqdm(enumerate(self),
iterable =(not verbose),
disable=len(self),
total='Forecast')
desc= {repr(m): 0.0 for m in models}
times for i, grp in iterable:
= grp[:, 0] if grp.ndim == 2 else grp
y_train = grp[:, 1:] if (grp.ndim == 2 and grp.shape[1] > 1) else None
X_train if X is not None:
= X[i]
X_f else:
= None
X_f = []
cols = []
cols_fitted for i_model, model in enumerate(models):
= has_level_models[i_model]
has_level = {}
kwargs if has_level:
'level'] = level
kwargs[= time.perf_counter()
start try:
= model.forecast(h=h, y=y_train, X=X_train, X_future=X_f, fitted=fitted, **kwargs)
res_i except Exception as error:
if fallback_model is not None:
= fallback_model.forecast(h=h, y=y_train, X=X_train, X_future=X_f, fitted=fitted, **kwargs)
res_i else:
raise error
repr(model)] += time.perf_counter() - start
times[= [key for key in res_i.keys() if any(key.startswith(m) for m in matches)]
cols_m = np.vstack([res_i[key] for key in cols_m]).T
fcsts_i = [f'{repr(model)}' if col == 'mean' else f'{repr(model)}-{col}' for col in cols_m]
cols_m if fcsts_i.ndim == 1:
= fcsts_i[:, None]
fcsts_i * h : (i + 1) * h, cuts[i_model]:cuts[i_model + 1]] = fcsts_i
fcsts[i += cols_m
cols if fitted:
= [key for key in res_i.keys() if any(key.startswith(m) for m in matches_fitted)]
cols_m_fitted = np.vstack([res_i[key] for key in cols_m_fitted]).T
fitted_i = [f'{repr(model)}' \
cols_m_fitted if col == 'fitted' else f"{repr(model)}-{col.replace('fitted-', '')}" \
for col in cols_m_fitted]
self.indptr[i] : self.indptr[i + 1], (cuts[i_model] + 1):(cuts[i_model + 1] + 1)] = fitted_i
fitted_vals[+= cols_m_fitted
cols_fitted = {'forecasts': fcsts, 'cols': cols, 'times': times}
result if fitted:
'fitted'] = {'values': fitted_vals}
result['fitted']['cols'] = [target_col] + cols_fitted
result[return result
def cross_validation(
self,
models,
h,
test_size,=None,
fallback_model=1,
step_size=None,
input_size=False,
fitted=tuple(),
level=True,
refit=False,
verbose='y',
target_col
):# 输出尺寸:(ts, window, h)
if (test_size - h) % step_size:
raise Exception('`test_size - h` should be module `step_size`')
= int((test_size - h) / step_size) + 1
n_windows = len(models)
n_models = self._get_cols(models=models, attr='forecast', h=h, X=None, level=level)
cuts, has_level_models # out的第一列是实际的y值
= np.full((self.n_groups, n_windows, h, 1 + cuts[-1]), np.nan, dtype=self.data.dtype)
out if fitted:
= np.full((self.data.shape[0], n_windows, n_models + 1), np.nan, dtype=self.data.dtype)
fitted_vals = np.full((self.data.shape[0], n_windows), False, dtype=bool)
fitted_idxs = np.full_like(fitted_idxs, False, dtype=bool)
last_fitted_idxs = ['mean', 'lo', 'hi']
matches = list(range(-test_size, -h + 1, step_size))
steps for i_ts, grp in enumerate(self):
= tqdm(
iterable enumerate(steps, start=0),
=f"Cross Validation Time Series {i_ts + 1}",
desc=(not verbose),
disable=len(steps),
total
)= [None for _ in range(n_models)]
fitted_models for i_window, cutoff in iterable:
= i_window == 0 or (refit > 0 and i_window % refit == 0)
should_fit = cutoff + h
end_cutoff = cutoff if input_size is None else input_size
in_size_disp = grp[(cutoff - in_size_disp):cutoff]
y = y[:, 0] if y.ndim == 2 else y
y_train = y[:, 1:] if (y.ndim == 2 and y.shape[1] > 1) else None
X_train = grp[cutoff:] if end_cutoff == 0 else grp[cutoff:end_cutoff]
y_test = y_test[:, 1:] if (y_test.ndim == 2 and y_test.shape[1] > 1) else None
X_future 0] = y_test[:, 0] if y.ndim == 2 else y_test
out[i_ts, i_window, :, if fitted:
self.indptr[i_ts] : self.indptr[i_ts + 1], i_window, 0][
fitted_vals[- in_size_disp):cutoff
(cutoff = y_train
] self.indptr[i_ts] : self.indptr[i_ts + 1], i_window][
fitted_idxs[- in_size_disp):cutoff
(cutoff = True
]
last_fitted_idxs[self.indptr[i_ts] : self.indptr[i_ts + 1], i_window
-1] = True
][cutoff= [target_col]
cols for i_model, model in enumerate(models):
= has_level_models[i_model]
has_level = {}
kwargs if has_level:
'level'] = level
kwargs[# 这是这样实现的,因为并非所有模型都具有forward方法。
# so we can't do fit + forward
if refit is True:
= dict(
forecast_kwargs =h,
h=y_train,
y=X_train,
X=X_future,
X_future=fitted,
fitted**kwargs,
)try:
= model.forecast(**forecast_kwargs)
res_i except Exception as error:
if fallback_model is None:
raise error
= fallback_model.forecast(**forecast_kwargs)
res_i else:
if should_fit:
try:
= model.fit(y=y_train, X=X_train)
fitted_models[i_model] except Exception as error:
if fallback_model is None:
raise error
= fallback_model.new().fit(y=y_train, X=X_train)
fitted_models[i_model] = fitted_models[i_model].forward(
res_i =h,
h=y_train,
y=X_train,
X=X_future,
X_future=fitted,
fitted**kwargs,
)= [key for key in res_i.keys() if any(key.startswith(m) for m in matches)]
cols_m = np.vstack([res_i[key] for key in cols_m]).T
fcsts_i = [f'{repr(model)}' if col == 'mean' else f'{repr(model)}-{col}' for col in cols_m]
cols_m 1 + cuts[i_model]):(1 + cuts[i_model + 1])] = fcsts_i
out[i_ts, i_window, :, (if fitted:
self.indptr[i_ts] : self.indptr[i_ts + 1], i_window, i_model + 1][
fitted_vals[- in_size_disp):cutoff
(cutoff = res_i['fitted']
] += cols_m
cols = {'forecasts': out.reshape(-1, 1 + cuts[-1]), 'cols': cols}
result if fitted:
'fitted'] = {
result['values': fitted_vals,
'idxs': fitted_idxs,
'last_idxs': last_fitted_idxs,
'cols': [target_col] + [repr(model) for model in models]
}return result
def take(self, idxs):
= super().take(idxs)
data, indptr return GroupedArray(data, indptr)
def split(self, n_chunks):
= min(n_chunks, self.n_groups)
n_chunks return [self.take(idxs) for idxs in np.array_split(range(self.n_groups), n_chunks)]
def split_fm(self, fm, n_chunks):
return [fm[idxs] for idxs in np.array_split(range(self.n_groups), n_chunks) if idxs.size]
@_controller.wrap(limits=1)
def _single_threaded_fit(self, models, fallback_model=None):
return self.fit(models=models, fallback_model=fallback_model)
@_controller.wrap(limits=1)
def _single_threaded_predict(self, fm, h, X=None, level=tuple()):
return self.predict(fm=fm, h=h, X=X, level=level)
@_controller.wrap(limits=1)
def _single_threaded_fit_predict(self, models, h, X=None, level=tuple()):
return self.fit_predict(models=models, h=h, X=X, level=level)
@_controller.wrap(limits=1)
def _single_threaded_forecast(
self,
models,
h,=None,
fallback_model=False,
fitted=None,
X=tuple(),
level=False,
verbose='y',
target_col
):return self.forecast(
=models,
models=h,
h=fallback_model,
fallback_model=fitted,
fitted=X,
X=level,
level=verbose,
verbose=target_col,
target_col
)
@_controller.wrap(limits=1)
def _single_threaded_cross_validation(
self,
models,
h,
test_size,=None,
fallback_model=1,
step_size=None,
input_size=False,
fitted=tuple(),
level=True,
refit=False,
verbose='y',
target_col
):return self.cross_validation(
=models,
models=h,
h=test_size,
test_size=fallback_model,
fallback_model=step_size,
step_size=input_size,
input_size=fitted,
fitted=level,
level=refit,
refit=verbose,
verbose=target_col,
target_col )
# sum ahead 仅返回最后一个值
# 加上h个未来值
class SumAhead:
def __init__(self):
pass
def fit(self, y, X):
self.last_value = y[-1]
self.fitted_values = np.full(y.size, np.nan, dtype=y.dtype)
self.fitted_values[1:] = y[:1]
return self
def predict(self, h, X=None, level=None):
= self.last_value + np.arange(1, h + 1)
mean = {'mean': mean}
res if level is not None:
for lv in level:
f'lo-{lv}'] = mean - 1.0
res[f'hi-{lv}'] = mean + 1.0
res[return res
def __repr__(self):
return 'SumAhead'
def forecast(self, y, h, X=None, X_future=None, fitted=False, level=None):
= y[-1] + np.arange(1, h + 1)
mean = {'mean': mean}
res if fitted:
= np.full(y.size, np.nan, dtype=y.dtype)
fitted_values 1:] = y[1:]
fitted_values['fitted'] = fitted_values
res[if level is not None:
for lv in level:
f'lo-{lv}'] = mean - 1.0
res[f'hi-{lv}'] = mean + 1.0
res[return res
def forward(self, y, h, X=None, X_future=None, fitted=False, level=None):
# 修复self.last_value以供测试使用
= self.last_value + np.arange(1, h + 1)
mean = {'mean': mean}
res if fitted:
= np.full(y.size, np.nan, dtype=mean.dtype)
fitted_values 1:] = y[1:]
fitted_values['fitted'] = fitted_values
res[if level is not None:
for lv in level:
f'lo-{lv}'] = mean - 1.0
res[f'hi-{lv}'] = mean + 1.0
res[return res
def new(self):
= type(self).__new__(type(self))
b self.__dict__)
b.__dict__.update(return b
#用于测试的数据
= np.arange(12).reshape(-1, 1)
data = np.array([0, 4, 8, 12])
indptr
# 测试我们可以恢复
# 系列数量
= GroupedArray(data, indptr)
ga len(ga), 3)
test_eq(
#数据测试集划分
= ga.split(2)
splits 0], GroupedArray(data[:8], indptr[:3]))
test_eq(splits[1], GroupedArray(data[8:], np.array([0, 4])))
test_eq(splits[
# 为每个时间序列拟合模型
= [Naive(), Naive()]
models = ga.fit(models)
fm 3, 2))
test_eq(fm.shape, (len(ga.split_fm(fm, 2)), 2)
test_eq(
# 测试预测
= np.vstack([2 * [data[i]] for i in indptr[1:] - 1])
exp_fcsts = ga.predict(fm=fm, h=2)
fcsts, cols
np.testing.assert_equal(
fcsts,
np.hstack([exp_fcsts, exp_fcsts]),
)
#测试拟合和预测管道
= ga.fit_predict(models=models, h=2)
fm_fp, fcsts_fp, cols_fp 3, 2))
test_eq(fm_fp.shape, (
np.testing.assert_equal(fcsts_fp, fcsts)
np.testing.assert_equal(cols_fp, cols)
#测试级别
= ga.fit_predict(models=models, h=2, level=(50, 90))
fm_lv, fcsts_lv, cols_lv 2 * len(ga), 10))
test_eq(fcsts_lv.shape, (
#测试预测
= ga.forecast(models=models, h=2, fitted=True)
fcst_f 'forecasts'], fcsts_fp)
test_eq(fcst_f['cols'], cols_fp) test_eq(fcst_f[
class NullModel(_TS):
def __init__(self):
pass
def forecast(self):
pass
def __repr__(self):
return "NullModel"
#测试备用模型
= ga.forecast(models=[NullModel(), NullModel()], fallback_model=Naive(), h=2, fitted=True)
fcst_f 'forecasts'], fcsts_fp)
test_eq(fcst_f['cols'], ['NullModel', 'NullModel'])
test_eq(fcst_f[={'models': [NullModel()]}) test_fail(ga.forecast, kwargs
#测试级别
= (50, 60)
lv = 2
h #预测测试
= ga.forecast(models=[SumAhead()], h=h, fitted=True, level=lv)
fcsts_lv
test_eq('forecasts'].shape,
fcsts_lv[len(ga) * h, 1 + 2 * len(lv))
(
)
test_eq('cols'],
fcsts_lv['SumAhead',
['SumAhead-lo-50',
'SumAhead-hi-50',
'SumAhead-lo-60',
'SumAhead-hi-60']
)#拟合和预测流水线
= ga.fit_predict(models=[SumAhead()], h=h, level=lv)
fm_lv_fp, fcsts_lv_fp, cols_lv_fp
test_eq('forecasts'],
fcsts_lv[
fcsts_lv_fp
)
test_eq('cols'],
fcsts_lv[
cols_lv_fp )
# 交叉验证测试
= np.hstack([np.arange(10), np.arange(100, 200), np.arange(20, 40)])
data = np.array([0, 10, 110, 130])
indptr = GroupedArray(data, indptr)
ga
= ga.cross_validation(models=[SumAhead()], h=2, test_size=5, fitted=True)
res_cv = res_cv['forecasts']
fcsts_cv = res_cv['cols']
cols_cv
test_eq('y')],
fcsts_cv[:, cols_cv.index('SumAhead')]
fcsts_cv[:, cols_cv.index(
)
#关卡
= ga.cross_validation(models=[SumAhead(), Naive()], h=2, test_size=5, level=(50, 60)) res_cv_lv
= np.unique(np.diff(fcsts_cv[:, cols_cv.index('SumAhead')].reshape((3, -1, 2)), axis=1))
actual_step_size 1) test_eq(actual_step_size,
= [1, 2, 3, 2]
horizons = [3, 4, 6, 6]
test_sizes = [2, 2, 3, 4]
step_sizes for h, test_size, step_size in zip(horizons, test_sizes, step_sizes):
= ga.cross_validation(
res_cv =[SumAhead()], h=h,
models=test_size,
test_size=step_size,
step_size=True
fitted
)= res_cv['forecasts']
fcsts_cv = res_cv['cols']
cols_cv
test_eq('y')],
fcsts_cv[:, cols_cv.index('SumAhead')]
fcsts_cv[:, cols_cv.index(
)= fcsts_cv[:, cols_cv.index('SumAhead')].reshape((3, -1, h))
fcsts_cv = np.unique(
actual_step_size =1)
np.diff(fcsts_cv, axis
)
test_eq(actual_step_size, step_size)= res_cv['forecasts'].shape[1]
actual_n_windows int((test_size - h)/step_size) + 1) test_eq(actual_n_windows,
def fail_cv(h, test_size, step_size):
return ga.cross_validation(models=[SumAhead()], h=h, test_size=test_size, step_size=step_size)
='module', kwargs=dict(h=2, test_size=5, step_size=2)) test_fail(fail_cv, contains
#测试备用模型
# 交叉验证
= ga.cross_validation(
fcst_cv_f =[NullModel(), NullModel()],
models=Naive(), h=2,
fallback_model=5,
test_size=True
fitted
)= ga.cross_validation(
fcst_cv_naive =[Naive(), Naive()],
models=2,
h=5,
test_size=True
fitted
)'forecasts'], fcst_cv_naive['forecasts'])
test_eq(fcst_cv_f['fitted']['values'], fcst_cv_naive['fitted']['values']) np.testing.assert_array_equal(fcst_cv_f[
# 在交叉验证中测试拟合失败时的回退模型
class FailedFit:
def __init__(self):
pass
def forecast(self):
pass
def fit(self, y, X):
raise Exception('Failed fit')
def __repr__(self):
return "FailedFit"
= ga.cross_validation(
fcst_cv_f =[FailedFit()],
models=Naive(), h=2,
fallback_model=5,
test_size=False,
refit=True,
fitted
)= ga.cross_validation(
fcst_cv_naive =[Naive()],
models=2,
h=5,
test_size=False,
refit=True,
fitted
)'forecasts'], fcst_cv_naive['forecasts'])
test_eq(fcst_cv_f['fitted']['values'], fcst_cv_naive['fitted']['values']) np.testing.assert_array_equal(fcst_cv_f[
# 不重拟合的交叉验证测试
= np.array([0, 8, 16])
cv_starts = ga.cross_validation(models=[SumAhead()], h=2, test_size=5, refit=False, level=(50, 60))
res_cv_wo_refit = ga.cross_validation(models=[SumAhead()], h=2, test_size=5, refit=True, level=(50, 60))
res_cv_refit =(res_cv_wo_refit['forecasts'], res_cv_refit['forecasts']))
test_fail(test_eq, args#测试首次预测相等
test_eq('forecasts'][cv_starts],
res_cv_wo_refit['forecasts'][cv_starts]
res_cv_refit[
)# 对于改装=2,前两个窗口应保持一致。
= ga.cross_validation(models=[SumAhead()], h=2, test_size=5, refit=2)
res_cv_refit2
test_eq('forecasts'][np.hstack([cv_starts + 0, cv_starts + 1]), 1],
res_cv_refit2['forecasts'][np.hstack([cv_starts + 2, cv_starts + 3]), 1],
res_cv_refit2[
)# 而接下来的两个窗口应该是一样的。
test_eq('forecasts'][np.hstack([cv_starts + 4, cv_starts + 5]), 1],
res_cv_refit2['forecasts'][np.hstack([cv_starts + 6, cv_starts + 7]), 1],
res_cv_refit2[
)# 但它们之间有所不同。
test_fail(lambda: test_eq(
'forecasts'][np.hstack([cv_starts + 0, cv_starts + 1]), 1],
res_cv_refit2['forecasts'][np.hstack([cv_starts + 4, cv_starts + 5]), 1],
res_cv_refit2[
) )
from statsforecast.models import AutoCES
= ga.cross_validation(models=[AutoCES()], h=2, test_size=5, refit=False, level=(50, 60))
res_cv_wo_refit = ga.cross_validation(models=[AutoCES()], h=2, test_size=5, refit=True, level=(50, 60))
res_cv_refit =(res_cv_wo_refit['forecasts'], res_cv_refit['forecasts']))
test_fail(test_eq, args#测试首次预测相等
test_eq('forecasts'][[0, 8, 16]],
res_cv_wo_refit['forecasts'][[0, 8, 16]]
res_cv_refit[ )
def _get_n_jobs(n_groups, n_jobs):
if n_jobs == -1 or (n_jobs is None):
= os.cpu_count()
actual_n_jobs else:
= n_jobs
actual_n_jobs return min(n_groups, actual_n_jobs)
#测试中涉及的系列数量超过资源数量
100, -1), os.cpu_count())
test_eq(_get_n_jobs(100, None), os.cpu_count())
test_eq(_get_n_jobs(100, 2), 2) test_eq(_get_n_jobs(
#测试资源数量多于系列数量
1, -1), 1)
test_eq(_get_n_jobs(1, None), 1)
test_eq(_get_n_jobs(2, 10), 2) test_eq(_get_n_jobs(
def _warn_df_constructor():
warnings.warn("The `df` argument of the StatsForecast constructor as well as reusing stored "
"dfs from other methods is deprecated and will raise an error in a future version. "
"Please provide the `df` argument to the corresponding method instead, e.g. fit/forecast.",
=FutureWarning,
category
)
def _maybe_warn_sort_df(sort_df):
if not sort_df:
warnings.warn("The `sort_df` argument is deprecated and will be removed in a future version. "
"You can leave it to its default value (True) to supress this warning",
=FutureWarning,
category
)
def _warn_id_as_idx():
warnings.warn("In a future version the predictions will have the id as a column. "
"You can set the `NIXTLA_ID_AS_COL` environment variable "
"to adopt the new behavior and to suppress this warning.",
=FutureWarning,
category
)
def _id_as_idx() -> bool:
return not bool(os.getenv('NIXTLA_ID_AS_COL', ''))
= {
_param_descriptions 'freq': """freq : str or int
Frequency of the data. Must be a valid pandas or polars offset alias, or an integer.""",
'df': """df : pandas or polars DataFrame, optional (default=None)
DataFrame with ids, times, targets and exogenous.""",
'sort_df': """sort_df : bool (default=True)
Sort `df` by ids and times.""",
'fallback_model': """fallback_model : Any, optional (default=None)
Any, optional (default=None)
Model to be used if a model fails.
Only works with the `forecast` and `cross_validation` methods.""",
'id_col': """id_col : str (default='unique_id')
Column that identifies each serie.""",
'time_col': """time_col : str (default='ds')
Column that identifies each timestep, its values can be timestamps or integers.""",
'target_col': """target_col : str (default='y')
Column that contains the target.""",
'h': """h : int
Forecast horizon.""",
'X_df': """X_df : pandas or polars DataFrame, optional (default=None)
DataFrame with ids, times and future exogenous.""",
'level': """level : List[float], optional (default=None)
Confidence levels between 0 and 100 for prediction intervals.""",
'prediction_intervals': """prediction_intervals : ConformalIntervals, optional (default=None)
Configuration to calibrate prediction intervals (Conformal Prediction).""",
'fitted': """fitted : bool (default=False)
Store in-sample predictions.""",
'n_jobs': """n_jobs : int (default=1)
Number of jobs used in the parallel processing, use -1 for all cores.""",
'verbose': """verbose : bool (default=True)
Prints TQDM progress bar when `n_jobs=1`.""",
'models': """models : List[Any]
List of instantiated objects models.StatsForecast.""",
'n_windows': """n_windows : int (default=1)
Number of windows used for cross validation.""",
'step_size': """step_size : int (default=1)
Step size between each window.""",
'test_size': """test_size : int, optional (default=None)
Length of test size. If passed, set `n_windows=None`.""",
'input_size': """input_size : int, optional (default=None)
Input size for each window, if not none rolled windows.""",
'refit': """refit : bool or int (default=True)
Wether or not refit the model for each window.
If int, train the models every `refit` windows.""",
}
class _StatsForecast:
"""The `StatsForecast` class allows you to efficiently fit multiple `StatsForecast` models
for large sets of time series. It operates on a DataFrame `df` with at least three columns
ids, times and targets.
The class has memory-efficient `StatsForecast.forecast` method that avoids storing partial
model outputs. While the `StatsForecast.fit` and `StatsForecast.predict` methods with
Scikit-learn interface store the fitted models.
The `StatsForecast` class offers parallelization utilities with Dask, Spark and Ray back-ends.
See distributed computing example [here](https://github.com/Nixtla/statsforecast/tree/main/experiments/ray).
"""
def __init__(
self,
models: List[Any],str, int],
freq: Union[int = 1,
n_jobs: = None,
df: Optional[DataFrame] bool = True,
sort_df: = None,
fallback_model: Optional[Any] bool = False,
verbose:
):"""训练统计模型。
参数
----------
{models}
{freq}
{n_jobs}
{df}
{sort_df}
{fallback_model}
{verbose}
"""
# 待办 @fede:残差计算所需,稍后考虑
self.models = models
self._validate_model_names()
self.freq = freq
self.n_jobs = n_jobs
self.fallback_model = fallback_model
self.verbose = verbose
if df is not None:
_warn_df_constructor()self._prepare_fit(df=df, sort_df=sort_df)
else:
_maybe_warn_sort_df(sort_df)
__init__.__doc__ = __init__.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
def _validate_model_names(self):
# Some test models don't have alias
= [getattr(model, 'alias', lambda: None) for model in self.models]
names = [x for x in names if x is not None]
names if len(names) != len(set(names)):
raise ValueError('Model names must be unique. You can use `alias` to set a unique name for each model.')
def _prepare_fit(
self,
df: Optional[DataFrame],bool = True,
sort_df: str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: -> None:
) if df is None:
if not hasattr(self, 'ga'):
raise ValueError('You must provide the `df` argument.')
_warn_df_constructor()return
= ensure_time_dtype(df, time_col)
df self.freq)
validate_freq(df[time_col], if isinstance(df, pd.DataFrame) and df.index.name == id_col:
warnings.warn("Passing unique_id as the index is deprecated. "
"Please provide it as a column instead.",
=FutureWarning
category
)= df.reset_index()
df
_maybe_warn_sort_df(sort_df)self.uids, last_times, data, indptr, sort_idxs = ufp.process_df(
df, id_col, time_col, target_col
)if isinstance(df, pd.DataFrame):
self.last_dates = pd.Index(last_times, name=time_col)
else:
self.last_dates = pl_Series(last_times)
self.ga = GroupedArray(data, indptr)
self.og_dates = df[time_col].to_numpy()
if sort_idxs is not None:
self.og_dates = self.og_dates[sort_idxs]
self.n_jobs = _get_n_jobs(len(self.ga), self.n_jobs)
self.df_constructor = type(df)
self.id_col = id_col
self.time_col = time_col
self.target_col = target_col
self._exog = [c for c in df.columns if c not in (id_col, time_col, target_col)]
def _validate_sizes_for_prediction_intervals(
self,
prediction_intervals: Optional[ConformalIntervals],int = 0,
offset: -> None:
) if prediction_intervals is None:
return
= np.diff(self.ga.indptr) - offset
sizes # the absolute minimum requires two windows
= 2 * prediction_intervals.h + 1
min_samples if np.any(sizes < min_samples):
raise ValueError(
f'Minimum samples for computing prediction intervals are {min_samples + offset:,}, '
'some series have less. Please remove them or adjust the horizon.'
)# required samples for current configuration
= prediction_intervals.n_windows * prediction_intervals.h + 1
required_samples if np.any(sizes < required_samples):
warnings.warn(f'Prediction intervals settings require at least {required_samples + offset:,} samples, '
'some series have less and will use less windows.'
)
def _set_prediction_intervals(
self, prediction_intervals: Optional[ConformalIntervals]
-> None:
) for model in self.models:
= getattr(model, "prediction_intervals", None)
interval if interval is None:
setattr(model, "prediction_intervals", prediction_intervals)
def fit(
self,
= None,
df: Optional[DataFrame] bool = True,
sort_df: = None,
prediction_intervals: Optional[ConformalIntervals] str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col:
):"""Fit statistical models.
Fit `models` to a large set of time series from DataFrame `df`
and store fitted models for later inspection.
Parameters
----------
{df}
If None, the `StatsForecast` class should have been instantiated using `df`.
{sort_df}
{prediction_intervals}
{id_col}
{time_col}
{target_col}
Returns
-------
self : StatsForecast
Returns with stored `StatsForecast` fitted `models`.
"""
self._prepare_fit(
=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
df
)self._validate_sizes_for_prediction_intervals(prediction_intervals)
self._set_prediction_intervals(prediction_intervals=prediction_intervals)
if self.n_jobs == 1:
self.fitted_ = self.ga.fit(models=self.models, fallback_model=self.fallback_model)
else:
self.fitted_ = self._fit_parallel()
return self
= fit.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
fit.__doc__
def _make_future_df(self, h: int):
= ufp.offset_times(self.last_dates, freq=self.freq, n=1)
start_dates = ufp.time_ranges(start_dates, freq=self.freq, periods=h)
dates = ufp.repeat(self.uids, n=h)
uids = self.df_constructor({self.id_col: uids, self.time_col: dates})
df if isinstance(df, pd.DataFrame):
if _id_as_idx():
_warn_id_as_idx()= df.set_index(self.id_col)
df else:
= df.reset_index(drop=True)
df return df
def _parse_X_level(self, h: int, X: Optional[DataFrame], level: Optional[List[int]]):
if level is None:
= []
level if X is None:
return X, level
= (h * len(self.ga), self.ga.data.shape[1] + 1)
expected_shape if X.shape != expected_shape:
raise ValueError(f'Expected X to have shape {expected_shape}, but got {X.shape}')
= ufp.process_df(X, self.id_col, self.time_col, None)
_, _, data, indptr, _ return GroupedArray(data, indptr), level
def _validate_exog(self, X_df: Optional[DataFrame] = None) -> None:
if not any(m.uses_exog for m in self.models) or not self._exog:
return
= (
err_msg f'Models require the following exogenous features {self._exog} '
'for the forecasting step. Please provide them through `X_df`.'
)if X_df is None:
raise ValueError(err_msg)
= [c for c in self._exog if c not in X_df.columns]
missing_exog if missing_exog:
raise ValueError(err_msg)
def predict(
self,
int,
h: = None,
X_df: Optional[DataFrame] int]] = None,
level: Optional[List[
):"""Predict statistical models.
Use stored fitted `models` to predict large set of time series from DataFrame `df`.
Parameters
----------
{h}
{X_df}
{level}
Returns
-------
fcsts_df : pandas or polars DataFrame
DataFrame with `models` columns for point predictions and probabilistic
predictions for all fitted `models`.
"""
if not hasattr(self, 'fitted_'):
raise ValueError('You must call the fit method before calling predict.')
if any(getattr(m, 'prediction_intervals', None) is not None for m in self.models) and level is None:
warnings.warn("Prediction intervals are set but `level` was not provided. "
"Predictions won't have intervals."
)self._validate_exog(X_df)
= self._parse_X_level(h=h, X=X_df, level=level)
X, level if self.n_jobs == 1:
= self.ga.predict(fm=self.fitted_, h=h, X=X, level=level)
fcsts, cols else:
= self._predict_parallel(h=h, X=X, level=level)
fcsts, cols = self._make_future_df(h=h)
fcsts_df = fcsts
fcsts_df[cols] return fcsts_df
= predict.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
predict.__doc__
def fit_predict(
self,
int,
h: = None,
df: Optional[DataFrame] = None,
X_df: Optional[DataFrame] int]] = None,
level: Optional[List[bool = True,
sort_df: = None,
prediction_intervals: Optional[ConformalIntervals] str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: -> DataFrame:
) """Fit and Predict with statistical models.
This method avoids memory burden due from object storage.
It is analogous to Scikit-Learn `fit_predict` without storing information.
It requires the forecast horizon `h` in advance.
In contrast to `StatsForecast.forecast` this method stores partial models outputs.
Parameters
----------
{h}
{df}
If None, the `StatsForecast` class should have been instantiated using `df`.
{X_df}
{level}
{sort_df}
{prediction_intervals}
{id_col}
{time_col}
{target_col}
Returns
-------
fcsts_df : pandas or polars DataFrame
DataFrame with `models` columns for point predictions and probabilistic
predictions for all fitted `models`.
"""
self._prepare_fit(
=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
df
)self._validate_exog(X_df)
if prediction_intervals is not None and level is None:
raise ValueError('You must specify `level` when using `prediction_intervals`')
self._validate_sizes_for_prediction_intervals(prediction_intervals)
self._set_prediction_intervals(prediction_intervals=prediction_intervals)
= self._parse_X_level(h=h, X=X_df, level=level)
X, level if self.n_jobs == 1:
self.fitted_, fcsts, cols = self.ga.fit_predict(models=self.models, h=h, X=X, level=level)
else:
self.fitted_, fcsts, cols = self._fit_predict_parallel(h=h, X=X, level=level)
= self._make_future_df(h=h)
fcsts_df = fcsts
fcsts_df[cols] return fcsts_df
= fit_predict.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
fit_predict.__doc__
def forecast(
self,
int,
h: = None,
df: Optional[DataFrame] = None,
X_df: Optional[DataFrame] int]] = None,
level: Optional[List[bool = False,
fitted: bool = True,
sort_df: = None,
prediction_intervals: Optional[ConformalIntervals] str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: -> DataFrame:
) """Memory Efficient predictions.
This method avoids memory burden due from object storage.
It is analogous to Scikit-Learn `fit_predict` without storing information.
It requires the forecast horizon `h` in advance.
Parameters
----------
{h}
{df}
{X_df}
{level}
{fitted}
{sort_df}
{prediction_intervals}
{id_col}
{time_col}
{target_col}
Returns
-------
fcsts_df : pandas or polars DataFrame
DataFrame with `models` columns for point predictions and probabilistic
predictions for all fitted `models`.
"""
self.__dict__.pop('fcst_fitted_values_', None)
self._prepare_fit(
=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
df
)self._validate_exog(X_df)
self._validate_sizes_for_prediction_intervals(prediction_intervals)
self._set_prediction_intervals(prediction_intervals=prediction_intervals)
= self._parse_X_level(h=h, X=X_df, level=level)
X, level if self.n_jobs == 1:
= self.ga.forecast(
res_fcsts =self.models,
models=h,
h=self.fallback_model,
fallback_model=fitted,
fitted=X,
X=level,
level=self.verbose,
verbose=target_col,
target_col
)else:
= self._forecast_parallel(
res_fcsts =h,
h=fitted,
fitted=X,
X=level,
level=target_col,
target_col
)if fitted:
self.fcst_fitted_values_ = res_fcsts['fitted']
= res_fcsts['forecasts']
fcsts = res_fcsts['cols']
cols = self._make_future_df(h=h)
fcsts_df = fcsts
fcsts_df[cols] self.forecast_times_ = res_fcsts['times']
return fcsts_df
= forecast.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
forecast.__doc__
def forecast_fitted_values(self):
"""Access insample predictions.
After executing `StatsForecast.forecast`, you can access the insample
prediction values for each model. To get them, you need to pass `fitted=True`
to the `StatsForecast.forecast` method and then use the
`StatsForecast.forecast_fitted_values` method.
Returns
-------
fcsts_df : pandas.DataFrame | polars.DataFrame
DataFrame with insample `models` columns for point predictions and probabilistic
predictions for all fitted `models`.
"""
if not hasattr(self, "fcst_fitted_values_"):
raise Exception("Please run `forecast` method using `fitted=True`")
= self.fcst_fitted_values_["cols"]
cols = self.df_constructor({
df self.id_col: ufp.repeat(self.uids, np.diff(self.ga.indptr)),
self.time_col: self.og_dates
})= self.fcst_fitted_values_['values']
df[cols] if isinstance(df, pd.DataFrame):
if _id_as_idx():
_warn_id_as_idx()= df.set_index(self.id_col)
df else:
= df.reset_index(drop=True)
df return df
def cross_validation(
self,
int,
h: = None,
df: Optional[DataFrame] int = 1,
n_windows: int = 1,
step_size: int] = None,
test_size: Optional[int] = None,
input_size: Optional[int]] = None,
level: Optional[List[bool = False,
fitted: bool, int] = True,
refit: Union[bool = True,
sort_df: = None,
prediction_intervals: Optional[ConformalIntervals] str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: -> DataFrame:
) """Temporal Cross-Validation.
Efficiently fits a list of `StatsForecast`
models through multiple training windows, in either chained or rolled manner.
`StatsForecast.models`' speed allows to overcome this evaluation technique
high computational costs. Temporal cross-validation provides better model's
generalization measurements by increasing the test's length and diversity.
Parameters
----------
{h}
{df}
If None, the `StatsForecast` class should have been instantiated using `df`.
{n_windows}
{step_size}
{test_size}
{input_size}
{level}
{fitted}
{refit}
{sort_df}
{prediction_intervals}
{id_col}
{time_col}
{target_col}
Returns
-------
fcsts_df : pandas or polars DataFrame
DataFrame with insample `models` columns for point predictions and probabilistic
predictions for all fitted `models`.
"""
if n_windows is None and test_size is None:
raise ValueError('you must define `n_windows` or `test_size`')
if test_size is None:
= h + step_size * (n_windows - 1)
test_size if prediction_intervals is not None and level is None:
raise ValueError('You must specify `level` when using `prediction_intervals`')
if refit != True:
= [m for m in self.models if not hasattr(m, 'forward')]
no_forward if no_forward:
raise ValueError(
'Can only use integer refit or refit=False with models that implement the forward method. '
f'The following models do not implement the forward method: {no_forward}.'
)if self.fallback_model is not None and not hasattr(self.fallback_model, 'forward'):
raise ValueError(
'Can only use integer refit or refit=False with a fallback model that implements the forward method.'
)self.__dict__.pop('cv_fitted_values_', None)
self._prepare_fit(
=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
df
)= np.diff(self.ga.indptr)
series_sizes = series_sizes <= test_size
short_series if short_series.any():
= self.uids[short_series].to_numpy().tolist()
short_ids raise ValueError(
f"The following series are too short for the cross validation settings: {reprlib.repr(short_ids)}\n"
"Please remove these series or change the settings, e.g. reducing the horizon or the number of windows."
) self._validate_sizes_for_prediction_intervals(
=prediction_intervals, offset=test_size
prediction_intervals
)self._set_prediction_intervals(prediction_intervals=prediction_intervals)
= self._parse_X_level(h=h, X=None, level=level)
_, level if self.n_jobs == 1:
= self.ga.cross_validation(
res_fcsts =self.models, h=h, test_size=test_size,
models=self.fallback_model,
fallback_model=step_size,
step_size=input_size,
input_size=fitted,
fitted=level,
level=self.verbose,
verbose=refit,
refit=target_col,
target_col
)else:
= self._cross_validation_parallel(
res_fcsts =h,
h=test_size,
test_size=step_size,
step_size=input_size,
input_size=fitted,
fitted=level,
level=refit,
refit=target_col,
target_col
)if fitted:
self.cv_fitted_values_ = res_fcsts['fitted']
self.n_cv_ = n_windows
= ufp.cv_times(
fcsts_df =self.og_dates,
times=self.uids,
uids=self.ga.indptr,
indptr=h,
h=test_size,
test_size=step_size,
step_size=id_col,
id_col=time_col,
time_col
)# the cv_times is sorted by window and then id
= ufp.sort(fcsts_df, [id_col, "cutoff", time_col])
fcsts_df = ufp.assign_columns(fcsts_df, res_fcsts["cols"], res_fcsts["forecasts"])
fcsts_df if isinstance(fcsts_df, pd.DataFrame) and _id_as_idx():
_warn_id_as_idx()= fcsts_df.set_index(id_col)
fcsts_df return fcsts_df
= cross_validation.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
cross_validation.__doc__
def cross_validation_fitted_values(self) -> DataFrame:
"""Access insample cross validated predictions.
After executing `StatsForecast.cross_validation`, you can access the insample
prediction values for each model and window. To get them, you need to pass `fitted=True`
to the `StatsForecast.cross_validation` method and then use the
`StatsForecast.cross_validation_fitted_values` method.
Returns
-------
fcsts_df : pandas or polars DataFrame
DataFrame with insample `models` columns for point predictions
and probabilistic predictions for all fitted `models`.
"""
if not hasattr(self, 'cv_fitted_values_'):
raise Exception('Please run `cross_validation` method using `fitted=True`')
= self.cv_fitted_values_['idxs'].flatten(order='F')
idxs = ufp.repeat(self.uids, np.diff(self.ga.indptr))
train_uids = ufp.vertical_concat([train_uids for _ in range(self.n_cv_)])
cv_uids = ufp.take_rows(cv_uids, idxs)
used_uids = np.tile(self.og_dates, self.n_cv_)[idxs]
dates = self.cv_fitted_values_['last_idxs'].flatten(order='F')[idxs]
cutoffs_mask = np.diff(np.append(0, np.where(cutoffs_mask)[0] + 1))
cutoffs_sizes = np.repeat(dates[cutoffs_mask], cutoffs_sizes)
cutoffs = self.df_constructor({
df self.id_col: used_uids,
self.time_col: dates,
'cutoff': cutoffs,
})= np.reshape(
fitted_vals self.cv_fitted_values_['values'],
-1, len(self.models) + 1),
(='F',
order
)= ufp.assign_columns(df, self.cv_fitted_values_['cols'], fitted_vals[idxs])
df = ufp.drop_index_if_pandas(df)
df if isinstance(df, pd.DataFrame):
if _id_as_idx():
_warn_id_as_idx()= df.set_index(self.id_col)
df else:
= df.reset_index(drop=True)
df return df
def _get_pool(self):
from multiprocessing import Pool
= dict()
pool_kwargs return Pool, pool_kwargs
def _fit_parallel(self):
= self.ga.split(self.n_jobs)
gas = self._get_pool()
Pool, pool_kwargs with Pool(self.n_jobs, **pool_kwargs) as executor:
= []
futures for ga in gas:
= executor.apply_async(
future
ga._single_threaded_fit,self.models, self.fallback_model)
(
)
futures.append(future)= np.vstack([f.get() for f in futures])
fm return fm
def _get_gas_Xs(self, X, tasks_per_job=1):
= min(tasks_per_job * self.n_jobs, self.ga.n_groups)
n_chunks = self.ga.split(n_chunks)
gas if X is not None:
= X.split(n_chunks)
Xs else:
from itertools import repeat
= repeat(None)
Xs return gas, Xs
def _predict_parallel(self, h, X, level):
#create elements for each core
= self._get_gas_Xs(X=X)
gas, Xs = self.ga.split_fm(self.fitted_, self.n_jobs)
fms = self._get_pool()
Pool, pool_kwargs #compute parallel forecasts
with Pool(self.n_jobs, **pool_kwargs) as executor:
= []
futures for ga, fm, X_ in zip(gas, fms, Xs):
= executor.apply_async(
future
ga._single_threaded_predict,
(fm, h, X_, level),
)
futures.append(future)= [f.get() for f in futures]
out = list(zip(*out))
fcsts, cols = np.vstack(fcsts)
fcsts = cols[0]
cols return fcsts, cols
def _fit_predict_parallel(self, h, X, level):
#create elements for each core
= self._get_gas_Xs(X=X)
gas, Xs = self._get_pool()
Pool, pool_kwargs #compute parallel forecasts
with Pool(self.n_jobs, **pool_kwargs) as executor:
= []
futures for ga, X_ in zip(gas, Xs):
= executor.apply_async(
future
ga._single_threaded_fit_predict,self.models, h, X_, level),
(
)
futures.append(future)= [f.get() for f in futures]
out = list(zip(*out))
fm, fcsts, cols = np.vstack(fm)
fm = np.vstack(fcsts)
fcsts = cols[0]
cols return fm, fcsts, cols
def _forecast_parallel(self, h, fitted, X, level, target_col):
= self._get_gas_Xs(X=X, tasks_per_job=100)
gas, Xs = [None] * len(gas)
results with ProcessPoolExecutor(self.n_jobs) as executor:
= {
future2pos
executor.submit(
ga._single_threaded_forecast,=h,
h=self.models,
models=self.fallback_model,
fallback_model=fitted,
fitted=X,
X=level,
level=False,
verbose=target_col,
target_col
): ifor i, (ga, X) in enumerate(zip(gas, Xs))
}= tqdm(
iterable
as_completed(future2pos),=not self.verbose,
disable=len(future2pos),
total="Forecast",
desc="{l_bar}{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed}{postfix}]",
bar_format
)for future in iterable:
= future2pos[future]
i = future.result()
results[i] = {
result 'cols': results[0]['cols'],
'forecasts': np.vstack([r['forecasts'] for r in results]),
'times': {
sum(r['times'][m] for r in results)
m: for m in [repr(m) for m in self.models]
},
}if fitted:
'fitted'] = {
result['cols': results[0]['fitted']['cols'],
'values': np.vstack([r['fitted']['values'] for r in results]),
}return result
def _cross_validation_parallel(self, h, test_size, step_size, input_size, fitted, level, refit, target_col):
#create elements for each core
= self.ga.split(self.n_jobs)
gas = self._get_pool()
Pool, pool_kwargs #compute parallel forecasts
= {}
result with Pool(self.n_jobs, **pool_kwargs) as executor:
= []
futures for ga in gas:
= executor.apply_async(
future
ga._single_threaded_cross_validation,tuple(),
dict(
=self.models,
models=h,
h=test_size,
test_size=self.fallback_model,
fallback_model=step_size,
step_size=input_size,
input_size=fitted,
fitted=level,
level=refit,
refit=self.verbose,
verbose=target_col,
target_col
),
)
futures.append(future)= [f.get() for f in futures]
out = [d['forecasts'] for d in out]
fcsts = np.vstack(fcsts)
fcsts = out[0]['cols']
cols 'forecasts'] = fcsts
result['cols'] = cols
result[if fitted:
'fitted'] = {}
result['fitted']['values'] = np.concatenate([d['fitted']['values'] for d in out])
result[for key in ['last_idxs', 'idxs']:
'fitted'][key] = np.concatenate([d['fitted'][key] for d in out])
result['fitted']['cols'] = out[0]['fitted']['cols']
result[return result
@staticmethod
def plot(
df: DataFrame,= None,
forecasts_df: Optional[DataFrame] str]], np.ndarray] = None,
unique_ids: Union[Optional[List[bool = True,
plot_random: str]] = None,
models: Optional[List[float]] = None,
level: Optional[List[int] = None,
max_insample_length: Optional[bool = False,
plot_anomalies: str = 'matplotlib',
engine: str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: = None
resampler_kwargs: Optional[Dict]
):"""Plot forecasts and insample values.
Parameters
----------
{df}
forecasts_df : pandas or polars DataFrame, optional (default=None)
DataFrame ids, times and models.
unique_ids : list of str, optional (default=None)
ids to plot. If None, they're selected randomly.
plot_random : bool (default=True)
Select time series to plot randomly.
models : List[str], optional (default=None)
List of models to plot.
level : List[float], optional (default=None)
List of prediction intervals to plot if paseed.
max_insample_length : int, optional (default=None)
Max number of train/insample observations to be plotted.
plot_anomalies : bool (default=False)
Plot anomalies for each prediction interval.
engine : str (default='matplotlib')
Library used to plot. 'plotly', 'plotly-resampler' or 'matplotlib'.
{id_col}
{time_col}
{target_col}
resampler_kwargs : dict
Kwargs to be passed to plotly-resampler constructor.
For further custumization ("show_dash") call the method,
store the plotting object and add the extra arguments to
its `show_dash` method.
"""
from utilsforecast.plotting import plot_series
= ensure_time_dtype(df, time_col)
df if isinstance(df, pd.DataFrame) and df.index.name == id_col:
warnings.warn("Passing the ids as the index is deprecated. "
"Please provide them as a column instead.",
=FutureWarning
category
)= df.reset_index()
df if forecasts_df is not None:
= ensure_time_dtype(forecasts_df, time_col)
forecasts_df if isinstance(forecasts_df, pd.DataFrame) and forecasts_df.index.name == id_col:
warnings.warn("Passing the ids as the index is deprecated. "
"Please provide them as a column instead.",
=FutureWarning
category
)= forecasts_df.reset_index()
forecasts_df return plot_series(
=df,
df=forecasts_df,
forecasts_df=unique_ids,
ids=plot_random,
plot_random=models,
models=level,
level=max_insample_length,
max_insample_length=plot_anomalies,
plot_anomalies=engine,
engine=resampler_kwargs,
resampler_kwargs='tab20b',
palette=id_col,
id_col=time_col,
time_col=target_col,
target_col
)
def save(
self,
str]] = None,
path: Optional[Union[Path, str] = None,
max_size: Optional[bool = False,
trim:
):"""Function that will save StatsForecast class with certain settings to make it
reproducible.
Parameters
----------
path : str or pathlib.Path, optional (default=None)
Path of the file to be saved. If `None` will create one in the current
directory using the current UTC timestamp.
max_size : str, optional (default = None)
StatsForecast object should not exceed this size.
Available byte naming: ['B', 'KB', 'MB', 'GB']
trim : bool (default = False)
Delete any attributes not needed for inference.
"""
# Will be used to find the size of the fitted models
# Never expecting anything higher than GB (even that's a lot')
= {
bytes_hmap "B": 1,
"KB": 2**10,
"MB": 2**20,
"GB": 2**30,
}
# Removing unnecessary attributes
# @jmoralez decide future implementation
list = ["fcst_fitted_values_", "cv_fitted_values_"]
trim_attr:if trim:
for attr in trim_attr:
# remove unnecessary attributes here
self.__dict__.pop(attr, None)
= len(pickle.dumps(self))
sf_size
if max_size is not None:
= self._get_cap_size(max_size, bytes_hmap)
cap_size if sf_size >= cap_size:
= "StatsForecast is larger than the specified max_size"
err_messg raise OSError(errno.EFBIG, err_messg)
= None, None
converted_size, sf_byte for key in reversed(list(bytes_hmap.keys())):
= bytes_hmap[key]
x_byte if sf_size >= x_byte:
= sf_size / x_byte
converted_size = key
sf_byte break
if converted_size is None or sf_byte is None:
= "Internal Error, this shouldn't happen, please open an issue"
err_messg raise RuntimeError(err_messg)
print(f"Saving StatsForecast object of size {converted_size:.2f}{sf_byte}.")
if path is None:
= dt.datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S")
datetime_record = f"StatsForecast_{datetime_record}.pkl"
path
with open(path, "wb") as m_file:
self, m_file)
pickle.dump(print("StatsForecast object saved")
def _get_cap_size(self, max_size, bytes_hmap):
= max_size.upper().replace(" ", "")
max_size = re.match(r'(\d+\.\d+|\d+)(\w+)', max_size)
match if match is None or len(match.groups()) < 2 or match[2] not in bytes_hmap.keys():
= "Couldn't parse `max_size`, it should be `None`", \
parsing_error " or a number followed by one of the following units: ['B', 'KB', 'MB', 'GB']"
raise ValueError(parsing_error)
else:
= float(match[1])
m_size = match[2]
key_ = m_size * bytes_hmap[key_]
cap_size return cap_size
@staticmethod
def load(path:Union[Path, str]):
"""
Automatically loads the model into ready StatsForecast.
Parameters
----------
path : str or pathlib.Path
Path to saved StatsForecast file.
Returns
-------
sf: StatsForecast
Previously saved StatsForecast
"""
if not Path(path).exists():
raise ValueError("Specified path does not exist, check again and retry.")
with open(path, "rb") as f:
return pickle.load(f)
def __repr__(self):
return f"StatsForecast(models=[{','.join(map(repr, self.models))}])"
= _StatsForecast.plot.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性] _StatsForecast.plot.__doc__
class ParallelBackend:
def forecast(
self,
*,
models,
fallback_model,
freq,
h,
df,
X_df,
level,
fitted,
prediction_intervals,
id_col,
time_col,
target_col,-> Any:
) = _StatsForecast(
model =models,
models=freq,
freq=fallback_model,
fallback_model
)return model.forecast(
=df,
df=h,
h=X_df,
X_df=level,
level=fitted,
fitted=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)
def cross_validation(
self,
*,
df,
models,
freq,
fallback_model,
h,
n_windows,
step_size,
test_size,
input_size,
level,
refit,
fitted,
prediction_intervals,
id_col,
time_col,
target_col, -> Any:
) = _StatsForecast(
model =models,
models=freq,
freq=fallback_model,
fallback_model
)return model.cross_validation(
=df,
df=h,
h=n_windows,
n_windows=step_size,
step_size=test_size,
test_size=input_size,
input_size=level,
level=refit,
refit=fitted,
fitted=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)
@conditional_dispatcher
def make_backend(obj:Any, *args:Any, **kwargs:Any) -> ParallelBackend:
return ParallelBackend()
class StatsForecast(_StatsForecast):
def forecast(
self,
int,
h: = None,
df: Any = None,
X_df: Optional[DataFrame] int]] = None,
level: Optional[List[bool = False,
fitted: bool = True,
sort_df: = None,
prediction_intervals: Optional[ConformalIntervals] str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col:
):if prediction_intervals is not None and level is None:
raise ValueError('You must specify `level` when using `prediction_intervals`')
if self._is_native(df=df):
return super().forecast(
=df,
df=h,
h=X_df,
X_df=level,
level=fitted,
fitted=sort_df,
sort_df=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)assert df is not None
= make_execution_engine(infer_by=[df])
engine self._backend = make_backend(engine)
return self._backend.forecast(
=self.models,
models=self.fallback_model,
fallback_model=self.freq,
freq=df,
df=h,
h=X_df,
X_df=level,
level=fitted,
fitted=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)
def forecast_fitted_values(self):
if hasattr(self, '_backend'):
= self._backend.forecast_fitted_values()
res else:
= super().forecast_fitted_values()
res return res
def cross_validation(
self,
int,
h: = None,
df: Any int = 1,
n_windows: int = 1,
step_size: int] = None,
test_size: Optional[int] = None,
input_size: Optional[int]] = None,
level: Optional[List[bool = False,
fitted: bool, int] = True,
refit: Union[bool = True,
sort_df: = None,
prediction_intervals: Optional[ConformalIntervals] str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col:
):if self._is_native(df=df):
return super().cross_validation(
=h,
h=df,
df=n_windows,
n_windows=step_size,
step_size=test_size,
test_size=input_size,
input_size=level,
level=fitted,
fitted=refit,
refit=sort_df,
sort_df=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)assert df is not None
= make_execution_engine(infer_by=[df])
engine = make_backend(engine)
backend return backend.cross_validation(
=df,
df=self.models,
models=self.freq,
freq=self.fallback_model,
fallback_model=h,
h=n_windows,
n_windows=step_size,
step_size=test_size,
test_size=input_size,
input_size=level,
level=refit,
refit=fitted,
fitted=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)
def _is_native(self, df) -> bool:
= try_get_context_execution_engine()
engine return engine is None and (df is None or isinstance(df, pd.DataFrame) or isinstance(df, pl_DataFrame))
=2, name='StatsForecast') show_doc(StatsForecast, title_level
# StatsForecast 的类使用示例
#来自 statsforecast.core 的 StatsForecast
from statsforecast.models import (
ADIDA,
AutoARIMA,
CrostonClassic,
CrostonOptimized,
CrostonSBA,
HistoricAverage,
IMAPA,
Naive,
RandomWalkWithDrift,
SeasonalExponentialSmoothing,
SeasonalNaive,
SeasonalWindowAverage,
SimpleExponentialSmoothing,
TSB,
WindowAverage,
DynamicOptimizedTheta,
AutoETS,
AutoCES
)
# 生成合成面板数据框示例
= generate_series(n_series=9, equal_ends=False, engine='pandas')
panel_df 'unique_id').tail(4) panel_df.groupby(
if 'NIXTLA_ID_AS_COL' in os.environ:
del os.environ['NIXTLA_ID_AS_COL']
# 将id用作索引的警告
= StatsForecast(models=[Naive()], freq='D')
fcst =panel_df)
fcst.fit(dfwith warnings.catch_warnings(record=True) as issued_warnings:
'always', category=FutureWarning)
warnings.simplefilter(= fcst.predict(h=1)
std_preds = fcst.fit_predict(df=panel_df, h=1)
std_preds2 = fcst.forecast(df=panel_df, h=1, fitted=True)
std_fcst = fcst.forecast_fitted_values()
std_fitted = fcst.cross_validation(df=panel_df, h=1, fitted=True)
std_cv = fcst.cross_validation_fitted_values()
std_fitted_cv assert len(issued_warnings) == 6
assert all('the predictions will have the id as a column' in str(w.message) for w in issued_warnings)
'NIXTLA_ID_AS_COL'] = '1' os.environ[
# 如果我们使用包含外生变量的模型进行训练,则必须通过X_df提供这些外生变量。
# 否则将引发错误,提示此问题。
= panel_df[panel_df['unique_id'] == 0].copy()
panel_with_exog 'month'] = panel_df['ds'].dt.month
panel_with_exog[= StatsForecast(
sf =[AutoARIMA(season_length=12)],
models='M',
freq
)
sf.fit(panel_with_exog)= "['month'] for the forecasting step. Please provide them through `X_df`"
expected_msg
test_fail(lambda: sf.predict(h=12),
=expected_msg,
contains
)
test_fail(lambda: sf.forecast(df=panel_with_exog, h=12),
=expected_msg,
contains
)
test_fail(lambda: sf.fit_predict(df=panel_with_exog, h=12),
=expected_msg,
contains
)# if the models don't use exog then it continues
= StatsForecast(
sf =[SeasonalNaive(season_length=10), Naive()],
models='M',
freq
)
sf.fit(panel_with_exog)= sf.predict(h=12) _
# 检查具有预测区间的尺寸
= StatsForecast(models=[Naive()], freq='D')
fcst = ConformalIntervals(n_windows=4, h=10)
intervals # 间隔需要41个样本,而30个样本我们只能使用2个窗口,应发出警告。
with warnings.catch_warnings(record=True) as issued_warnings:
=panel_df.head(30), prediction_intervals=intervals)
fcst.fit(dfassert 'will use less windows' in str(issued_warnings[0].message)
assert fcst.fitted_[0, 0]._cs.shape[0] == 2
# 如果我们拥有的样本少于21个(即两个窗口,h = 10 + 1用于训练),它应该会失败。
test_fail(lambda: fcst.fit(df=panel_df.head(20), prediction_intervals=intervals),
='Please remove them or adjust the horizon',
contains
)# 对于CV,应考虑测试规模(CV为20,区间为21)
test_fail(lambda: fcst.cross_validation(
=panel_df.head(40),
df=2,
n_windows=10,
step_size=10,
h=intervals,
prediction_intervals=[80],
level
),='Minimum samples for computing prediction intervals are 41',
contains )
# 整数重置或重置=False 对不支持的模型引发错误
= StatsForecast(models=[Naive(), SeasonalNaive(season_length=7)], freq='D')
fcst
test_fail(lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=2),
='implement the forward method: [SeasonalNaive]'
contains
)
test_fail(lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=False),
='implement the forward method: [SeasonalNaive]'
contains
)= StatsForecast(models=[Naive()], freq='D', fallback_model=SeasonalNaive(season_length=7))
fcst
test_fail(lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=2),
='a fallback model that implements the forward method.'
contains
)
test_fail(lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=False),
='a fallback model that implements the forward method.'
contains )
# 非标准列名
= {'unique_id': 'uid', 'ds': 'time', 'y': 'target'}
renamer = dict(id_col='uid', time_col='time', target_col='target')
kwargs = {v: k for k, v in renamer.items()}
inverse_renamer = panel_df.rename(columns=renamer)
non_std_df
def assert_equal_results(df1, df2):
pd.testing.assert_frame_equal(
df1.reset_index(),=inverse_renamer),
df2.rename(columns
)
= StatsForecast(models=[Naive()], freq='D')
fcst =non_std_df, **kwargs)
fcst.fit(df= fcst.predict(h=1)
non_std_preds = fcst.fit_predict(df=non_std_df, h=1, **kwargs)
non_std_preds2 = fcst.forecast(df=non_std_df, h=1, fitted=True, **kwargs)
non_std_fcst = fcst.forecast_fitted_values()
non_std_fitted = fcst.cross_validation(df=non_std_df, h=1, fitted=True, **kwargs)
non_std_cv = fcst.cross_validation_fitted_values()
non_std_fitted_cv
assert_equal_results(std_preds, non_std_preds)
assert_equal_results(std_preds2, non_std_preds2)
assert_equal_results(std_fcst, non_std_fcst)
assert_equal_results(std_fitted, non_std_fitted)
assert_equal_results(std_cv, non_std_cv) assert_equal_results(std_fitted_cv, non_std_fitted_cv)
# 声明要拟合的StatsForecast估计器实例列表
# You can try other estimator's hyperparameters
# You can try other methods from the `models.StatsForecast` collection
# Check them here: https://nixtla.github.io/statsforecast/models.html
=[AutoARIMA(), Naive(),
models=True, alias='MeanAutoARIMA')]
AutoETS(), AutoARIMA(allowmean
# Instantiate StatsForecast class
= StatsForecast(models=models,
fcst ='D',
freq=1,
n_jobs=True)
verbose
# Efficiently predict
= fcst.forecast(df=panel_df, h=4, fitted=True)
fcsts_df 'unique_id').tail(4) fcsts_df.groupby(
# 测试保存和加载
import tempfile
from polars.testing import assert_frame_equal
with tempfile.TemporaryDirectory() as td:
= Path(td).joinpath("sf_test.pickle")
f_path
= generate_series(n_series=9, equal_ends=False, engine='polars')
test_df = StatsForecast(
test_frcs =models,
models='1d',
freq=1,
n_jobs=True
verbose
)
= test_frcs.forecast(df=test_df, h=4, fitted=True)
origin_df
test_frcs.save(f_path)
= StatsForecast.load(f_path)
sf_test = sf_test.forecast(df=test_df, h=4, fitted=True)
load_df
assert_frame_equal(origin_df, load_df)
# 测试自定义名称
test_eq(-1],
fcsts_df.columns['MeanAutoARIMA'
)
# 测试无重复名称
lambda: StatsForecast(models=[Naive(), Naive()], freq="D"))
test_fail(=[Naive(), Naive(alias="Naive2")], freq="D") StatsForecast(models
= StatsForecast.plot(panel_df, max_insample_length=10)
fig fig
test_fail(
StatsForecast.plot, ='Please use a list',
contains={'df': panel_df, 'level': 90}
kwargs )
='matplotlib') fcst.plot(panel_df, fcsts_df, engine
# 以ds为对象的测试图
'ds'] = panel_df['ds'].astype(str)
panel_df[ fcst.plot(panel_df, fcsts_df)
= fcst.forecast(df=panel_df, h=4, fitted=True, level=[90, 80, 30])
fcsts_df 'unique_id').tail(4)
fcsts_df.groupby(=['AutoARIMA', 'AutoETS'], level=[90, 80], max_insample_length=28) fcst.plot(panel_df, fcsts_df, models
fcst.plot(fcst.forecast_fitted_values(),=fcsts_df,
forecasts_df=['AutoARIMA', 'AutoETS'], level=[80],
models=20,
max_insample_length=True) plot_anomalies
=['AutoARIMA', 'Naive']) fcst.plot(panel_df, fcsts_df, models
=['AutoARIMA', 'Naive'], max_insample_length=28) fcst.plot(panel_df, fcsts_df, models
'unique_id in [0, 1]'), fcsts_df, models=['AutoARIMA', 'Naive'], level=[90]) fcst.plot(panel_df.query(
=[0, 1], models=['AutoARIMA', 'Naive'], level=[90]) fcst.plot(panel_df, fcsts_df, unique_ids
'unique_id == 0'), fcsts_df, models=['AutoARIMA', 'Naive'], level=[90]) fcst.plot(panel_df.query(
'unique_id == 0'), models=['AutoARIMA', 'Naive'], level=[90]) fcst.plot(panel_df, fcsts_df.query(
=[0], models=['AutoARIMA', 'Naive'], level=[90]) fcst.plot(panel_df, fcsts_df, unique_ids
'unique_id in [0, 1, 3]'), fcsts_df, models=['AutoARIMA', 'Naive'], level=[90]) fcst.plot(panel_df.query(
=[0, 1, 2], level=[90]) fcst.plot(panel_df, fcsts_df, unique_ids
= fcst.plot(panel_df, fcsts_df, unique_ids=[0, 1, 2, 3, 4], models=['AutoARIMA', 'Naive'], level=[90])
fig fig
= fcst.plot(
fig =[0, 1, 2, 3, 4],
panel_df, fcsts_df, unique_ids=['AutoARIMA', 'Naive'],
models=[90],
level='matplotlib'
engine
) fig
# 测试模型预测区间覆盖
=[SimpleExponentialSmoothing(alpha=0.1, prediction_intervals=ConformalIntervals(h=24, n_windows=2))]
models= StatsForecast(models=models, freq='D', n_jobs=1)
fcst None)
fcst._set_prediction_intervals(assert models[0].prediction_intervals is not None
= StatsForecast(models=[AutoARIMA(season_length=7)],
fcst ='D',
freq=1,
n_jobs=True)
verbose= fcst.forecast(df=panel_df, h=4, fitted=True, level=[90])
fcsts_df 'unique_id').tail(4)
fcsts_df.groupby(= fcst.forecast_fitted_values()
fitted_vals ='y'), level=[90]) fcst.plot(panel_df, fitted_vals.drop(columns
show_doc(_StatsForecast.fit, =2,
title_level='StatsForecast.fit') name
show_doc(_StatsForecast.predict, =2,
title_level='SatstForecast.predict') name
show_doc(_StatsForecast.fit_predict, =2,
title_level='StatsForecast.fit_predict') name
=2, name='StatsForecast.forecast') show_doc(_StatsForecast.forecast, title_level
# StatsForecast.forecast 方法使用示例
#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import AutoARIMA, Naive
# 实例化 StatsForecast 类
= StatsForecast(models=[AutoARIMA(), Naive()],
fcst ='D', n_jobs=1)
freq
# 高效预测,无需存储记忆
= fcst.forecast(df=panel_df, h=4, fitted=True)
fcsts_df 'unique_id').tail(4) fcsts_df.groupby(
= generate_series(100, n_static_features=2, equal_ends=False)
series
= [
models
ADIDA(), CrostonClassic(), CrostonOptimized(),
CrostonSBA(), HistoricAverage(),
IMAPA(), Naive(),
RandomWalkWithDrift(), =7, alpha=0.1),
SeasonalExponentialSmoothing(season_length=7),
SeasonalNaive(season_length=7, window_size=4),
SeasonalWindowAverage(season_length=0.1),
SimpleExponentialSmoothing(alpha=0.1, alpha_p=0.3),
TSB(alpha_d=4)
WindowAverage(window_size
]
= StatsForecast(
fcst =models,
models='D',
freq=1,
n_jobs=True
verbose
)
= fcst.forecast(df=series, h=14) res
# 不带日期时间作为日期时间的测试系列
= series.copy()
series_wo_dt 'ds'] = series_wo_dt['ds'].astype(str)
series_wo_dt[= StatsForecast(models=models, freq='D')
fcst = fcst.forecast(df=series_wo_dt, h=14)
fcsts_wo_dt test_eq(res, fcsts_wo_dt)
'unique_id'].unique(), fcst.uids.values)
test_eq(res[= series.groupby('unique_id')['ds'].max()
last_dates 'unique_id')['ds'].min().values, last_dates + pd.offsets.Day())
test_eq(res.groupby('unique_id')['ds'].max().values, last_dates + 14 * pd.offsets.Day()) test_eq(res.groupby(
#月度数据测试
= generate_series(10_000, freq='M', min_length=10, max_length=20, equal_ends=True)
monthly_series
monthly_series
= StatsForecast(
fcst =[Naive()],
models='M'
freq
)= fcst.forecast(df=monthly_series, h=4)
monthly_res
monthly_res
= monthly_series.groupby('unique_id')['ds'].max()
last_dates 'unique_id')['ds'].min().values, pd.Series(fcst.last_dates) + pd.offsets.MonthEnd())
test_eq(monthly_res.groupby('unique_id')['ds'].max().values, pd.Series(fcst.last_dates) + 4 * pd.offsets.MonthEnd()) test_eq(monthly_res.groupby(
show_doc(_StatsForecast.forecast_fitted_values, =2,
title_level='StatsForecast.forecast_fitted_values') name
# StatsForecast.forecast_fitted_values 方法使用示例
#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import Naive
# 实例化 StatsForecast 类
= StatsForecast(models=[AutoARIMA()], freq='D', n_jobs=1)
fcst
# 访问样本预测
= fcst.forecast(df=panel_df, h=12, fitted=True, level=(90, 10))
fcsts_df = fcst.forecast_fitted_values()
insample_fcsts_df 4) insample_fcsts_df.tail(
#拟合值的测试
def test_fcst_fitted(series, n_jobs=1, str_ds=False):
if str_ds:
= series.copy()
series 'ds'] = series['ds'].astype(str)
series[= StatsForecast(
fitted_fcst =[Naive()],
models='D',
freq=n_jobs,
n_jobs
)= fitted_fcst.forecast(df=series, h=14, fitted=True)
fitted_res = fitted_fcst.forecast_fitted_values()
fitted if str_ds:
'ds']), fitted['ds'])
test_eq(pd.to_datetime(series[else:
'ds'], fitted['ds'])
test_eq(series['y'], fitted['y'])
test_eq(series[
test_fcst_fitted(series)=True) test_fcst_fitted(series, str_ds
#备用模型的测试
def test_fcst_fallback_model(n_jobs=1):
= StatsForecast(
fitted_fcst =[NullModel()],
models='D',
freq=n_jobs,
n_jobs=Naive()
fallback_model
)= fitted_fcst.forecast(df=series, h=14, fitted=True)
fitted_res = fitted_fcst.forecast_fitted_values()
fitted 'ds'], fitted['ds'])
test_eq(series['y'], fitted['y'])
test_eq(series[# 测试空模型实际上失败了
= StatsForecast(
fitted_fcst =[NullModel()],
models='D',
freq=n_jobs,
n_jobs
)lambda: fitted_fcst.forecast(df=series, h=14))
test_fail( test_fcst_fallback_model()
show_doc(_StatsForecast.cross_validation, =2,
title_level='StatsForecast.cross_validation') name
# StatsForecast.crossvalidation 方法使用示例
#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import Naive
# 实例化 StatsForecast 类
= StatsForecast(models=[Naive()],
fcst ='D', n_jobs=1, verbose=True)
freq
# 访问样本预测
= fcst.cross_validation(df=panel_df, h=14, n_windows=2)
rolled_fcsts_df 4) rolled_fcsts_df.head(
#交叉验证测试
= pd.DataFrame({
series_cv 'unique_id': np.array(10 * ['id_0'] + 100 * ['id_1'] + 20 * ['id_2']),
'ds': np.hstack([
='2021-01-01', freq='D', periods=10),
pd.date_range(end='2022-01-01', freq='D', periods=100),
pd.date_range(end='2020-01-01', freq='D', periods=20)
pd.date_range(end
]),'y': np.hstack([np.arange(10.), np.arange(100, 200), np.arange(20, 40)]),
})
= StatsForecast(
fcst =[SumAhead(), Naive()],
models='D',
freq=True,
verbose
)= fcst.cross_validation(df=series_cv, h=2, test_size=5, n_windows=None, level=(50, 60))
res_cv 0., np.mean(res_cv['y'] - res_cv['SumAhead']))
test_eq(
= fcst.cross_validation(df=series_cv, h=2, n_windows=2).groupby('unique_id').size().unique()
n_windows 2 * 2)
test_eq(n_windows, 0., np.mean(res_cv['y'] - res_cv['SumAhead']))
test_eq(
= fcst.cross_validation(df=series_cv, h=3, n_windows=3, step_size=3, fitted=True).groupby('unique_id').size().unique()
n_windows 3 * 3)
test_eq(n_windows, 0., np.mean(res_cv['y'] - res_cv['SumAhead']))
test_eq(
lambda: fcst.cross_validation(df=series_cv, h=10), contains="The following series are too short for the cross validation settings: ['id_0']")
test_fail(lambda: fcst.cross_validation(df=series_cv, h=20), contains="The following series are too short for the cross validation settings: ['id_0', 'id_2']") test_fail(
# 测试交叉验证,不重新拟合
= StatsForecast(
fcst =[SumAhead()],
models='D',
freq=True
verbose
)= fcst.cross_validation(
res_cv_wo_refit =series_cv,
df=2,
h=5,
test_size=None,
n_windows=(50, 60),
level=False,
refit
)=(res_cv_wo_refit, res_cv))
test_fail(test_eq, args= res_cv_wo_refit.columns
cols_wo_refit 'unique_id').head(1), res_cv[cols_wo_refit].groupby('unique_id').head(1))
test_eq(res_cv_wo_refit.groupby(
= fcst.cross_validation(
n_windows =series_cv,
df=2,
h=2,
n_windows=False,
refit'unique_id').size().unique()
).groupby(2 * 2)
test_eq(n_windows,
= fcst.cross_validation(
n_windows =series_cv,
df=3,
h=3,
n_windows=3,
step_size=True,
fitted=False,
refit'unique_id').size().unique()
).groupby(3 * 3) test_eq(n_windows,
# 测试交叉验证,不重新拟合模型。
= StatsForecast(
fcst =[DynamicOptimizedTheta(), AutoCES(),
models=7, alias='test')],
DynamicOptimizedTheta(season_length='D',
freq=True
verbose
)= fcst.cross_validation(
res_cv_wo_refit =series_cv,
df=2,
h=5,
test_size=None,
n_windows=(50, 60),
level=False,
refit
)= fcst.cross_validation(
res_cv_w_refit =series_cv,
df=2,
h=5,
test_size=None,
n_windows=(50, 60),
level=True,
refit
)=(res_cv_wo_refit, res_cv_w_refit))
test_fail(test_eq, args
test_eq('unique_id').head(1),
res_cv_wo_refit.groupby('unique_id').head(1)
res_cv_w_refit.groupby( )
# 不带日期时间作为日期时间的测试系列
= series_cv.copy()
series_cv_wo_dt 'ds'] = series_cv_wo_dt['ds'].astype(str)
series_cv_wo_dt[= StatsForecast(
fcst =[SumAhead(), Naive()],
models='D',
freq=False
verbose
)= fcst.cross_validation(
res_cv_wo_dt =series_cv_wo_dt,
df=2,
h=5,
test_size=None,
n_windows=(50, 60),
level
) test_eq(res_cv, res_cv_wo_dt)
#测试等端交叉验证
= pd.DataFrame({
series_cv 'unique_id': np.hstack([np.zeros(10), np.zeros(100) + 1, np.zeros(20) + 2]).astype('int64'),
'ds': np.hstack([
='2022-01-01', freq='D', periods=10),
pd.date_range(end='2022-01-01', freq='D', periods=100),
pd.date_range(end='2022-01-01', freq='D', periods=20)
pd.date_range(end
]),'y': np.hstack([np.arange(10), np.arange(100, 200), np.arange(20, 40)]),
})= StatsForecast(
fcst =[SumAhead()],
models='D',
freq
)= fcst.cross_validation(
res_cv =series_cv,
df=2,
h=5,
test_size=None,
n_windows=(50,60),
level=True,
fitted
)0., np.mean(res_cv['y'] - res_cv['SumAhead']))
test_eq(
= fcst.cross_validation(
n_windows =series_cv,
df=2,
h=2,
n_windows'unique_id').size().unique()
).groupby(2 * 2)
test_eq(n_windows, 0., np.mean(res_cv['y'] - res_cv['SumAhead']))
test_eq(
= fcst.cross_validation(
n_windows =series_cv,
df=3,
h=3,
n_windows=3,
step_size'unique_id').size().unique()
).groupby(3 * 3)
test_eq(n_windows, 0., np.mean(res_cv['y'] - res_cv['SumAhead'])) test_eq(
show_doc(_StatsForecast.cross_validation_fitted_values, =2,
title_level='StatsForecast.cross_validation_fitted_values') name
# StatsForecast.cross_validation_fitted_values 方法使用示例
#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import Naive
# 实例化 StatsForecast 类
= StatsForecast(models=[Naive()],
fcst ='D', n_jobs=1)
freq
# 访问样本预测
= fcst.cross_validation(df=panel_df, h=12, n_windows=2, fitted=True)
rolled_fcsts_df = fcst.cross_validation_fitted_values()
insample_rolled_fcsts_df 4) insample_rolled_fcsts_df.tail(
#拟合值交叉验证测试
def test_cv_fitted(series_cv, n_jobs=1, str_ds=False):
if str_ds:
= series_cv.copy()
series_cv 'ds'] = series_cv['ds'].astype(str)
series_cv[= StatsForecast(
resids_fcst =[SumAhead(), Naive()],
models='D',
freq=n_jobs
n_jobs
)= resids_fcst.cross_validation(df=series_cv, h=2, n_windows=4, fitted=True)
resids_res_cv = resids_fcst.cross_validation_fitted_values()
resids_cv
np.testing.assert_array_equal('cutoff'].unique(),
resids_cv['cutoff'].unique()
resids_res_cv[
)if str_ds:
'ds'] = pd.to_datetime(series_cv['ds'])
series_cv[for uid in resids_cv['unique_id'].unique():
= resids_cv[resids_cv['unique_id'].eq(uid)]
resids_uid for cutoff in resids_uid['cutoff'].unique():
pd.testing.assert_frame_equal('cutoff == @cutoff')[['unique_id', 'ds', 'y']].reset_index(drop=True),
resids_uid.query('ds <= @cutoff & unique_id == @uid')[['unique_id', 'ds', 'y']].reset_index(drop=True),
series_cv.query(=False
check_dtype
)
test_cv_fitted(series_cv)=True) test_cv_fitted(series_cv, str_ds
#备用模型的测试
def test_cv_fallback_model(n_jobs=1):
= StatsForecast(
fitted_fcst =[NullModel()],
models='D',
freq=n_jobs,
n_jobs=Naive()
fallback_model
)= fitted_fcst.cross_validation(df=series, h=2, n_windows=4, fitted=True)
fitted_res = fitted_fcst.cross_validation_fitted_values()
fitted # 测试空模型实际上失败了
= StatsForecast(
fitted_fcst =[NullModel()],
models='D',
freq=n_jobs,
n_jobs
)lambda: fitted_fcst.cross_validation(df=series, h=12, n_windows=4),
test_fail(='got an unexpected keyword argument')
contains test_cv_fallback_model()
show_doc(_StatsForecast.plot, =2,
title_level='StatsForecast.plot') name
=2, name='StatsForecast.save') show_doc(StatsForecast.save, title_level
=2, name='StatsForecast.load') show_doc(StatsForecast.load, title_level
=series)
fcst.fit(df
test_eq(=12),
fcst.predict(h=series, h=12)
fcst.forecast(df )
test_eq(=series, h=12),
fcst.fit_predict(df=series, h=12)
fcst.forecast(df )
# 用于一致性预测的测试
= series.index.unique()[:10]
uids = series.query('unique_id in @uids')[['unique_id', 'ds', 'y']]
series_subset = StatsForecast(
sf =[SeasonalNaive(season_length=7)],
models='D',
freq=1,
n_jobs
)= sf.fit(df=series_subset, prediction_intervals=ConformalIntervals(h=12))
sf
test_eq(=12, level=[80, 90]),
sf.predict(h=series_subset, h=12, level=[80, 90], prediction_intervals=ConformalIntervals(h=12)),
sf.fit_predict(df
)
test_eq(=12, level=[80, 90]),
sf.predict(h=series_subset, h=12, level=[80, 90], prediction_intervals=ConformalIntervals(h=12)),
sf.forecast(df
)
# 当未指定级别时,会引发测试错误/警告
= ConformalIntervals(h=12)
intervals = StatsForecast(
sf2 =[ADIDA()],
models='D',
freq=1,
n_jobs
)=series_subset, prediction_intervals=intervals)
sf2.fit(dflambda: sf2.predict(h=12))
test_warns(lambda: sf2.forecast(df=series_subset, h=12, prediction_intervals=intervals))
test_fail(lambda: sf2.fit_predict(df=series_subset, h=12, prediction_intervals=intervals))
test_fail(lambda: sf2.cross_validation(df=series_subset, h=12, prediction_intervals=intervals)) test_fail(
# 测试保形交叉验证
= sf.cross_validation(
cv_conformal =series_subset,
df=12,
h=2,
n_windows=[80, 90],
level=ConformalIntervals(h=12),
prediction_intervals
)= sf.cross_validation(
cv_no_conformal =series_subset,
df=12,
h=2,
n_windows=[80, 90],
level
)
test_eq(
cv_conformal.columns,
cv_no_conformal.columns,
)
test_eq(filter(regex='ds|cutoff|y|AutoARIMA$'),
cv_conformal.filter(regex='ds|cutoff|y|AutoARIMA$')
cv_no_conformal. )
= StatsForecast(
fcst =[ADIDA(), SimpleExponentialSmoothing(0.1),
models
HistoricAverage(), CrostonClassic()],='D',
freq=1
n_jobs
)= fcst.forecast(df=series, h=14) res
#| 评估: 错误
#并行处理测试
= StatsForecast(
fcst =[ADIDA(), SimpleExponentialSmoothing(0.1),
models
HistoricAverage(), CrostonClassic()],='D',
freq=-1
n_jobs
)= fcst.forecast(df=series, h=14)
res = fcst.cross_validation(df=series, h=3, test_size=10, n_windows=None)
res_cv = StatsForecast(
fcst =[SumAhead()],
models='D',
freq
)= fcst.cross_validation(df=series_cv, h=2, test_size=5, n_windows=None)
res_cv 0., np.mean(res_cv['y'] - res_cv['SumAhead']))
test_eq(
=-1)
test_fcst_fitted(series, n_jobs=-1)
test_cv_fitted(series_cv, n_jobs=-1, str_ds=True)
test_fcst_fitted(series, n_jobs=-1, str_ds=True)
test_cv_fitted(series_cv, n_jobs# 检查 n_windows 参数
= fcst.cross_validation(df=series_cv, h=2, n_windows=2).groupby('unique_id').size().unique()
n_windows 2 * 2)
test_eq(n_windows, 0., np.mean(res_cv['y'] - res_cv['SumAhead']))
test_eq(# 检查step_size参数
= fcst.cross_validation(df=series_cv, h=3, n_windows=3, step_size=3).groupby('unique_id').size().unique()
n_windows 3 * 3)
test_eq(n_windows, 0., np.mean(res_cv['y'] - res_cv['SumAhead'])) test_eq(
杂项
整数日期戳
StatsForecast
类还可以接收整数作为日期戳,以下示例展示了如何做到这一点。
# 从statsforecast.core导入StatsForecast
from statsforecast.utils import AirPassengers as ap
from statsforecast.models import HistoricAverage
= pd.DataFrame({'ds': np.arange(1, len(ap) + 1), 'y': ap})
int_ds_df 0, 'unique_id', 'AirPassengers')
int_ds_df.insert( int_ds_df.head()
int_ds_df.tail()
int_ds_df
= StatsForecast(models=[HistoricAverage()], freq=1)
fcst = 7
horizon = fcst.forecast(df=int_ds_df, h=horizon)
forecast forecast.head()
= int_ds_df['ds'].max()
last_date 'ds'].values, np.arange(last_date + 1, last_date + 1 + horizon)) test_eq(forecast[
= fcst.cross_validation(df=int_ds_df, h=7, test_size=8, n_windows=None)
int_ds_cv int_ds_cv
外部回归变量
每个y之后的列都被视为外部回归变量,并将传递给允许使用它们的模型。如果使用这些外部回归变量,您必须向StatsForecast.forecast
方法提供未来值。
class LinearRegression(_TS):
def __init__(self):
pass
def fit(self, y, X):
self.coefs_, *_ = np.linalg.lstsq(X, y, rcond=None)
return self
def predict(self, h, X):
= X @ coefs
mean return mean
def __repr__(self):
return 'LinearRegression()'
def forecast(self, y, h, X=None, X_future=None, fitted=False):
*_ = np.linalg.lstsq(X, y, rcond=None)
coefs, return {'mean': X_future @ coefs}
def new(self):
= type(self).__new__(type(self))
b self.__dict__)
b.__dict__.update(return b
= series = generate_series(10_000, equal_ends=True)
series_xreg 'intercept'] = 1
series_xreg['dayofweek'] = series_xreg['ds'].dt.dayofweek
series_xreg[= pd.get_dummies(series_xreg, columns=['dayofweek'], drop_first=True)
series_xreg series_xreg
= sorted(series_xreg['ds'].unique())
dates = dates[-14]
valid_start = series_xreg['ds'] < valid_start
train_mask = series_xreg[train_mask]
series_train = series_xreg[~train_mask]
series_valid = series_valid.drop(columns=['y'])
X_valid = StatsForecast(
fcst =[LinearRegression()],
models='D',
freq
)= fcst.forecast(df=series_train, h=14, X_df=X_valid)
xreg_res 'y'] = series_valid['y'].values xreg_res[
='unique_id').groupby('ds').mean().plot() xreg_res.drop(columns
= fcst.cross_validation(df=series_train, h=3, test_size=5, n_windows=None) xreg_res_cv
# 以下单元格包含对外部回归量的测试
class ReturnX(_TS):
def __init__(self):
pass
def fit(self, y, X):
return self
def predict(self, h, X):
= X
mean return X
def __repr__(self):
return 'ReturnX'
def forecast(self, y, h, X=None, X_future=None, fitted=False):
return {'mean': X_future.flatten()}
def new(self):
= type(self).__new__(type(self))
b self.__dict__)
b.__dict__.update(return b
= pd.DataFrame(
df
{'unique_id': [0] * 10 + [1] * 10,
'ds': np.hstack([np.arange(10), np.arange(10)]),
'y': np.random.rand(20),
'x': np.arange(20, dtype=np.float64),
}
)= df['ds'] < 6
train_mask = df[train_mask]
train_df = df[~train_mask] test_df
def test_x_vars(n_jobs=1):
= StatsForecast(
fcst =[ReturnX()],
models=1,
freq=n_jobs,
n_jobs
)= test_df.drop(columns='y')
xreg = fcst.forecast(df=train_df, h=4, X_df=xreg)
res = xreg.rename(columns={'x': 'ReturnX'})
expected_res
pd.testing.assert_frame_equal(
res,=True),
expected_res.reset_index(drop=False,
check_dtype
)=1) test_x_vars(n_jobs
#| 评估: 错误
=2) test_x_vars(n_jobs
预测区间
您可以将参数 level
传递给 StatsForecast.forecast
方法,以计算预测区间。并非所有模型目前都能计算预测区间,因此我们只能获得那些已实现此功能模型的区间。
= pd.DataFrame({'ds': np.arange(ap.size), 'y': ap})
ap_df 'unique_id'] = 0
ap_df[= StatsForecast(
sf =[
models=12),
SeasonalNaive(season_length=12)
AutoARIMA(season_length
],=1,
freq=1
n_jobs
)= sf.forecast(df=ap_df, h=12, level=(80, 95))
ap_ci =[80], engine="matplotlib") fcst.plot(ap_df, ap_ci, level
适应性预测区间
您还可以使用以下代码添加符合间隔。
from statsforecast.utils import ConformalIntervals
= StatsForecast(
sf =[
models=12),
AutoARIMA(season_length
AutoARIMA(=12,
season_length=ConformalIntervals(n_windows=2, h=12),
prediction_intervals='ConformalAutoARIMA'
alias
),
],=1,
freq=1
n_jobs
)= sf.forecast(df=ap_df, h=12, level=(80, 95))
ap_ci =[80], engine="plotly") fcst.plot(ap_df, ap_ci, level
您还可以为所有支持的模型计算保形区间,使用以下内容,
= StatsForecast(
sf =[
models=12),
AutoARIMA(season_length
],=1,
freq=1
n_jobs
)= sf.forecast(
ap_ci =ap_df,
df=12,
h=(50, 80, 95),
level=ConformalIntervals(h=12),
prediction_intervals
)=[80], engine="matplotlib") fcst.plot(ap_df, ap_ci, level
def test_conf_intervals(n_jobs=1):
= pd.DataFrame(
ap_df
{'unique_id': [0] * ap.size,
'ds': np.arange(ap.size),
'y': ap
}
)= StatsForecast(
fcst =[
models=12),
SeasonalNaive(season_length=12)
AutoARIMA(season_length
],=1,
freq=n_jobs
n_jobs
)= fcst.forecast(df=ap_df, h=12, level=(80, 95))
ap_ci ='unique_id').set_index('ds').plot(marker='.', figsize=(10, 6))
ap_ci.drop(columns=1) test_conf_intervals(n_jobs
#| 评估: 错误
#测试任务数量大于可用核心数
=101) test_conf_intervals(n_jobs
Give us a ⭐ on Github