聚合/可视化工具

HierarchicalForecast 包含实用函数,用于处理和可视化分层系列数据集。该模块的 aggregate 函数允许您从表示结构层级的分类变量中创建层次结构,并返回聚合约束矩阵 \(\mathbf{S}\)

此外,HierarchicalForecast 确保其协调方法与其他流行的机器学习库的兼容性,通过其外部预测适配器,将外部库的基础预测输出转换为兼容的数据框格式。


import sys
import timeit
import warnings
from itertools import chain
from typing import Callable, Dict, List, Optional, Iterable
from collections.abc import Sequence

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.preprocessing import OneHotEncoder

plt.rcParams['font.family'] = 'serif'

from nbdev.showdoc import add_docs, show_doc
from fastcore.test import test_eq, test_close, test_fail

from statsforecast.utils import generate_series

class CodeTimer:
    def __init__(self, name=None, verbose=True):
        self.name = " '"  + name + "'" if name else ''
        self.verbose = verbose

    def __enter__(self):
        self.start = timeit.default_timer()

    def __exit__(self, exc_type, exc_value, traceback):
        self.took = (timeit.default_timer() - self.start)
        if self.verbose:
            print('Code block' + self.name + \
                  ' took:\t{0:.5f}'.format(self.took) + ' seconds')

def is_strictly_hierarchical(S: np.ndarray, 
                             tags: Dict[str, np.ndarray]):
    # 主要观点:
    # 如果S代表一种严格等级结构
    # 在底层之前的路径数量
    # 应等于节点数。
    # 前一级别的
    levels_ = dict(sorted(tags.items(), key=lambda x: len(x[1])))
    # 移除底层
    levels_.popitem()
    # 使 S 成为范畴
    hiers = [np.argmax(S[idx], axis=0) + 1 for _, idx in levels_.items()]
    hiers = np.vstack(hiers)
    paths = np.unique(hiers, axis=1).shape[1] 
    nodes = levels_.popitem()[1].size
    return paths == nodes

def cov2corr(cov, return_std=False):
    """ convert covariance matrix to correlation matrix

    **Parameters:**<br>
    `cov`: array_like, 2d covariance matrix.<br>
    `return_std`: bool=False, if True returned std.<br>

    **Returns:**<br>
    `corr`: ndarray (subclass) correlation matrix
    """
    cov = np.asanyarray(cov)
    std_ = np.sqrt(np.diag(cov))
    corr = cov / np.outer(std_, std_)
    if return_std:
        return corr, std_
    else:
        return corr

聚合函数


def _to_summing_matrix(S_df: pd.DataFrame, sparse_s: bool = False):
    """将层次结构数据框 `df` 转换为求和矩阵 `S`。"""
    categories = [S_df[col].unique() for col in S_df.columns]
    cat_sizes = [len(cats) for cats in categories]
    idx_bottom = np.argmax(cat_sizes)
    cats_bottom = categories[idx_bottom]

    try:
        encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
    except TypeError:  # sklearn < 1.2
        encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)

    S = encoder.fit_transform(S_df).T

    if sparse_s:
        df_constructor = pd.DataFrame.sparse.from_spmatrix
    else:
        df_constructor = pd.DataFrame
    S = df_constructor(S, index=chain(*categories), columns=cats_bottom)

    tags = dict(zip(S_df.columns, categories))
    return S, tags

def aggregate_before(df: pd.DataFrame,
              spec: List[List[str]],
              agg_fn: Callable = np.sum,
              sparse_s: bool = False):
    """Utils Aggregation Function.

    Aggregates bottom level series contained in the pd.DataFrame `df` according
    to levels defined in the `spec` list applying the `agg_fn` (sum, mean).<br>

    **Parameters:**<br>
    `df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.<br>
    `spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.<br>
    `agg_fn`: Function used to aggregate `'y'`.<br>
    `sparse_s`: bool=False, whether the returned S should be a sparse DataFrame.<br>


    **Returns:**<br>
    `Y_df, S, tags`: tuple with hierarchically structured series `Y_df` ($\mathbf{y}_{[a,b]}$),
    summing matrix `S`, and hierarchical aggregation indexes `tags`.
    """
    max_len_idx = np.argmax([len(hier) for hier in spec])
    bottom_comb = spec[max_len_idx]
    hiers = []
    for hier in spec:
        df_hier = df.groupby(hier + ['ds'])['y'].apply(agg_fn).reset_index()
        df_hier['unique_id'] = df_hier[hier].agg('/'.join, axis=1)
        if hier == bottom_comb:
            bottom_hier = df_hier['unique_id'].unique()
        hiers.append(df_hier)
    df_hiers = pd.concat(hiers)
    S_df = df_hiers[['unique_id'] + bottom_comb].drop_duplicates().reset_index(drop=True)
    S_df = S_df.set_index('unique_id')
    S_df = S_df.fillna('agg')
    hiers_cols = []
    for hier in spec:
        hier_col = '/'.join(hier)
        S_df[hier_col] = S_df[hier].agg('/'.join, axis=1)
        hiers_cols.append(hier_col)
    Y_df = df_hiers[['unique_id', 'ds', 'y']].set_index('unique_id')
    
    # 聚合约束S的定义
    S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols], sparse_s)
    return Y_df, S, tags

