核心


%load_ext autoreload
%autoreload 2

HierarchicalForecast包含纯Python实现的层次化调和方法,以及一个core.HierarchicalReconciliation包装类,能够通过包含层次化时间序列和基础预测的pandas DataFrame轻松与这些方法进行交互。

core.HierarchicalReconciliation调和类使用层次化时间序列pd.DataFrame Y_df,基础预测pd.DataFrame Y_hat_df,以及聚合约束矩阵S。有关创建聚合约束矩阵的更多信息,请参见utils 聚合方法


import re
import gc
import time
import copy
from hierarchicalforecast.methods import HReconciler
from inspect import signature
from scipy.stats import norm
from scipy import sparse
from typing import Dict, List, Optional
import warnings

import numpy as np
import pandas as pd

from fastcore.test import test_close, test_eq, test_fail
from nbdev.showdoc import add_docs, show_doc

def _build_fn_name(fn) -> str:
    fn_name = type(fn).__name__
    func_params = fn.__dict__

    # 将默认参数从名称中移除
    args_to_remove = ['insample', 'num_threads']
    if not func_params.get('nonnegative', False):
        args_to_remove.append('nonnegative')

    if fn_name == 'MinTrace' and \
        func_params['method']=='mint_shrink':
        if func_params['mint_shr_ridge'] == 2e-8:
            args_to_remove += ['mint_shr_ridge']

    func_params = [f'{name}-{value}' for name, value in func_params.items() if name not in args_to_remove]
    if func_params:
        fn_name += '_' + '_'.join(func_params)
    return fn_name

# 测试函数名称
from hierarchicalforecast.methods import BottomUp, MinTrace

test_eq(_build_fn_name(BottomUp()), 'BottomUp')
test_eq(
    _build_fn_name(MinTrace(method='ols')), 
    'MinTrace_method-ols'
)
test_eq(
    _build_fn_name(MinTrace(method='ols', nonnegative=True)), 
    'MinTrace_method-ols_nonnegative-True'
)
test_eq(
    _build_fn_name(MinTrace(method='mint_shr')), 
    'MinTrace_method-mint_shr'
)

核心.层次重建


def _reverse_engineer_sigmah(Y_hat_df, y_hat, model_name):
    """
    This function assumes that the model creates prediction intervals
    under a normality with the following the Equation:
    $\hat{y}_{t+h} + c \hat{sigma}_{h}$

    In the future, we might deprecate this function in favor of a 
    direct usage of an estimated $\hat{sigma}_{h}$
    """

    drop_cols = ['ds']
    if 'y' in Y_hat_df.columns:
        drop_cols.append('y')
    if model_name+'-median' in Y_hat_df.columns:
        drop_cols.append(model_name+'-median')
    model_names = Y_hat_df.drop(columns=drop_cols, axis=1).columns.to_list()
    pi_model_names = [name for name in model_names if ('-lo' in name or '-hi' in name)]
    pi_model_name = [pi_name for pi_name in pi_model_names if model_name in pi_name]
    pi = len(pi_model_name) > 0

    n_series = len(Y_hat_df.index.unique())

    if not pi:
        raise Exception(f'Please include `{model_name}` prediction intervals in `Y_hat_df`')

    pi_col = pi_model_name[0]
    sign = -1 if 'lo' in pi_col else 1
    level_col = re.findall('[\d]+[.,\d]+|[\d]*[.][\d]+|[\d]+', pi_col)
    level_col = float(level_col[-1])
    z = norm.ppf(0.5 + level_col / 200)
    sigmah = Y_hat_df[pi_col].values.reshape(n_series,-1)
    sigmah = sign * (sigmah - y_hat) / z

    return sigmah

