调解方法

大规模时间序列集合按照不同聚合层次组织,往往需要其预测遵循聚合约束,这就提出了创建能够进行一致性预测的新算法的挑战。

HierarchicalForecast 包提供了最全面的 Python 实现的层次预测算法集合,遵循经典的层次协调。所有方法都有一个 reconcile 函数,能够使用 numpy 数组协调基本预测。

大多数协调方法可以用以下方便的线性代数符号描述:

\[\tilde{\mathbf{y}}_{[a,b],\tau} = \mathbf{S}_{[a,b][b]} \mathbf{P}_{[b][a,b]} \hat{\mathbf{y}}_{[a,b],\tau}\]

其中 \(a, b\) 代表聚合层次和底层次,\(\mathbf{S}_{[a,b][b]}\) 包含层次聚合约束,而 \(\mathbf{P}_{[b][a,b]}\) 在不同的协调方法中有所不同。协调后的预测是 \(\tilde{\mathbf{y}}_{[a,b],\tau}\),基本预测为 \(\hat{\mathbf{y}}_{[a,b],\tau}\)


import warnings
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from typing import Dict, List, Optional, Union

import numpy as np
from numba import njit, prange
from quadprog import solve_qp
from scipy import sparse

from hierarchicalforecast.utils import is_strictly_hierarchical
from hierarchicalforecast.probabilistic_methods import Normality, Bootstrap, PERMBU

from fastcore.test import ExceptionExpected, test_close, test_eq, test_fail
from nbdev.showdoc import add_docs, show_doc

class HReconciler:
    fitted = False
    is_sparse_method = False
    insample = False
    P = None
    sampler = None

    def _get_sampler(self,
                     intervals_method,
                     S, P, y_hat,
                     y_insample, y_hat_insample,
                     W, sigmah, num_samples, seed, tags):
        if intervals_method == 'normality':
            sampler = Normality(
                        S=S, P=P,
                        y_hat=y_hat,
                        W=W, sigmah=sigmah,
                        seed=seed)
        elif intervals_method == 'permbu':
            sampler = PERMBU(
                        S=S, P=P,
                        y_hat = (S @ (P @ y_hat)),
                        tags=tags,
                        y_insample=y_insample, 
                        y_hat_insample=y_hat_insample,
                        sigmah=sigmah,
                        num_samples=num_samples,
                        seed=seed)
        elif intervals_method == 'bootstrap':
            sampler = Bootstrap(
                        S=S, P=P, 
                        y_hat=y_hat,
                        y_insample=y_insample,
                        y_hat_insample=y_hat_insample,
                        num_samples=num_samples,
                        seed=seed)
        else:
            sampler = None
        return sampler    

    def _reconcile(self,
                   S: np.ndarray,
                   P: np.ndarray,
                   y_hat: np.ndarray,
                   SP: np.ndarray = None,
                   level: Optional[List[int]] = None,
                   sampler: Optional[Union[Normality, PERMBU, Bootstrap]] = None):

        # 和解的痛苦
        res = {'mean': (S @ (P @ y_hat))}

        # 概率性调和
        if (level is not None) and (sampler is not None):
            # 更新结果字典
            # 向量化分位数
            quantiles = np.concatenate(
                [[(100 - lv) / 200, ((100 - lv) / 200) + lv / 100] for lv in level])
            quantiles = np.sort(quantiles)
            res = sampler.get_prediction_quantiles(res, quantiles)

        return res

    def predict(self,
                S: np.ndarray,
                y_hat: np.ndarray,
                level: Optional[List[int]] = None):
        """Predict using reconciler.

        Predict using fitted mean and probabilistic reconcilers.

        **Parameters:**<br>
        `S`: Summing matrix of size (`base`, `bottom`).<br>
        `y_hat`: Forecast values of size (`base`, `horizon`).<br>
        `level`: float list 0-100, confidence levels for prediction intervals.<br>

        **Returns:**<br>
        `y_tilde`: Reconciliated predictions.
        """
        if not self.fitted:
            raise Exception("This model instance is not fitted yet, Call fit method.")
   
        return self._reconcile(S=S, P=self.P, y_hat=y_hat,
                               sampler=self.sampler, level=level)

    def sample(self,
               num_samples: int):
        """Sample probabilistic coherent distribution.

        Generates n samples from a probabilistic coherent distribution.
        The method uses fitted mean and probabilistic reconcilers, defined by
        the `intervals_method` selected during the reconciler's
        instantiation. Currently available: `normality`, `bootstrap`, `permbu`.

        **Parameters:**<br>
        `num_samples`: int, number of samples generated from coherent distribution.<br>

        **Returns:**<br>
        `samples`: Coherent samples of size (`num_series`, `horizon`, `num_samples`).
        """
        if not self.fitted:
            raise Exception("This model instance is not fitted yet, Call fit method.")
        if self.sampler is None:
            raise Exception("This model instance does not have sampler. Call fit with `intervals_method`.")

        samples = self.sampler.get_samples(num_samples=num_samples)
        return samples
    
    def fit(self,
        *args,
        **kwargs):

        raise NotImplementedError("This method is not implemented yet.")
    
    def fit_predict(self,
        *args,
        **kwargs):

        raise NotImplementedError("This method is not implemented yet.")  

    __call__ = fit_predict      

1. 自下而上


class BottomUp(HReconciler):
    """Bottom Up Reconciliation Class.
    The most basic hierarchical reconciliation is performed using an Bottom-Up strategy. It was proposed for 
    the first time by Orcutt in 1968.
    The corresponding hierarchical \"projection\" matrix is defined as:
    $$\mathbf{P}_{\\text{BU}} = [\mathbf{0}_{\mathrm{[b],[a]}}\;|\;\mathbf{I}_{\mathrm{[b][b]}}]$$

    **Parameters:**<br>
    None

    **References:**<br>
    - [Orcutt, G.H., Watts, H.W., & Edwards, J.B.(1968). \"Data aggregation and information loss\". The American 
    Economic Review, 58 , 773(787)](http://www.jstor.org/stable/1815532).
    """
    insample = False

    def _get_PW_matrices(self, S, idx_bottom):
        n_hiers, n_bottom = S.shape
        P = np.eye(n_bottom, n_hiers, n_hiers - n_bottom, np.float32)
        if getattr(self, "intervals_method", False) is None:
            W = None
        else:
            W = np.eye(n_hiers, dtype=np.float32)
        return P, W

    def fit(self,
            S: np.ndarray,
            y_hat: np.ndarray,
            idx_bottom: np.ndarray,
            y_insample: Optional[np.ndarray] = None,
            y_hat_insample: Optional[np.ndarray] = None,
            sigmah: Optional[np.ndarray] = None,
            intervals_method: Optional[str] = None,
            num_samples: Optional[int] = None,
            seed: Optional[int] = None,            
            tags: Optional[Dict[str, np.ndarray]] = None):
        """Bottom Up Fit Method.

        **Parameters:**<br>
        `S`: Summing matrix of size (`base`, `bottom`).<br>
        `y_hat`: Forecast values of size (`base`, `horizon`).<br>
        `idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
        `level`: float list 0-100, confidence levels for prediction intervals.<br>
        `intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
        `**sampler_kwargs`: Coherent sampler instantiation arguments.<br>

        **Returns:**<br>
        `self`: object, fitted reconciler.
        """
        self.intervals_method = intervals_method
        self.P, self.W = self._get_PW_matrices(S=S, idx_bottom=idx_bottom)
        self.sampler = self._get_sampler(S=S,
                                         P=self.P,
                                         W=self.W,
                                         y_hat=y_hat,
                                         y_insample=y_insample,
                                         y_hat_insample=y_hat_insample,
                                         sigmah=sigmah, 
                                         intervals_method=intervals_method,
                                         num_samples=num_samples,
                                         seed=seed,
                                         tags=tags)
        self.fitted = True
        return self

    def fit_predict(self,
                    S: np.ndarray,
                    y_hat: np.ndarray,
                    idx_bottom: np.ndarray,
                    y_insample: Optional[np.ndarray] = None,
                    y_hat_insample: Optional[np.ndarray] = None,
                    sigmah: Optional[np.ndarray] = None,
                    level: Optional[List[int]] = None,
                    intervals_method: Optional[str] = None,
                    num_samples: Optional[int] = None,
                    seed: Optional[int] = None,
                    tags: Optional[Dict[str, np.ndarray]] = None):
        """BottomUp Reconciliation Method.

        **Parameters:**<br>
        `S`: Summing matrix of size (`base`, `bottom`).<br>
        `y_hat`: Forecast values of size (`base`, `horizon`).<br>
        `idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
        `level`: float list 0-100, confidence levels for prediction intervals.<br>
        `intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
        `**sampler_kwargs`: Coherent sampler instantiation arguments.<br>

        **Returns:**<br>
        `y_tilde`: Reconciliated y_hat using the Bottom Up approach.
        """
        # Fit 创建了 P、W 和采样器属性
        self.fit(S=S,
                 y_hat=y_hat,
                 y_insample=y_insample,
                 y_hat_insample=y_hat_insample,
                 sigmah=sigmah,
                 intervals_method=intervals_method, 
                 num_samples=num_samples,
                 seed=seed,
                 tags=tags, idx_bottom=idx_bottom)

        return self._reconcile(S=S, P=self.P, y_hat=y_hat,
                               sampler=self.sampler, level=level)

    __call__ = fit_predict