def _to_upper_hierarchy(bottom_split, bottom_values, upper_key):
    upper_split = upper_key.split('/')
    upper_idxs = [bottom_split.index(i) for i in upper_split]

    def join_upper(bottom_value):
        bottom_parts = bottom_value.split('/')
        return '/'.join(bottom_parts[i] for i in upper_idxs)

    return [join_upper(val) for val in bottom_values]

def aggregate(
    df: pd.DataFrame,
    spec: List[List[str]],
    is_balanced: bool = False,
    sparse_s: bool = False,
):
    """Utils Aggregation Function.
    Aggregates bottom level series contained in the pandas DataFrame `df` according
    to levels defined in the `spec` list.

    Parameters
    ----------
    df : pandas DataFrame
        Dataframe with columns `['ds', 'y']` and columns to aggregate.
    spec : list of list of str
        List of levels. Each element of the list should contain a list of columns of `df` to aggregate.
    is_balanced : bool (default=False)
        Deprecated.
    sparse_s : bool (default=False)
        Return `S_df` as a sparse dataframe.

    Returns
    -------
    Y_df : pandas DataFrame
        Hierarchically structured series.
    S_df : pandas DataFrame
        Summing dataframe.
    tags : dict
        Aggregation indices.
    """
    # 支票
    if df.isnull().values.any():
        raise ValueError('`df` contains null values')
    if is_balanced:
        warnings.warn(
            "`is_balanced` is deprecated and will be removed in a future version. "
            "Don't set this argument to suppress this warning.",
            category=DeprecationWarning,
        )
            
    # compute aggregations and tags
    spec = sorted(spec, key=len)
    bottom = spec[-1]
    aggs = []
    tags = {}
    for levels in spec:
        agg = df.groupby(levels + ['ds'], observed=True)['y'].sum()
        if not agg.index.is_monotonic_increasing:
            agg = agg.sort_index()
        agg = agg.reset_index('ds')
        group = agg.index.get_level_values(0)
        if not pd.api.types.is_string_dtype(group.dtype):
            group = group.astype(str)
        for level in levels[1:]:
            group = group + '/' + agg.index.get_level_values(level).str.replace('/', '_')
        agg.index = group
        agg.index.name = 'unique_id'
        tags['/'.join(levels)] = group.unique().values
        aggs.append(agg)
    Y_df = pd.concat(aggs)

    # construct S
    bottom_key = '/'.join(bottom)
    bottom_levels = tags[bottom_key]
    S = np.empty((len(bottom_levels), len(spec)), dtype=object)
    for j, levels in enumerate(spec[:-1]):
        S[:, j] = _to_upper_hierarchy(bottom, bottom_levels, '/'.join(levels))
    S[:, -1] = tags[bottom_key]
    categories = list(tags.values())
    try:
        encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
    except TypeError:  # sklearn < 1.2
        encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)    
    S = encoder.fit_transform(S).T
    if sparse_s:
        df_constructor = pd.DataFrame.sparse.from_spmatrix
    else:
        df_constructor = pd.DataFrame
    S_df = df_constructor(S, index=np.hstack(categories), columns=bottom_levels)
    return Y_df, S_df, tags
show_doc(aggregate, title_level=3)