class HierarchicalReconciliation:
    """Hierarchical Reconciliation Class.

    The `core.HierarchicalReconciliation` class allows you to efficiently fit multiple 
    HierarchicaForecast methods for a collection of time series and base predictions stored in 
    pandas DataFrames. The `Y_df` dataframe identifies series and datestamps with the unique_id and ds columns while the
    y column denotes the target time series variable. The `Y_h` dataframe stores the base predictions, 
    example ([AutoARIMA](https://nixtla.github.io/statsforecast/models.html#autoarima), [ETS](https://nixtla.github.io/statsforecast/models.html#autoets), etc.).

    **Parameters:**<br>
    `reconcilers`: A list of instantiated classes of the [reconciliation methods](https://nixtla.github.io/hierarchicalforecast/methods.html) module .<br>

    **References:**<br>
    [Rob J. Hyndman and George Athanasopoulos (2018). \"Forecasting principles and practice, Hierarchical and Grouped Series\".](https://otexts.com/fpp3/hierarchical.html)
    """
    def __init__(self,
                 reconcilers: List[HReconciler]):
        self.reconcilers = reconcilers
        self.orig_reconcilers = copy.deepcopy(reconcilers) # 待办事项:优雅的解决方案
        self.insample = any([method.insample for method in reconcilers])
    
    def _prepare_fit(self,
                     Y_hat_df: pd.DataFrame,
                     S_df: pd.DataFrame,
                     Y_df: Optional[pd.DataFrame],
                     tags: Dict[str, np.ndarray],
                     level: Optional[List[int]] = None,
                     intervals_method: str = 'normality',
                     sort_df: bool = True):
        """
        进行初步整理和保护
        """
        #-------------------------------- 匹配 Y_hat/Y/S 索引顺序 --------------------------------#
        if sort_df:
            Y_hat_df = Y_hat_df.reset_index()
            Y_hat_df.unique_id = Y_hat_df.unique_id.astype('category')
            Y_hat_df.unique_id = Y_hat_df.unique_id.cat.set_categories(S_df.index)
            Y_hat_df = Y_hat_df.sort_values(by=['unique_id', 'ds'])
            Y_hat_df = Y_hat_df.set_index('unique_id')

            if Y_df is not None:
                Y_df = Y_df.reset_index()
                Y_df.unique_id = Y_df.unique_id.astype('category')
                Y_df.unique_id = Y_df.unique_id.cat.set_categories(S_df.index)
                Y_df = Y_df.sort_values(by=['unique_id', 'ds'])
                Y_df = Y_df.set_index('unique_id')

            S_df.index = pd.CategoricalIndex(S_df.index, categories=S_df.index)

        #----------------------------------- Check Input's Validity ----------------------------------#
        # Check input's validity
        if intervals_method not in ['normality', 'bootstrap', 'permbu']:
            raise ValueError(f'Unkwon interval method: {intervals_method}')

        if self.insample or (intervals_method in ['bootstrap', 'permbu']):
            if Y_df is None:
                raise Exception('you need to pass `Y_df`')
        
        # 保护级别列表
        if (level is not None):
            level_outside_domain = np.any((np.array(level) < 0)|(np.array(level) >= 100 ))
            if level_outside_domain and (intervals_method in ['normality', 'permbu']):
                raise Exception('Level outside domain, send `level` list in [0,100)')

        # 声明输出名称
        drop_cols = ['ds', 'y'] if 'y' in Y_hat_df.columns else ['ds']
        model_names = Y_hat_df.drop(columns=drop_cols, axis=1).columns.to_list()

        # 确保数值列
        if not len(Y_hat_df[model_names].select_dtypes(include='number').columns) == len(Y_hat_df[model_names].columns):
            raise Exception('`Y_hat_df`s columns contain non numeric types')
            
        #确保无空值
        if Y_hat_df[model_names].isnull().values.any():
            raise Exception('`Y_hat_df` contains null values')
        
        pi_model_names = [name for name in model_names if ('-lo' in name or '-hi' in name or '-median' in name)]
        model_names = [name for name in model_names if name not in pi_model_names]
        
        # 待办事项:完成y_hat_insample保护
        if intervals_method in ['bootstrap', 'permbu'] and Y_df is not None:
            if not (set(model_names) <= set(Y_df.columns)):
                raise Exception('Check `Y_hat_df`s models are included in `Y_df` columns')

        uids = Y_hat_df.index.unique()

        # 检查Y_hat_df与S_df序列之间的差异
        S_diff  = len(S_df.index.difference(uids))
        Y_hat_diff = len(Y_hat_df.index.difference(S_df.index.unique()))
        if S_diff > 0 or Y_hat_diff > 0:
            raise Exception(f'Check `S_df`, `Y_hat_df` series difference, S\Y_hat={S_diff}, Y_hat\S={Y_hat_diff}')

        if Y_df is not None:
            # 检查Y_hat_df与Y_df序列之间的差异
            Y_diff  = len(Y_df.index.difference(uids))
            Y_hat_diff = len(Y_hat_df.index.difference(Y_df.index.unique()))
            if Y_diff > 0 or Y_hat_diff > 0:
                raise Exception(f'Check `Y_hat_df`, `Y_df` series difference, Y_hat\Y={Y_hat_diff}, Y\Y_hat={Y_diff}')

        # Same Y_hat_df/S_df/Y_df's unique_id order to prevent errors
        S_df = S_df.loc[uids]

        return Y_hat_df, S_df, Y_df, model_names

    def reconcile(self, 
                  Y_hat_df: pd.DataFrame,
                  S: pd.DataFrame,
                  tags: Dict[str, np.ndarray],
                  Y_df: Optional[pd.DataFrame] = None,
                  level: Optional[List[int]] = None,
                  intervals_method: str = 'normality',
                  num_samples: int = -1,
                  seed: int = 0,
                  sort_df: bool = True,
                  is_balanced: bool = False,
        ):
        """Hierarchical Reconciliation Method.

        The `reconcile` method is analogous to SKLearn `fit_predict` method, it 
        applies different reconciliation techniques instantiated in the `reconcilers` list.

        Most reconciliation methods can be described by the following convenient 
        linear algebra notation:

        $$\\tilde{\mathbf{y}}_{[a,b],\\tau} = \mathbf{S}_{[a,b][b]} \mathbf{P}_{[b][a,b]} \hat{\mathbf{y}}_{[a,b],\\tau}$$

        where $a, b$ represent the aggregate and bottom levels, $\mathbf{S}_{[a,b][b]}$ contains
        the hierarchical aggregation constraints, and $\mathbf{P}_{[b][a,b]}$ varies across 
        reconciliation methods. The reconciled predictions are $\\tilde{\mathbf{y}}_{[a,b],\\tau}$, and the 
        base predictions $\hat{\mathbf{y}}_{[a,b],\\tau}$.

        **Parameters:**<br>
        `Y_hat_df`: pd.DataFrame, base forecasts with columns `ds` and models to reconcile indexed by `unique_id`.<br>
        `Y_df`: pd.DataFrame, training set of base time series with columns `['ds', 'y']` indexed by `unique_id`.<br>
        If a class of `self.reconciles` receives `y_hat_insample`, `Y_df` must include them as columns.<br>
        `S`: pd.DataFrame with summing matrix of size `(base, bottom)`, see [aggregate method](https://nixtla.github.io/hierarchicalforecast/utils.html#aggregate).<br>
        `tags`: Each key is a level and its value contains tags associated to that level.<br>
        `level`: positive float list [0,100), confidence levels for prediction intervals.<br>
        `intervals_method`: str, method used to calculate prediction intervals, one of `normality`, `bootstrap`, `permbu`.<br>
        `num_samples`: int=-1, if positive return that many probabilistic coherent samples.
        `seed`: int=0, random seed for numpy generator's replicability.<br>
        `sort_df` : bool (default=True), if True, sort `df` by [`unique_id`,`ds`].<br>
        `is_balanced`: bool=False, wether `Y_df` is balanced, set it to True to speed things up if `Y_df` is balanced.<br>

        **Returns:**<br>
        `Y_tilde_df`: pd.DataFrame, with reconciled predictions.
        """
        # 检查输入的有效性并排序数据框
        Y_hat_df, S_df, Y_df, self.model_names = \
                    self._prepare_fit(Y_hat_df=Y_hat_df,
                                      S_df=S,
                                      Y_df=Y_df,
                                      tags=tags,
                                      level=level,
                                      intervals_method=intervals_method,
                                      sort_df=sort_df)

        # 初始化协调器参数
        reconciler_args = dict(
            idx_bottom=S_df.index.get_indexer(S.columns),
            tags={key: S_df.index.get_indexer(val) for key, val in tags.items()}
        )

        any_sparse = any([method.is_sparse_method for method in self.reconcilers])
        if any_sparse:
            try:
                S_for_sparse = sparse.csr_matrix(S_df.sparse.to_coo())
            except AttributeError:
                warnings.warn('Using dense S matrix for sparse reconciliation method.')
                S_for_sparse = S_df.values.astype(np.float32)

        if Y_df is not None:
            if is_balanced:
                y_insample = Y_df['y'].values.reshape(len(S_df), -1).astype(np.float32)
            else:
                y_insample = Y_df.pivot(columns='ds', values='y').loc[S_df.index].values.astype(np.float32)
            reconciler_args['y_insample'] = y_insample

        Y_tilde_df= Y_hat_df.copy()
        start = time.time()
        self.execution_times = {}
        self.level_names = {}
        self.sample_names = {}
        for reconciler, name_copy in zip(self.reconcilers, self.orig_reconcilers):
            reconcile_fn_name = _build_fn_name(name_copy)

            if reconciler.is_sparse_method:
                reconciler_args["S"] = S_for_sparse
            else:
                reconciler_args["S"] = S_df.values.astype(np.float32)

            has_fitted = 'y_hat_insample' in signature(reconciler.fit_predict).parameters
            has_level = 'level' in signature(reconciler.fit_predict).parameters

            for model_name in self.model_names:
                recmodel_name = f'{model_name}/{reconcile_fn_name}'
                y_hat = Y_hat_df[model_name].values.reshape(len(S_df), -1).astype(np.float32)
                reconciler_args['y_hat'] = y_hat

                if (self.insample and has_fitted) or intervals_method in ['bootstrap', 'permbu']:
                    if is_balanced:
                        y_hat_insample = Y_df[model_name].values.reshape(len(S_df), -1).astype(np.float32)
                    else:
                        y_hat_insample = Y_df.pivot(columns='ds', values=model_name).loc[S_df.index].values.astype(np.float32)
                    reconciler_args['y_hat_insample'] = y_hat_insample

                if has_level and (level is not None):
                    if intervals_method in ['normality', 'permbu']:
                        sigmah = _reverse_engineer_sigmah(Y_hat_df=Y_hat_df,
                                    y_hat=y_hat, model_name=model_name)
                        reconciler_args['sigmah'] = sigmah

                    reconciler_args['intervals_method'] = intervals_method
                    reconciler_args['num_samples'] = 200 # 待办事项:解决重复的num_samples问题
                    reconciler_args['seed'] = seed

                # 均值与概率校准
                kwargs_ls = [key for key in signature(reconciler.fit_predict).parameters if key in reconciler_args.keys()]
                kwargs = {key: reconciler_args[key] for key in kwargs_ls}
                
                if (level is not None) and (num_samples > 0):
                    # Store reconciler's memory to generate samples
                    reconciler = reconciler.fit(**kwargs)
                    fcsts_model = reconciler.predict(S=reconciler_args['S'], 
                                                     y_hat=reconciler_args['y_hat'], level=level)
                else:
                    # Memory efficient reconciler's fit_predict
                    fcsts_model = reconciler(**kwargs, level=level)

                # 解析最终输出
                Y_tilde_df[recmodel_name] = fcsts_model['mean'].flatten()
                if intervals_method in ['bootstrap', 'normality', 'permbu'] and level is not None:
                    level.sort()
                    lo_names = [f'{recmodel_name}-lo-{lv}' for lv in reversed(level)]
                    hi_names = [f'{recmodel_name}-hi-{lv}' for lv in level]
                    self.level_names[recmodel_name] = lo_names + hi_names
                    sorted_quantiles = np.reshape(fcsts_model['quantiles'], (len(Y_tilde_df),-1))
                    intervals_df = pd.DataFrame(sorted_quantiles, index=Y_tilde_df.index,
                                                columns=self.level_names[recmodel_name])
                    Y_tilde_df= pd.concat([Y_tilde_df, intervals_df], axis=1)

                    if num_samples > 0:
                        samples = reconciler.sample(num_samples=num_samples)
                        self.sample_names[recmodel_name] = [f'{recmodel_name}-sample-{i}' for i in range(num_samples)]
                        samples = np.reshape(samples, (len(Y_tilde_df),-1))
                        samples_df = pd.DataFrame(samples, index=Y_tilde_df.index,
                                                  columns=self.sample_names[recmodel_name])
                        Y_tilde_df= pd.concat([Y_tilde_df, samples_df], axis=1)

                    del sorted_quantiles
                    del intervals_df
                if self.insample and has_fitted:
                    del y_hat_insample
                gc.collect()

                end = time.time()
                self.execution_times[f'{model_name}/{reconcile_fn_name}'] = (end - start)

        return Y_tilde_df

    def bootstrap_reconcile(self,
                            Y_hat_df: pd.DataFrame,
                            S_df: pd.DataFrame,
                            tags: Dict[str, np.ndarray],
                            Y_df: Optional[pd.DataFrame] = None,
                            level: Optional[List[int]] = None,
                            intervals_method: str = 'normality',
                            num_samples: int = -1,
                            num_seeds: int = 1,
                            sort_df: bool = True):
        """Bootstraped Hierarchical Reconciliation Method.

        Applies N times, based on different random seeds, the `reconcile` method 
        for the different reconciliation techniques instantiated in the `reconcilers` list. 

        **Parameters:**<br>
        `Y_hat_df`: pd.DataFrame, base forecasts with columns `ds` and models to reconcile indexed by `unique_id`.<br>
        `Y_df`: pd.DataFrame, training set of base time series with columns `['ds', 'y']` indexed by `unique_id`.<br>
        If a class of `self.reconciles` receives `y_hat_insample`, `Y_df` must include them as columns.<br>
        `S`: pd.DataFrame with summing matrix of size `(base, bottom)`, see [aggregate method](https://nixtla.github.io/hierarchicalforecast/utils.html#aggregate).<br>
        `tags`: Each key is a level and its value contains tags associated to that level.<br>
        `level`: positive float list [0,100), confidence levels for prediction intervals.<br>
        `intervals_method`: str, method used to calculate prediction intervals, one of `normality`, `bootstrap`, `permbu`.<br>
        `num_samples`: int=-1, if positive return that many probabilistic coherent samples.
        `num_seeds`: int=1, random seed for numpy generator's replicability.<br>
        `sort_df` : bool (default=True), if True, sort `df` by [`unique_id`,`ds`].<br>

        **Returns:**<br>
        `Y_bootstrap_df`: pd.DataFrame, with bootstraped reconciled predictions.
        """

        # 检查输入的有效性并排序数据框
        Y_hat_df, S_df, Y_df, self.model_names = \
                    self._prepare_fit(Y_hat_df=Y_hat_df,
                                      S_df=S_df,
                                      Y_df=Y_df,
                                      tags=tags,
                                      intervals_method=intervals_method,
                                      sort_df=sort_df)

        # Bootstrap 调和了预测结果
        Y_tilde_list = []
        for seed in range(num_seeds):
            Y_tilde_df = self.reconcile(Y_hat_df=Y_hat_df,
                                        S=S_df,
                                        tags=tags,
                                        Y_df=Y_df,
                                        level=level,
                                        intervals_method=intervals_method,
                                        num_samples=num_samples,
                                        seed=seed,
                                        sort_df=False)
            Y_tilde_df['seed'] = seed
            # 待办事项:修复损坏的recmodel_names
            if seed==0:
                first_columns = Y_tilde_df.columns
            Y_tilde_df.columns = first_columns
            Y_tilde_list.append(Y_tilde_df)

        Y_bootstrap_df = pd.concat(Y_tilde_list, axis=0)
        del Y_tilde_list
        gc.collect()

        return Y_bootstrap_df