::: {#cell-10 .cell 0=‘出’ 1=‘口’}

class BottomUpSparse(BottomUp):
    """BottomUpSparse Reconciliation Class.

    This is the implementation of a Bottom Up reconciliation using the sparse
    matrix approach. It works much more efficient on datasets with many time series.
    [makoren: At least I hope so, I only checked up until ~20k time series, and
    there's no real improvement, it would be great to check for smth like 1M time
    series, where the dense S matrix really stops fitting in memory]

    See the parent class for more details.
    """
    is_sparse_method = True

    def _get_PW_matrices(self, S, idx_bottom):
        n_hiers, n_bottom = S.shape
        P = sparse.eye(n_bottom, n_hiers, n_hiers - n_bottom, np.float32, "csr")
        if getattr(self, "intervals_method", False) is None:
            W = None
        else:
            W = sparse.eye(n_hiers, dtype=np.float32, format="csr")
        return P, W

:::

show_doc(BottomUp, title_level=3)
show_doc(BottomUp.fit, name='BottomUp.fit', title_level=3)
show_doc(BottomUp.predict, name='BottomUp.predict', title_level=3)
show_doc(BottomUp.fit_predict, name='BottomUp.fit_predict', title_level=3)
show_doc(BottomUp.sample, name='BottomUp.sample', title_level=3)

S = np.array([[1., 1., 1., 1.],
              [1., 1., 0., 0.],
              [0., 0., 1., 1.],
              [1., 0., 0., 0.],
              [0., 1., 0., 0.],
              [0., 0., 1., 0.],
              [0., 0., 0., 1.]])
h = 2
_y = np.array([10., 5., 4., 2., 1.])
y_bottom = np.vstack([i * _y for i in range(1, 5)])
y_hat_bottom_insample = np.roll(y_bottom, 1)
y_hat_bottom_insample[:, 0] = np.nan
y_hat_bottom = np.vstack([i * np.ones(h) for i in range(1, 5)])
idx_bottom = [3, 4, 5, 6]
tags = {'level1': np.array([0]),
        'level2': np.array([1, 2]),
        'level3': idx_bottom}

# 层次结构中所有级别的sigmah
# sigmah 用于朴素方法
# 如本文所计算:
#https://otexts.com/fpp3/预测区间.html
y_base = S @ y_bottom
y_hat_base = S @ y_hat_bottom
y_hat_base_insample = S @ y_hat_bottom_insample
sigma = np.nansum((y_base - y_hat_base_insample) ** 2, axis=1) / (y_base.shape[1] - 1)
sigma = np.sqrt(sigma)
sigmah = sigma[:, None] * np.sqrt(np.vstack([np.arange(1, h + 1) for _ in range(y_base.shape[0])]))

cls_bottom_up = BottomUp()
test_eq(
    cls_bottom_up(S=S, y_hat=S @ y_hat_bottom, idx_bottom=idx_bottom)['mean'],
    S @ y_hat_bottom
)

# 测试自下而上的预测恢复
cls_bottom_up = BottomUp()
bu_bootstrap_intervals =  cls_bottom_up(
    S=S, y_hat=S @ y_hat_bottom, idx_bottom=idx_bottom,
)
test_eq(
    bu_bootstrap_intervals['mean'],
    S @ y_hat_bottom
)
# 隐藏
# 测试预测恢复与拟合 -> 预测
cls_bottom_up = BottomUp()
cls_bottom_up.fit(S=S, y_hat=S @ y_hat_bottom, idx_bottom=idx_bottom)
y_tilde = cls_bottom_up.predict(S=S, y_hat=S @ y_hat_bottom)['mean']
test_eq(
    y_tilde,
    S @ y_hat_bottom
)

# 测试未拟合消息,预测未拟合
cls_bottom_up = BottomUp()
test_fail(
    cls_bottom_up.predict,
    contains='not fitted yet',
    args=(S, S @ y_hat_bottom),
)

2. 自上而下


assert is_strictly_hierarchical(S, tags)
S_non_hier = np.array([
    [1., 1., 1., 1.], # 总计
    [1., 1., 0., 0.], # 城市1
    [0., 0., 1., 1.], # 城市2
    [1., 0., 1., 0.], # 跨性别者
    [0., 1., 0., 1.], # 无跨性别
    [1., 0., 0., 0.], # 城市1 - transgender
    [0., 1., 0., 0.], # 城市1 - no transgender
    [0., 0., 1., 0.], # 城市2 - transgender
    [0., 0., 0., 1.], # 城市2 - 无跨性别者
])
tags_non_hier = {
    'Country': np.array([0]),
    'Country/City': np.array([2, 1]),
    'Country/Transgender': np.array([3, 4]),
    'Country-City-Transgender': np.array([5, 6, 7, 8]),
}
assert not is_strictly_hierarchical(S_non_hier, tags_non_hier)

def _get_child_nodes(
    S: Union[np.ndarray, sparse.csr_matrix], tags: Dict[str, np.ndarray]
):
    if isinstance(S, sparse.spmatrix):
        S = S.toarray()
    level_names = list(tags.keys())
    nodes = OrderedDict()
    for i_level, level in enumerate(level_names[:-1]):
        parent = tags[level]
        child = np.zeros_like(S)
        idx_child = tags[level_names[i_level + 1]]
        child[idx_child] = S[idx_child]
        nodes_level = {}
        for idx_parent_node in parent:
            parent_node = S[idx_parent_node]
            idx_node = child * parent_node.astype(bool)
            (idx_node,) = np.where(idx_node.sum(axis=1) > 0)
            nodes_level[idx_parent_node] = [idx for idx in idx_child if idx in idx_node]
        nodes[level] = nodes_level
    return nodes

def _reconcile_fcst_proportions(S: np.ndarray, y_hat: np.ndarray,
                                tags: Dict[str, np.ndarray],
                                nodes: Dict[str, Dict[int, np.ndarray]],
                                idx_top: int):
    reconciled = np.zeros_like(y_hat)
    reconciled[idx_top] = y_hat[idx_top]
    level_names = list(tags.keys())
    for i_level, level in enumerate(level_names[:-1]):
        nodes_level = nodes[level]
        for idx_parent, idx_childs in nodes_level.items():
            fcst_parent = reconciled[idx_parent]
            childs_sum = y_hat[idx_childs].sum()
            for idx_child in idx_childs:
                reconciled[idx_child] = y_hat[idx_child] * fcst_parent / childs_sum
    return reconciled

class TopDown(HReconciler):
    """Top Down Reconciliation Class.

    The Top Down hierarchical reconciliation method, distributes the total aggregate predictions and decomposes 
    it down the hierarchy using proportions $\mathbf{p}_{\mathrm{[b]}}$ that can be actual historical values 
    or estimated.

    $$\mathbf{P}=[\mathbf{p}_{\mathrm{[b]}}\;|\;\mathbf{0}_{\mathrm{[b][a,b\;-1]}}]$$
    **Parameters:**<br>
    `method`: One of `forecast_proportions`, `average_proportions` and `proportion_averages`.<br>

    **References:**<br>
    - [CW. Gross (1990). \"Disaggregation methods to expedite product line forecasting\". Journal of Forecasting, 9 , 233–254. 
    doi:10.1002/for.3980090304](https://onlinelibrary.wiley.com/doi/abs/10.1002/for.3980090304).<br>
    - [G. Fliedner (1999). \"An investigation of aggregate variable time series forecast strategies with specific subaggregate 
    time series statistical correlation\". Computers and Operations Research, 26 , 1133–1149. 
    doi:10.1016/S0305-0548(99)00017-9](https://doi.org/10.1016/S0305-0548(99)00017-9).
    """
    def __init__(self, 
                 method: str):
        self.method = method
        self.insample = method in ['average_proportions', 'proportion_averages']

    def _get_PW_matrices(self,
                         S: np.ndarray,
                         y_hat: np.ndarray,
                         y_insample: np.ndarray,
                         tags: Optional[Dict[str, np.ndarray]] = None,
                         ):

        n_hiers, n_bottom = S.shape

        # 检查数据结构是否严格分层。
        if tags is not None:
            if not is_strictly_hierarchical(S, tags):
                raise ValueError(
                    "Top-down reconciliation requires strictly hierarchical structures."
                )
            idx_top = int(S.sum(axis=1).argmax())
            levels_ = dict(sorted(tags.items(), key=lambda x: len(x[1])))
            idx_bottom = levels_[list(levels_)[-1]]
            y_btm = y_insample[idx_bottom]
        else:
            idx_top = 0
            y_btm = y_insample[(n_hiers - n_bottom):]

        y_top = y_insample[idx_top]

        if self.method == 'average_proportions':
            prop = np.mean(y_btm / y_top, axis=1)
        elif self.method == 'proportion_averages':
            prop = np.mean(y_btm, axis=1) / np.mean(y_top)
        elif self.method == 'forecast_proportions':
            raise Exception(f'Fit method not implemented for {self.method} yet')
        else:
            raise Exception(f'Unknown method {self.method}')

        P = np.zeros_like(S, np.float64).T #浮点数64 如果属性值过小,这种情况在处理wiki2数据时会发生
        P[:, idx_top] = prop
        W = np.eye(n_hiers, dtype=np.float32)
        return P, W

    def fit(self, 
            S,
            y_hat,
            y_insample: np.ndarray,
            y_hat_insample: Optional[np.ndarray] = None,
            sigmah: Optional[np.ndarray] = None,
            intervals_method: Optional[str] = None,
            num_samples: Optional[int] = None,
            seed: Optional[int] = None,            
            tags: Optional[Dict[str, np.ndarray]] = None,
            idx_bottom: Optional[np.ndarray] = None):
        """TopDown Fit Method.

        **Parameters:**<br>
        `S`: Summing matrix of size (`base`, `bottom`).<br>
        `y_hat`: Forecast values of size (`base`, `horizon`).<br>
        `tags`: Each key is a level and each value its `S` indices.<br>
        `y_insample`: Insample values of size (`base`, `insample_size`). Optional for `forecast_proportions` method.<br>
        `idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
        `level`: float list 0-100, confidence levels for prediction intervals.<br>
        `intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
        `**sampler_kwargs`: Coherent sampler instantiation arguments.<br>

        **Returns:**<br>
        `self`: object, fitted reconciler.
        """
        self.intervals_method = intervals_method
        self.P, self.W = self._get_PW_matrices(S=S, y_hat=y_hat, 
                                               tags=tags, y_insample=y_insample)
        self.sampler = self._get_sampler(S=S,
                                         P=self.P,
                                         W=self.W,
                                         y_hat=y_hat,
                                         y_insample=y_insample,
                                         y_hat_insample=y_hat_insample,
                                         sigmah=sigmah, 
                                         intervals_method=intervals_method,
                                         num_samples=num_samples,
                                         seed=seed,
                                         tags=tags)
        self.fitted = True
        return self

    def fit_predict(self,
                    S: np.ndarray,
                    y_hat: np.ndarray,
                    tags: Dict[str, np.ndarray],
                    idx_bottom: np.ndarray = None,
                    y_insample: Optional[np.ndarray] = None,
                    y_hat_insample: Optional[np.ndarray] = None,
                    sigmah: Optional[np.ndarray] = None,
                    level: Optional[List[int]] = None,
                    intervals_method: Optional[str] = None,
                    num_samples: Optional[int] = None,
                    seed: Optional[int] = None):
        """Top Down Reconciliation Method.

        **Parameters:**<br>
        `S`: Summing matrix of size (`base`, `bottom`).<br>
        `y_hat`: Forecast values of size (`base`, `horizon`).<br>
        `tags`: Each key is a level and each value its `S` indices.<br>
        `y_insample`: Insample values of size (`base`, `insample_size`). Optional for `forecast_proportions` method.<br>
        `idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
        `level`: float list 0-100, confidence levels for prediction intervals.<br>
        `intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
        `**sampler_kwargs`: Coherent sampler instantiation arguments.<br>

        **Returns:**<br>
        `y_tilde`: Reconciliated y_hat using the Top Down approach.
        """
        if self.method == 'forecast_proportions':
            idx_top = int(S.sum(axis=1).argmax())
            levels_ = dict(sorted(tags.items(), key=lambda x: len(x[1])))
            if level is not None:
                warnings.warn('Prediction intervals not implement for `forecast_proportions`')
            nodes = _get_child_nodes(S=S, tags=levels_)
            reconciled = [_reconcile_fcst_proportions(S=S, y_hat=y_hat_[:, None], 
                                                      tags=levels_, 
                                                      nodes=nodes,
                                                      idx_top=idx_top) \
                          for y_hat_ in y_hat.T]
            reconciled = np.hstack(reconciled)
            return {'mean': reconciled}
        else:
            # Fit 创建了 P、W 和采样器属性
            self.fit(S=S,
                     y_hat=y_hat,
                     y_insample=y_insample,
                     y_hat_insample=y_hat_insample,
                     sigmah=sigmah,
                     intervals_method=intervals_method,
                     num_samples=num_samples,
                     seed=seed,
                     tags=tags, idx_bottom=idx_bottom)
            return self._reconcile(S=S, P=self.P, y_hat=y_hat,
                                   level=level, sampler=self.sampler)

    __call__ = fit_predict

class TopDownSparse(TopDown):
    """TopDownSparse Reconciliation Class.

    This is an implementation of top-down reconciliation using the sparse matrix
    approach. It works much more efficiently on data sets with many time series.

    See the parent class for more details.
    """

    is_sparse_method = True

    def _get_PW_matrices(
        self,
        S: sparse.csr_matrix,
        y_hat: np.ndarray,
        y_insample: np.ndarray,
        tags: Optional[Dict[str, np.ndarray]] = None,
    ):
        # 检查数据结构是否为严格层次结构。
        if tags is not None and not is_strictly_hierarchical(S, tags):
            raise ValueError(
                "Top-down reconciliation requires strictly hierarchical structures."
            )

        # Get the dimensions of the "summing" matrix.
        n_hiers, n_bottom = S.shape

        # 获取顶部节点和底部节点的样本内值。
        y_top = y_insample[0]
        y_btm = y_insample[(n_hiers - n_bottom) :]

        # 计算分解比例。
        if self.method == "average_proportions":
            prop = np.mean(y_btm / y_top, 1)
        elif self.method == "proportion_averages":
            prop = np.mean(y_btm, 1) / np.mean(y_top)
        elif self.method == "forecast_proportions":
            raise Exception(f"Fit method not yet implemented for {self.method}.")
        else:
            raise Exception(f"{self.method} is an unknown disaggregation method.")

        # Instantiate and allocate the "projection" matrix to distribute the
        # 将顶层节点的分解基础预测分配到底层节点。
        P = sparse.csr_matrix(
            (
                prop,
                np.zeros_like(prop, np.uint8),
                np.arange(len(prop) + 1, dtype=np.min_scalar_type(n_bottom)),
            ),
            shape=(n_bottom, n_hiers),
            dtype=np.float64,
        )

        # Instantiate and allocate the "weight" matrix.
        if getattr(self, "intervals_method", False) is None:
            W = None
        else:
            W = sparse.eye(n_hiers, dtype=np.float32, format="csr")

        return P, W
show_doc(TopDown, title_level=3)
show_doc(TopDown.fit, name='TopDown.fit', title_level=3)
show_doc(TopDown.predict, name='TopDown.predict', title_level=3)
show_doc(TopDown.fit_predict, name='TopDown.fit_predict', title_level=3)
show_doc(TopDown.sample, name='TopDown.sample', title_level=3)

# 我们能够恢复预测。 
# 从上到下,在这个例子中
# 由于时间序列
# 比例相同
# 穿越时空
# 但这并非普遍情况。
for method in ['forecast_proportions', 'average_proportions', 'proportion_averages']:
    cls_top_down = TopDown(method=method)
    if cls_top_down.insample:
        assert method in ['average_proportions', 'proportion_averages']
        test_close(
            cls_top_down(
                S=S, 
                y_hat=S @ y_hat_bottom, 
                y_insample=S @ y_bottom, 
                tags=tags
            )['mean'],
            S @ y_hat_bottom
        )
    else:
        test_close(
            cls_top_down(
                S=S, 
                y_hat=S @ y_hat_bottom, 
                tags=tags
            )['mean'],
            S @ y_hat_bottom
        )

for method in ["forecast_proportions", "average_proportions", "proportion_averages"]:
    cls_top_down = TopDownSparse(method=method)
    if cls_top_down.insample:
        assert method in ["average_proportions", "proportion_averages"]
        test_close(
            cls_top_down(
                S=sparse.csr_matrix(S),
                y_hat=S @ y_hat_bottom,
                y_insample=S @ y_bottom,
                tags=tags,
            )["mean"],
            S @ y_hat_bottom,
        )
    else:
        test_close(
            cls_top_down(S=sparse.csr_matrix(S), y_hat=S @ y_hat_bottom, tags=tags)[
                "mean"
            ],
            S @ y_hat_bottom,
        )

for method in ["forecast_proportions", "average_proportions", "proportion_averages"]:
    cls_top_down = TopDown(method=method)
    cls_top_down_sparse = TopDownSparse(method=method)
    if cls_top_down.insample:
        assert method in ["average_proportions", "proportion_averages"]
        test_close(
            cls_top_down(
                S=S,
                y_hat=S @ y_hat_bottom,
                y_insample=S @ y_bottom,
                tags=tags,
            )["mean"],
            cls_top_down_sparse(
                S=sparse.csr_matrix(S),
                y_hat=S @ y_hat_bottom,
                y_insample=S @ y_bottom,
                tags=tags,
            )["mean"],
            np.finfo(np.float64).eps,
        )
    else:
        test_close(
            cls_top_down(S=S, y_hat=S @ y_hat_bottom, tags=tags)["mean"],
            cls_top_down_sparse(
                S=sparse.csr_matrix(S),
                y_hat=S @ y_hat_bottom,
                y_insample=S @ y_bottom,
                tags=tags,
            )["mean"],
            np.finfo(np.float64).eps,
        )

3. 从中间向外推进


class MiddleOut(HReconciler):
    """Middle Out Reconciliation Class.

    This method is only available for **strictly hierarchical structures**. It anchors the base predictions 
    in a middle level. The levels above the base predictions use the Bottom-Up approach, while the levels 
    below use a Top-Down.

    **Parameters:**<br>
    `middle_level`: Middle level.<br>
    `top_down_method`: One of `forecast_proportions`, `average_proportions` and `proportion_averages`.<br>

    **References:**<br>
    - [Hyndman, R.J., & Athanasopoulos, G. (2021). \"Forecasting: principles and practice, 3rd edition:
    Chapter 11: Forecasting hierarchical and grouped series.\". OTexts: Melbourne, Australia. OTexts.com/fpp3 
    Accessed on July 2022.](https://otexts.com/fpp3/hierarchical.html)

    """
    def __init__(self, 
                 middle_level: str,
                 top_down_method: str):
        self.middle_level = middle_level
        self.top_down_method = top_down_method 
        self.insample = top_down_method in ['average_proportions', 'proportion_averages']

    def _get_PW_matrices(self, **kwargs):
        raise Exception('Not implemented')

    def fit(self, **kwargs):
        raise Exception('Not implemented')

    def predict(self, **kwargs):
        raise Exception('Not implemented')

    def fit_predict(self, 
                    S: np.ndarray,
                    y_hat: np.ndarray,
                    tags: Dict[str, np.ndarray],
                    y_insample: Optional[np.ndarray] = None,
                    level: Optional[List[int]] = None,
                    intervals_method: Optional[str] = None):
        """Middle Out Reconciliation Method.

        **Parameters:**<br>
        `S`: Summing matrix of size (`base`, `bottom`).<br>
        `y_hat`: Forecast values of size (`base`, `horizon`).<br>
        `tags`: Each key is a level and each value its `S` indices.<br>
        `y_insample`: Insample values of size (`base`, `insample_size`). Only used for `forecast_proportions`<br>

        **Returns:**<br>
        `y_tilde`: Reconciliated y_hat using the Middle Out approach.
        """
        if not is_strictly_hierarchical(S, tags):
            raise ValueError('Middle out reconciliation requires strictly hierarchical structures.')
        if self.middle_level not in tags.keys():
            raise ValueError('You have to provide a `middle_level` in `tags`.')

        levels_ = dict(sorted(tags.items(), key=lambda x: len(x[1])))
        reconciled = np.full_like(y_hat, fill_value=np.nan)
        cut_nodes = levels_[self.middle_level]
        # 自下而上的对账
        idxs_bu = []
        for node, idx_node in levels_.items():
            idxs_bu.append(idx_node)
            if node == self.middle_level:
                break
        idxs_bu = np.hstack(idxs_bu)
        #自下而上预测
        bu = BottomUp().fit_predict(
            S=np.fliplr(np.unique(S[idxs_bu], axis=1)), 
            y_hat=y_hat[idxs_bu], 
            idx_bottom=np.arange(len(idxs_bu))[-len(cut_nodes):]
        )
        reconciled[idxs_bu] = bu['mean']

        #自上而下
        child_nodes = _get_child_nodes(S, levels_)
        # 父节点包含中间层级中的每个节点
        # 作为键。每个节点的值是
        # 与该节点相连。
        parents = {node: {self.middle_level: np.array([node])} for node in cut_nodes}
        level_names = list(levels_.keys())
        for lv, lv_child in zip(level_names[:-1], level_names[1:]):
            # 如果lv不是从中间到下的一部分
            # 结构我们继续
            if lv not in list(parents.values())[0].keys():
                continue
            for idx_middle_out in parents.keys():
                idxs_parents = parents[idx_middle_out].values()
                complete_idxs_child = []
                for idx_parent, idxs_child in child_nodes[lv].items():
                    if any(idx_parent in val for val in idxs_parents):
                        complete_idxs_child.append(idxs_child)
                parents[idx_middle_out][lv_child] = np.hstack(complete_idxs_child)

        for node, levels_node in parents.items():
            idxs_node = np.hstack(list(levels_node.values()))
            S_node = S[idxs_node]
            S_node = S_node[:,~np.all(S_node == 0, axis=0)]
            counter = 0
            levels_node_ = deepcopy(levels_node)
            for lv_name, idxs_level in levels_node_.items():
                idxs_len = len(idxs_level)
                levels_node_[lv_name] = np.arange(counter, idxs_len + counter)
                counter += idxs_len
            td = TopDown(self.top_down_method).fit_predict(
                S=S_node, 
                y_hat=y_hat[idxs_node], 
                y_insample=y_insample[idxs_node] if y_insample is not None else None, 
                tags=levels_node_, 
            )
            reconciled[idxs_node] = td['mean']
        return {'mean': reconciled}

    __call__ = fit_predict

class MiddleOutSparse(MiddleOut):
    """MiddleOutSparse Reconciliation Class.

    This is an implementation of middle-out reconciliation using the sparse matrix
    approach. It works much more efficiently on data sets with many time series.

    See the parent class for more details.
    """

    # 尽管这是一种稀疏方法,但我们仍需要一个密集的表示形式。
    # "summing" matrix for the required transformations in the fit_predict method
    # 在进行自底向上和自顶向下的协调之前,我们可以避免冗余。
    # 转换。
    is_sparse_method = False

    def fit_predict(
        self,
        S: np.ndarray,
        y_hat: np.ndarray,
        tags: Dict[str, np.ndarray],
        y_insample: Optional[np.ndarray] = None,
        level: Optional[List[int]] = None,
        intervals_method: Optional[str] = None,
    ) -> Dict[str, np.ndarray]:
        # 检查数据结构是否为严格层次结构。
        if not is_strictly_hierarchical(S, tags):
            raise ValueError(
                "Middle-out reconciliation requires strictly hierarchical structures."
            )
        # 检查层级到节点映射中是否存在中间层级。
        if self.middle_level not in tags.keys():
            raise KeyError(f"{self.middle_level} is not a key in `tags`.")

        # 按节点数量对层级进行排序。
        levels = dict(sorted(tags.items(), key=lambda x: len(x[1])))
        # 分配一个数组来存储对账后的点预测结果。
        y_tilde = np.full_like(y_hat, np.nan)
        # 找出构成中间层的节点。
        cut_nodes = levels[self.middle_level]

        # 计算将中间层与下层分开的切割。
        cut_idx = max(cut_nodes) + 1

        # 从中间层开始,执行自底向上的稀疏协调。
        y_tilde[:cut_idx, :] = BottomUpSparse().fit_predict(
            S=sparse.csr_matrix(np.fliplr(np.unique(S[:cut_idx, :], axis=1))),
            y_hat=y_hat[:cut_idx, :],
            idx_bottom=None,
        )["mean"]

        # 从中间层开始执行稀疏自顶向下的协调。
        for cut_node in cut_nodes:
            # 找出割点子图的叶节点。
            leaf_idx = np.flatnonzero(S[cut_node, :])
            # 找到子图中所有与割点相连的节点。
            sub_idx = np.hstack(
                (cut_node, cut_idx + np.flatnonzero((np.any(S[cut_idx:, leaf_idx], 1))))
            )

            # Construct the "tags" argument for the cut node.
            if self.insample:
                # 内样本分解方法并不要求这一点。
                sub_tags = None
            else:
                # Disaggregating using forecast proportions requires the "tags" for
                # 子图。
                sub_tags = {}
                acc = 0
                for level_, nodes in levels.items():
                    # 查找该层子图中的所有节点。
                    nodes = np.intersect1d(nodes, sub_idx, True)
                    # 获取该层级的节点数量。
                    n = len(nodes)
                    # 排除切割节点以上的层级或其下的空层级。
                    if len(nodes) > 0:
                        sub_tags[level_] = np.arange(acc, n + acc)
                        acc += n

            # 从切割节点开始执行稀疏自顶向下的协调。
            y_tilde[sub_idx, :] = TopDownSparse(self.top_down_method).fit_predict(
                S=sparse.csr_matrix(S[sub_idx[:, None], leaf_idx]),
                y_hat=y_hat[sub_idx, :],
                y_insample=y_insample[sub_idx, :] if y_insample is not None else None,
                tags=sub_tags,
            )["mean"]

        return {"mean": y_tilde}

    __call__ = fit_predict
show_doc(MiddleOut, title_level=3)
show_doc(MiddleOut.fit_predict, title_level=3)

# 我们能够恢复预测。 
# 从中间向外扩展,在这个例子中
# 因为时间序列
# 比例相同
# 穿越时空
# 但这并非普遍情况。
for method in ['forecast_proportions', 'average_proportions', 'proportion_averages']:
    cls_middle_out = MiddleOut(middle_level='level2', top_down_method=method)
    if cls_middle_out.insample:
        assert method in ['average_proportions', 'proportion_averages']
        test_close(
            cls_middle_out(
                S=S, 
                y_hat=S @ y_hat_bottom, 
                y_insample=S @ y_bottom, 
                tags=tags
            )['mean'],
            S @ y_hat_bottom
        )
    else:
        test_close(
            cls_middle_out(
                S=S, 
                y_hat=S @ y_hat_bottom, 
                tags=tags
            )['mean'],
            S @ y_hat_bottom
        )

for method in ["forecast_proportions", "average_proportions", "proportion_averages"]:
    cls_middle_out = MiddleOutSparse(middle_level="level2", top_down_method=method)
    if cls_middle_out.insample:
        assert method in ["average_proportions", "proportion_averages"]
        test_close(
            cls_middle_out(
                S=S,
                y_hat=S @ y_hat_bottom,
                y_insample=S @ y_bottom,
                tags=tags,
            )["mean"],
            S @ y_hat_bottom,
        )
    else:
        test_close(
            cls_middle_out(S=S, y_hat=S @ y_hat_bottom, tags=tags)["mean"],
            S @ y_hat_bottom,
        )

for method in ["forecast_proportions", "average_proportions", "proportion_averages"]:
    cls_middle_out = MiddleOut(middle_level="level2", top_down_method=method)
    cls_middle_out_sparse = MiddleOutSparse(
        middle_level="level2", top_down_method=method
    )
    if cls_middle_out.insample:
        assert method in ["average_proportions", "proportion_averages"]
        test_close(
            cls_middle_out(
                S=S,
                y_hat=S @ y_hat_bottom,
                y_insample=S @ y_bottom,
                tags=tags,
            )["mean"],
            cls_middle_out_sparse(
                S=S,
                y_hat=S @ y_hat_bottom,
                y_insample=S @ y_bottom,
                tags=tags,
            )["mean"],
            np.finfo(np.float64).eps,
        )
    else:
        test_close(
            cls_middle_out(S=S, y_hat=S @ y_hat_bottom, tags=tags)["mean"],
            cls_middle_out_sparse(
                S=S,
                y_hat=S @ y_hat_bottom,
                y_insample=S @ y_bottom,
                tags=tags,
            )["mean"],
            np.finfo(np.float64).eps,
        )

4. 最小迹


def crossprod(x):
    return x.T @ x

class MinTrace(HReconciler):
    """MinTrace Reconciliation Class.

    This reconciliation algorithm proposed by Wickramasuriya et al. depends on a generalized least squares estimator 
    and an estimator of the covariance matrix of the coherency errors $\mathbf{W}_{h}$. The Min Trace algorithm 
    minimizes the squared errors for the coherent forecasts under an unbiasedness assumption; the solution has a 
    closed form.<br>

    $$
    \mathbf{P}_{\\text{MinT}}=\\left(\mathbf{S}^{\intercal}\mathbf{W}_{h}\mathbf{S}\\right)^{-1}
    \mathbf{S}^{\intercal}\mathbf{W}^{-1}_{h}
    $$

    **Parameters:**<br>
    `method`: str, one of `ols`, `wls_struct`, `wls_var`, `mint_shrink`, `mint_cov`.<br>
    `nonnegative`: bool, reconciled forecasts should be nonnegative?<br>
    `mint_shr_ridge`: float=2e-8, ridge numeric protection to MinTrace-shr covariance estimator.<br>
    `num_threads`: int=1, number of threads to use for solving the optimization problems (when nonnegative=True).

    **References:**<br>
    - [Wickramasuriya, S. L., Athanasopoulos, G., & Hyndman, R. J. (2019). \"Optimal forecast reconciliation for
    hierarchical and grouped time series through trace minimization\". Journal of the American Statistical Association, 
    114 , 804–819. doi:10.1080/01621459.2018.1448825.](https://robjhyndman.com/publications/mint/).
    - [Wickramasuriya, S.L., Turlach, B.A. & Hyndman, R.J. (2020). \"Optimal non-negative
    forecast reconciliation". Stat Comput 30, 1167–1182,
    https://doi.org/10.1007/s11222-020-09930-0](https://robjhyndman.com/publications/nnmint/).
    """
    def __init__(self, 
                 method: str,
                 nonnegative: bool = False,
                 mint_shr_ridge: Optional[float] = 2e-8,
                 num_threads: int = 1):
        self.method = method
        self.nonnegative = nonnegative
        self.insample = method in ['wls_var', 'mint_cov', 'mint_shrink']
        if method == 'mint_shrink':
            self.mint_shr_ridge = mint_shr_ridge
        self.num_threads = num_threads
        if not self.nonnegative and self.num_threads > 1:
            warnings.warn('`num_threads` is only used when `nonnegative=True`')

    def _get_PW_matrices(self, 
                  S: np.ndarray,
                  y_hat: np.ndarray,
                  y_insample: Optional[np.ndarray] = None,
                  y_hat_insample: Optional[np.ndarray] = None,
                  idx_bottom: Optional[List[int]] = None,):
        # shape residuals_insample (n_hiers, obs)
        res_methods = ['wls_var', 'mint_cov', 'mint_shrink']
        if self.method in res_methods and y_insample is None and y_hat_insample is None:
            raise ValueError(f"For methods {', '.join(res_methods)} you need to pass residuals")
        n_hiers, n_bottom = S.shape
        n_aggs = n_hiers - n_bottom
        # Construct J and U.T
        J = np.concatenate((np.zeros((n_bottom, n_aggs), dtype=np.float64), S[n_aggs:]), axis=1)
        Ut = np.concatenate((np.eye(n_aggs, dtype=np.float64), -S[:n_aggs]), axis=1)
        if self.method == 'ols':
            W = np.eye(n_hiers)
            UtW = Ut
        elif self.method == 'wls_struct':
            Wdiag = np.sum(S, axis=1, dtype=np.float64)
            UtW = Ut * Wdiag
            W = np.diag(Wdiag)
        elif self.method in res_methods and y_insample is not None and y_hat_insample is not None:         
            # Residuals with shape (obs, n_hiers)
            residuals = (y_insample - y_hat_insample).T
            n, _ = residuals.shape

            # Protection: against overfitted model
            residuals_sum = np.sum(residuals, axis=0)
            zero_residual_prc = np.abs(residuals_sum) < 1e-4
            zero_residual_prc = np.mean(zero_residual_prc)
            if zero_residual_prc > .98:
                raise Exception(f'Insample residuals close to 0, zero_residual_prc={zero_residual_prc}. Check `Y_df`')

            if self.method == 'wls_var':
                Wdiag = np.nansum(residuals**2, axis=0, dtype=np.float64) / residuals.shape[0]
                Wdiag += np.full(n_hiers, 2e-8, dtype=np.float64)
                W = np.diag(Wdiag)
                UtW = Ut * Wdiag
            elif self.method == 'mint_cov':
                # Compute nans
                nan_mask = np.isnan(residuals.T)
                if np.any(nan_mask):
                    W = _ma_cov(residuals.T, ~nan_mask)
                else:
                    W = np.cov(residuals.T)

                UtW = Ut @ W
            elif self.method == 'mint_shrink':
                # Compute nans
                nan_mask = np.isnan(residuals.T)
                # Compute shrunk empirical covariance
                if np.any(nan_mask):
                    W = _shrunk_covariance_schaferstrimmer_with_nans(residuals.T, ~nan_mask, self.mint_shr_ridge)
                else:
                    W = _shrunk_covariance_schaferstrimmer_no_nans(residuals.T, self.mint_shr_ridge)
                
                UtW = Ut @ W
        else:
            raise ValueError(f'Unknown reconciliation method {self.method}')

        try:
            P = (J - np.linalg.solve(UtW[:, n_aggs:] @ Ut.T[n_aggs:] + UtW[:, :n_aggs], UtW[:, n_aggs:] @ J.T[n_aggs:]).T @ Ut)
        except np.linalg.LinAlgError:
            if self.method == "mint_shrink":
                raise Exception(f"min_trace ({self.method}) is ill-conditioned. Increase the value of parameter 'mint_shr_ridge' or use another reconciliation method.")            
            else:
                raise Exception(f'min_trace ({self.method}) is ill-conditioned. Please use another reconciliation method.')

        return P, W

    def fit(self,
            S,
            y_hat,
            y_insample: Optional[np.ndarray] = None,
            y_hat_insample: Optional[np.ndarray] = None,
            sigmah: Optional[np.ndarray] = None,
            intervals_method: Optional[str] = None,
            num_samples: Optional[int] = None,
            seed: Optional[int] = None,            
            tags: Optional[Dict[str, np.ndarray]] = None,
            idx_bottom: Optional[np.ndarray] = None):
        """MinTrace Fit Method.

        **Parameters:**<br>
        `S`: Summing matrix of size (`base`, `bottom`).<br>
        `y_hat`: Forecast values of size (`base`, `horizon`).<br>
        `tags`: Each key is a level and each value its `S` indices.<br>
        `y_insample`: Insample values of size (`base`, `insample_size`). Optional for `forecast_proportions` method.<br>
        `idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
        `level`: float list 0-100, confidence levels for prediction intervals.<br>
        `intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
        `**sampler_kwargs`: Coherent sampler instantiation arguments.<br>

        **Returns:**<br>
        `self`: object, fitted reconciler.
        """
        self.y_hat = y_hat
        self.P, self.W = self._get_PW_matrices(S=S, y_hat=y_hat, 
                                               y_insample=y_insample, y_hat_insample=y_hat_insample,
                                               idx_bottom=idx_bottom)

        if self.nonnegative:
            _, n_bottom = S.shape
            W_inv = np.linalg.pinv(self.W)
            negatives = y_hat < 0
            if negatives.any():
                warnings.warn('Replacing negative forecasts with zero.')
                y_hat = np.copy(y_hat)
                y_hat[negatives] = 0.
            # Quadratic progamming formulation
            # here we are solving the quadratic programming problem
            # formulated in the origial paper
            # https://robjhyndman.com/publications/nnmint/
            # The library quadprog was chosen
            # based on these benchmarks:
            # https://scaron.info/blog/quadratic-programming-in-python.html
            a = S.T @ W_inv
            G = a @ S
            try:
                _ = np.linalg.cholesky(G)
            except np.linalg.LinAlgError:
                raise Exception(f"min_trace ({self.method}) is ill-conditioned. Try setting nonnegative=False or use another reconciliation method.")            
            C = np.eye(n_bottom)
            b = np.zeros(n_bottom)
            # the quadratic programming problem
            # returns the forecasts of the bottom series
            if self.num_threads == 1:
                bottom_fcts = np.apply_along_axis(lambda y_hat: solve_qp(G=G, a=a @ y_hat, C=C, b=b)[0], 
                                                  axis=0, arr=y_hat)
            else:
                futures = []
                with ThreadPoolExecutor(self.num_threads) as executor:
                    for j in range(y_hat.shape[1]):
                        future = executor.submit(solve_qp, G=G, a=a @ y_hat[:, j], C=C, b=b)
                        futures.append(future)
                    bottom_fcts = np.hstack([f.result()[0][:, None] for f in futures])
            if not np.all(bottom_fcts > -1e-8):
                raise Exception('nonnegative optimization failed')
            # remove negative values close to zero
            bottom_fcts = np.clip(np.float32(bottom_fcts), a_min=0, a_max=None)
            self.y_hat = S @ bottom_fcts # Hack

            # Overwrite P, W and sampler attributes with BottomUp's
            self.P, self.W = BottomUp()._get_PW_matrices(S=S, idx_bottom=idx_bottom)            

        self.sampler = self._get_sampler(S=S,
                                         P=self.P,
                                         W=self.W,
                                         y_hat=y_hat,
                                         y_insample=y_insample,
                                         y_hat_insample=y_hat_insample,
                                         sigmah=sigmah, 
                                         intervals_method=intervals_method,
                                         num_samples=num_samples,
                                         seed=seed,
                                         tags=tags)
        self.fitted = True
        return self

    def fit_predict(self,
                    S: np.ndarray,
                    y_hat: np.ndarray,
                    idx_bottom: np.ndarray = None,
                    y_insample: Optional[np.ndarray] = None,
                    y_hat_insample: Optional[np.ndarray] = None,
                    sigmah: Optional[np.ndarray] = None,
                    level: Optional[List[int]] = None,
                    intervals_method: Optional[str] = None,
                    num_samples: Optional[int] = None,
                    seed: Optional[int] = None,                    
                    tags: Optional[Dict[str, np.ndarray]] = None):
        """MinTrace Reconciliation Method.

        **Parameters:**<br>
        `S`: Summing matrix of size (`base`, `bottom`).<br>
        `y_hat`: Forecast values of size (`base`, `horizon`).<br>
        `y_insample`: Insample values of size (`base`, `insample_size`). Only used by `wls_var`, `mint_cov`, `mint_shrink`<br>
        `y_hat_insample`: Insample fitted values of size (`base`, `insample_size`). Only used by `wls_var`, `mint_cov`, `mint_shrink`<br>
        `idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
        `level`: float list 0-100, confidence levels for prediction intervals.<br>
        `sampler`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>

        **Returns:**<br>
        `y_tilde`: Reconciliated y_hat using the MinTrace approach.
        """
        if self.nonnegative:
            if (level is not None) and intervals_method in ['bootstrap', 'permbu']:
                raise Exception('nonnegative reconciliation is not compatible with bootstrap forecasts')
            if idx_bottom is None:
                raise Exception('idx_bottom needed for nonnegative reconciliation')

        # Fit creates P, W and sampler attributes
        self.fit(S=S,
                 y_hat=y_hat,
                 y_insample=y_insample,
                 y_hat_insample=y_hat_insample,
                 sigmah=sigmah,
                 intervals_method=intervals_method,
                 num_samples=num_samples,
                 seed=seed,
                 tags=tags, idx_bottom=idx_bottom)

        return self._reconcile(S=S, P=self.P, y_hat=self.y_hat,
                               level=level, sampler=self.sampler)

    __call__ = fit_predict

@njit(nogil=True, cache=True, parallel=True, fastmath=True, error_model="numpy")
def _ma_cov(residuals: np.ndarray, not_nan_mask: np.ndarray):
    """掩码经验协方差矩阵。

    :meta private:
    """
    n_timeseries = residuals.shape[0]
    W = np.zeros((n_timeseries, n_timeseries), dtype=np.float64).T
    for i in prange(n_timeseries):
        not_nan_mask_i = not_nan_mask[i]
        for j in range(i + 1):
            not_nan_mask_j = not_nan_mask[j]
            not_nan_mask_ij = not_nan_mask_i & not_nan_mask_j   
            n_samples = np.sum(not_nan_mask_ij)
            # 仅在时间序列对中有足够数量的非NaN样本时才进行计算。
            if n_samples > 1:
                # 掩码残差
                residuals_i = residuals[i][not_nan_mask_ij]
                residuals_j = residuals[j][not_nan_mask_ij]
                residuals_i_mean = np.mean(residuals_i)
                residuals_j_mean = np.mean(residuals_j)
                X_i = (residuals_i - residuals_i_mean)
                X_j = (residuals_j - residuals_j_mean)
                # 经验协方差
                factor_emp_cov = np.float64(1 / (n_samples - 1))
                W[i, j] = W[j, i] = factor_emp_cov * np.sum(X_i * X_j)

    return W    

@njit(nogil=True, cache=True, parallel=True, fastmath=True, error_model="numpy")
def _shrunk_covariance_schaferstrimmer_no_nans(residuals: np.ndarray, mint_shr_ridge: float):
    """Shrink empirical covariance according to the following method:
        Schäfer, Juliane, and Korbinian Strimmer. 
        ‘A Shrinkage Approach to Large-Scale Covariance Matrix Estimation and 
        Implications for Functional Genomics’. Statistical Applications in 
        Genetics and Molecular Biology 4, no. 1 (14 January 2005). 
        https://doi.org/10.2202/1544-6115.1175.

    :meta private:
    """
    n_timeseries = residuals.shape[0]
    n_samples = residuals.shape[1]
    
    # 我们需要经验协方差,即方差的非对角线元素之和。 
    # 经验相关矩阵与非对角线平方和 
    # 经验相关矩阵。
    W = np.zeros((n_timeseries, n_timeseries), dtype=np.float64).T
    sum_var_emp_corr = np.float64(0.0)
    sum_sq_emp_corr = np.float64(0.0)
    factor_emp_cov = np.float64(1 / (n_samples - 1))
    factor_shrinkage = np.float64(1 / (n_samples * (n_samples - 1)))
    epsilon = np.float64(2e-8)
    for i in prange(n_timeseries):
        # Mean of the standardized residuals
        X_i = residuals[i] - np.mean(residuals[i])
        Xs_i = X_i / (np.std(residuals[i]) + epsilon)
        Xs_i_mean = np.mean(Xs_i)
        for j in range(i + 1):
            # 经验协方差
            X_j = residuals[j] - np.mean(residuals[j])
            W[i, j] = factor_emp_cov * np.sum(X_i * X_j)
            # 非对角元素之和
            if i != j:
                Xs_j = X_j / (np.std(residuals[j]) + epsilon)
                Xs_j_mean = np.mean(Xs_j)
                # 经验相关性的非对角线方差之和
                w = (Xs_i - Xs_i_mean) * (Xs_j - Xs_j_mean)
                w_mean = np.mean(w)
                sum_var_emp_corr += np.sum(np.square(w - w_mean))
                # 平方和经验相关性
                sum_sq_emp_corr += w_mean**2

    # 计算收缩强度 
    shrinkage = 1.0 - max(min((factor_shrinkage * sum_var_emp_corr) / (sum_sq_emp_corr + epsilon), 1.0), 0.0)
    # 缩小经验协方差
    for i in prange(n_timeseries):
        for j in range(i + 1):
            if i != j:    
                W[i, j] = W[j, i] = shrinkage * W[i, j]
            else:
                W[i, j] = W[j, i] = max(W[i, j], mint_shr_ridge)
    return W

@njit(nogil=True, cache=True, parallel=True, fastmath=True, error_model="numpy")
def _shrunk_covariance_schaferstrimmer_with_nans(residuals: np.ndarray, not_nan_mask: np.ndarray, mint_shr_ridge: float):
    """Shrink empirical covariance according to the following method:
        Schäfer, Juliane, and Korbinian Strimmer. 
        ‘A Shrinkage Approach to Large-Scale Covariance Matrix Estimation and 
        Implications for Functional Genomics’. Statistical Applications in 
        Genetics and Molecular Biology 4, no. 1 (14 January 2005). 
        https://doi.org/10.2202/1544-6115.1175.

    :meta private:
    """
    n_timeseries = residuals.shape[0]
    
    # 我们需要经验协方差,即方差的非对角线元素之和。 
    # 经验相关矩阵与非对角线平方和 
    # 经验相关矩阵。
    W = np.zeros((n_timeseries, n_timeseries), dtype=np.float64).T
    sum_var_emp_corr = np.float64(0.0)
    sum_sq_emp_corr = np.float64(0.0)
    epsilon = np.float64(2e-8)
    for i in prange(n_timeseries):
        not_nan_mask_i = not_nan_mask[i]
        for j in range(i + 1):
            not_nan_mask_j = not_nan_mask[j]
            not_nan_mask_ij = not_nan_mask_i & not_nan_mask_j   
            n_samples = np.sum(not_nan_mask_ij)
            # 仅在时间序列对中有足够数量的非NaN样本时才进行计算。
            if n_samples > 1:
                # 掩码残差
                residuals_i = residuals[i][not_nan_mask_ij]
                residuals_j = residuals[j][not_nan_mask_ij]
                residuals_i_mean = np.mean(residuals_i)
                residuals_j_mean = np.mean(residuals_j)
                X_i = (residuals_i - residuals_i_mean)
                X_j = (residuals_j - residuals_j_mean)
                # 经验协方差
                factor_emp_cov = np.float64(1 / (n_samples - 1))
                W[i, j] = factor_emp_cov * np.sum(X_i * X_j)
                # 非对角元素之和
                if i != j:
                    factor_var_emp_cor = np.float64(n_samples / (n_samples - 1)**3)
                    residuals_i_std = np.std(residuals_i) + epsilon
                    residuals_j_std = np.std(residuals_j) + epsilon
                    Xs_i = X_i / (residuals_i_std + epsilon)
                    Xs_j = X_j / (residuals_j_std + epsilon)
                    Xs_im_mean = np.mean(Xs_i)
                    Xs_jm_mean = np.mean(Xs_j)
                    # 经验相关性的非对角线方差之和
                    w = (Xs_i - Xs_im_mean) * (Xs_j - Xs_jm_mean)
                    w_mean = np.mean(w)
                    sum_var_emp_corr += factor_var_emp_cor * np.sum(np.square(w - w_mean))
                    # 平方和经验相关性
                    sum_sq_emp_corr += np.square(factor_emp_cov * n_samples * w_mean)

    # 计算收缩强度 
    shrinkage = 1.0 - max(min((sum_var_emp_corr) / (sum_sq_emp_corr + epsilon), 1.0), 0.0)

    # 缩小经验协方差
    for i in prange(n_timeseries):
        for j in range(i + 1):
            if i != j:    
                W[i, j] = W[j, i] = shrinkage * W[i, j]
            else:
                W[i, j] = W[j, i] = max(W[i, j], mint_shr_ridge)

    return W

class MinTraceSparse(MinTrace):
    """MinTraceSparse Reconciliation Class.

    This is the implementation of a subset of MinTrace features using the sparse
    matrix approach. It works much more efficient on datasets with many time series.

    See the parent class for more details.

    Currently supported:
    * Methods using diagonal W matrix, i.e. "ols", "wls_struct", "wls_var",
    * The standard MinT version (non-negative is not supported).

    Note: due to the numerical instability of the matrix inversion when creating the
    P matrix, the method is NOT guaranteed to give identical results to the non-sparse
    version.
    """
    is_sparse_method = True

    def _get_PW_matrices(
        self,
        S: Union[np.ndarray, sparse.spmatrix],
        y_hat: np.ndarray,
        y_insample: Optional[np.ndarray] = None,
        y_hat_insample: Optional[np.ndarray] = None,
        idx_bottom: Optional[List[int]] = None,
    ):
        # 形状残差_样本内(n_层级,观测值)
        res_methods = ["wls_var", "mint_cov", "mint_shrink"]
        diag_only_methods = ["ols", "wls_struct", "wls_var"]

        if self.method not in diag_only_methods:
            raise NotImplementedError(
                "Only the methods with diagonal W are supported as sparse operations"
            )

        S = sparse.csr_matrix(S)

        if self.method in res_methods and y_insample is None and y_hat_insample is None:
            raise ValueError(
                f"For methods {', '.join(res_methods)} you need to pass residuals"
            )
        n_hiers, n_bottom = S.shape

        if self.method == "ols":
            W_diag = np.ones(n_hiers)
        elif self.method == "wls_struct":
            W_diag = S @ np.ones((n_bottom,))
        elif self.method == "wls_var" and y_insample is not None and y_hat_insample is not None:
            # 形状为 (观测数, 层级数) 的残差
            residuals = (y_insample - y_hat_insample).T
            n, _ = residuals.shape

            # 保护:防止过拟合模型
            residuals_sum = np.sum(residuals, axis=0)
            zero_residual_prc = np.abs(residuals_sum) < 1e-4
            zero_residual_prc = np.mean(zero_residual_prc)
            if zero_residual_prc > 0.98:
                raise Exception(
                    f"Insample residuals close to 0, zero_residual_prc={zero_residual_prc}. Check `Y_df`"
                )

            # 保护:数据不可用/缺失的情况
            # makoren:我发现这种掩盖行为弊大于利,结果显而易见。
            # of nan-s can often be rubbish, I'd argue it's better to fail than give rubbish results, here
            # 代码在遇到方差向量中的NaN时会直接失败。
            # masked_res = np.ma.array(residuals, mask=np.isnan(residuals))
            # covm = np.ma.cov(masked_res, rowvar=False, allow_masked=True).data

            W_diag = np.var(residuals, axis=0, ddof=1)
        else:
            raise ValueError(f"Unknown reconciliation method {self.method}")

        if any(W_diag < 1e-8):
            raise Exception(
                f"min_trace ({self.method}) needs covariance matrix to be positive definite."
            )

        if any(np.isnan(W_diag)):
            raise Exception(
                f"min_trace ({self.method}) needs covariance matrix to be positive definite (not nan)."
            )

        M = sparse.spdiags(np.reciprocal(W_diag), 0, W_diag.size, W_diag.size)
        R = sparse.csr_matrix(S.T @ M)

        # 向量上P作用的实现:
        def get_P_action(y):
            b = R @ y

            A = sparse.linalg.LinearOperator(
                (b.size, b.size), matvec=lambda v: R @ (S @ v)
            )

            x_tilde, exit_code = sparse.linalg.bicgstab(A, b, atol=1e-5)

            return x_tilde

        P = sparse.linalg.LinearOperator(
            (S.shape[1], y_hat.shape[0]), matvec=get_P_action
        )
        W = sparse.spdiags(W_diag, 0, W_diag.size, W_diag.size)

        return P, W

    def fit(self,
            S: sparse.csr_matrix,
            y_hat: np.ndarray,
            y_insample: Optional[np.ndarray] = None,
            y_hat_insample: Optional[np.ndarray] = None,
            sigmah: Optional[np.ndarray] = None,
            intervals_method: Optional[str] = None,
            num_samples: Optional[int] = None,
            seed: Optional[int] = None,            
            tags: Optional[Dict[str, np.ndarray]] = None,
            idx_bottom: Optional[np.ndarray] = None):
        # 根据实际应用需要,对基础预测进行必要的剪裁,以使其与之相符。
        if self.nonnegative:
            self.y_hat = np.clip(y_hat, 0, None)
        else:
            self.y_hat = y_hat
        # 获取对账矩阵。
        self.P, self.W = self._get_PW_matrices(
            S=S, 
            y_hat=self.y_hat, 
            y_insample=y_insample, 
            y_hat_insample=y_hat_insample, 
            idx_bottom=idx_bottom,
        )

        if self.nonnegative:
            # 获取叶节点的数量。
            _, n_bottom = S.shape
            # 尽管现在已能确保P中的所有条目都 
            # 正向的,因为它作为线性算子在迭代中实现 
            # 解决稀疏线性系统的方法,我们需要调和以找到 
            # 如果任何一致的底层点预测值为负。
            y_tilde = self._reconcile(
                S=S, P=self.P, y_hat=self.y_hat, level=None, sampler=None
            )["mean"][-n_bottom:]
            # 查找是否有任何预测为负值。
            if np.any(y_tilde < 0):
                # 剪掉那些负面的预测。
                y_tilde = np.clip(y_tilde, 0, None)
                # 通过覆盖基准预测,强制非负一致性。 
                # 聚合的、裁剪后的底层预测。
                self.y_hat = S @ y_tilde
                # 用指定的属性覆盖P和W矩阵的属性。 
                # 自底向上协调以强制投影到非负数 
                # 相干子空间。
                self.P, self.W = BottomUpSparse()._get_PW_matrices(S=S, idx_bottom=None)  

        # 获取概率性协调的采样器。
        self.sampler = self._get_sampler(
            S=S,
            P=self.P,
            W=self.W,
            y_hat=self.y_hat,
            y_insample=y_insample,
            y_hat_insample=y_hat_insample,
            sigmah=sigmah,
            intervals_method=intervals_method,
            num_samples=num_samples,
            seed=seed,
            tags=tags,
        )
        # 将实例设置为已拟合。
        self.fitted = True
        return self
show_doc(MinTrace, title_level=3)
show_doc(MinTrace.fit, name='MinTrace.fit', title_level=3)
show_doc(MinTrace.predict, name='MinTrace.predict', title_level=3)
show_doc(MinTrace.fit_predict, name='MinTrace.fit_predict', title_level=3)
show_doc(MinTrace.sample, name='MinTrace.sample', title_level=3)

for method in ['ols', 'wls_struct', 'wls_var', 'mint_shrink']:
    for nonnegative in [False, True]:
        #测试非负行为
        # 我们应该能够恢复相同的预测结果。
        # 在这个例子中
        cls_min_trace = MinTrace(method=method, nonnegative=nonnegative)
        assert cls_min_trace.nonnegative is nonnegative
        if cls_min_trace.insample:
            assert method in ['wls_var', 'mint_cov', 'mint_shrink']
            test_close(
                cls_min_trace(
                    S=S, 
                    y_hat=S @ y_hat_bottom, 
                    y_insample=S @ y_bottom,
                    y_hat_insample=S @ y_hat_bottom_insample,
                    idx_bottom=idx_bottom if nonnegative else None
                )['mean'],
                S @ y_hat_bottom
            )
        else:
            test_close(
                cls_min_trace(
                    S=S, 
                    y_hat=S @ y_hat_bottom,
                    idx_bottom=idx_bottom if nonnegative else None
                )['mean'],
                S @ y_hat_bottom
            )
mintrace_1thr = MinTrace(method='ols', nonnegative=False, num_threads=1).fit(S=S, y_hat=S @ y_hat_bottom)
mintrace_2thr = MinTrace(method='ols', nonnegative=False, num_threads=2).fit(S=S, y_hat=S @ y_hat_bottom)
np.testing.assert_allclose(mintrace_1thr.y_hat, mintrace_2thr.y_hat)
with ExceptionExpected(regex='min_trace (mint_cov)*'):
    for nonnegative in [False, True]:
        cls_min_trace = MinTrace(method='mint_cov', nonnegative=nonnegative)
        cls_min_trace(
            S=S, 
            y_hat=S @ y_hat_bottom, 
            y_insample=S @ y_bottom,
            y_hat_insample=S @ y_hat_bottom_insample,
            idx_bottom=idx_bottom if nonnegative else None
        )

# MinTrace-shr covariance's stress test
diff_len_y_insample = S @ y_bottom
diff_len_y_hat_insample = S @ y_hat_bottom_insample
diff_len_y_insample[-1, :-1] = np.nan
diff_len_y_hat_insample[-1, :-1] = np.nan
cls_min_trace = MinTrace(method='mint_shrink')
result_min_trace = cls_min_trace(
    S=S,
    y_hat=S @ y_hat_bottom,
    y_insample=diff_len_y_insample,
    y_hat_insample=diff_len_y_hat_insample,
    idx_bottom=idx_bottom
)

#测试级别
for method in ['ols', 'wls_struct', 'wls_var', 'mint_shrink']:
    for nonnegative in [False, True]:
        cls_min_trace = MinTrace(method=method, nonnegative=nonnegative)
        test_close(
            cls_min_trace(
                S=S, 
                y_hat=S @ y_hat_bottom, 
                y_insample=S @ y_bottom,
                y_hat_insample=S @ y_hat_bottom_insample,
                idx_bottom=idx_bottom if nonnegative else None,
            )['mean'],
            S @ y_hat_bottom
        )

for method in ["ols", "wls_struct"]:
    for nonnegative in [False, True]:
        cls_min_trace = MinTraceSparse(method=method, nonnegative=nonnegative)
        test_close(
            cls_min_trace(
                S=S,
                y_hat=S @ y_hat_bottom,
                y_insample=S @ y_bottom,
                y_hat_insample=S @ y_hat_bottom_insample,
                idx_bottom=idx_bottom if nonnegative else None,
            )["mean"],
            S @ y_hat_bottom,
        )

5. 最优组合


class OptimalCombination(MinTrace):
    """Optimal Combination Reconciliation Class.

    This reconciliation algorithm was proposed by Hyndman et al. 2011, the method uses generalized least squares 
    estimator using the coherency errors covariance matrix. Consider the covariance of the base forecast 
    $\\textrm{Var}(\epsilon_{h}) = \Sigma_{h}$, the $\mathbf{P}$ matrix of this method is defined by:
    $$ \mathbf{P} = \\left(\mathbf{S}^{\intercal}\Sigma_{h}^{\dagger}\mathbf{S}\\right)^{-1}\mathbf{S}^{\intercal}\Sigma^{\dagger}_{h}$$
    where $\Sigma_{h}^{\dagger}$ denotes the variance pseudo-inverse. The method was later proven equivalent to 
    `MinTrace` variants.

    **Parameters:**<br>
    `method`: str, allowed optimal combination methods: 'ols', 'wls_struct'.<br>
    `nonnegative`: bool, reconciled forecasts should be nonnegative?<br>

    **References:**<br>
    - [Rob J. Hyndman, Roman A. Ahmed, George Athanasopoulos, Han Lin Shang (2010). \"Optimal Combination Forecasts for 
    Hierarchical Time Series\".](https://robjhyndman.com/papers/Hierarchical6.pdf).<br>
    - [Shanika L. Wickramasuriya, George Athanasopoulos and Rob J. Hyndman (2010). \"Optimal Combination Forecasts for 
    Hierarchical Time Series\".](https://robjhyndman.com/papers/MinT.pdf).
    - [Wickramasuriya, S.L., Turlach, B.A. & Hyndman, R.J. (2020). \"Optimal non-negative
    forecast reconciliation". Stat Comput 30, 1167–1182, 
    https://doi.org/10.1007/s11222-020-09930-0](https://robjhyndman.com/publications/nnmint/).
    """
    def __init__(self,
                 method: str,
                 nonnegative: bool = False,
                 num_threads: int = 1):
        comb_methods = ['ols', 'wls_struct']
        if method not in comb_methods:
            raise ValueError(f"Optimal Combination class does not support method: \"{method}\"")
        super().__init__(method=method, nonnegative=nonnegative, num_threads=num_threads)
        self.insample = False
show_doc(OptimalCombination, title_level=3)
show_doc(OptimalCombination.fit, name='OptimalCombination.fit', title_level=3)
show_doc(OptimalCombination.predict, name='OptimalCombination.predict', title_level=3)
show_doc(OptimalCombination.fit_predict, name='OptimalCombination.fit_predict', title_level=3)
show_doc(OptimalCombination.sample, name='OptimalCombination.sample', title_level=3)

for method in ['ols', 'wls_struct']:
    for nonnegative in [False, True]:
        #测试非负行为
        # 我们应该能够恢复相同的预测结果。
        # 在这个例子中
        cls_optimal_combination = OptimalCombination(method=method, nonnegative=nonnegative)
        test_close(
            cls_optimal_combination(
                S=S, 
                y_hat=S @ y_hat_bottom,
                idx_bottom=idx_bottom if nonnegative else None
            )['mean'],
            S @ y_hat_bottom
        )

#测试间隔
for method in ['ols', 'wls_struct']:
    for nonnegative in [False, True]:
        # 测试非负行为
        # 我们应该能够恢复相同的预测结果。
        # 在这个例子中
        cls_optimal_combination = OptimalCombination(method=method, nonnegative=nonnegative)
        test_close(
            cls_optimal_combination(
                S=S, 
                y_hat=S @ y_hat_bottom,
                idx_bottom=idx_bottom if nonnegative else None,
            )['mean'],
            S @ y_hat_bottom
        )

6. 经验风险最小化


@njit
def lasso(X: np.ndarray, y: np.ndarray, 
          lambda_reg: float, max_iters: int = 1_000,
          tol: float = 1e-4):
    # lasso循环坐标下降法
    n, feats = X.shape
    norms = (X ** 2).sum(axis=0)
    beta = np.zeros(feats, dtype=np.float32)
    beta_changes = np.zeros(feats, dtype=np.float32)
    residuals = y.copy()
    
    for it in range(max_iters):
        for i, betai in enumerate(beta):
            # 该特征接近于零,我们 
            # 继续下一个。
            # 在这种情况下,最优的βi= 0
            if abs(norms[i]) < 1e-8:
                continue
            xi = X[:, i]
            #我们计算归一化导数
            rho = betai + xi.flatten().dot(residuals) / norms[i] #(规范[i] + 1e-3)
            #软阈值
            beta[i] = np.sign(rho) * max(np.abs(rho) - lambda_reg * n / norms[i], 0.)#(规范[i] + 1e-3), 0.)
            beta_changes[i] = np.abs(betai - beta[i])
            if beta[i] != betai:
                residuals += (betai - beta[i]) * xi
        if max(beta_changes) < tol:
            break
    #打印(它)
    return beta

class ERM(HReconciler):
    """Optimal Combination Reconciliation Class.

    The Empirical Risk Minimization reconciliation strategy relaxes the unbiasedness assumptions from
    previous reconciliation methods like MinT and optimizes square errors between the reconciled predictions
    and the validation data to obtain an optimal reconciliation matrix P.
    
    The exact solution for $\mathbf{P}$ (`method='closed'`) follows the expression:
    $$\mathbf{P}^{*} = \\left(\mathbf{S}^{\intercal}\mathbf{S}\\right)^{-1}\mathbf{Y}^{\intercal}\hat{\mathbf{Y}}\\left(\hat{\mathbf{Y}}\hat{\mathbf{Y}}\\right)^{-1}$$

    The alternative Lasso regularized $\mathbf{P}$ solution (`method='reg_bu'`) is useful when the observations 
    of validation data is limited or the exact solution has low numerical stability.
    $$\mathbf{P}^{*} = \\text{argmin}_{\mathbf{P}} ||\mathbf{Y}-\mathbf{S} \mathbf{P} \hat{Y} ||^{2}_{2} + \lambda ||\mathbf{P}-\mathbf{P}_{\\text{BU}}||_{1}$$

    **Parameters:**<br>
    `method`: str, one of `closed`, `reg` and `reg_bu`.<br>
    `lambda_reg`: float, l1 regularizer for `reg` and `reg_bu`.<br>

    **References:**<br>
    - [Ben Taieb, S., & Koo, B. (2019). Regularized regression for hierarchical forecasting without 
    unbiasedness conditions. In Proceedings of the 25th ACM SIGKDD International Conference on Knowledge 
    Discovery & Data Mining KDD '19 (p. 1337-1347). New York, NY, USA: Association for Computing Machinery.](https://doi.org/10.1145/3292500.3330976).<br>
    """
    def __init__(self,
                 method: str,
                 lambda_reg: float = 1e-2):
        self.method = method
        self.lambda_reg = lambda_reg
        self.insample = True

    def _get_PW_matrices(self, 
                  S: np.ndarray,
                  y_hat: np.ndarray,
                  y_insample: np.ndarray,
                  y_hat_insample: np.ndarray,
                  idx_bottom: np.ndarray):
        n_hiers, n_bottom = S.shape
        # y_hat_insample shape (n_hiers, obs)
        # remove obs with nan values
        nan_idx = np.isnan(y_hat_insample).any(axis=0)
        y_insample = y_insample[:, ~nan_idx]
        y_hat_insample = y_hat_insample[:, ~nan_idx]
        #only using h validation steps to avoid 
        #computational burden
        #print(y_hat.shape)
        h = min(y_hat.shape[1], y_hat_insample.shape[1])
        y_hat_insample = y_hat_insample[:, -h:] # shape (h, n_hiers)
        y_insample = y_insample[:, -h:]
        if self.method == 'closed':
            B = np.linalg.inv(S.T @ S) @ S.T @ y_insample
            B = B.T
            P = np.linalg.pinv(y_hat_insample.T) @ B
            P = P.T
        elif self.method in ['reg', 'reg_bu']:
            X = np.kron(np.array(S, order='F'), np.array(y_hat_insample.T, order='F'))
            Pbu = np.zeros_like(S)
            if self.method == 'reg_bu':
                Pbu[idx_bottom] = S[idx_bottom]
            Pbu = Pbu.T
            Y = y_insample.T.flatten(order='F') - X @ Pbu.T.flatten(order='F')
            if self.lambda_reg is None:
                lambda_reg = np.max(np.abs(X.T.dot(Y)))
            else:
                lambda_reg = self.lambda_reg
            P = lasso(X, Y, lambda_reg)
            P = P + Pbu.T.flatten(order='F')
            P = P.reshape(-1, n_bottom, order='F').T
        else:
            raise ValueError(f'Unknown reconciliation method {self.method}')

        W = np.eye(n_hiers, dtype=np.float32)

        return P, W

    def fit(self,
            S,
            y_hat,
            y_insample,
            y_hat_insample,
            sigmah: Optional[np.ndarray] = None,
            intervals_method: Optional[str] = None,
            num_samples: Optional[int] = None,
            seed: Optional[int] = None,
            tags: Optional[Dict[str, np.ndarray]] = None,
            idx_bottom: Optional[np.ndarray] = None):
        """ERM Fit Method.

        **Parameters:**<br>
        `S`: Summing matrix of size (`base`, `bottom`).<br>
        `y_hat`: Forecast values of size (`base`, `horizon`).<br>
        `y_insample`: Train values of size (`base`, `insample_size`).<br>
        `y_hat_insample`: Insample train predictions of size (`base`, `insample_size`).<br>
        `idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
        `level`: float list 0-100, confidence levels for prediction intervals.<br>
        `intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
        `**sampler_kwargs`: Coherent sampler instantiation arguments.<br>

        **Returns:**<br>
        `self`: object, fitted reconciler.
        """
        self.P, self.W = self._get_PW_matrices(S=S,
                                               y_hat=y_hat,
                                               y_insample=y_insample,
                                               y_hat_insample=y_hat_insample,
                                               idx_bottom=idx_bottom)                                               
        self.sampler = self._get_sampler(S=S,
                                         P=self.P,
                                         W=self.W,
                                         y_hat=y_hat,
                                         y_insample=y_insample,
                                         y_hat_insample=y_hat_insample,
                                         sigmah=sigmah, 
                                         intervals_method=intervals_method,
                                         num_samples=num_samples,
                                         seed=seed,
                                         tags=tags)
        self.fitted = True
        return self

    def fit_predict(self,
                    S: np.ndarray,
                    y_hat: np.ndarray,
                    idx_bottom: np.ndarray = None,
                    y_insample: Optional[np.ndarray] = None,
                    y_hat_insample: Optional[np.ndarray] = None,
                    sigmah: Optional[np.ndarray] = None,
                    level: Optional[List[int]] = None,
                    intervals_method: Optional[str] = None,
                    num_samples: Optional[int] = None,
                    seed: Optional[int] = None,
                    tags: Optional[Dict[str, np.ndarray]] = None):
        """ERM Reconciliation Method.

        **Parameters:**<br>
        `S`: Summing matrix of size (`base`, `bottom`).<br>
        `y_hat`: Forecast values of size (`base`, `horizon`).<br>
        `y_insample`: Train values of size (`base`, `insample_size`).<br>
        `y_hat_insample`: Insample train predictions of size (`base`, `insample_size`).<br>
        `idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
        `level`: float list 0-100, confidence levels for prediction intervals.<br>
        `intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>

        **Returns:**<br>
        `y_tilde`: Reconciliated y_hat using the ERM approach.
        """
        # Fit 创建了 P、W 和采样器属性
        self.fit(S=S,
                 y_hat=y_hat,
                 y_insample=y_insample,
                 y_hat_insample=y_hat_insample,
                 sigmah=sigmah,
                 intervals_method=intervals_method,
                 num_samples=num_samples,
                 seed=seed,
                 tags=tags, idx_bottom=idx_bottom)

        return self._reconcile(S=S, P=self.P, y_hat=y_hat,
                               level=level, sampler=self.sampler)

    __call__ = fit_predict
show_doc(ERM, title_level=3)
show_doc(ERM.fit, name='ERM.fit', title_level=3)
show_doc(ERM.predict, name='ERM.predict', title_level=3)
show_doc(ERM.fit_predict, name='ERM.fit_predict', title_level=3)
show_doc(ERM.sample, name='ERM.sample', title_level=3)

for method in ['reg_bu']:
    cls_erm = ERM(method=method, lambda_reg=None)
    test_close(
        cls_erm(
            S=S, 
            y_hat=S @ y_hat_bottom,
            y_insample=S @ y_bottom,
            y_hat_insample=S @ y_hat_bottom_insample,
            idx_bottom=idx_bottom
        )['mean'],
        S @ y_hat_bottom
    )

# 测试间隔
for method in ['reg_bu']:
    cls_erm = ERM(method=method, lambda_reg=None)
    test_close(
        cls_erm(
            S=S, 
            y_hat=S @ y_hat_bottom,
            y_insample=S @ y_bottom,
            y_hat_insample=S @ y_hat_bottom_insample,
            idx_bottom=idx_bottom,
        )['mean'],
        S @ y_hat_bottom
    )
reconciler_args = dict(S=S, 
                       y_hat=y_hat_base,
                       y_insample=y_base,
                       y_hat_insample=y_hat_base_insample,
                       sigmah=sigmah,
                       level=[80, 90],
                       intervals_method='normality',
                       num_samples=200,
                       seed=0,
                       tags=tags,
                       idx_bottom=idx_bottom
                       )

# 测试正态预测区间
# 我们应该恢复原始的sigmah。
# 对于底部时间序列
cls_bottom_up = BottomUp()
test_eq(
    cls_bottom_up(**reconciler_args)['sigmah'][idx_bottom],
    sigmah[idx_bottom]
)

# test normality interval's names
cls_bottom_up = BottomUp()
bu_bootstrap_intervals =  cls_bottom_up(**reconciler_args)
test_eq(
    ['mean', 'sigmah', 'quantiles'],
    list(bu_bootstrap_intervals.keys())
)

# test PERMBU interval's names
reconciler_args['intervals_method'] = 'permbu'
bu_permbu_intervals = cls_bottom_up(**reconciler_args)
test_eq(
    ['mean', 'quantiles'],
    list(bu_permbu_intervals.keys())
)

# 测试自顶向下 + 区间
for method in ['average_proportions', 'proportion_averages', 'forecast_proportions']:
    for intervals_method in ['normality', 'bootstrap', 'permbu']:
        cls_top_down = TopDown(method=method)
        reconciler_args['intervals_method'] = 'permbu'
        cls_top_down(**reconciler_args)

# 测试MinTrace + 区间
nonnegative = False
for method in ['ols', 'wls_struct', 'wls_var', 'mint_shrink']:
    for intervals_method in ['normality', 'bootstrap', 'permbu']:
        cls_min_trace = MinTrace(method=method, nonnegative=nonnegative)
        reconciler_args['intervals_method'] = intervals_method
        cls_min_trace(**reconciler_args)

# 测试最佳组合 + 区间
nonnegative = False
for method in ['ols', 'wls_struct']:
    for intervals_method in ['normality', 'bootstrap', 'permbu']:
        cls_optimal_combination = OptimalCombination(method=method, nonnegative=nonnegative)
        reconciler_args['intervals_method'] = intervals_method
        if not nonnegative:
            cls_optimal_combination(**reconciler_args)

# 测试ERM + 区间
for method in ['reg_bu']:
    for intervals_method in ['normality', 'bootstrap', 'permbu']:    
        cls_erm = ERM(method=method, lambda_reg=None)
        reconciler_args['intervals_method'] = intervals_method
        cls_erm(**reconciler_args)

# test coherent sample's shape
reconciler_args = dict(S=S, 
                       y_hat=y_hat_base,
                       y_insample=y_base,
                       y_hat_insample=y_hat_base_insample,
                       sigmah=sigmah,
                       intervals_method='bootstrap',
                       tags=tags,
                       idx_bottom=idx_bottom)

cls_bottom_up = BottomUp()
shapes = []
for intervals_method in ['normality', 'bootstrap', 'permbu']:
    cls_bottom_up.fit(**reconciler_args)
    coherent_samples = cls_bottom_up.sample(num_samples=100)
    shapes.append(coherent_samples.shape)
test_eq(shapes[0], shapes[1])
test_eq(shapes[0], shapes[2])

# 测试协方差等价性
n_samples = 100
n_hiers = 10
y_insample = np.random.rand(n_samples, n_hiers)
y_hat_insample = np.random.rand(n_samples, n_hiers)
residuals = (y_insample - y_hat_insample).astype(np.float32)
nan_mask = np.isnan(residuals)

# 检查在无缺失值情况下协方差函数的等价性
W_nb = _ma_cov(residuals, ~nan_mask)
W_np = np.cov(residuals)
np.testing.assert_allclose(W_nb, W_np, atol=1e-6)

# 检查在无缺失值情况下收缩协方差函数的等价性
W_ss_nonan = _shrunk_covariance_schaferstrimmer_no_nans(residuals, 2e-8)
W_ss_nan = _shrunk_covariance_schaferstrimmer_with_nans(residuals, ~nan_mask, 2e-8)
np.testing.assert_allclose(W_ss_nan, W_ss_nonan, atol=1e-6)

# 检查在无缺失值情况下,收缩W的对角元素与非收缩W的对角元素是否等价
np.testing.assert_allclose(np.diag(W_np), np.diag(W_ss_nan), atol=1e-6)

参考文献

一般和谐

最优和谐

分层概率一致预测

If you find the code useful, please ⭐ us on Github