# 简单案例
df = pd.DataFrame(
    {
        'cat1': ['a', 'a', 'a', 'b'],
        'cat2': ['1', '2', '3', '2'],
        'y': [10, 20, 30, 40],
        'ds': ['2020-01-01', '2020-02-01', '2020-03-01', '2020-02-01']
    }
)
df['country'] = 'COUNTRY'
spec = [['country'], ['country', 'cat1'], ['country','cat1', 'cat2']]
Y_df, S_df, tags = aggregate(df, spec)
test_eq(
    Y_df.index.tolist(), 
    3 * ['COUNTRY'] +
    3 * ['COUNTRY/a'] +
    ['COUNTRY/b'] +
    ['COUNTRY/a/1', 'COUNTRY/a/2', 'COUNTRY/a/3'] +
    ['COUNTRY/b/2']
)
test_eq(Y_df.loc['COUNTRY', 'y'].values, [10, 60, 30])
test_eq(
    S_df.index,
    ['COUNTRY', 'COUNTRY/a', 'COUNTRY/b', 'COUNTRY/a/1', 'COUNTRY/a/2', 'COUNTRY/a/3', 'COUNTRY/b/2'],
)
test_eq(
    S_df.columns,
    ['COUNTRY/a/1', 'COUNTRY/a/2', 'COUNTRY/a/3', 'COUNTRY/b/2'],
)
expected_tags = {
    'country': ['COUNTRY'],
    'country/cat1': ['COUNTRY/a', 'COUNTRY/b'],
    'country/cat1/cat2': ['COUNTRY/a/1', 'COUNTRY/a/2', 'COUNTRY/a/3','COUNTRY/b/2'],
}
for k, actual in tags.items():
    test_eq(actual, expected_tags[k])

# test categoricals don't produce all combinations
df2 = df.copy()
for col in ('country', 'cat1', 'cat2'):
    df2[col] = df2[col].astype('category')

Y_df2, *_ = aggregate(df2, spec)
assert Y_df.shape[0] == Y_df2.shape[0]

# 测试不平衡数据集
max_tenure = 24
dates = pd.date_range(start='2019-01-31', freq='M', periods=max_tenure)
cohort_tenure = [24, 23, 22, 21]

ts_list = []

# 为每个队列创建时间序列
for i in range(len(cohort_tenure)):
    ts_list.append(
        generate_series(n_series=1, freq='M', min_length=cohort_tenure[i], max_length=cohort_tenure[i]).reset_index() \
            .assign(ult=i) \
            .assign(ds=dates[-cohort_tenure[i]:]) \
            .drop(columns=['unique_id'])
    )
df = pd.concat(ts_list, ignore_index=True)

# 创建类别
df['pen'] = np.where(df['ult'] < 2, 'a', 'b')
# 请注意,唯一标识符需要使用字符串。
df['ult'] = df['ult'].astype(str)

hier_levels = [
    ['pen'],
    ['pen', 'ult'],
]

hier_df, S_df, tags = aggregate(df=df, spec=hier_levels)
hier_df_before, S_df_before, _ = aggregate_before(df=df, spec=hier_levels)
test_eq(S_df, S_df_before)
test_eq(hier_df, hier_df_before)

df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/tourism.csv')
df = df.rename({'Trips': 'y', 'Quarter': 'ds'}, axis=1)
df.insert(0, 'Country', 'Australia')

# 分组结构
hiers_grouped = [['Country'],
                 ['Country', 'State'], 
                 ['Country', 'Purpose'], 
                 ['Country', 'State', 'Region'], 
                 ['Country', 'State', 'Purpose'], 
                 ['Country', 'State', 'Region', 'Purpose']]

# 严格的等级结构
hiers_strictly = [['Country'],
                  ['Country', 'State'], 
                  ['Country', 'State', 'Region']]

# 测试严格
hier_df, S_df, tags = aggregate(df=df, spec=hiers_strictly)
test_eq(len(hier_df), 6800)
test_eq(hier_df.index.nunique(), 85)
test_eq(S_df.shape, (85, 76))
test_eq(hier_df.index.unique(), S_df.index)
test_eq(len(tags), len(hiers_strictly))                  

# 测试分组
hier_df, S_df, tags = aggregate(df=df, spec=hiers_grouped)
test_eq(len(hier_df), 34_000)
test_eq(hier_df.index.nunique(), 425)
test_eq(S_df.shape, (425, 304))
test_eq(hier_df.index.unique(), S_df.index)
test_eq(len(tags), len(hiers_grouped))