show_doc(HierarchicalReconciliation, 
         name='init', title_level=3)
show_doc(HierarchicalReconciliation)
show_doc(HierarchicalReconciliation.reconcile, name='reconcile', title_level=3)
show_doc(HierarchicalReconciliation.bootstrap_reconcile, name='bootstrap_reconcile', title_level=3)

from hierarchicalforecast.methods import (
    BottomUp, TopDown, MiddleOut, MinTrace, ERM,
)
from hierarchicalforecast.utils import aggregate

df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/tourism.csv')
df = df.rename({'Trips': 'y', 'Quarter': 'ds'}, axis=1)
df.insert(0, 'Country', 'Australia')

# 非严格等级结构
hierS_grouped_df = [
    ['Country'],
    ['Country', 'State'], 
    ['Country', 'Purpose'], 
    ['Country', 'State', 'Region'], 
    ['Country', 'State', 'Purpose'], 
    ['Country', 'State', 'Region', 'Purpose']
]
# 严格的等级结构
hiers_strictly = [
    ['Country'],
    ['Country', 'State'], 
    ['Country', 'State', 'Region'], 
]

# 获取数据框
hier_grouped_df, S_grouped_df, tags_grouped = aggregate(df, hierS_grouped_df)
hier_strict_df, S_strict, tags_strict = aggregate(df, hiers_strictly)

