%load_ext autoreload
%autoreload 2
核心
HierarchicalForecast包含纯Python实现的层次化调和方法,以及一个core.HierarchicalReconciliation
包装类,能够通过包含层次化时间序列和基础预测的pandas DataFrame轻松与这些方法进行交互。
core.HierarchicalReconciliation
调和类使用层次化时间序列pd.DataFrame Y_df
,基础预测pd.DataFrame Y_hat_df
,以及聚合约束矩阵S
。有关创建聚合约束矩阵的更多信息,请参见utils 聚合方法。
import re
import gc
import time
import copy
from hierarchicalforecast.methods import HReconciler
from inspect import signature
from scipy.stats import norm
from scipy import sparse
from typing import Dict, List, Optional
import warnings
import numpy as np
import pandas as pd
from fastcore.test import test_close, test_eq, test_fail
from nbdev.showdoc import add_docs, show_doc
def _build_fn_name(fn) -> str:
= type(fn).__name__
fn_name = fn.__dict__
func_params
# 将默认参数从名称中移除
= ['insample', 'num_threads']
args_to_remove if not func_params.get('nonnegative', False):
'nonnegative')
args_to_remove.append(
if fn_name == 'MinTrace' and \
'method']=='mint_shrink':
func_params[if func_params['mint_shr_ridge'] == 2e-8:
+= ['mint_shr_ridge']
args_to_remove
= [f'{name}-{value}' for name, value in func_params.items() if name not in args_to_remove]
func_params if func_params:
+= '_' + '_'.join(func_params)
fn_name return fn_name
# 测试函数名称
from hierarchicalforecast.methods import BottomUp, MinTrace
'BottomUp')
test_eq(_build_fn_name(BottomUp()),
test_eq(='ols')),
_build_fn_name(MinTrace(method'MinTrace_method-ols'
)
test_eq(='ols', nonnegative=True)),
_build_fn_name(MinTrace(method'MinTrace_method-ols_nonnegative-True'
)
test_eq(='mint_shr')),
_build_fn_name(MinTrace(method'MinTrace_method-mint_shr'
)
核心.层次重建
def _reverse_engineer_sigmah(Y_hat_df, y_hat, model_name):
"""
This function assumes that the model creates prediction intervals
under a normality with the following the Equation:
$\hat{y}_{t+h} + c \hat{sigma}_{h}$
In the future, we might deprecate this function in favor of a
direct usage of an estimated $\hat{sigma}_{h}$
"""
= ['ds']
drop_cols if 'y' in Y_hat_df.columns:
'y')
drop_cols.append(if model_name+'-median' in Y_hat_df.columns:
+'-median')
drop_cols.append(model_name= Y_hat_df.drop(columns=drop_cols, axis=1).columns.to_list()
model_names = [name for name in model_names if ('-lo' in name or '-hi' in name)]
pi_model_names = [pi_name for pi_name in pi_model_names if model_name in pi_name]
pi_model_name = len(pi_model_name) > 0
pi
= len(Y_hat_df.index.unique())
n_series
if not pi:
raise Exception(f'Please include `{model_name}` prediction intervals in `Y_hat_df`')
= pi_model_name[0]
pi_col = -1 if 'lo' in pi_col else 1
sign = re.findall('[\d]+[.,\d]+|[\d]*[.][\d]+|[\d]+', pi_col)
level_col = float(level_col[-1])
level_col = norm.ppf(0.5 + level_col / 200)
z = Y_hat_df[pi_col].values.reshape(n_series,-1)
sigmah = sign * (sigmah - y_hat) / z
sigmah
return sigmah
class HierarchicalReconciliation:
"""Hierarchical Reconciliation Class.
The `core.HierarchicalReconciliation` class allows you to efficiently fit multiple
HierarchicaForecast methods for a collection of time series and base predictions stored in
pandas DataFrames. The `Y_df` dataframe identifies series and datestamps with the unique_id and ds columns while the
y column denotes the target time series variable. The `Y_h` dataframe stores the base predictions,
example ([AutoARIMA](https://nixtla.github.io/statsforecast/models.html#autoarima), [ETS](https://nixtla.github.io/statsforecast/models.html#autoets), etc.).
**Parameters:**<br>
`reconcilers`: A list of instantiated classes of the [reconciliation methods](https://nixtla.github.io/hierarchicalforecast/methods.html) module .<br>
**References:**<br>
[Rob J. Hyndman and George Athanasopoulos (2018). \"Forecasting principles and practice, Hierarchical and Grouped Series\".](https://otexts.com/fpp3/hierarchical.html)
"""
def __init__(self,
reconcilers: List[HReconciler]):self.reconcilers = reconcilers
self.orig_reconcilers = copy.deepcopy(reconcilers) # 待办事项:优雅的解决方案
self.insample = any([method.insample for method in reconcilers])
def _prepare_fit(self,
Y_hat_df: pd.DataFrame,
S_df: pd.DataFrame,
Y_df: Optional[pd.DataFrame],str, np.ndarray],
tags: Dict[int]] = None,
level: Optional[List[str = 'normality',
intervals_method: bool = True):
sort_df: """
进行初步整理和保护
"""
#-------------------------------- 匹配 Y_hat/Y/S 索引顺序 --------------------------------#
if sort_df:
= Y_hat_df.reset_index()
Y_hat_df = Y_hat_df.unique_id.astype('category')
Y_hat_df.unique_id = Y_hat_df.unique_id.cat.set_categories(S_df.index)
Y_hat_df.unique_id = Y_hat_df.sort_values(by=['unique_id', 'ds'])
Y_hat_df = Y_hat_df.set_index('unique_id')
Y_hat_df
if Y_df is not None:
= Y_df.reset_index()
Y_df = Y_df.unique_id.astype('category')
Y_df.unique_id = Y_df.unique_id.cat.set_categories(S_df.index)
Y_df.unique_id = Y_df.sort_values(by=['unique_id', 'ds'])
Y_df = Y_df.set_index('unique_id')
Y_df
= pd.CategoricalIndex(S_df.index, categories=S_df.index)
S_df.index
#----------------------------------- Check Input's Validity ----------------------------------#
# Check input's validity
if intervals_method not in ['normality', 'bootstrap', 'permbu']:
raise ValueError(f'Unkwon interval method: {intervals_method}')
if self.insample or (intervals_method in ['bootstrap', 'permbu']):
if Y_df is None:
raise Exception('you need to pass `Y_df`')
# 保护级别列表
if (level is not None):
= np.any((np.array(level) < 0)|(np.array(level) >= 100 ))
level_outside_domain if level_outside_domain and (intervals_method in ['normality', 'permbu']):
raise Exception('Level outside domain, send `level` list in [0,100)')
# 声明输出名称
= ['ds', 'y'] if 'y' in Y_hat_df.columns else ['ds']
drop_cols = Y_hat_df.drop(columns=drop_cols, axis=1).columns.to_list()
model_names
# 确保数值列
if not len(Y_hat_df[model_names].select_dtypes(include='number').columns) == len(Y_hat_df[model_names].columns):
raise Exception('`Y_hat_df`s columns contain non numeric types')
#确保无空值
if Y_hat_df[model_names].isnull().values.any():
raise Exception('`Y_hat_df` contains null values')
= [name for name in model_names if ('-lo' in name or '-hi' in name or '-median' in name)]
pi_model_names = [name for name in model_names if name not in pi_model_names]
model_names
# 待办事项:完成y_hat_insample保护
if intervals_method in ['bootstrap', 'permbu'] and Y_df is not None:
if not (set(model_names) <= set(Y_df.columns)):
raise Exception('Check `Y_hat_df`s models are included in `Y_df` columns')
= Y_hat_df.index.unique()
uids
# 检查Y_hat_df与S_df序列之间的差异
= len(S_df.index.difference(uids))
S_diff = len(Y_hat_df.index.difference(S_df.index.unique()))
Y_hat_diff if S_diff > 0 or Y_hat_diff > 0:
raise Exception(f'Check `S_df`, `Y_hat_df` series difference, S\Y_hat={S_diff}, Y_hat\S={Y_hat_diff}')
if Y_df is not None:
# 检查Y_hat_df与Y_df序列之间的差异
= len(Y_df.index.difference(uids))
Y_diff = len(Y_hat_df.index.difference(Y_df.index.unique()))
Y_hat_diff if Y_diff > 0 or Y_hat_diff > 0:
raise Exception(f'Check `Y_hat_df`, `Y_df` series difference, Y_hat\Y={Y_hat_diff}, Y\Y_hat={Y_diff}')
# Same Y_hat_df/S_df/Y_df's unique_id order to prevent errors
= S_df.loc[uids]
S_df
return Y_hat_df, S_df, Y_df, model_names
def reconcile(self,
Y_hat_df: pd.DataFrame,
S: pd.DataFrame,str, np.ndarray],
tags: Dict[= None,
Y_df: Optional[pd.DataFrame] int]] = None,
level: Optional[List[str = 'normality',
intervals_method: int = -1,
num_samples: int = 0,
seed: bool = True,
sort_df: bool = False,
is_balanced:
):"""Hierarchical Reconciliation Method.
The `reconcile` method is analogous to SKLearn `fit_predict` method, it
applies different reconciliation techniques instantiated in the `reconcilers` list.
Most reconciliation methods can be described by the following convenient
linear algebra notation:
$$\\tilde{\mathbf{y}}_{[a,b],\\tau} = \mathbf{S}_{[a,b][b]} \mathbf{P}_{[b][a,b]} \hat{\mathbf{y}}_{[a,b],\\tau}$$
where $a, b$ represent the aggregate and bottom levels, $\mathbf{S}_{[a,b][b]}$ contains
the hierarchical aggregation constraints, and $\mathbf{P}_{[b][a,b]}$ varies across
reconciliation methods. The reconciled predictions are $\\tilde{\mathbf{y}}_{[a,b],\\tau}$, and the
base predictions $\hat{\mathbf{y}}_{[a,b],\\tau}$.
**Parameters:**<br>
`Y_hat_df`: pd.DataFrame, base forecasts with columns `ds` and models to reconcile indexed by `unique_id`.<br>
`Y_df`: pd.DataFrame, training set of base time series with columns `['ds', 'y']` indexed by `unique_id`.<br>
If a class of `self.reconciles` receives `y_hat_insample`, `Y_df` must include them as columns.<br>
`S`: pd.DataFrame with summing matrix of size `(base, bottom)`, see [aggregate method](https://nixtla.github.io/hierarchicalforecast/utils.html#aggregate).<br>
`tags`: Each key is a level and its value contains tags associated to that level.<br>
`level`: positive float list [0,100), confidence levels for prediction intervals.<br>
`intervals_method`: str, method used to calculate prediction intervals, one of `normality`, `bootstrap`, `permbu`.<br>
`num_samples`: int=-1, if positive return that many probabilistic coherent samples.
`seed`: int=0, random seed for numpy generator's replicability.<br>
`sort_df` : bool (default=True), if True, sort `df` by [`unique_id`,`ds`].<br>
`is_balanced`: bool=False, wether `Y_df` is balanced, set it to True to speed things up if `Y_df` is balanced.<br>
**Returns:**<br>
`Y_tilde_df`: pd.DataFrame, with reconciled predictions.
"""
# 检查输入的有效性并排序数据框
self.model_names = \
Y_hat_df, S_df, Y_df, self._prepare_fit(Y_hat_df=Y_hat_df,
=S,
S_df=Y_df,
Y_df=tags,
tags=level,
level=intervals_method,
intervals_method=sort_df)
sort_df
# 初始化协调器参数
= dict(
reconciler_args =S_df.index.get_indexer(S.columns),
idx_bottom={key: S_df.index.get_indexer(val) for key, val in tags.items()}
tags
)
= any([method.is_sparse_method for method in self.reconcilers])
any_sparse if any_sparse:
try:
= sparse.csr_matrix(S_df.sparse.to_coo())
S_for_sparse except AttributeError:
'Using dense S matrix for sparse reconciliation method.')
warnings.warn(= S_df.values.astype(np.float32)
S_for_sparse
if Y_df is not None:
if is_balanced:
= Y_df['y'].values.reshape(len(S_df), -1).astype(np.float32)
y_insample else:
= Y_df.pivot(columns='ds', values='y').loc[S_df.index].values.astype(np.float32)
y_insample 'y_insample'] = y_insample
reconciler_args[
= Y_hat_df.copy()
Y_tilde_df= time.time()
start self.execution_times = {}
self.level_names = {}
self.sample_names = {}
for reconciler, name_copy in zip(self.reconcilers, self.orig_reconcilers):
= _build_fn_name(name_copy)
reconcile_fn_name
if reconciler.is_sparse_method:
"S"] = S_for_sparse
reconciler_args[else:
"S"] = S_df.values.astype(np.float32)
reconciler_args[
= 'y_hat_insample' in signature(reconciler.fit_predict).parameters
has_fitted = 'level' in signature(reconciler.fit_predict).parameters
has_level
for model_name in self.model_names:
= f'{model_name}/{reconcile_fn_name}'
recmodel_name = Y_hat_df[model_name].values.reshape(len(S_df), -1).astype(np.float32)
y_hat 'y_hat'] = y_hat
reconciler_args[
if (self.insample and has_fitted) or intervals_method in ['bootstrap', 'permbu']:
if is_balanced:
= Y_df[model_name].values.reshape(len(S_df), -1).astype(np.float32)
y_hat_insample else:
= Y_df.pivot(columns='ds', values=model_name).loc[S_df.index].values.astype(np.float32)
y_hat_insample 'y_hat_insample'] = y_hat_insample
reconciler_args[
if has_level and (level is not None):
if intervals_method in ['normality', 'permbu']:
= _reverse_engineer_sigmah(Y_hat_df=Y_hat_df,
sigmah =y_hat, model_name=model_name)
y_hat'sigmah'] = sigmah
reconciler_args[
'intervals_method'] = intervals_method
reconciler_args['num_samples'] = 200 # 待办事项:解决重复的num_samples问题
reconciler_args['seed'] = seed
reconciler_args[
# 均值与概率校准
= [key for key in signature(reconciler.fit_predict).parameters if key in reconciler_args.keys()]
kwargs_ls = {key: reconciler_args[key] for key in kwargs_ls}
kwargs
if (level is not None) and (num_samples > 0):
# Store reconciler's memory to generate samples
= reconciler.fit(**kwargs)
reconciler = reconciler.predict(S=reconciler_args['S'],
fcsts_model =reconciler_args['y_hat'], level=level)
y_hatelse:
# Memory efficient reconciler's fit_predict
= reconciler(**kwargs, level=level)
fcsts_model
# 解析最终输出
= fcsts_model['mean'].flatten()
Y_tilde_df[recmodel_name] if intervals_method in ['bootstrap', 'normality', 'permbu'] and level is not None:
level.sort()= [f'{recmodel_name}-lo-{lv}' for lv in reversed(level)]
lo_names = [f'{recmodel_name}-hi-{lv}' for lv in level]
hi_names self.level_names[recmodel_name] = lo_names + hi_names
= np.reshape(fcsts_model['quantiles'], (len(Y_tilde_df),-1))
sorted_quantiles = pd.DataFrame(sorted_quantiles, index=Y_tilde_df.index,
intervals_df =self.level_names[recmodel_name])
columns= pd.concat([Y_tilde_df, intervals_df], axis=1)
Y_tilde_df
if num_samples > 0:
= reconciler.sample(num_samples=num_samples)
samples self.sample_names[recmodel_name] = [f'{recmodel_name}-sample-{i}' for i in range(num_samples)]
= np.reshape(samples, (len(Y_tilde_df),-1))
samples = pd.DataFrame(samples, index=Y_tilde_df.index,
samples_df =self.sample_names[recmodel_name])
columns= pd.concat([Y_tilde_df, samples_df], axis=1)
Y_tilde_df
del sorted_quantiles
del intervals_df
if self.insample and has_fitted:
del y_hat_insample
gc.collect()
= time.time()
end self.execution_times[f'{model_name}/{reconcile_fn_name}'] = (end - start)
return Y_tilde_df
def bootstrap_reconcile(self,
Y_hat_df: pd.DataFrame,
S_df: pd.DataFrame,str, np.ndarray],
tags: Dict[= None,
Y_df: Optional[pd.DataFrame] int]] = None,
level: Optional[List[str = 'normality',
intervals_method: int = -1,
num_samples: int = 1,
num_seeds: bool = True):
sort_df: """Bootstraped Hierarchical Reconciliation Method.
Applies N times, based on different random seeds, the `reconcile` method
for the different reconciliation techniques instantiated in the `reconcilers` list.
**Parameters:**<br>
`Y_hat_df`: pd.DataFrame, base forecasts with columns `ds` and models to reconcile indexed by `unique_id`.<br>
`Y_df`: pd.DataFrame, training set of base time series with columns `['ds', 'y']` indexed by `unique_id`.<br>
If a class of `self.reconciles` receives `y_hat_insample`, `Y_df` must include them as columns.<br>
`S`: pd.DataFrame with summing matrix of size `(base, bottom)`, see [aggregate method](https://nixtla.github.io/hierarchicalforecast/utils.html#aggregate).<br>
`tags`: Each key is a level and its value contains tags associated to that level.<br>
`level`: positive float list [0,100), confidence levels for prediction intervals.<br>
`intervals_method`: str, method used to calculate prediction intervals, one of `normality`, `bootstrap`, `permbu`.<br>
`num_samples`: int=-1, if positive return that many probabilistic coherent samples.
`num_seeds`: int=1, random seed for numpy generator's replicability.<br>
`sort_df` : bool (default=True), if True, sort `df` by [`unique_id`,`ds`].<br>
**Returns:**<br>
`Y_bootstrap_df`: pd.DataFrame, with bootstraped reconciled predictions.
"""
# 检查输入的有效性并排序数据框
self.model_names = \
Y_hat_df, S_df, Y_df, self._prepare_fit(Y_hat_df=Y_hat_df,
=S_df,
S_df=Y_df,
Y_df=tags,
tags=intervals_method,
intervals_method=sort_df)
sort_df
# Bootstrap 调和了预测结果
= []
Y_tilde_list for seed in range(num_seeds):
= self.reconcile(Y_hat_df=Y_hat_df,
Y_tilde_df =S_df,
S=tags,
tags=Y_df,
Y_df=level,
level=intervals_method,
intervals_method=num_samples,
num_samples=seed,
seed=False)
sort_df'seed'] = seed
Y_tilde_df[# 待办事项:修复损坏的recmodel_names
if seed==0:
= Y_tilde_df.columns
first_columns = first_columns
Y_tilde_df.columns
Y_tilde_list.append(Y_tilde_df)
= pd.concat(Y_tilde_list, axis=0)
Y_bootstrap_df del Y_tilde_list
gc.collect()
return Y_bootstrap_df
show_doc(HierarchicalReconciliation, ='init', title_level=3) name
show_doc(HierarchicalReconciliation)
='reconcile', title_level=3) show_doc(HierarchicalReconciliation.reconcile, name
='bootstrap_reconcile', title_level=3) show_doc(HierarchicalReconciliation.bootstrap_reconcile, name
from hierarchicalforecast.methods import (
BottomUp, TopDown, MiddleOut, MinTrace, ERM,
)from hierarchicalforecast.utils import aggregate
= pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/tourism.csv')
df = df.rename({'Trips': 'y', 'Quarter': 'ds'}, axis=1)
df 0, 'Country', 'Australia')
df.insert(
# 非严格等级结构
= [
hierS_grouped_df 'Country'],
['Country', 'State'],
['Country', 'Purpose'],
['Country', 'State', 'Region'],
['Country', 'State', 'Purpose'],
['Country', 'State', 'Region', 'Purpose']
[
]# 严格的等级结构
= [
hiers_strictly 'Country'],
['Country', 'State'],
['Country', 'State', 'Region'],
[
]
# 获取数据框
= aggregate(df, hierS_grouped_df)
hier_grouped_df, S_grouped_df, tags_grouped = aggregate(df, hiers_strictly)
hier_strict_df, S_strict, tags_strict
# 检查分类输入是否产生相同输出
= df.copy()
df2 for col in ['Country', 'State', 'Purpose', 'Region']:
= df2[col].astype('category')
df2[col]
for spec in [hierS_grouped_df, hiers_strictly]:
= aggregate(df, spec)
Y_orig, S_orig, tags_orig = aggregate(df2, spec)
Y_cat, S_cat, tags_cat
pd.testing.assert_frame_equal(Y_cat, Y_orig)
pd.testing.assert_frame_equal(S_cat, S_orig)assert all(np.array_equal(tags_orig[k], tags_cat[k]) for k in tags_orig.keys())
'y_model'] = hier_grouped_df['y']
hier_grouped_df[# 我们应该能够使用这些方法恢复y。
= hier_grouped_df.groupby('unique_id').tail(12)
hier_grouped_hat_df = hier_grouped_hat_df['ds'].unique()
ds_h = hier_grouped_df.query('~(ds in @ds_h)')
hier_grouped_df #向`y_model`添加噪声以避免完全拟合的值
'y_model'] += np.random.uniform(-1, 1, len(hier_grouped_df))
hier_grouped_df[
#层次化协调
= HierarchicalReconciliation(reconcilers=[
hrec #这些方法应能重建原始的y
BottomUp(),='ols'),
MinTrace(method='wls_struct'),
MinTrace(method='wls_var'),
MinTrace(method='mint_shrink'),
MinTrace(method='ols', nonnegative=True),
MinTrace(method='wls_struct', nonnegative=True),
MinTrace(method='wls_var', nonnegative=True),
MinTrace(method='mint_shrink', nonnegative=True),
MinTrace(method# ERM正在恢复,但需要更大的每股收益
#ERM(method='reg_bu', lambda_reg=None),
])= hrec.reconcile(Y_hat_df=hier_grouped_hat_df, Y_df=hier_grouped_df,
reconciled =S_grouped_df, tags=tags_grouped)
Sfor model in reconciled.drop(columns=['ds', 'y']).columns:
if 'ERM' in model:
= 3
eps elif 'nonnegative' in model:
= 1e-1
eps else:
= 1e-1
eps 'y'], reconciled[model], eps=eps) test_close(reconciled[
# 测试不正确的Y_hat_df数据类型
= hier_grouped_hat_df.copy()
hier_grouped_hat_df_nan 'Australia', 'y_model'] = float('nan')
hier_grouped_hat_df_nan.loc[
test_fail(
hrec.reconcile,='null values',
contains=(hier_grouped_hat_df_nan, S_grouped_df, tags_grouped, hier_grouped_df),
args
)
= hier_grouped_hat_df.copy()
hier_grouped_hat_df_none 'Australia', 'y_model'] = None
hier_grouped_hat_df_none.loc[
test_fail(
hrec.reconcile,='null values',
contains=(hier_grouped_hat_df_none, S_grouped_df, tags_grouped, hier_grouped_df),
args
)
= hier_grouped_hat_df.copy()
hier_grouped_hat_df_str 'y_model'] = hier_grouped_hat_df_str['y_model'].astype(str)
hier_grouped_hat_df_str[
test_fail(
hrec.reconcile,='numeric types',
contains=(hier_grouped_hat_df_str, S_grouped_df, tags_grouped, hier_grouped_df),
args )
# 测试预期错误
# 不同的系列 S 和 Y_hat_df
test_fail(
hrec.reconcile,='series difference',
contains=(hier_grouped_hat_df.drop('Australia'), S_grouped_df, tags_grouped, hier_grouped_df),
args
)
test_fail(
hrec.reconcile,='series difference',
contains=(hier_grouped_hat_df, S_grouped_df.drop('Australia'), tags_grouped, hier_grouped_df),
args
)
test_fail(
hrec.reconcile,='series difference',
contains=(hier_grouped_hat_df, S_grouped_df, tags_grouped, hier_grouped_df.drop('Australia')),
args
)
::: {#cell-22 .cell 0=‘隐’ 1=‘藏’}
# 测试预期错误
# 不同的列 Y_df 和 Y_hat_df
= HierarchicalReconciliation(
hrec =[ERM(method='reg_bu', lambda_reg=100)])
reconcilers
test_fail(
hrec.reconcile,='Please include ',
contains=(hier_grouped_hat_df, S_grouped_df, tags_grouped,
args80], 'permbu'), # permbu 需要 y_hat_insample
hier_grouped_df, [ )
:::
# 测试无样本的调和方法
= HierarchicalReconciliation(reconcilers=[
hrec #这些方法应能重构原始的y
BottomUp(),='ols'),
MinTrace(method='wls_struct'),
MinTrace(method='ols', nonnegative=True),
MinTrace(method='wls_struct', nonnegative=True),
MinTrace(method
])= hrec.reconcile(Y_hat_df=hier_grouped_hat_df,
reconciled =S_grouped_df, tags=tags_grouped)
Sfor model in reconciled.drop(columns=['ds', 'y']).columns:
if 'ERM' in model:
= 3
eps elif 'nonnegative' in model:
= 1e-1
eps else:
= 1e-1
eps 'y'], reconciled[model], eps=eps) test_close(reconciled[
# 自上而下应打破
# 在非严格层级结构中
= HierarchicalReconciliation([TopDown(method='average_proportions')])
hrec
test_fail(
hrec.reconcile,='requires strictly hierarchical structures',
contains=(hier_grouped_hat_df, S_grouped_df, tags_grouped, hier_grouped_df,)
args )
# 方法应与
# 严格等级结构
'y_model'] = hier_strict_df['y']
hier_strict_df[# 我们应该能够使用这些方法恢复y。
= hier_strict_df.groupby('unique_id').tail(12)
hier_strict_df_h = hier_strict_df_h['ds'].unique()
ds_h = hier_strict_df.query('~(ds in @ds_h)')
hier_strict_df #向`y_model`添加噪声以避免完全拟合的值
'y_model'] += np.random.uniform(-1, 1, len(hier_strict_df))
hier_strict_df[
= 'Country/State'
middle_out_level # 分层协调
= HierarchicalReconciliation(reconcilers=[
hrec #这些方法应能重建原始的y
BottomUp(),='ols'),
MinTrace(method='wls_struct'),
MinTrace(method='wls_var'),
MinTrace(method='mint_shrink'),
MinTrace(method='ols', nonnegative=True),
MinTrace(method='wls_struct', nonnegative=True),
MinTrace(method='wls_var', nonnegative=True),
MinTrace(method='mint_shrink', nonnegative=True),
MinTrace(method# 自上而下无法恢复原始的y
# 但它应该恢复总水平
='forecast_proportions'),
TopDown(method='average_proportions'),
TopDown(method='proportion_averages'),
TopDown(method# 中间向外的方法无法恢复原始数据
# 但它应该恢复总水平
=middle_out_level, top_down_method='forecast_proportions'),
MiddleOut(middle_level=middle_out_level, top_down_method='average_proportions'),
MiddleOut(middle_level=middle_out_level, top_down_method='proportion_averages'),
MiddleOut(middle_level# ERM正在恢复,但需要更大的每股收益
#ERM(method='reg_bu', lambda_reg=None),
])= hrec.reconcile(
reconciled =hier_strict_df_h,
Y_hat_df=hier_strict_df,
Y_df=S_strict,
S=tags_strict
tags
)for model in reconciled.drop(columns=['ds', 'y']).columns:
if 'ERM' in model:
= 3
eps elif 'nonnegative' in model:
= 1e-1
eps else:
= 1e-1
eps if 'TopDown' in model:
if 'forecast_proportions' in model:
'y'], reconciled[model], eps)
test_close(reconciled[else:
# 自上而下无法恢复原始的y
test_fail(
test_close,=(reconciled['y'], reconciled[model], eps),
args
)# 但它应该恢复总水平
= tags_strict['Country']
total_tag 'y'].loc[total_tag],
test_close(reconciled[1e-2)
reconciled[model].loc[total_tag], elif 'MiddleOut' in model:
if 'forecast_proportions' in model:
'y'], reconciled[model], eps)
test_close(reconciled[else:
# 自上而下无法恢复原始的y
test_fail(
test_close,=(reconciled['y'], reconciled[model], eps),
args
)# 但它应该恢复总水平
= tags_strict[middle_out_level]
total_tag 'y'].loc[total_tag],
test_close(reconciled[1e-2)
reconciled[model].loc[total_tag], else:
'y'], reconciled[model], eps) test_close(reconciled[
# 测试是否平衡行为
= hrec.reconcile(
reconciled_balanced =hier_strict_df_h,
Y_hat_df=hier_strict_df,
Y_df=S_strict,
S=tags_strict,
tags=True,
is_balanced
)='ds').values, reconciled_balanced.drop(columns='ds').values, eps=1e-10) test_close(reconciled.drop(columns
from statsforecast import StatsForecast
from statsforecast.utils import generate_series
from statsforecast.models import RandomWalkWithDrift
# 测试不平衡数据集
= 24
max_tenure = pd.date_range(start='2019-01-31', freq='M', periods=max_tenure)
dates = [24, 23, 22, 21]
cohort_tenure
= []
ts_list
# 为每个队列创建时间序列
for i in range(len(cohort_tenure)):
ts_list.append(=1, freq='M', min_length=cohort_tenure[i], max_length=cohort_tenure[i]).reset_index() \
generate_series(n_series=i) \
.assign(ult=dates[-cohort_tenure[i]:]) \
.assign(ds=['unique_id'])
.drop(columns
)= pd.concat(ts_list, ignore_index=True)
df
# 创建类别
'ult'] < 2, 'pen'] = 'a'
df.loc[df['ult'] >= 2, 'pen'] = 'b'
df.loc[df[# 请注意,唯一标识符需要使用字符串。
'ult'] = df['ult'].astype(str)
df[
= [
hier_levels 'pen'],
['pen', 'ult'],
[
]= aggregate(df=df, spec=hier_levels)
hier_df, S_df, tags
= hier_df.query("ds <= @pd.to_datetime('2019-12-31')")
train_df = hier_df.query("ds > @pd.to_datetime('2019-12-31')")
test_df
= StatsForecast(
fcst =[
models
RandomWalkWithDrift(),
],='M',
freq=1,
n_jobs
)
= HierarchicalReconciliation(
hrec =[
reconcilers
BottomUp(),='mint_shrink'),
MinTrace(method
]
)
= fcst.forecast(df=train_df, h=12, fitted=True)
fcst_df = fcst.forecast_fitted_values()
fitted_df
= hrec.reconcile(
fcst_df =fcst_df,
Y_hat_df=fitted_df,
Y_df=S_df,
S=tags,
tags )
# MinTrace 应中断
# 在模型极度过拟合的情况下,y_model 等于 y。
= hier_grouped_df.copy()
zero_df 'y'] = 0
zero_df['y_model'] = 0
zero_df[= HierarchicalReconciliation([MinTrace(method='mint_shrink')])
hrec
test_fail(
hrec.reconcile,='Insample residuals',
contains=(hier_grouped_hat_df, S_grouped_df, tags_grouped, zero_df)
args )
#不使用残差的测试方法
#即使他们的签名中包含
#那个论点
= HierarchicalReconciliation([MinTrace(method='ols')])
hrec = hrec.reconcile(
reconciled =hier_grouped_hat_df,
Y_hat_df=hier_grouped_df.drop(columns=['y_model']),
Y_df=S_grouped_df,
S=tags_grouped
tags
)for model in reconciled.drop(columns=['ds', 'y']).columns:
'y'], reconciled[model], eps=1e-1) test_close(reconciled[
'Country/State']] reconciled.loc[tags_grouped[
# 具有Bootstrap预测区间的测试方法
= HierarchicalReconciliation([BottomUp()])
hrec = hrec.reconcile(Y_hat_df=hier_grouped_hat_df,
reconciled =hier_grouped_df, S=S_grouped_df, tags=tags_grouped,
Y_df=[80, 90],
level='bootstrap')
intervals_method= reconciled.loc[tags_grouped['Country/State/Region/Purpose']].groupby('ds').sum().reset_index()
total
pd.testing.assert_frame_equal('ds', 'y_model/BottomUp']],
total[['Australia'][['ds', 'y_model/BottomUp']].reset_index(drop=True)
reconciled.loc[
)assert 'y_model/BottomUp-lo-80' in reconciled.columns
# 具有正态预测区间的测试方法
'y_model-lo-80'] = hier_grouped_hat_df['y_model'] - 1.96
hier_grouped_hat_df['y_model-hi-80'] = hier_grouped_hat_df['y_model'] + 1.96
hier_grouped_hat_df[= HierarchicalReconciliation([BottomUp()])
hrec = hrec.reconcile(Y_hat_df=hier_grouped_hat_df,
reconciled =hier_grouped_df, S=S_grouped_df, tags=tags_grouped,
Y_df=[80, 90],
level='normality')
intervals_method= reconciled.loc[tags_grouped['Country/State/Region/Purpose']].groupby('ds').sum().reset_index()
total
pd.testing.assert_frame_equal('ds', 'y_model/BottomUp']],
total[['Australia'][['ds', 'y_model/BottomUp']].reset_index(drop=True)
reconciled.loc[
)assert 'y_model/BottomUp-lo-80' in reconciled.columns
# 具有PERMBU预测区间的测试方法
# 测试预期错误与分组结构
# (非严格层级结构)
'y_model-lo-80'] = hier_grouped_hat_df['y_model'] - 1.96
hier_grouped_hat_df['y_model-hi-80'] = hier_grouped_hat_df['y_model'] + 1.96
hier_grouped_hat_df[= HierarchicalReconciliation([BottomUp()])
hrec
test_fail(
hrec.reconcile,='requires strictly hierarchical structures',
contains=(hier_grouped_hat_df, S_grouped_df, tags_grouped, hier_grouped_df, [80, 90], 'permbu',)
args
)
# 测试 PERMBU
'y_model-lo-80'] = hier_strict_df_h['y_model'] - 1.96
hier_strict_df_h['y_model-hi-80'] = hier_strict_df_h['y_model'] + 1.96
hier_strict_df_h[= HierarchicalReconciliation([BottomUp()])
hrec = hrec.reconcile(Y_hat_df=hier_strict_df_h,
reconciled =hier_strict_df, S=S_strict,
Y_df=tags_grouped,
tags=[80, 90],
level='permbu')
intervals_method= reconciled.loc[tags_grouped['Country/State/Region']].groupby('ds').sum().reset_index()
total
pd.testing.assert_frame_equal('ds', 'y_model/BottomUp']],
total[['Australia'][['ds', 'y_model/BottomUp']].reset_index(drop=True)
reconciled.loc[
)assert 'y_model/BottomUp-lo-80' in reconciled.columns
# 带有Bootstrap预测区间的测试方法
= HierarchicalReconciliation([BottomUp()])
hrec = hrec.bootstrap_reconcile(Y_hat_df=hier_grouped_hat_df,
bootstrap_df =hier_grouped_df, S_df=S_grouped_df, tags=tags_grouped,
Y_df=[80, 90],
level='bootstrap',
intervals_method=2)
num_seedsassert 'y_model/BottomUp-lo-80' in bootstrap_df.columns
assert 'seed' in bootstrap_df.columns
assert len(bootstrap_df.seed.unique())==2
bootstrap_df
# PERMBU和正态概率方法的测试水平保护
= HierarchicalReconciliation([BottomUp()])
hrec
test_fail(
hrec.reconcile,='Level outside domain',
contains=(hier_grouped_hat_df, S_grouped_df, tags_grouped, hier_grouped_df, [-1, 80, 90], 'permbu',)
args
)
test_fail(
hrec.reconcile,='Level outside domain',
contains=(hier_grouped_hat_df, S_grouped_df, tags_grouped, hier_grouped_df, [80, 90, 101], 'normality',)
args )
示例
import numpy as np
import pandas as pd
from statsforecast.core import StatsForecast
from statsforecast.models import ETS, Naive
from hierarchicalforecast.utils import aggregate
from hierarchicalforecast.core import HierarchicalReconciliation
from hierarchicalforecast.methods import BottomUp, MinTrace
# 负载旅游小数据集
= pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/tourism.csv')
df = df.rename({'Trips': 'y', 'Quarter': 'ds'}, axis=1)
df 0, 'Country', 'Australia')
df.insert(
# 根据地理层级和目的创建层级系列
# 并将季度 ds 字符串转换为 pd.datetime 格式
= [['Country'],
hierarchy_levels 'Country', 'State'],
['Country', 'Purpose'],
['Country', 'State', 'Region'],
['Country', 'State', 'Purpose'],
['Country', 'State', 'Region', 'Purpose']]
[
= aggregate(df=df, spec=hierarchy_levels)
Y_df, S_df, tags = Y_df['ds'].str.replace(r'(\d+) (Q\d)', r'\1-\2', regex=True)
qs 'ds'] = pd.PeriodIndex(qs, freq='Q').to_timestamp()
Y_df[= Y_df.reset_index()
Y_df
# 划分训练集/测试集
= Y_df.groupby('unique_id').tail(4)
Y_test_df = Y_df.drop(Y_test_df.index)
Y_train_df
# 计算基础自动ETS预测
# 仔细识别正确的数据频率,此数据为季度数据。 'Q'
= StatsForecast(models=[Naive()], freq='Q', n_jobs=-1)
fcst = fcst.forecast(df=Y_train_df, h=4, fitted=True)
Y_hat_df = fcst.forecast_fitted_values()
Y_fitted_df
# 调和基础预测
= Y_train_df.reset_index().set_index('unique_id')
Y_train_df = Y_hat_df.reset_index().set_index('unique_id')
Y_hat_df = [BottomUp(),
reconcilers ='mint_shrink')]
MinTrace(method= HierarchicalReconciliation(reconcilers=reconcilers)
hrec = hrec.reconcile(Y_hat_df=Y_hat_df,
Y_rec_df =Y_fitted_df,
Y_df=S_df, tags=tags)
S'unique_id', observed=True).head(2) Y_rec_df.groupby(
If you find the code useful, please ⭐ us on Github