df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/tourism.csv')
df = df.rename({'Trips': 'y', 'Quarter': 'ds'}, axis=1)
df.insert(0, 'Country', 'Australia')

#单元测试中的NaN值
df_nan = df.copy()
df_nan.loc[0, 'Region'] = float('nan')
test_fail(
    aggregate,
    contains='null values',
    args=(df_nan, hiers_strictly),
)

#单元测试空值
df_none = df.copy()
df_none.loc[0, 'Region'] = None
test_fail(
    aggregate,
    contains='null values',
    args=(df_none, hiers_strictly),
)

# 测试聚合与聚合前是否相等
for name, spec in zip(['strict', 'grouped'], [hiers_strictly, hiers_grouped]):
    with CodeTimer(f'{name} aggregation before'):
        Y_df_before, S_df_before, tags_before = aggregate_before(df=df, spec=spec)
    
    with CodeTimer(f'{name} aggregation now'):
        Y_df, S_df, tags = aggregate(df=df, spec=spec)
    
    np.testing.assert_allclose(
        Y_df['y'].values,
        Y_df_before['y'].values,
    )
    np.testing.assert_equal(S_df.values, S_df_before.values)
    
    test_eq(S_df.columns, S_df_before.columns)
    test_eq(S_df.index, S_df_before.index)
    
    test_eq(Y_df.columns, Y_df_before.columns)
    test_eq(Y_df.index, Y_df_before.index)

# 测试稀疏与非稀疏聚合的相等性
with CodeTimer('strict non-sparse aggregate'):
    Y_df, S_df, tags = aggregate(df=df, sparse_s=False, spec=hiers_strictly)

with CodeTimer('strict sparse aggregate'):
    Y_df_sparse, S_df_sparse, tags_sparse = aggregate(df=df, sparse_s=True, spec=hiers_strictly)

test_close(Y_df.y.values, Y_df_sparse.y.values)
test_eq(S_df.values, S_df_sparse.values)

test_eq(S_df.columns, S_df_sparse.columns)
test_eq(S_df.index, S_df_sparse.index)

test_eq(Y_df.columns, Y_df_sparse.columns)
test_eq(Y_df.index, Y_df_sparse.index)

with CodeTimer('grouped non-sparse aggregate'):
    Y_df, S_df, tags = aggregate(df=df, sparse_s=False, spec=hiers_grouped)

with CodeTimer('grouped sparse aggregate'):
    Y_df_sparse, S_df_sparse, tags_sparse = aggregate(df=df, sparse_s=True, spec=hiers_grouped)

test_close(Y_df.y.values, Y_df_sparse.y.values)
test_eq(S_df.values, S_df_sparse.values)

test_eq(S_df.columns, S_df_sparse.columns)
test_eq(S_df.index, S_df_sparse.index)

test_eq(Y_df.columns, Y_df_sparse.columns)
test_eq(Y_df.index, Y_df_sparse.index)

层次可视化