# 检查分类输入是否产生相同输出
df2 = df.copy()
for col in ['Country', 'State', 'Purpose', 'Region']:
    df2[col] = df2[col].astype('category')

for spec in [hierS_grouped_df, hiers_strictly]:
    Y_orig, S_orig, tags_orig = aggregate(df, spec)
    Y_cat, S_cat, tags_cat = aggregate(df2, spec)
    pd.testing.assert_frame_equal(Y_cat, Y_orig)
    pd.testing.assert_frame_equal(S_cat, S_orig)
    assert all(np.array_equal(tags_orig[k], tags_cat[k]) for k in tags_orig.keys())

hier_grouped_df['y_model'] = hier_grouped_df['y']
# 我们应该能够使用这些方法恢复y。
hier_grouped_hat_df = hier_grouped_df.groupby('unique_id').tail(12)
ds_h = hier_grouped_hat_df['ds'].unique()
hier_grouped_df = hier_grouped_df.query('~(ds in @ds_h)')
#向`y_model`添加噪声以避免完全拟合的值
hier_grouped_df['y_model'] += np.random.uniform(-1, 1, len(hier_grouped_df))

#层次化协调
hrec = HierarchicalReconciliation(reconcilers=[
    #这些方法应能重建原始的y
    BottomUp(),
    MinTrace(method='ols'),
    MinTrace(method='wls_struct'),
    MinTrace(method='wls_var'),
    MinTrace(method='mint_shrink'),
    MinTrace(method='ols', nonnegative=True),
    MinTrace(method='wls_struct', nonnegative=True),
    MinTrace(method='wls_var', nonnegative=True),
    MinTrace(method='mint_shrink', nonnegative=True),
    # ERM正在恢复,但需要更大的每股收益
    #ERM(method='reg_bu', lambda_reg=None),
])
reconciled = hrec.reconcile(Y_hat_df=hier_grouped_hat_df, Y_df=hier_grouped_df, 
                            S=S_grouped_df, tags=tags_grouped)
for model in reconciled.drop(columns=['ds', 'y']).columns:
    if 'ERM' in model:
        eps = 3
    elif 'nonnegative' in model:
        eps = 1e-1
    else:
        eps = 1e-1
    test_close(reconciled['y'], reconciled[model], eps=eps)

# 测试不正确的Y_hat_df数据类型
hier_grouped_hat_df_nan = hier_grouped_hat_df.copy()
hier_grouped_hat_df_nan.loc['Australia', 'y_model'] = float('nan')
test_fail(
    hrec.reconcile,
    contains='null values',
    args=(hier_grouped_hat_df_nan, S_grouped_df, tags_grouped, hier_grouped_df),
)

hier_grouped_hat_df_none = hier_grouped_hat_df.copy()
hier_grouped_hat_df_none.loc['Australia', 'y_model'] = None
test_fail(
    hrec.reconcile,
    contains='null values',
    args=(hier_grouped_hat_df_none, S_grouped_df, tags_grouped, hier_grouped_df),
)

hier_grouped_hat_df_str = hier_grouped_hat_df.copy()
hier_grouped_hat_df_str['y_model'] = hier_grouped_hat_df_str['y_model'].astype(str)
test_fail(
    hrec.reconcile,
    contains='numeric types',
    args=(hier_grouped_hat_df_str, S_grouped_df, tags_grouped, hier_grouped_df),
)

# 测试预期错误
# 不同的系列 S 和 Y_hat_df
test_fail(
    hrec.reconcile,
    contains='series difference',
    args=(hier_grouped_hat_df.drop('Australia'), S_grouped_df, tags_grouped, hier_grouped_df),
    
)
test_fail(
    hrec.reconcile,
    contains='series difference',
    args=(hier_grouped_hat_df, S_grouped_df.drop('Australia'), tags_grouped, hier_grouped_df),
    
)
test_fail(
    hrec.reconcile,
    contains='series difference',
    args=(hier_grouped_hat_df, S_grouped_df, tags_grouped, hier_grouped_df.drop('Australia')),
    
)