class HierarchicalPlot:
    """ Hierarchical Plot

    This class contains a collection of matplotlib visualization methods, suited for small
    to medium sized hierarchical series.

    **Parameters:**<br>
    `S`: pd.DataFrame with summing matrix of size `(base, bottom)`, see [aggregate function](https://nixtla.github.io/hierarchicalforecast/utils.html#aggregate).<br>
    `tags`: np.ndarray, with hierarchical aggregation indexes, where 
        each key is a level and its value contains tags associated to that level.<br><br>
    """
    def __init__(self,
                 S: pd.DataFrame,
                 tags: Dict[str, np.ndarray]):
        self.S = S
        self.tags = tags

    def plot_summing_matrix(self):
        """ 求和约束图

该方法仅绘制层次聚合约束矩阵 $\mathbf{S}$。
        """
        plt.figure(num=1, figsize=(4, 6), dpi=80, facecolor='w')
        plt.spy(self.S)
        plt.show()
        plt.close()

    def plot_series(self,
                    series: str,
                    Y_df: pd.DataFrame,
                    models: Optional[List[str]] = None,
                    level: Optional[List[int]] = None):
        """ Single Series plot

        **Parameters:**<br>
        `series`: str, string identifying the `'unique_id'` any-level series to plot.<br>
        `Y_df`: pd.DataFrame, hierarchically structured series ($\mathbf{y}_{[a,b]}$). 
                It contains columns `['unique_id', 'ds', 'y']`, it may have `'models'`.<br>
        `models`: List[str], string identifying filtering model columns.
        `level`: float list 0-100, confidence levels for prediction intervals available in `Y_df`.<br>

        **Returns:**<br>
        Single series plot with filtered models and prediction interval level.<br><br>
        """
        if series not in self.S.index:
            raise Exception(f'time series {series} not found')
        fig, ax = plt.subplots(1, 1, figsize = (20, 7))
        df_plot = Y_df.loc[series].set_index('ds')
        cols = models if models is not None else df_plot.columns
        cols_wo_levels = [col for col in cols if ('-lo-' not in col and '-hi-' not in col)]
        try:
            cmap = plt.get_cmap("tab10", 10)
        except AttributeError:
            cmap = plt.cm.get_cmap("tab10", 10)
        cmap = [cmap(i) for i in range(10)][:len(cols_wo_levels)]
        cmap_dict = dict(zip(cols_wo_levels, cmap))
        for col in cols_wo_levels:
            ax.plot(df_plot[col], linewidth=2, label=col, color=cmap_dict[col])
            if level is not None and col != 'y':
                for lv in level:
                    if f'{col}-lo-{lv}' not in df_plot.columns:
                        # 如果模型
                        # 没有等级
                        continue
                    ax.fill_between(
                        df_plot.dropna().index, 
                        df_plot[f'{col}-lo-{lv}'].dropna().values, 
                        df_plot[f'{col}-hi-{lv}'].dropna().values,
                        alpha=-lv/100 + 1,
                        color=cmap_dict[col],
                        label=f'{col}_level_{lv}'
                    )
        ax.set_title(f'{series} Forecast', fontsize=22)
        ax.set_xlabel('Timestamp [t]', fontsize=20)
        ax.legend(prop={'size': 15})
        ax.grid()
        ax.xaxis.set_major_locator(
            plt.MaxNLocator(min(max(len(df_plot) // 10, 1), 10))
        )
        for label in (ax.get_xticklabels() + ax.get_yticklabels()):
            label.set_fontsize(20)
                    
    def plot_hierarchically_linked_series(self,
                                          bottom_series: str,
                                          Y_df: pd.DataFrame,
                                          models: Optional[List[str]] = None,
                                          level: Optional[List[int]] = None):
        """ Hierarchically Linked Series plot

        **Parameters:**<br>
        `bottom_series`: str, string identifying the `'unique_id'` bottom-level series to plot.<br>
        `Y_df`: pd.DataFrame, hierarchically structured series ($\mathbf{y}_{[a,b]}$). 
                It contains columns ['unique_id', 'ds', 'y'] and models. <br>
        `models`: List[str], string identifying filtering model columns.
        `level`: float list 0-100, confidence levels for prediction intervals available in `Y_df`.<br>

        **Returns:**<br>
        Collection of hierarchilly linked series plots associated with the `bottom_series`
        and filtered models and prediction interval level.<br><br>
        """
        if bottom_series not in self.S.columns:
            raise Exception(f'bottom time series {bottom_series} not found')
        linked_series = self.S[bottom_series].loc[lambda x: x == 1.].index
        fig, axs = plt.subplots(len(linked_series), 1, figsize=(20, 2 * len(linked_series)))
        cols = models if models is not None else Y_df.drop(['ds'], axis=1)
        cols_wo_levels = [col for col in cols if ('-lo-' not in col and '-hi-' not in col)]
        cmap = plt.cm.get_cmap("tab10", 10)
        cmap = [cmap(i) for i in range(10)][:len(cols_wo_levels)]
        cmap_dict = dict(zip(cols_wo_levels, cmap))
        for idx, series in enumerate(linked_series):
            df_plot = Y_df.loc[[series]].set_index('ds')
            for col in cols_wo_levels:
                axs[idx].plot(df_plot[col], linewidth=2, label=col, color=cmap_dict[col])
                if level is not None and col != 'y':
                    for lv in level:
                        if f'{col}-lo-{lv}' not in df_plot.columns:
                            # 如果模型
                            # 没有等级
                            continue
                        axs[idx].fill_between(
                            df_plot.dropna().index, 
                            df_plot[f'{col}-lo-{lv}'].dropna().values, 
                            df_plot[f'{col}-hi-{lv}'].dropna().values,
                            alpha=-lv/100 + 1,
                            color=cmap_dict[col],
                            label=f'{col}_level_{lv}'
                        )
            axs[idx].set_title(f'{series}', fontsize=10)
            axs[idx].grid()
            axs[idx].get_xaxis().label.set_visible(False)
            axs[idx].legend().set_visible(False)
            axs[idx].xaxis.set_major_locator(
                plt.MaxNLocator(min(max(len(df_plot) // 10, 1), 10))
            )
            for label in (axs[idx].get_xticklabels() + axs[idx].get_yticklabels()):
                label.set_fontsize(10)
        plt.subplots_adjust(hspace=0.4)
        handles, labels = axs[0].get_legend_handles_labels()
        kwargs = dict(loc='lower center', 
                      prop={'size': 10}, 
                      bbox_to_anchor=(0, 0.05, 1, 1))
        if sys.version_info.minor > 7:
            kwargs['ncols'] = np.max([2, np.ceil(len(labels) / 2)])
        fig.legend(handles, labels, **kwargs)

    def plot_hierarchical_predictions_gap(self,
                                          Y_df: pd.DataFrame,
                                          models: Optional[List[str]] = None,
                                          xlabel: Optional[str] = None,
                                          ylabel: Optional[str] = None,
                                          ):
        """ Hierarchically Predictions Gap plot

        **Parameters:**<br>
        `Y_df`: pd.DataFrame, hierarchically structured series ($\mathbf{y}_{[a,b]}$). 
                It contains columns ['unique_id', 'ds', 'y'] and models. <br>
        `models`: List[str], string identifying filtering model columns.
        `xlabel`: str, string for the plot's x axis label.
        `ylable`: str, string for the plot's y axis label.

        **Returns:**<br>
        Plots of aggregated predictions at different levels of the hierarchical structure.
        The aggregation is performed according to the tag levels see 
        [aggregate function](https://nixtla.github.io/hierarchicalforecast/utils.html).<br><br>
        """
        # 解析预测数据框
        horizon_dates = Y_df['ds'].unique()
        cols = models if models is not None else Y_df.drop(['ds', 'y'], axis=1).columns
        
        # 在标签级别上预测情节
        fig, ax = plt.subplots(figsize=(8, 5))
        
        if 'y' in Y_df.columns:
            idx_top = self.S.sum(axis=1).idxmax()
            y_plot = Y_df.loc[idx_top].y.values
            plt.plot(horizon_dates, y_plot, label='True')

        ys = []
        for tag in self.tags:
            y_plot = sum([Y_df[cols].loc[Y_df.index == idx].values \
                          for idx in self.tags[tag]])
            plt.plot(horizon_dates, y_plot, label=f'Level: {tag}')
            
            ys.append(y_plot[:,None])

        plt.title('Predictions Accumulated Difference')
        if ylabel is not None:
            plt.ylabel(ylabel)
        if xlabel is not None:
            plt.xlabel(xlabel)

        plt.legend()
        plt.grid()
        plt.show()
show_doc(HierarchicalPlot, title_level=3)
show_doc(HierarchicalPlot.plot_summing_matrix, 
         name='plot_summing_matrix', title_level=3)
show_doc(HierarchicalPlot.plot_series, 
         name='plot_series', title_level=3)
show_doc(HierarchicalPlot.plot_hierarchically_linked_series, 
         name='plot_hierarchically_linked_series', title_level=3)
show_doc(HierarchicalPlot.plot_hierarchical_predictions_gap,
         name='plot_hierarchical_predictions_gap', title_level=3)

hplots = HierarchicalPlot(S=S_df, tags=tags)
hplots.plot_summing_matrix()

hier_df['Model'] = hier_df['y'] * 1.1
hier_df['Model-lo-80'] = hier_df['Model'] - 0.1 * hier_df['Model']
hier_df['Model-hi-80'] = hier_df['Model'] + 0.1 * hier_df['Model']
hier_df['Model-lo-90'] = hier_df['Model'] - 0.2 * hier_df['Model']
hier_df['Model-hi-90'] = hier_df['Model'] + 0.2 * hier_df['Model']
hplots.plot_series(
    series='Australia', 
    Y_df=hier_df,
    level=[80, 90]
)

hplots.plot_series(series='Australia', 
                   Y_df=hier_df)

hplots.plot_hierarchically_linked_series(
    bottom_series='Australia/Western Australia/Experience Perth/Visiting', 
    Y_df=hier_df,
    level=[80, 90]
)

hplots.plot_hierarchically_linked_series(
    bottom_series='Australia/Western Australia/Experience Perth/Visiting', 
    Y_df=hier_df,
)

# 仅包含一个值的测试系列
hplots.plot_hierarchically_linked_series(
    bottom_series='Australia/Western Australia/Experience Perth/Visiting', 
    Y_df=hier_df.groupby('unique_id').tail(1),
)

hplots.plot_hierarchical_predictions_gap(Y_df=hier_df.drop(columns='y'), models=['Model'])
from statsforecast.core import StatsForecast
from statsforecast.models import AutoARIMA, ETS, Naive
from datasetsforecast.hierarchical import HierarchicalData

Y_df, S, tags = HierarchicalData.load('./data', 'Labour')
Y_df['ds'] = pd.to_datetime(Y_df['ds'])

Y_test_df  = Y_df.groupby('unique_id').tail(24)
Y_train_df = Y_df.drop(Y_test_df.index)
Y_test_df  = Y_test_df.set_index('unique_id')
Y_train_df = Y_train_df.set_index('unique_id')

fcst = StatsForecast(
    df=Y_train_df, 
    #模型列表=[自动ARIMA(季节长度=12), 朴素模型()] 
    models=[ETS(season_length=12, model='AAZ')],
    freq='MS', 
    n_jobs=-1
)
Y_hat_df = fcst.forecast(h=24)

# 不同聚合的情节预测差异
# 国家级别、国家/地区、国家/性别/地区……
hplots = HierarchicalPlot(S=S, tags=tags)

hplots.plot_hierarchical_predictions_gap(
    Y_df=Y_hat_df, models='ETS',
    xlabel='Month', ylabel='Predictions',
)

外部预测适配器



# 将等级转换为输出分位数名称
def level_to_outputs(level:Iterable[int]):
    """ Converts list of levels into output names matching StatsForecast and NeuralForecast methods.

    **Parameters:**<br>
    `level`: int list [0,100]. Probability levels for prediction intervals.<br>

    **Returns:**<br>
    `output_names`: str list. String list with output column names.
    """
    qs = sum([[50-l/2, 50+l/2] for l in level], [])
    output_names = sum([[f'-lo-{l}', f'-hi-{l}'] for l in level], [])

    sort_idx = np.argsort(qs)
    quantiles = np.array(qs)[sort_idx]

    # 添加默认中位数
    quantiles = np.concatenate([np.array([50]), quantiles]) / 100
    output_names = list(np.array(output_names)[sort_idx])
    output_names.insert(0, '-median')
    
    return quantiles, output_names

# 将分位数转换为输出分位数名称
def quantiles_to_outputs(quantiles:Iterable[float]):
    """Converts list of quantiles into output names matching StatsForecast and NeuralForecast methods.

    **Parameters:**<br>
    `quantiles`: float list [0., 1.]. Alternative to level, quantiles to estimate from y distribution.<br>

    **Returns:**<br>
    `output_names`: str list. String list with output column names.
    """
    output_names = []
    for q in quantiles:
        if q<.50:
            output_names.append(f'-lo-{np.round(100-200*q,2)}')
        elif q>.50:
            output_names.append(f'-hi-{np.round(100-200*(1-q),2)}')
        else:
            output_names.append('-median')
    return quantiles, output_names


# 给定样本预测的输入数组和输入的分位数/水平, 
# 输出一个包含分位数预测列的Pandas数据框
def samples_to_quantiles_df(samples: np.ndarray, 
                            unique_ids: Sequence[str], 
                            dates: List[str], 
                            quantiles: Optional[List[float]] = None,
                            level: Optional[List[int]] = None, 
                            model_name: Optional[str] = "model"):
    """ Transform Random Samples into HierarchicalForecast input.
    Auxiliary function to create compatible HierarchicalForecast input `Y_hat_df` dataframe.

    **Parameters:**<br>
    `samples`: numpy array. Samples from forecast distribution of shape [n_series, n_samples, horizon].<br>
    `unique_ids`: string list. Unique identifiers for each time series.<br>
    `dates`: datetime list. List of forecast dates.<br>
    `quantiles`: float list in [0., 1.]. Alternative to level, quantiles to estimate from y distribution.<br>
    `level`: int list in [0,100]. Probability levels for prediction intervals.<br>
    `model_name`: string. Name of forecasting model.<br>

    **Returns:**<br>
    `quantiles`: float list in [0., 1.]. quantiles to estimate from y distribution .<br>
    `Y_hat_df`: pd.DataFrame. With base quantile forecasts with columns ds and models to reconcile indexed by unique_id.
    """
    
    # 获取数组的形状
    n_series, n_samples, horizon = samples.shape

    assert n_series == len(unique_ids)
    assert horizon == len(dates)
    assert (quantiles is not None) ^ (level is not None)  #检查是否仅输入了quantiles或levels中的一个

    #创建初始字典
    forecasts_mean = np.mean(samples, axis=1).flatten()
    unique_ids = np.repeat(unique_ids, horizon)
    ds = np.tile(dates, n_series)
    data = pd.DataFrame({"unique_id":unique_ids, "ds":ds, model_name:forecasts_mean})

    #创建分位数和分位数名称
    if level is not None:
        _quantiles, quantile_names = level_to_outputs(level)
    elif quantiles is not None:
        _quantiles, quantile_names = quantiles_to_outputs(quantiles)

    percentiles = [quantile * 100 for quantile in _quantiles]
    col_names = np.array([model_name + quantile_name for quantile_name in quantile_names])
    
    #将分位数添加到数据框
    forecasts_quantiles = np.percentile(samples, percentiles, axis=1)

    forecasts_quantiles = np.transpose(forecasts_quantiles, (1,2,0)) # [Q,H,N] -> [N,H,Q]
    forecasts_quantiles = forecasts_quantiles.reshape(-1,len(_quantiles))

    df = pd.DataFrame(data=forecasts_quantiles, 
                      columns=col_names)
    
    return _quantiles, pd.concat([data,df], axis=1).set_index('unique_id')
show_doc(samples_to_quantiles_df, title_level=3)


#level_to_outputs 单元测试
test_eq(
    level_to_outputs([80, 90]),
    ([0.5 , 0.05, 0.1 , 0.9 , 0.95], ['-median', '-lo-90', '-lo-80', '-hi-80', '-hi-90'])
)
test_eq(
    level_to_outputs([30]),
    ([0.5 , 0.35, 0.65], ['-median', '-lo-30', '-hi-30'])
)
#分位数到输出单元测试
test_eq(
    quantiles_to_outputs([0.2, 0.4, 0.6, 0.8]),
    ([0.2,0.4,0.6, 0.8], ['-lo-60.0', '-lo-20.0', '-hi-20.0', '-hi-60.0'])
)
test_eq(
    quantiles_to_outputs([0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9]),
    ([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], 
     ['-lo-80.0', '-lo-60.0', '-lo-40.0', '-lo-20.0', '-median', '-hi-20.0', '-hi-40.0', '-hi-60.0', '-hi-80.0'])
)


#samples_to_quantiles_df 单元测试
start_date = pd.Timestamp("2023-06-01")
end_date = pd.Timestamp("2023-06-10")
frequency = "D"  # 每日频率
dates = pd.date_range(start=start_date, end=end_date, freq=frequency).tolist()
samples = np.random.rand(3, 200, 10)
unique_ids = ['id1', 'id2', 'id3']
level = np.array([10, 50, 90])
quantiles=np.array([0.5, 0.05, 0.25, 0.45, 0.55, 0.75, 0.95])

ret_quantiles_1, ret_df_1 = samples_to_quantiles_df(samples, unique_ids, dates, level=level)
ret_quantiles_2, ret_df_2 = samples_to_quantiles_df(samples, unique_ids, dates, quantiles=quantiles)

test_eq(
    ret_quantiles_1,
    quantiles
)
test_eq(
    ret_df_1.columns,
    ['ds', 'model', 'model-median', 'model-lo-90', 'model-lo-50', 'model-lo-10', 'model-hi-10', 'model-hi-50', 'model-hi-90']
)
test_eq(
    ret_df_1.index,
    ['id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1',
       'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2',
       'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3']
)
test_eq(
    ret_quantiles_1, ret_quantiles_2
)
test_eq(
    ret_df_1.index, ret_df_2.index
)
samples_to_quantiles_df(samples, unique_ids, dates, level=level)
samples_to_quantiles_df(samples, unique_ids, dates, quantiles=quantiles)

If you find the code useful, please ⭐ us on Github