::: {#cell-22 .cell 0=‘隐’ 1=‘藏’}

# 测试预期错误
# 不同的列 Y_df 和 Y_hat_df
hrec = HierarchicalReconciliation(
            reconcilers=[ERM(method='reg_bu', lambda_reg=100)])
test_fail(
    hrec.reconcile,
    contains='Please include ',
    args=(hier_grouped_hat_df, S_grouped_df, tags_grouped, 
          hier_grouped_df, [80], 'permbu'), # permbu 需要 y_hat_insample
)

:::


# 测试无样本的调和方法
hrec = HierarchicalReconciliation(reconcilers=[
    #这些方法应能重构原始的y
    BottomUp(),
    MinTrace(method='ols'),
    MinTrace(method='wls_struct'),
    MinTrace(method='ols', nonnegative=True),
    MinTrace(method='wls_struct', nonnegative=True),
])
reconciled = hrec.reconcile(Y_hat_df=hier_grouped_hat_df,
                            S=S_grouped_df, tags=tags_grouped)
for model in reconciled.drop(columns=['ds', 'y']).columns:
    if 'ERM' in model:
        eps = 3
    elif 'nonnegative' in model:
        eps = 1e-1
    else:
        eps = 1e-1
    test_close(reconciled['y'], reconciled[model], eps=eps)

# 自上而下应打破
# 在非严格层级结构中
hrec = HierarchicalReconciliation([TopDown(method='average_proportions')])
test_fail(
    hrec.reconcile,
    contains='requires strictly hierarchical structures',
    args=(hier_grouped_hat_df, S_grouped_df, tags_grouped,  hier_grouped_df,)
)

# 方法应与
# 严格等级结构

hier_strict_df['y_model'] = hier_strict_df['y']
# 我们应该能够使用这些方法恢复y。
hier_strict_df_h = hier_strict_df.groupby('unique_id').tail(12)
ds_h = hier_strict_df_h['ds'].unique()
hier_strict_df = hier_strict_df.query('~(ds in @ds_h)')
#向`y_model`添加噪声以避免完全拟合的值
hier_strict_df['y_model'] += np.random.uniform(-1, 1, len(hier_strict_df))

middle_out_level = 'Country/State'
# 分层协调
hrec = HierarchicalReconciliation(reconcilers=[
    #这些方法应能重建原始的y
    BottomUp(),
    MinTrace(method='ols'),
    MinTrace(method='wls_struct'),
    MinTrace(method='wls_var'),
    MinTrace(method='mint_shrink'),
    MinTrace(method='ols', nonnegative=True),
    MinTrace(method='wls_struct', nonnegative=True),
    MinTrace(method='wls_var', nonnegative=True),
    MinTrace(method='mint_shrink', nonnegative=True),
    # 自上而下无法恢复原始的y
    # 但它应该恢复总水平
    TopDown(method='forecast_proportions'),
    TopDown(method='average_proportions'),
    TopDown(method='proportion_averages'),
    # 中间向外的方法无法恢复原始数据
    # 但它应该恢复总水平
    MiddleOut(middle_level=middle_out_level, top_down_method='forecast_proportions'),
    MiddleOut(middle_level=middle_out_level, top_down_method='average_proportions'),
    MiddleOut(middle_level=middle_out_level, top_down_method='proportion_averages'),
    # ERM正在恢复,但需要更大的每股收益
    #ERM(method='reg_bu', lambda_reg=None),
])
reconciled = hrec.reconcile(
    Y_hat_df=hier_strict_df_h, 
    Y_df=hier_strict_df, 
    S=S_strict, 
    tags=tags_strict
)
for model in reconciled.drop(columns=['ds', 'y']).columns:
    if 'ERM' in model:
        eps = 3
    elif 'nonnegative' in model:
        eps = 1e-1
    else:
        eps = 1e-1
    if 'TopDown' in model:
        if 'forecast_proportions' in model:
            test_close(reconciled['y'], reconciled[model], eps)
        else:
            # 自上而下无法恢复原始的y
            test_fail(
                test_close,
                args=(reconciled['y'], reconciled[model], eps),
            )
        # 但它应该恢复总水平
        total_tag = tags_strict['Country']
        test_close(reconciled['y'].loc[total_tag], 
                   reconciled[model].loc[total_tag], 1e-2)
    elif 'MiddleOut' in model:
        if 'forecast_proportions' in model:
            test_close(reconciled['y'], reconciled[model], eps)
        else:
            # 自上而下无法恢复原始的y
            test_fail(
                test_close,
                args=(reconciled['y'], reconciled[model], eps),
            )
        # 但它应该恢复总水平
        total_tag = tags_strict[middle_out_level]
        test_close(reconciled['y'].loc[total_tag], 
                   reconciled[model].loc[total_tag], 1e-2)
    else:
        test_close(reconciled['y'], reconciled[model], eps)

# 测试是否平衡行为
reconciled_balanced = hrec.reconcile(
    Y_hat_df=hier_strict_df_h, 
    Y_df=hier_strict_df, 
    S=S_strict, 
    tags=tags_strict,
    is_balanced=True,
)
test_close(reconciled.drop(columns='ds').values, reconciled_balanced.drop(columns='ds').values, eps=1e-10)

from statsforecast import StatsForecast
from statsforecast.utils import generate_series
from statsforecast.models import RandomWalkWithDrift

# 测试不平衡数据集
max_tenure = 24
dates = pd.date_range(start='2019-01-31', freq='M', periods=max_tenure)
cohort_tenure = [24, 23, 22, 21]

ts_list = []

# 为每个队列创建时间序列
for i in range(len(cohort_tenure)):
    ts_list.append(
        generate_series(n_series=1, freq='M', min_length=cohort_tenure[i], max_length=cohort_tenure[i]).reset_index() \
            .assign(ult=i) \
            .assign(ds=dates[-cohort_tenure[i]:]) \
            .drop(columns=['unique_id'])
    )
df = pd.concat(ts_list, ignore_index=True)

# 创建类别
df.loc[df['ult'] < 2, 'pen'] = 'a'
df.loc[df['ult'] >= 2, 'pen'] = 'b'
# 请注意,唯一标识符需要使用字符串。
df['ult'] = df['ult'].astype(str)

hier_levels = [
    ['pen'],
    ['pen', 'ult'],
]
hier_df, S_df, tags = aggregate(df=df, spec=hier_levels)

train_df = hier_df.query("ds <= @pd.to_datetime('2019-12-31')")
test_df = hier_df.query("ds > @pd.to_datetime('2019-12-31')")

fcst = StatsForecast(
    models=[
        RandomWalkWithDrift(),
    ],
    freq='M',
    n_jobs=1,
)

hrec = HierarchicalReconciliation(
    reconcilers=[
        BottomUp(),
        MinTrace(method='mint_shrink'),
    ]
)

fcst_df = fcst.forecast(df=train_df, h=12, fitted=True)
fitted_df = fcst.forecast_fitted_values()

fcst_df = hrec.reconcile(
    Y_hat_df=fcst_df,
    Y_df=fitted_df,
    S=S_df,
    tags=tags,
)

# MinTrace 应中断
# 在模型极度过拟合的情况下,y_model 等于 y。

zero_df = hier_grouped_df.copy()
zero_df['y'] = 0
zero_df['y_model'] = 0
hrec = HierarchicalReconciliation([MinTrace(method='mint_shrink')])
test_fail(
    hrec.reconcile,
    contains='Insample residuals',
    args=(hier_grouped_hat_df, S_grouped_df, tags_grouped,  zero_df)
)

#不使用残差的测试方法
#即使他们的签名中包含
#那个论点
hrec = HierarchicalReconciliation([MinTrace(method='ols')])
reconciled = hrec.reconcile(
    Y_hat_df=hier_grouped_hat_df, 
    Y_df=hier_grouped_df.drop(columns=['y_model']), 
    S=S_grouped_df, 
    tags=tags_grouped
)
for model in reconciled.drop(columns=['ds', 'y']).columns:
    test_close(reconciled['y'], reconciled[model], eps=1e-1)

reconciled.loc[tags_grouped['Country/State']]

# 具有Bootstrap预测区间的测试方法
hrec = HierarchicalReconciliation([BottomUp()])
reconciled = hrec.reconcile(Y_hat_df=hier_grouped_hat_df, 
                            Y_df=hier_grouped_df, S=S_grouped_df, tags=tags_grouped,
                            level=[80, 90], 
                            intervals_method='bootstrap')
total = reconciled.loc[tags_grouped['Country/State/Region/Purpose']].groupby('ds').sum().reset_index()
pd.testing.assert_frame_equal(
    total[['ds', 'y_model/BottomUp']],
    reconciled.loc['Australia'][['ds', 'y_model/BottomUp']].reset_index(drop=True)
)
assert 'y_model/BottomUp-lo-80' in reconciled.columns

# 具有正态预测区间的测试方法
hier_grouped_hat_df['y_model-lo-80'] = hier_grouped_hat_df['y_model'] - 1.96
hier_grouped_hat_df['y_model-hi-80'] = hier_grouped_hat_df['y_model'] + 1.96
hrec = HierarchicalReconciliation([BottomUp()])
reconciled = hrec.reconcile(Y_hat_df=hier_grouped_hat_df,
                            Y_df=hier_grouped_df, S=S_grouped_df, tags=tags_grouped,
                            level=[80, 90], 
                            intervals_method='normality')
total = reconciled.loc[tags_grouped['Country/State/Region/Purpose']].groupby('ds').sum().reset_index()
pd.testing.assert_frame_equal(
    total[['ds', 'y_model/BottomUp']],
    reconciled.loc['Australia'][['ds', 'y_model/BottomUp']].reset_index(drop=True)
)
assert 'y_model/BottomUp-lo-80' in reconciled.columns

# 具有PERMBU预测区间的测试方法

# 测试预期错误与分组结构
# (非严格层级结构)
hier_grouped_hat_df['y_model-lo-80'] = hier_grouped_hat_df['y_model'] - 1.96
hier_grouped_hat_df['y_model-hi-80'] = hier_grouped_hat_df['y_model'] + 1.96
hrec = HierarchicalReconciliation([BottomUp()])
test_fail(
    hrec.reconcile,
    contains='requires strictly hierarchical structures',
    args=(hier_grouped_hat_df, S_grouped_df, tags_grouped, hier_grouped_df, [80, 90], 'permbu',)
)

# 测试 PERMBU
hier_strict_df_h['y_model-lo-80'] = hier_strict_df_h['y_model'] - 1.96
hier_strict_df_h['y_model-hi-80'] = hier_strict_df_h['y_model'] + 1.96
hrec = HierarchicalReconciliation([BottomUp()])
reconciled = hrec.reconcile(Y_hat_df=hier_strict_df_h,
                            Y_df=hier_strict_df, S=S_strict, 
                            tags=tags_grouped,
                            level=[80, 90], 
                            intervals_method='permbu')
total = reconciled.loc[tags_grouped['Country/State/Region']].groupby('ds').sum().reset_index()
pd.testing.assert_frame_equal(
    total[['ds', 'y_model/BottomUp']],
    reconciled.loc['Australia'][['ds', 'y_model/BottomUp']].reset_index(drop=True)
)
assert 'y_model/BottomUp-lo-80' in reconciled.columns

# 带有Bootstrap预测区间的测试方法
hrec = HierarchicalReconciliation([BottomUp()])
bootstrap_df = hrec.bootstrap_reconcile(Y_hat_df=hier_grouped_hat_df,
                                        Y_df=hier_grouped_df, S_df=S_grouped_df, tags=tags_grouped,
                                        level=[80, 90],
                                        intervals_method='bootstrap',
                                        num_seeds=2)
assert 'y_model/BottomUp-lo-80' in bootstrap_df.columns
assert 'seed' in bootstrap_df.columns
assert len(bootstrap_df.seed.unique())==2
bootstrap_df

# PERMBU和正态概率方法的测试水平保护
hrec = HierarchicalReconciliation([BottomUp()])
test_fail(
    hrec.reconcile,
    contains='Level outside domain',
    args=(hier_grouped_hat_df, S_grouped_df, tags_grouped, hier_grouped_df, [-1, 80, 90], 'permbu',)
)
test_fail(
    hrec.reconcile,
    contains='Level outside domain',
    args=(hier_grouped_hat_df, S_grouped_df, tags_grouped, hier_grouped_df, [80, 90, 101], 'normality',)
)

示例

import numpy as np
import pandas as pd

from statsforecast.core import StatsForecast
from statsforecast.models import ETS, Naive

from hierarchicalforecast.utils import aggregate
from hierarchicalforecast.core import HierarchicalReconciliation
from hierarchicalforecast.methods import BottomUp, MinTrace

# 负载旅游小数据集
df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/tourism.csv')
df = df.rename({'Trips': 'y', 'Quarter': 'ds'}, axis=1)
df.insert(0, 'Country', 'Australia')

# 根据地理层级和目的创建层级系列
# 并将季度 ds 字符串转换为 pd.datetime 格式
hierarchy_levels = [['Country'],
                    ['Country', 'State'], 
                    ['Country', 'Purpose'], 
                    ['Country', 'State', 'Region'], 
                    ['Country', 'State', 'Purpose'], 
                    ['Country', 'State', 'Region', 'Purpose']]

Y_df, S_df, tags = aggregate(df=df, spec=hierarchy_levels)
qs = Y_df['ds'].str.replace(r'(\d+) (Q\d)', r'\1-\2', regex=True)
Y_df['ds'] = pd.PeriodIndex(qs, freq='Q').to_timestamp()
Y_df = Y_df.reset_index()

# 划分训练集/测试集
Y_test_df  = Y_df.groupby('unique_id').tail(4)
Y_train_df = Y_df.drop(Y_test_df.index)

# 计算基础自动ETS预测
# 仔细识别正确的数据频率,此数据为季度数据。 'Q'
fcst = StatsForecast(models=[Naive()], freq='Q', n_jobs=-1)
Y_hat_df = fcst.forecast(df=Y_train_df, h=4, fitted=True)
Y_fitted_df = fcst.forecast_fitted_values()

# 调和基础预测
Y_train_df = Y_train_df.reset_index().set_index('unique_id')
Y_hat_df = Y_hat_df.reset_index().set_index('unique_id')
reconcilers = [BottomUp(),
               MinTrace(method='mint_shrink')]
hrec = HierarchicalReconciliation(reconcilers=reconcilers)
Y_rec_df = hrec.reconcile(Y_hat_df=Y_hat_df, 
                          Y_df=Y_fitted_df,
                          S=S_df, tags=tags)
Y_rec_df.groupby('unique_id', observed=True).head(2)

If you find the code useful, please ⭐ us on Github