import warnings
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from typing import Dict, List, Optional, Union
import numpy as np
from numba import njit, prange
from quadprog import solve_qp
from scipy import sparse
调解方法
大规模时间序列集合按照不同聚合层次组织,往往需要其预测遵循聚合约束,这就提出了创建能够进行一致性预测的新算法的挑战。
HierarchicalForecast
包提供了最全面的 Python 实现的层次预测算法集合,遵循经典的层次协调。所有方法都有一个 reconcile
函数,能够使用 numpy
数组协调基本预测。
大多数协调方法可以用以下方便的线性代数符号描述:
\[\tilde{\mathbf{y}}_{[a,b],\tau} = \mathbf{S}_{[a,b][b]} \mathbf{P}_{[b][a,b]} \hat{\mathbf{y}}_{[a,b],\tau}\]
其中 \(a, b\) 代表聚合层次和底层次,\(\mathbf{S}_{[a,b][b]}\) 包含层次聚合约束,而 \(\mathbf{P}_{[b][a,b]}\) 在不同的协调方法中有所不同。协调后的预测是 \(\tilde{\mathbf{y}}_{[a,b],\tau}\),基本预测为 \(\hat{\mathbf{y}}_{[a,b],\tau}\)。
from hierarchicalforecast.utils import is_strictly_hierarchical
from hierarchicalforecast.probabilistic_methods import Normality, Bootstrap, PERMBU
from fastcore.test import ExceptionExpected, test_close, test_eq, test_fail
from nbdev.showdoc import add_docs, show_doc
class HReconciler:
= False
fitted = False
is_sparse_method = False
insample = None
P = None
sampler
def _get_sampler(self,
intervals_method,
S, P, y_hat,
y_insample, y_hat_insample,
W, sigmah, num_samples, seed, tags):if intervals_method == 'normality':
= Normality(
sampler =S, P=P,
S=y_hat,
y_hat=W, sigmah=sigmah,
W=seed)
seedelif intervals_method == 'permbu':
= PERMBU(
sampler =S, P=P,
S= (S @ (P @ y_hat)),
y_hat =tags,
tags=y_insample,
y_insample=y_hat_insample,
y_hat_insample=sigmah,
sigmah=num_samples,
num_samples=seed)
seedelif intervals_method == 'bootstrap':
= Bootstrap(
sampler =S, P=P,
S=y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=num_samples,
num_samples=seed)
seedelse:
= None
sampler return sampler
def _reconcile(self,
S: np.ndarray,
P: np.ndarray,
y_hat: np.ndarray,= None,
SP: np.ndarray int]] = None,
level: Optional[List[= None):
sampler: Optional[Union[Normality, PERMBU, Bootstrap]]
# 和解的痛苦
= {'mean': (S @ (P @ y_hat))}
res
# 概率性调和
if (level is not None) and (sampler is not None):
# 更新结果字典
# 向量化分位数
= np.concatenate(
quantiles 100 - lv) / 200, ((100 - lv) / 200) + lv / 100] for lv in level])
[[(= np.sort(quantiles)
quantiles = sampler.get_prediction_quantiles(res, quantiles)
res
return res
def predict(self,
S: np.ndarray,
y_hat: np.ndarray,int]] = None):
level: Optional[List["""Predict using reconciler.
Predict using fitted mean and probabilistic reconcilers.
**Parameters:**<br>
`S`: Summing matrix of size (`base`, `bottom`).<br>
`y_hat`: Forecast values of size (`base`, `horizon`).<br>
`level`: float list 0-100, confidence levels for prediction intervals.<br>
**Returns:**<br>
`y_tilde`: Reconciliated predictions.
"""
if not self.fitted:
raise Exception("This model instance is not fitted yet, Call fit method.")
return self._reconcile(S=S, P=self.P, y_hat=y_hat,
=self.sampler, level=level)
sampler
def sample(self,
int):
num_samples: """Sample probabilistic coherent distribution.
Generates n samples from a probabilistic coherent distribution.
The method uses fitted mean and probabilistic reconcilers, defined by
the `intervals_method` selected during the reconciler's
instantiation. Currently available: `normality`, `bootstrap`, `permbu`.
**Parameters:**<br>
`num_samples`: int, number of samples generated from coherent distribution.<br>
**Returns:**<br>
`samples`: Coherent samples of size (`num_series`, `horizon`, `num_samples`).
"""
if not self.fitted:
raise Exception("This model instance is not fitted yet, Call fit method.")
if self.sampler is None:
raise Exception("This model instance does not have sampler. Call fit with `intervals_method`.")
= self.sampler.get_samples(num_samples=num_samples)
samples return samples
def fit(self,
*args,
**kwargs):
raise NotImplementedError("This method is not implemented yet.")
def fit_predict(self,
*args,
**kwargs):
raise NotImplementedError("This method is not implemented yet.")
__call__ = fit_predict
1. 自下而上
class BottomUp(HReconciler):
"""Bottom Up Reconciliation Class.
The most basic hierarchical reconciliation is performed using an Bottom-Up strategy. It was proposed for
the first time by Orcutt in 1968.
The corresponding hierarchical \"projection\" matrix is defined as:
$$\mathbf{P}_{\\text{BU}} = [\mathbf{0}_{\mathrm{[b],[a]}}\;|\;\mathbf{I}_{\mathrm{[b][b]}}]$$
**Parameters:**<br>
None
**References:**<br>
- [Orcutt, G.H., Watts, H.W., & Edwards, J.B.(1968). \"Data aggregation and information loss\". The American
Economic Review, 58 , 773(787)](http://www.jstor.org/stable/1815532).
"""
= False
insample
def _get_PW_matrices(self, S, idx_bottom):
= S.shape
n_hiers, n_bottom = np.eye(n_bottom, n_hiers, n_hiers - n_bottom, np.float32)
P if getattr(self, "intervals_method", False) is None:
= None
W else:
= np.eye(n_hiers, dtype=np.float32)
W return P, W
def fit(self,
S: np.ndarray,
y_hat: np.ndarray,
idx_bottom: np.ndarray,= None,
y_insample: Optional[np.ndarray] = None,
y_hat_insample: Optional[np.ndarray] = None,
sigmah: Optional[np.ndarray] str] = None,
intervals_method: Optional[int] = None,
num_samples: Optional[int] = None,
seed: Optional[str, np.ndarray]] = None):
tags: Optional[Dict["""Bottom Up Fit Method.
**Parameters:**<br>
`S`: Summing matrix of size (`base`, `bottom`).<br>
`y_hat`: Forecast values of size (`base`, `horizon`).<br>
`idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
`level`: float list 0-100, confidence levels for prediction intervals.<br>
`intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
`**sampler_kwargs`: Coherent sampler instantiation arguments.<br>
**Returns:**<br>
`self`: object, fitted reconciler.
"""
self.intervals_method = intervals_method
self.P, self.W = self._get_PW_matrices(S=S, idx_bottom=idx_bottom)
self.sampler = self._get_sampler(S=S,
=self.P,
P=self.W,
W=y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=sigmah,
sigmah=intervals_method,
intervals_method=num_samples,
num_samples=seed,
seed=tags)
tagsself.fitted = True
return self
def fit_predict(self,
S: np.ndarray,
y_hat: np.ndarray,
idx_bottom: np.ndarray,= None,
y_insample: Optional[np.ndarray] = None,
y_hat_insample: Optional[np.ndarray] = None,
sigmah: Optional[np.ndarray] int]] = None,
level: Optional[List[str] = None,
intervals_method: Optional[int] = None,
num_samples: Optional[int] = None,
seed: Optional[str, np.ndarray]] = None):
tags: Optional[Dict["""BottomUp Reconciliation Method.
**Parameters:**<br>
`S`: Summing matrix of size (`base`, `bottom`).<br>
`y_hat`: Forecast values of size (`base`, `horizon`).<br>
`idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
`level`: float list 0-100, confidence levels for prediction intervals.<br>
`intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
`**sampler_kwargs`: Coherent sampler instantiation arguments.<br>
**Returns:**<br>
`y_tilde`: Reconciliated y_hat using the Bottom Up approach.
"""
# Fit 创建了 P、W 和采样器属性
self.fit(S=S,
=y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=sigmah,
sigmah=intervals_method,
intervals_method=num_samples,
num_samples=seed,
seed=tags, idx_bottom=idx_bottom)
tags
return self._reconcile(S=S, P=self.P, y_hat=y_hat,
=self.sampler, level=level)
sampler
__call__ = fit_predict
::: {#cell-10 .cell 0=‘出’ 1=‘口’}
class BottomUpSparse(BottomUp):
"""BottomUpSparse Reconciliation Class.
This is the implementation of a Bottom Up reconciliation using the sparse
matrix approach. It works much more efficient on datasets with many time series.
[makoren: At least I hope so, I only checked up until ~20k time series, and
there's no real improvement, it would be great to check for smth like 1M time
series, where the dense S matrix really stops fitting in memory]
See the parent class for more details.
"""
= True
is_sparse_method
def _get_PW_matrices(self, S, idx_bottom):
= S.shape
n_hiers, n_bottom = sparse.eye(n_bottom, n_hiers, n_hiers - n_bottom, np.float32, "csr")
P if getattr(self, "intervals_method", False) is None:
= None
W else:
= sparse.eye(n_hiers, dtype=np.float32, format="csr")
W return P, W
:::
=3) show_doc(BottomUp, title_level
='BottomUp.fit', title_level=3) show_doc(BottomUp.fit, name
='BottomUp.predict', title_level=3) show_doc(BottomUp.predict, name
='BottomUp.fit_predict', title_level=3) show_doc(BottomUp.fit_predict, name
='BottomUp.sample', title_level=3) show_doc(BottomUp.sample, name
= np.array([[1., 1., 1., 1.],
S 1., 1., 0., 0.],
[0., 0., 1., 1.],
[1., 0., 0., 0.],
[0., 1., 0., 0.],
[0., 0., 1., 0.],
[0., 0., 0., 1.]])
[= 2
h = np.array([10., 5., 4., 2., 1.])
_y = np.vstack([i * _y for i in range(1, 5)])
y_bottom = np.roll(y_bottom, 1)
y_hat_bottom_insample 0] = np.nan
y_hat_bottom_insample[:, = np.vstack([i * np.ones(h) for i in range(1, 5)])
y_hat_bottom = [3, 4, 5, 6]
idx_bottom = {'level1': np.array([0]),
tags 'level2': np.array([1, 2]),
'level3': idx_bottom}
# 层次结构中所有级别的sigmah
# sigmah 用于朴素方法
# 如本文所计算:
#https://otexts.com/fpp3/预测区间.html
= S @ y_bottom
y_base = S @ y_hat_bottom
y_hat_base = S @ y_hat_bottom_insample
y_hat_base_insample = np.nansum((y_base - y_hat_base_insample) ** 2, axis=1) / (y_base.shape[1] - 1)
sigma = np.sqrt(sigma)
sigma = sigma[:, None] * np.sqrt(np.vstack([np.arange(1, h + 1) for _ in range(y_base.shape[0])])) sigmah
= BottomUp()
cls_bottom_up
test_eq(=S, y_hat=S @ y_hat_bottom, idx_bottom=idx_bottom)['mean'],
cls_bottom_up(S@ y_hat_bottom
S )
# 测试自下而上的预测恢复
= BottomUp()
cls_bottom_up = cls_bottom_up(
bu_bootstrap_intervals =S, y_hat=S @ y_hat_bottom, idx_bottom=idx_bottom,
S
)
test_eq('mean'],
bu_bootstrap_intervals[@ y_hat_bottom
S )
# 隐藏
# 测试预测恢复与拟合 -> 预测
= BottomUp()
cls_bottom_up =S, y_hat=S @ y_hat_bottom, idx_bottom=idx_bottom)
cls_bottom_up.fit(S= cls_bottom_up.predict(S=S, y_hat=S @ y_hat_bottom)['mean']
y_tilde
test_eq(
y_tilde,@ y_hat_bottom
S )
# 测试未拟合消息,预测未拟合
= BottomUp()
cls_bottom_up
test_fail(
cls_bottom_up.predict,='not fitted yet',
contains=(S, S @ y_hat_bottom),
args )
2. 自上而下
assert is_strictly_hierarchical(S, tags)
= np.array([
S_non_hier 1., 1., 1., 1.], # 总计
[1., 1., 0., 0.], # 城市1
[0., 0., 1., 1.], # 城市2
[1., 0., 1., 0.], # 跨性别者
[0., 1., 0., 1.], # 无跨性别
[1., 0., 0., 0.], # 城市1 - transgender
[0., 1., 0., 0.], # 城市1 - no transgender
[0., 0., 1., 0.], # 城市2 - transgender
[0., 0., 0., 1.], # 城市2 - 无跨性别者
[
])= {
tags_non_hier 'Country': np.array([0]),
'Country/City': np.array([2, 1]),
'Country/Transgender': np.array([3, 4]),
'Country-City-Transgender': np.array([5, 6, 7, 8]),
}assert not is_strictly_hierarchical(S_non_hier, tags_non_hier)
def _get_child_nodes(
str, np.ndarray]
S: Union[np.ndarray, sparse.csr_matrix], tags: Dict[
):if isinstance(S, sparse.spmatrix):
= S.toarray()
S = list(tags.keys())
level_names = OrderedDict()
nodes for i_level, level in enumerate(level_names[:-1]):
= tags[level]
parent = np.zeros_like(S)
child = tags[level_names[i_level + 1]]
idx_child = S[idx_child]
child[idx_child] = {}
nodes_level for idx_parent_node in parent:
= S[idx_parent_node]
parent_node = child * parent_node.astype(bool)
idx_node = np.where(idx_node.sum(axis=1) > 0)
(idx_node,) = [idx for idx in idx_child if idx in idx_node]
nodes_level[idx_parent_node] = nodes_level
nodes[level] return nodes
def _reconcile_fcst_proportions(S: np.ndarray, y_hat: np.ndarray,
str, np.ndarray],
tags: Dict[str, Dict[int, np.ndarray]],
nodes: Dict[int):
idx_top: = np.zeros_like(y_hat)
reconciled = y_hat[idx_top]
reconciled[idx_top] = list(tags.keys())
level_names for i_level, level in enumerate(level_names[:-1]):
= nodes[level]
nodes_level for idx_parent, idx_childs in nodes_level.items():
= reconciled[idx_parent]
fcst_parent = y_hat[idx_childs].sum()
childs_sum for idx_child in idx_childs:
= y_hat[idx_child] * fcst_parent / childs_sum
reconciled[idx_child] return reconciled
class TopDown(HReconciler):
"""Top Down Reconciliation Class.
The Top Down hierarchical reconciliation method, distributes the total aggregate predictions and decomposes
it down the hierarchy using proportions $\mathbf{p}_{\mathrm{[b]}}$ that can be actual historical values
or estimated.
$$\mathbf{P}=[\mathbf{p}_{\mathrm{[b]}}\;|\;\mathbf{0}_{\mathrm{[b][a,b\;-1]}}]$$
**Parameters:**<br>
`method`: One of `forecast_proportions`, `average_proportions` and `proportion_averages`.<br>
**References:**<br>
- [CW. Gross (1990). \"Disaggregation methods to expedite product line forecasting\". Journal of Forecasting, 9 , 233–254.
doi:10.1002/for.3980090304](https://onlinelibrary.wiley.com/doi/abs/10.1002/for.3980090304).<br>
- [G. Fliedner (1999). \"An investigation of aggregate variable time series forecast strategies with specific subaggregate
time series statistical correlation\". Computers and Operations Research, 26 , 1133–1149.
doi:10.1016/S0305-0548(99)00017-9](https://doi.org/10.1016/S0305-0548(99)00017-9).
"""
def __init__(self,
str):
method: self.method = method
self.insample = method in ['average_proportions', 'proportion_averages']
def _get_PW_matrices(self,
S: np.ndarray,
y_hat: np.ndarray,
y_insample: np.ndarray,str, np.ndarray]] = None,
tags: Optional[Dict[
):
= S.shape
n_hiers, n_bottom
# 检查数据结构是否严格分层。
if tags is not None:
if not is_strictly_hierarchical(S, tags):
raise ValueError(
"Top-down reconciliation requires strictly hierarchical structures."
)= int(S.sum(axis=1).argmax())
idx_top = dict(sorted(tags.items(), key=lambda x: len(x[1])))
levels_ = levels_[list(levels_)[-1]]
idx_bottom = y_insample[idx_bottom]
y_btm else:
= 0
idx_top = y_insample[(n_hiers - n_bottom):]
y_btm
= y_insample[idx_top]
y_top
if self.method == 'average_proportions':
= np.mean(y_btm / y_top, axis=1)
prop elif self.method == 'proportion_averages':
= np.mean(y_btm, axis=1) / np.mean(y_top)
prop elif self.method == 'forecast_proportions':
raise Exception(f'Fit method not implemented for {self.method} yet')
else:
raise Exception(f'Unknown method {self.method}')
= np.zeros_like(S, np.float64).T #浮点数64 如果属性值过小,这种情况在处理wiki2数据时会发生
P = prop
P[:, idx_top] = np.eye(n_hiers, dtype=np.float32)
W return P, W
def fit(self,
S,
y_hat,
y_insample: np.ndarray,= None,
y_hat_insample: Optional[np.ndarray] = None,
sigmah: Optional[np.ndarray] str] = None,
intervals_method: Optional[int] = None,
num_samples: Optional[int] = None,
seed: Optional[str, np.ndarray]] = None,
tags: Optional[Dict[= None):
idx_bottom: Optional[np.ndarray] """TopDown Fit Method.
**Parameters:**<br>
`S`: Summing matrix of size (`base`, `bottom`).<br>
`y_hat`: Forecast values of size (`base`, `horizon`).<br>
`tags`: Each key is a level and each value its `S` indices.<br>
`y_insample`: Insample values of size (`base`, `insample_size`). Optional for `forecast_proportions` method.<br>
`idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
`level`: float list 0-100, confidence levels for prediction intervals.<br>
`intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
`**sampler_kwargs`: Coherent sampler instantiation arguments.<br>
**Returns:**<br>
`self`: object, fitted reconciler.
"""
self.intervals_method = intervals_method
self.P, self.W = self._get_PW_matrices(S=S, y_hat=y_hat,
=tags, y_insample=y_insample)
tagsself.sampler = self._get_sampler(S=S,
=self.P,
P=self.W,
W=y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=sigmah,
sigmah=intervals_method,
intervals_method=num_samples,
num_samples=seed,
seed=tags)
tagsself.fitted = True
return self
def fit_predict(self,
S: np.ndarray,
y_hat: np.ndarray,str, np.ndarray],
tags: Dict[= None,
idx_bottom: np.ndarray = None,
y_insample: Optional[np.ndarray] = None,
y_hat_insample: Optional[np.ndarray] = None,
sigmah: Optional[np.ndarray] int]] = None,
level: Optional[List[str] = None,
intervals_method: Optional[int] = None,
num_samples: Optional[int] = None):
seed: Optional["""Top Down Reconciliation Method.
**Parameters:**<br>
`S`: Summing matrix of size (`base`, `bottom`).<br>
`y_hat`: Forecast values of size (`base`, `horizon`).<br>
`tags`: Each key is a level and each value its `S` indices.<br>
`y_insample`: Insample values of size (`base`, `insample_size`). Optional for `forecast_proportions` method.<br>
`idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
`level`: float list 0-100, confidence levels for prediction intervals.<br>
`intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
`**sampler_kwargs`: Coherent sampler instantiation arguments.<br>
**Returns:**<br>
`y_tilde`: Reconciliated y_hat using the Top Down approach.
"""
if self.method == 'forecast_proportions':
= int(S.sum(axis=1).argmax())
idx_top = dict(sorted(tags.items(), key=lambda x: len(x[1])))
levels_ if level is not None:
'Prediction intervals not implement for `forecast_proportions`')
warnings.warn(= _get_child_nodes(S=S, tags=levels_)
nodes = [_reconcile_fcst_proportions(S=S, y_hat=y_hat_[:, None],
reconciled =levels_,
tags=nodes,
nodes=idx_top) \
idx_topfor y_hat_ in y_hat.T]
= np.hstack(reconciled)
reconciled return {'mean': reconciled}
else:
# Fit 创建了 P、W 和采样器属性
self.fit(S=S,
=y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=sigmah,
sigmah=intervals_method,
intervals_method=num_samples,
num_samples=seed,
seed=tags, idx_bottom=idx_bottom)
tagsreturn self._reconcile(S=S, P=self.P, y_hat=y_hat,
=level, sampler=self.sampler)
level
__call__ = fit_predict
class TopDownSparse(TopDown):
"""TopDownSparse Reconciliation Class.
This is an implementation of top-down reconciliation using the sparse matrix
approach. It works much more efficiently on data sets with many time series.
See the parent class for more details.
"""
= True
is_sparse_method
def _get_PW_matrices(
self,
S: sparse.csr_matrix,
y_hat: np.ndarray,
y_insample: np.ndarray,str, np.ndarray]] = None,
tags: Optional[Dict[
):# 检查数据结构是否为严格层次结构。
if tags is not None and not is_strictly_hierarchical(S, tags):
raise ValueError(
"Top-down reconciliation requires strictly hierarchical structures."
)
# Get the dimensions of the "summing" matrix.
= S.shape
n_hiers, n_bottom
# 获取顶部节点和底部节点的样本内值。
= y_insample[0]
y_top = y_insample[(n_hiers - n_bottom) :]
y_btm
# 计算分解比例。
if self.method == "average_proportions":
= np.mean(y_btm / y_top, 1)
prop elif self.method == "proportion_averages":
= np.mean(y_btm, 1) / np.mean(y_top)
prop elif self.method == "forecast_proportions":
raise Exception(f"Fit method not yet implemented for {self.method}.")
else:
raise Exception(f"{self.method} is an unknown disaggregation method.")
# Instantiate and allocate the "projection" matrix to distribute the
# 将顶层节点的分解基础预测分配到底层节点。
= sparse.csr_matrix(
P
(
prop,
np.zeros_like(prop, np.uint8),len(prop) + 1, dtype=np.min_scalar_type(n_bottom)),
np.arange(
),=(n_bottom, n_hiers),
shape=np.float64,
dtype
)
# Instantiate and allocate the "weight" matrix.
if getattr(self, "intervals_method", False) is None:
= None
W else:
= sparse.eye(n_hiers, dtype=np.float32, format="csr")
W
return P, W
=3) show_doc(TopDown, title_level
='TopDown.fit', title_level=3) show_doc(TopDown.fit, name
='TopDown.predict', title_level=3) show_doc(TopDown.predict, name
='TopDown.fit_predict', title_level=3) show_doc(TopDown.fit_predict, name
='TopDown.sample', title_level=3) show_doc(TopDown.sample, name
# 我们能够恢复预测。
# 从上到下,在这个例子中
# 由于时间序列
# 比例相同
# 穿越时空
# 但这并非普遍情况。
for method in ['forecast_proportions', 'average_proportions', 'proportion_averages']:
= TopDown(method=method)
cls_top_down if cls_top_down.insample:
assert method in ['average_proportions', 'proportion_averages']
test_close(
cls_top_down(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=tags
tags'mean'],
)[@ y_hat_bottom
S
)else:
test_close(
cls_top_down(=S,
S=S @ y_hat_bottom,
y_hat=tags
tags'mean'],
)[@ y_hat_bottom
S )
for method in ["forecast_proportions", "average_proportions", "proportion_averages"]:
= TopDownSparse(method=method)
cls_top_down if cls_top_down.insample:
assert method in ["average_proportions", "proportion_averages"]
test_close(
cls_top_down(=sparse.csr_matrix(S),
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=tags,
tags"mean"],
)[@ y_hat_bottom,
S
)else:
test_close(=sparse.csr_matrix(S), y_hat=S @ y_hat_bottom, tags=tags)[
cls_top_down(S"mean"
],@ y_hat_bottom,
S )
for method in ["forecast_proportions", "average_proportions", "proportion_averages"]:
= TopDown(method=method)
cls_top_down = TopDownSparse(method=method)
cls_top_down_sparse if cls_top_down.insample:
assert method in ["average_proportions", "proportion_averages"]
test_close(
cls_top_down(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=tags,
tags"mean"],
)[
cls_top_down_sparse(=sparse.csr_matrix(S),
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=tags,
tags"mean"],
)[
np.finfo(np.float64).eps,
)else:
test_close(=S, y_hat=S @ y_hat_bottom, tags=tags)["mean"],
cls_top_down(S
cls_top_down_sparse(=sparse.csr_matrix(S),
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=tags,
tags"mean"],
)[
np.finfo(np.float64).eps, )
3. 从中间向外推进
class MiddleOut(HReconciler):
"""Middle Out Reconciliation Class.
This method is only available for **strictly hierarchical structures**. It anchors the base predictions
in a middle level. The levels above the base predictions use the Bottom-Up approach, while the levels
below use a Top-Down.
**Parameters:**<br>
`middle_level`: Middle level.<br>
`top_down_method`: One of `forecast_proportions`, `average_proportions` and `proportion_averages`.<br>
**References:**<br>
- [Hyndman, R.J., & Athanasopoulos, G. (2021). \"Forecasting: principles and practice, 3rd edition:
Chapter 11: Forecasting hierarchical and grouped series.\". OTexts: Melbourne, Australia. OTexts.com/fpp3
Accessed on July 2022.](https://otexts.com/fpp3/hierarchical.html)
"""
def __init__(self,
str,
middle_level: str):
top_down_method: self.middle_level = middle_level
self.top_down_method = top_down_method
self.insample = top_down_method in ['average_proportions', 'proportion_averages']
def _get_PW_matrices(self, **kwargs):
raise Exception('Not implemented')
def fit(self, **kwargs):
raise Exception('Not implemented')
def predict(self, **kwargs):
raise Exception('Not implemented')
def fit_predict(self,
S: np.ndarray,
y_hat: np.ndarray,str, np.ndarray],
tags: Dict[= None,
y_insample: Optional[np.ndarray] int]] = None,
level: Optional[List[str] = None):
intervals_method: Optional["""Middle Out Reconciliation Method.
**Parameters:**<br>
`S`: Summing matrix of size (`base`, `bottom`).<br>
`y_hat`: Forecast values of size (`base`, `horizon`).<br>
`tags`: Each key is a level and each value its `S` indices.<br>
`y_insample`: Insample values of size (`base`, `insample_size`). Only used for `forecast_proportions`<br>
**Returns:**<br>
`y_tilde`: Reconciliated y_hat using the Middle Out approach.
"""
if not is_strictly_hierarchical(S, tags):
raise ValueError('Middle out reconciliation requires strictly hierarchical structures.')
if self.middle_level not in tags.keys():
raise ValueError('You have to provide a `middle_level` in `tags`.')
= dict(sorted(tags.items(), key=lambda x: len(x[1])))
levels_ = np.full_like(y_hat, fill_value=np.nan)
reconciled = levels_[self.middle_level]
cut_nodes # 自下而上的对账
= []
idxs_bu for node, idx_node in levels_.items():
idxs_bu.append(idx_node)if node == self.middle_level:
break
= np.hstack(idxs_bu)
idxs_bu #自下而上预测
= BottomUp().fit_predict(
bu =np.fliplr(np.unique(S[idxs_bu], axis=1)),
S=y_hat[idxs_bu],
y_hat=np.arange(len(idxs_bu))[-len(cut_nodes):]
idx_bottom
)= bu['mean']
reconciled[idxs_bu]
#自上而下
= _get_child_nodes(S, levels_)
child_nodes # 父节点包含中间层级中的每个节点
# 作为键。每个节点的值是
# 与该节点相连。
= {node: {self.middle_level: np.array([node])} for node in cut_nodes}
parents = list(levels_.keys())
level_names for lv, lv_child in zip(level_names[:-1], level_names[1:]):
# 如果lv不是从中间到下的一部分
# 结构我们继续
if lv not in list(parents.values())[0].keys():
continue
for idx_middle_out in parents.keys():
= parents[idx_middle_out].values()
idxs_parents = []
complete_idxs_child for idx_parent, idxs_child in child_nodes[lv].items():
if any(idx_parent in val for val in idxs_parents):
complete_idxs_child.append(idxs_child)= np.hstack(complete_idxs_child)
parents[idx_middle_out][lv_child]
for node, levels_node in parents.items():
= np.hstack(list(levels_node.values()))
idxs_node = S[idxs_node]
S_node = S_node[:,~np.all(S_node == 0, axis=0)]
S_node = 0
counter = deepcopy(levels_node)
levels_node_ for lv_name, idxs_level in levels_node_.items():
= len(idxs_level)
idxs_len = np.arange(counter, idxs_len + counter)
levels_node_[lv_name] += idxs_len
counter = TopDown(self.top_down_method).fit_predict(
td =S_node,
S=y_hat[idxs_node],
y_hat=y_insample[idxs_node] if y_insample is not None else None,
y_insample=levels_node_,
tags
)= td['mean']
reconciled[idxs_node] return {'mean': reconciled}
__call__ = fit_predict
class MiddleOutSparse(MiddleOut):
"""MiddleOutSparse Reconciliation Class.
This is an implementation of middle-out reconciliation using the sparse matrix
approach. It works much more efficiently on data sets with many time series.
See the parent class for more details.
"""
# 尽管这是一种稀疏方法,但我们仍需要一个密集的表示形式。
# "summing" matrix for the required transformations in the fit_predict method
# 在进行自底向上和自顶向下的协调之前,我们可以避免冗余。
# 转换。
= False
is_sparse_method
def fit_predict(
self,
S: np.ndarray,
y_hat: np.ndarray,str, np.ndarray],
tags: Dict[= None,
y_insample: Optional[np.ndarray] int]] = None,
level: Optional[List[str] = None,
intervals_method: Optional[-> Dict[str, np.ndarray]:
) # 检查数据结构是否为严格层次结构。
if not is_strictly_hierarchical(S, tags):
raise ValueError(
"Middle-out reconciliation requires strictly hierarchical structures."
)# 检查层级到节点映射中是否存在中间层级。
if self.middle_level not in tags.keys():
raise KeyError(f"{self.middle_level} is not a key in `tags`.")
# 按节点数量对层级进行排序。
= dict(sorted(tags.items(), key=lambda x: len(x[1])))
levels # 分配一个数组来存储对账后的点预测结果。
= np.full_like(y_hat, np.nan)
y_tilde # 找出构成中间层的节点。
= levels[self.middle_level]
cut_nodes
# 计算将中间层与下层分开的切割。
= max(cut_nodes) + 1
cut_idx
# 从中间层开始,执行自底向上的稀疏协调。
= BottomUpSparse().fit_predict(
y_tilde[:cut_idx, :] =sparse.csr_matrix(np.fliplr(np.unique(S[:cut_idx, :], axis=1))),
S=y_hat[:cut_idx, :],
y_hat=None,
idx_bottom"mean"]
)[
# 从中间层开始执行稀疏自顶向下的协调。
for cut_node in cut_nodes:
# 找出割点子图的叶节点。
= np.flatnonzero(S[cut_node, :])
leaf_idx # 找到子图中所有与割点相连的节点。
= np.hstack(
sub_idx + np.flatnonzero((np.any(S[cut_idx:, leaf_idx], 1))))
(cut_node, cut_idx
)
# Construct the "tags" argument for the cut node.
if self.insample:
# 内样本分解方法并不要求这一点。
= None
sub_tags else:
# Disaggregating using forecast proportions requires the "tags" for
# 子图。
= {}
sub_tags = 0
acc for level_, nodes in levels.items():
# 查找该层子图中的所有节点。
= np.intersect1d(nodes, sub_idx, True)
nodes # 获取该层级的节点数量。
= len(nodes)
n # 排除切割节点以上的层级或其下的空层级。
if len(nodes) > 0:
= np.arange(acc, n + acc)
sub_tags[level_] += n
acc
# 从切割节点开始执行稀疏自顶向下的协调。
= TopDownSparse(self.top_down_method).fit_predict(
y_tilde[sub_idx, :] =sparse.csr_matrix(S[sub_idx[:, None], leaf_idx]),
S=y_hat[sub_idx, :],
y_hat=y_insample[sub_idx, :] if y_insample is not None else None,
y_insample=sub_tags,
tags"mean"]
)[
return {"mean": y_tilde}
__call__ = fit_predict
=3) show_doc(MiddleOut, title_level
=3) show_doc(MiddleOut.fit_predict, title_level
# 我们能够恢复预测。
# 从中间向外扩展,在这个例子中
# 因为时间序列
# 比例相同
# 穿越时空
# 但这并非普遍情况。
for method in ['forecast_proportions', 'average_proportions', 'proportion_averages']:
= MiddleOut(middle_level='level2', top_down_method=method)
cls_middle_out if cls_middle_out.insample:
assert method in ['average_proportions', 'proportion_averages']
test_close(
cls_middle_out(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=tags
tags'mean'],
)[@ y_hat_bottom
S
)else:
test_close(
cls_middle_out(=S,
S=S @ y_hat_bottom,
y_hat=tags
tags'mean'],
)[@ y_hat_bottom
S )
for method in ["forecast_proportions", "average_proportions", "proportion_averages"]:
= MiddleOutSparse(middle_level="level2", top_down_method=method)
cls_middle_out if cls_middle_out.insample:
assert method in ["average_proportions", "proportion_averages"]
test_close(
cls_middle_out(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=tags,
tags"mean"],
)[@ y_hat_bottom,
S
)else:
test_close(=S, y_hat=S @ y_hat_bottom, tags=tags)["mean"],
cls_middle_out(S@ y_hat_bottom,
S )
for method in ["forecast_proportions", "average_proportions", "proportion_averages"]:
= MiddleOut(middle_level="level2", top_down_method=method)
cls_middle_out = MiddleOutSparse(
cls_middle_out_sparse ="level2", top_down_method=method
middle_level
)if cls_middle_out.insample:
assert method in ["average_proportions", "proportion_averages"]
test_close(
cls_middle_out(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=tags,
tags"mean"],
)[
cls_middle_out_sparse(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=tags,
tags"mean"],
)[
np.finfo(np.float64).eps,
)else:
test_close(=S, y_hat=S @ y_hat_bottom, tags=tags)["mean"],
cls_middle_out(S
cls_middle_out_sparse(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=tags,
tags"mean"],
)[
np.finfo(np.float64).eps, )
4. 最小迹
def crossprod(x):
return x.T @ x
class MinTrace(HReconciler):
"""MinTrace Reconciliation Class.
This reconciliation algorithm proposed by Wickramasuriya et al. depends on a generalized least squares estimator
and an estimator of the covariance matrix of the coherency errors $\mathbf{W}_{h}$. The Min Trace algorithm
minimizes the squared errors for the coherent forecasts under an unbiasedness assumption; the solution has a
closed form.<br>
$$
\mathbf{P}_{\\text{MinT}}=\\left(\mathbf{S}^{\intercal}\mathbf{W}_{h}\mathbf{S}\\right)^{-1}
\mathbf{S}^{\intercal}\mathbf{W}^{-1}_{h}
$$
**Parameters:**<br>
`method`: str, one of `ols`, `wls_struct`, `wls_var`, `mint_shrink`, `mint_cov`.<br>
`nonnegative`: bool, reconciled forecasts should be nonnegative?<br>
`mint_shr_ridge`: float=2e-8, ridge numeric protection to MinTrace-shr covariance estimator.<br>
`num_threads`: int=1, number of threads to use for solving the optimization problems (when nonnegative=True).
**References:**<br>
- [Wickramasuriya, S. L., Athanasopoulos, G., & Hyndman, R. J. (2019). \"Optimal forecast reconciliation for
hierarchical and grouped time series through trace minimization\". Journal of the American Statistical Association,
114 , 804–819. doi:10.1080/01621459.2018.1448825.](https://robjhyndman.com/publications/mint/).
- [Wickramasuriya, S.L., Turlach, B.A. & Hyndman, R.J. (2020). \"Optimal non-negative
forecast reconciliation". Stat Comput 30, 1167–1182,
https://doi.org/10.1007/s11222-020-09930-0](https://robjhyndman.com/publications/nnmint/).
"""
def __init__(self,
str,
method: bool = False,
nonnegative: float] = 2e-8,
mint_shr_ridge: Optional[int = 1):
num_threads: self.method = method
self.nonnegative = nonnegative
self.insample = method in ['wls_var', 'mint_cov', 'mint_shrink']
if method == 'mint_shrink':
self.mint_shr_ridge = mint_shr_ridge
self.num_threads = num_threads
if not self.nonnegative and self.num_threads > 1:
'`num_threads` is only used when `nonnegative=True`')
warnings.warn(
def _get_PW_matrices(self,
S: np.ndarray,
y_hat: np.ndarray,= None,
y_insample: Optional[np.ndarray] = None,
y_hat_insample: Optional[np.ndarray] int]] = None,):
idx_bottom: Optional[List[# shape residuals_insample (n_hiers, obs)
= ['wls_var', 'mint_cov', 'mint_shrink']
res_methods if self.method in res_methods and y_insample is None and y_hat_insample is None:
raise ValueError(f"For methods {', '.join(res_methods)} you need to pass residuals")
= S.shape
n_hiers, n_bottom = n_hiers - n_bottom
n_aggs # Construct J and U.T
= np.concatenate((np.zeros((n_bottom, n_aggs), dtype=np.float64), S[n_aggs:]), axis=1)
J = np.concatenate((np.eye(n_aggs, dtype=np.float64), -S[:n_aggs]), axis=1)
Ut if self.method == 'ols':
= np.eye(n_hiers)
W = Ut
UtW elif self.method == 'wls_struct':
= np.sum(S, axis=1, dtype=np.float64)
Wdiag = Ut * Wdiag
UtW = np.diag(Wdiag)
W elif self.method in res_methods and y_insample is not None and y_hat_insample is not None:
# Residuals with shape (obs, n_hiers)
= (y_insample - y_hat_insample).T
residuals = residuals.shape
n, _
# Protection: against overfitted model
= np.sum(residuals, axis=0)
residuals_sum = np.abs(residuals_sum) < 1e-4
zero_residual_prc = np.mean(zero_residual_prc)
zero_residual_prc if zero_residual_prc > .98:
raise Exception(f'Insample residuals close to 0, zero_residual_prc={zero_residual_prc}. Check `Y_df`')
if self.method == 'wls_var':
= np.nansum(residuals**2, axis=0, dtype=np.float64) / residuals.shape[0]
Wdiag += np.full(n_hiers, 2e-8, dtype=np.float64)
Wdiag = np.diag(Wdiag)
W = Ut * Wdiag
UtW elif self.method == 'mint_cov':
# Compute nans
= np.isnan(residuals.T)
nan_mask if np.any(nan_mask):
= _ma_cov(residuals.T, ~nan_mask)
W else:
= np.cov(residuals.T)
W
= Ut @ W
UtW elif self.method == 'mint_shrink':
# Compute nans
= np.isnan(residuals.T)
nan_mask # Compute shrunk empirical covariance
if np.any(nan_mask):
= _shrunk_covariance_schaferstrimmer_with_nans(residuals.T, ~nan_mask, self.mint_shr_ridge)
W else:
= _shrunk_covariance_schaferstrimmer_no_nans(residuals.T, self.mint_shr_ridge)
W
= Ut @ W
UtW else:
raise ValueError(f'Unknown reconciliation method {self.method}')
try:
= (J - np.linalg.solve(UtW[:, n_aggs:] @ Ut.T[n_aggs:] + UtW[:, :n_aggs], UtW[:, n_aggs:] @ J.T[n_aggs:]).T @ Ut)
P except np.linalg.LinAlgError:
if self.method == "mint_shrink":
raise Exception(f"min_trace ({self.method}) is ill-conditioned. Increase the value of parameter 'mint_shr_ridge' or use another reconciliation method.")
else:
raise Exception(f'min_trace ({self.method}) is ill-conditioned. Please use another reconciliation method.')
return P, W
def fit(self,
S,
y_hat,= None,
y_insample: Optional[np.ndarray] = None,
y_hat_insample: Optional[np.ndarray] = None,
sigmah: Optional[np.ndarray] str] = None,
intervals_method: Optional[int] = None,
num_samples: Optional[int] = None,
seed: Optional[str, np.ndarray]] = None,
tags: Optional[Dict[= None):
idx_bottom: Optional[np.ndarray] """MinTrace Fit Method.
**Parameters:**<br>
`S`: Summing matrix of size (`base`, `bottom`).<br>
`y_hat`: Forecast values of size (`base`, `horizon`).<br>
`tags`: Each key is a level and each value its `S` indices.<br>
`y_insample`: Insample values of size (`base`, `insample_size`). Optional for `forecast_proportions` method.<br>
`idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
`level`: float list 0-100, confidence levels for prediction intervals.<br>
`intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
`**sampler_kwargs`: Coherent sampler instantiation arguments.<br>
**Returns:**<br>
`self`: object, fitted reconciler.
"""
self.y_hat = y_hat
self.P, self.W = self._get_PW_matrices(S=S, y_hat=y_hat,
=y_insample, y_hat_insample=y_hat_insample,
y_insample=idx_bottom)
idx_bottom
if self.nonnegative:
= S.shape
_, n_bottom = np.linalg.pinv(self.W)
W_inv = y_hat < 0
negatives if negatives.any():
'Replacing negative forecasts with zero.')
warnings.warn(= np.copy(y_hat)
y_hat = 0.
y_hat[negatives] # Quadratic progamming formulation
# here we are solving the quadratic programming problem
# formulated in the origial paper
# https://robjhyndman.com/publications/nnmint/
# The library quadprog was chosen
# based on these benchmarks:
# https://scaron.info/blog/quadratic-programming-in-python.html
= S.T @ W_inv
a = a @ S
G try:
= np.linalg.cholesky(G)
_ except np.linalg.LinAlgError:
raise Exception(f"min_trace ({self.method}) is ill-conditioned. Try setting nonnegative=False or use another reconciliation method.")
= np.eye(n_bottom)
C = np.zeros(n_bottom)
b # the quadratic programming problem
# returns the forecasts of the bottom series
if self.num_threads == 1:
= np.apply_along_axis(lambda y_hat: solve_qp(G=G, a=a @ y_hat, C=C, b=b)[0],
bottom_fcts =0, arr=y_hat)
axiselse:
= []
futures with ThreadPoolExecutor(self.num_threads) as executor:
for j in range(y_hat.shape[1]):
= executor.submit(solve_qp, G=G, a=a @ y_hat[:, j], C=C, b=b)
future
futures.append(future)= np.hstack([f.result()[0][:, None] for f in futures])
bottom_fcts if not np.all(bottom_fcts > -1e-8):
raise Exception('nonnegative optimization failed')
# remove negative values close to zero
= np.clip(np.float32(bottom_fcts), a_min=0, a_max=None)
bottom_fcts self.y_hat = S @ bottom_fcts # Hack
# Overwrite P, W and sampler attributes with BottomUp's
self.P, self.W = BottomUp()._get_PW_matrices(S=S, idx_bottom=idx_bottom)
self.sampler = self._get_sampler(S=S,
=self.P,
P=self.W,
W=y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=sigmah,
sigmah=intervals_method,
intervals_method=num_samples,
num_samples=seed,
seed=tags)
tagsself.fitted = True
return self
def fit_predict(self,
S: np.ndarray,
y_hat: np.ndarray,= None,
idx_bottom: np.ndarray = None,
y_insample: Optional[np.ndarray] = None,
y_hat_insample: Optional[np.ndarray] = None,
sigmah: Optional[np.ndarray] int]] = None,
level: Optional[List[str] = None,
intervals_method: Optional[int] = None,
num_samples: Optional[int] = None,
seed: Optional[str, np.ndarray]] = None):
tags: Optional[Dict["""MinTrace Reconciliation Method.
**Parameters:**<br>
`S`: Summing matrix of size (`base`, `bottom`).<br>
`y_hat`: Forecast values of size (`base`, `horizon`).<br>
`y_insample`: Insample values of size (`base`, `insample_size`). Only used by `wls_var`, `mint_cov`, `mint_shrink`<br>
`y_hat_insample`: Insample fitted values of size (`base`, `insample_size`). Only used by `wls_var`, `mint_cov`, `mint_shrink`<br>
`idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
`level`: float list 0-100, confidence levels for prediction intervals.<br>
`sampler`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
**Returns:**<br>
`y_tilde`: Reconciliated y_hat using the MinTrace approach.
"""
if self.nonnegative:
if (level is not None) and intervals_method in ['bootstrap', 'permbu']:
raise Exception('nonnegative reconciliation is not compatible with bootstrap forecasts')
if idx_bottom is None:
raise Exception('idx_bottom needed for nonnegative reconciliation')
# Fit creates P, W and sampler attributes
self.fit(S=S,
=y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=sigmah,
sigmah=intervals_method,
intervals_method=num_samples,
num_samples=seed,
seed=tags, idx_bottom=idx_bottom)
tags
return self._reconcile(S=S, P=self.P, y_hat=self.y_hat,
=level, sampler=self.sampler)
level
__call__ = fit_predict
@njit(nogil=True, cache=True, parallel=True, fastmath=True, error_model="numpy")
def _ma_cov(residuals: np.ndarray, not_nan_mask: np.ndarray):
"""掩码经验协方差矩阵。
:meta private:
"""
= residuals.shape[0]
n_timeseries = np.zeros((n_timeseries, n_timeseries), dtype=np.float64).T
W for i in prange(n_timeseries):
= not_nan_mask[i]
not_nan_mask_i for j in range(i + 1):
= not_nan_mask[j]
not_nan_mask_j = not_nan_mask_i & not_nan_mask_j
not_nan_mask_ij = np.sum(not_nan_mask_ij)
n_samples # 仅在时间序列对中有足够数量的非NaN样本时才进行计算。
if n_samples > 1:
# 掩码残差
= residuals[i][not_nan_mask_ij]
residuals_i = residuals[j][not_nan_mask_ij]
residuals_j = np.mean(residuals_i)
residuals_i_mean = np.mean(residuals_j)
residuals_j_mean = (residuals_i - residuals_i_mean)
X_i = (residuals_j - residuals_j_mean)
X_j # 经验协方差
= np.float64(1 / (n_samples - 1))
factor_emp_cov = W[j, i] = factor_emp_cov * np.sum(X_i * X_j)
W[i, j]
return W
@njit(nogil=True, cache=True, parallel=True, fastmath=True, error_model="numpy")
def _shrunk_covariance_schaferstrimmer_no_nans(residuals: np.ndarray, mint_shr_ridge: float):
"""Shrink empirical covariance according to the following method:
Schäfer, Juliane, and Korbinian Strimmer.
‘A Shrinkage Approach to Large-Scale Covariance Matrix Estimation and
Implications for Functional Genomics’. Statistical Applications in
Genetics and Molecular Biology 4, no. 1 (14 January 2005).
https://doi.org/10.2202/1544-6115.1175.
:meta private:
"""
= residuals.shape[0]
n_timeseries = residuals.shape[1]
n_samples
# 我们需要经验协方差,即方差的非对角线元素之和。
# 经验相关矩阵与非对角线平方和
# 经验相关矩阵。
= np.zeros((n_timeseries, n_timeseries), dtype=np.float64).T
W = np.float64(0.0)
sum_var_emp_corr = np.float64(0.0)
sum_sq_emp_corr = np.float64(1 / (n_samples - 1))
factor_emp_cov = np.float64(1 / (n_samples * (n_samples - 1)))
factor_shrinkage = np.float64(2e-8)
epsilon for i in prange(n_timeseries):
# Mean of the standardized residuals
= residuals[i] - np.mean(residuals[i])
X_i = X_i / (np.std(residuals[i]) + epsilon)
Xs_i = np.mean(Xs_i)
Xs_i_mean for j in range(i + 1):
# 经验协方差
= residuals[j] - np.mean(residuals[j])
X_j = factor_emp_cov * np.sum(X_i * X_j)
W[i, j] # 非对角元素之和
if i != j:
= X_j / (np.std(residuals[j]) + epsilon)
Xs_j = np.mean(Xs_j)
Xs_j_mean # 经验相关性的非对角线方差之和
= (Xs_i - Xs_i_mean) * (Xs_j - Xs_j_mean)
w = np.mean(w)
w_mean += np.sum(np.square(w - w_mean))
sum_var_emp_corr # 平方和经验相关性
+= w_mean**2
sum_sq_emp_corr
# 计算收缩强度
= 1.0 - max(min((factor_shrinkage * sum_var_emp_corr) / (sum_sq_emp_corr + epsilon), 1.0), 0.0)
shrinkage # 缩小经验协方差
for i in prange(n_timeseries):
for j in range(i + 1):
if i != j:
= W[j, i] = shrinkage * W[i, j]
W[i, j] else:
= W[j, i] = max(W[i, j], mint_shr_ridge)
W[i, j] return W
@njit(nogil=True, cache=True, parallel=True, fastmath=True, error_model="numpy")
def _shrunk_covariance_schaferstrimmer_with_nans(residuals: np.ndarray, not_nan_mask: np.ndarray, mint_shr_ridge: float):
"""Shrink empirical covariance according to the following method:
Schäfer, Juliane, and Korbinian Strimmer.
‘A Shrinkage Approach to Large-Scale Covariance Matrix Estimation and
Implications for Functional Genomics’. Statistical Applications in
Genetics and Molecular Biology 4, no. 1 (14 January 2005).
https://doi.org/10.2202/1544-6115.1175.
:meta private:
"""
= residuals.shape[0]
n_timeseries
# 我们需要经验协方差,即方差的非对角线元素之和。
# 经验相关矩阵与非对角线平方和
# 经验相关矩阵。
= np.zeros((n_timeseries, n_timeseries), dtype=np.float64).T
W = np.float64(0.0)
sum_var_emp_corr = np.float64(0.0)
sum_sq_emp_corr = np.float64(2e-8)
epsilon for i in prange(n_timeseries):
= not_nan_mask[i]
not_nan_mask_i for j in range(i + 1):
= not_nan_mask[j]
not_nan_mask_j = not_nan_mask_i & not_nan_mask_j
not_nan_mask_ij = np.sum(not_nan_mask_ij)
n_samples # 仅在时间序列对中有足够数量的非NaN样本时才进行计算。
if n_samples > 1:
# 掩码残差
= residuals[i][not_nan_mask_ij]
residuals_i = residuals[j][not_nan_mask_ij]
residuals_j = np.mean(residuals_i)
residuals_i_mean = np.mean(residuals_j)
residuals_j_mean = (residuals_i - residuals_i_mean)
X_i = (residuals_j - residuals_j_mean)
X_j # 经验协方差
= np.float64(1 / (n_samples - 1))
factor_emp_cov = factor_emp_cov * np.sum(X_i * X_j)
W[i, j] # 非对角元素之和
if i != j:
= np.float64(n_samples / (n_samples - 1)**3)
factor_var_emp_cor = np.std(residuals_i) + epsilon
residuals_i_std = np.std(residuals_j) + epsilon
residuals_j_std = X_i / (residuals_i_std + epsilon)
Xs_i = X_j / (residuals_j_std + epsilon)
Xs_j = np.mean(Xs_i)
Xs_im_mean = np.mean(Xs_j)
Xs_jm_mean # 经验相关性的非对角线方差之和
= (Xs_i - Xs_im_mean) * (Xs_j - Xs_jm_mean)
w = np.mean(w)
w_mean += factor_var_emp_cor * np.sum(np.square(w - w_mean))
sum_var_emp_corr # 平方和经验相关性
+= np.square(factor_emp_cov * n_samples * w_mean)
sum_sq_emp_corr
# 计算收缩强度
= 1.0 - max(min((sum_var_emp_corr) / (sum_sq_emp_corr + epsilon), 1.0), 0.0)
shrinkage
# 缩小经验协方差
for i in prange(n_timeseries):
for j in range(i + 1):
if i != j:
= W[j, i] = shrinkage * W[i, j]
W[i, j] else:
= W[j, i] = max(W[i, j], mint_shr_ridge)
W[i, j]
return W
class MinTraceSparse(MinTrace):
"""MinTraceSparse Reconciliation Class.
This is the implementation of a subset of MinTrace features using the sparse
matrix approach. It works much more efficient on datasets with many time series.
See the parent class for more details.
Currently supported:
* Methods using diagonal W matrix, i.e. "ols", "wls_struct", "wls_var",
* The standard MinT version (non-negative is not supported).
Note: due to the numerical instability of the matrix inversion when creating the
P matrix, the method is NOT guaranteed to give identical results to the non-sparse
version.
"""
= True
is_sparse_method
def _get_PW_matrices(
self,
S: Union[np.ndarray, sparse.spmatrix],
y_hat: np.ndarray,= None,
y_insample: Optional[np.ndarray] = None,
y_hat_insample: Optional[np.ndarray] int]] = None,
idx_bottom: Optional[List[
):# 形状残差_样本内(n_层级,观测值)
= ["wls_var", "mint_cov", "mint_shrink"]
res_methods = ["ols", "wls_struct", "wls_var"]
diag_only_methods
if self.method not in diag_only_methods:
raise NotImplementedError(
"Only the methods with diagonal W are supported as sparse operations"
)
= sparse.csr_matrix(S)
S
if self.method in res_methods and y_insample is None and y_hat_insample is None:
raise ValueError(
f"For methods {', '.join(res_methods)} you need to pass residuals"
)= S.shape
n_hiers, n_bottom
if self.method == "ols":
= np.ones(n_hiers)
W_diag elif self.method == "wls_struct":
= S @ np.ones((n_bottom,))
W_diag elif self.method == "wls_var" and y_insample is not None and y_hat_insample is not None:
# 形状为 (观测数, 层级数) 的残差
= (y_insample - y_hat_insample).T
residuals = residuals.shape
n, _
# 保护:防止过拟合模型
= np.sum(residuals, axis=0)
residuals_sum = np.abs(residuals_sum) < 1e-4
zero_residual_prc = np.mean(zero_residual_prc)
zero_residual_prc if zero_residual_prc > 0.98:
raise Exception(
f"Insample residuals close to 0, zero_residual_prc={zero_residual_prc}. Check `Y_df`"
)
# 保护:数据不可用/缺失的情况
# makoren:我发现这种掩盖行为弊大于利,结果显而易见。
# of nan-s can often be rubbish, I'd argue it's better to fail than give rubbish results, here
# 代码在遇到方差向量中的NaN时会直接失败。
# masked_res = np.ma.array(residuals, mask=np.isnan(residuals))
# covm = np.ma.cov(masked_res, rowvar=False, allow_masked=True).data
= np.var(residuals, axis=0, ddof=1)
W_diag else:
raise ValueError(f"Unknown reconciliation method {self.method}")
if any(W_diag < 1e-8):
raise Exception(
f"min_trace ({self.method}) needs covariance matrix to be positive definite."
)
if any(np.isnan(W_diag)):
raise Exception(
f"min_trace ({self.method}) needs covariance matrix to be positive definite (not nan)."
)
= sparse.spdiags(np.reciprocal(W_diag), 0, W_diag.size, W_diag.size)
M = sparse.csr_matrix(S.T @ M)
R
# 向量上P作用的实现:
def get_P_action(y):
= R @ y
b
= sparse.linalg.LinearOperator(
A =lambda v: R @ (S @ v)
(b.size, b.size), matvec
)
= sparse.linalg.bicgstab(A, b, atol=1e-5)
x_tilde, exit_code
return x_tilde
= sparse.linalg.LinearOperator(
P 1], y_hat.shape[0]), matvec=get_P_action
(S.shape[
)= sparse.spdiags(W_diag, 0, W_diag.size, W_diag.size)
W
return P, W
def fit(self,
S: sparse.csr_matrix,
y_hat: np.ndarray,= None,
y_insample: Optional[np.ndarray] = None,
y_hat_insample: Optional[np.ndarray] = None,
sigmah: Optional[np.ndarray] str] = None,
intervals_method: Optional[int] = None,
num_samples: Optional[int] = None,
seed: Optional[str, np.ndarray]] = None,
tags: Optional[Dict[= None):
idx_bottom: Optional[np.ndarray] # 根据实际应用需要,对基础预测进行必要的剪裁,以使其与之相符。
if self.nonnegative:
self.y_hat = np.clip(y_hat, 0, None)
else:
self.y_hat = y_hat
# 获取对账矩阵。
self.P, self.W = self._get_PW_matrices(
=S,
S=self.y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=idx_bottom,
idx_bottom
)
if self.nonnegative:
# 获取叶节点的数量。
= S.shape
_, n_bottom # 尽管现在已能确保P中的所有条目都
# 正向的,因为它作为线性算子在迭代中实现
# 解决稀疏线性系统的方法,我们需要调和以找到
# 如果任何一致的底层点预测值为负。
= self._reconcile(
y_tilde =S, P=self.P, y_hat=self.y_hat, level=None, sampler=None
S"mean"][-n_bottom:]
)[# 查找是否有任何预测为负值。
if np.any(y_tilde < 0):
# 剪掉那些负面的预测。
= np.clip(y_tilde, 0, None)
y_tilde # 通过覆盖基准预测,强制非负一致性。
# 聚合的、裁剪后的底层预测。
self.y_hat = S @ y_tilde
# 用指定的属性覆盖P和W矩阵的属性。
# 自底向上协调以强制投影到非负数
# 相干子空间。
self.P, self.W = BottomUpSparse()._get_PW_matrices(S=S, idx_bottom=None)
# 获取概率性协调的采样器。
self.sampler = self._get_sampler(
=S,
S=self.P,
P=self.W,
W=self.y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=sigmah,
sigmah=intervals_method,
intervals_method=num_samples,
num_samples=seed,
seed=tags,
tags
)# 将实例设置为已拟合。
self.fitted = True
return self
=3) show_doc(MinTrace, title_level
='MinTrace.fit', title_level=3) show_doc(MinTrace.fit, name
='MinTrace.predict', title_level=3) show_doc(MinTrace.predict, name
='MinTrace.fit_predict', title_level=3) show_doc(MinTrace.fit_predict, name
='MinTrace.sample', title_level=3) show_doc(MinTrace.sample, name
for method in ['ols', 'wls_struct', 'wls_var', 'mint_shrink']:
for nonnegative in [False, True]:
#测试非负行为
# 我们应该能够恢复相同的预测结果。
# 在这个例子中
= MinTrace(method=method, nonnegative=nonnegative)
cls_min_trace assert cls_min_trace.nonnegative is nonnegative
if cls_min_trace.insample:
assert method in ['wls_var', 'mint_cov', 'mint_shrink']
test_close(
cls_min_trace(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=S @ y_hat_bottom_insample,
y_hat_insample=idx_bottom if nonnegative else None
idx_bottom'mean'],
)[@ y_hat_bottom
S
)else:
test_close(
cls_min_trace(=S,
S=S @ y_hat_bottom,
y_hat=idx_bottom if nonnegative else None
idx_bottom'mean'],
)[@ y_hat_bottom
S
)= MinTrace(method='ols', nonnegative=False, num_threads=1).fit(S=S, y_hat=S @ y_hat_bottom)
mintrace_1thr = MinTrace(method='ols', nonnegative=False, num_threads=2).fit(S=S, y_hat=S @ y_hat_bottom)
mintrace_2thr
np.testing.assert_allclose(mintrace_1thr.y_hat, mintrace_2thr.y_hat)with ExceptionExpected(regex='min_trace (mint_cov)*'):
for nonnegative in [False, True]:
= MinTrace(method='mint_cov', nonnegative=nonnegative)
cls_min_trace
cls_min_trace(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=S @ y_hat_bottom_insample,
y_hat_insample=idx_bottom if nonnegative else None
idx_bottom )
# MinTrace-shr covariance's stress test
= S @ y_bottom
diff_len_y_insample = S @ y_hat_bottom_insample
diff_len_y_hat_insample -1, :-1] = np.nan
diff_len_y_insample[-1, :-1] = np.nan
diff_len_y_hat_insample[= MinTrace(method='mint_shrink')
cls_min_trace = cls_min_trace(
result_min_trace =S,
S=S @ y_hat_bottom,
y_hat=diff_len_y_insample,
y_insample=diff_len_y_hat_insample,
y_hat_insample=idx_bottom
idx_bottom )
#测试级别
for method in ['ols', 'wls_struct', 'wls_var', 'mint_shrink']:
for nonnegative in [False, True]:
= MinTrace(method=method, nonnegative=nonnegative)
cls_min_trace
test_close(
cls_min_trace(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=S @ y_hat_bottom_insample,
y_hat_insample=idx_bottom if nonnegative else None,
idx_bottom'mean'],
)[@ y_hat_bottom
S )
for method in ["ols", "wls_struct"]:
for nonnegative in [False, True]:
= MinTraceSparse(method=method, nonnegative=nonnegative)
cls_min_trace
test_close(
cls_min_trace(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=S @ y_hat_bottom_insample,
y_hat_insample=idx_bottom if nonnegative else None,
idx_bottom"mean"],
)[@ y_hat_bottom,
S )
5. 最优组合
class OptimalCombination(MinTrace):
"""Optimal Combination Reconciliation Class.
This reconciliation algorithm was proposed by Hyndman et al. 2011, the method uses generalized least squares
estimator using the coherency errors covariance matrix. Consider the covariance of the base forecast
$\\textrm{Var}(\epsilon_{h}) = \Sigma_{h}$, the $\mathbf{P}$ matrix of this method is defined by:
$$ \mathbf{P} = \\left(\mathbf{S}^{\intercal}\Sigma_{h}^{\dagger}\mathbf{S}\\right)^{-1}\mathbf{S}^{\intercal}\Sigma^{\dagger}_{h}$$
where $\Sigma_{h}^{\dagger}$ denotes the variance pseudo-inverse. The method was later proven equivalent to
`MinTrace` variants.
**Parameters:**<br>
`method`: str, allowed optimal combination methods: 'ols', 'wls_struct'.<br>
`nonnegative`: bool, reconciled forecasts should be nonnegative?<br>
**References:**<br>
- [Rob J. Hyndman, Roman A. Ahmed, George Athanasopoulos, Han Lin Shang (2010). \"Optimal Combination Forecasts for
Hierarchical Time Series\".](https://robjhyndman.com/papers/Hierarchical6.pdf).<br>
- [Shanika L. Wickramasuriya, George Athanasopoulos and Rob J. Hyndman (2010). \"Optimal Combination Forecasts for
Hierarchical Time Series\".](https://robjhyndman.com/papers/MinT.pdf).
- [Wickramasuriya, S.L., Turlach, B.A. & Hyndman, R.J. (2020). \"Optimal non-negative
forecast reconciliation". Stat Comput 30, 1167–1182,
https://doi.org/10.1007/s11222-020-09930-0](https://robjhyndman.com/publications/nnmint/).
"""
def __init__(self,
str,
method: bool = False,
nonnegative: int = 1):
num_threads: = ['ols', 'wls_struct']
comb_methods if method not in comb_methods:
raise ValueError(f"Optimal Combination class does not support method: \"{method}\"")
super().__init__(method=method, nonnegative=nonnegative, num_threads=num_threads)
self.insample = False
=3) show_doc(OptimalCombination, title_level
='OptimalCombination.fit', title_level=3) show_doc(OptimalCombination.fit, name
='OptimalCombination.predict', title_level=3) show_doc(OptimalCombination.predict, name
='OptimalCombination.fit_predict', title_level=3) show_doc(OptimalCombination.fit_predict, name
='OptimalCombination.sample', title_level=3) show_doc(OptimalCombination.sample, name
for method in ['ols', 'wls_struct']:
for nonnegative in [False, True]:
#测试非负行为
# 我们应该能够恢复相同的预测结果。
# 在这个例子中
= OptimalCombination(method=method, nonnegative=nonnegative)
cls_optimal_combination
test_close(
cls_optimal_combination(=S,
S=S @ y_hat_bottom,
y_hat=idx_bottom if nonnegative else None
idx_bottom'mean'],
)[@ y_hat_bottom
S )
#测试间隔
for method in ['ols', 'wls_struct']:
for nonnegative in [False, True]:
# 测试非负行为
# 我们应该能够恢复相同的预测结果。
# 在这个例子中
= OptimalCombination(method=method, nonnegative=nonnegative)
cls_optimal_combination
test_close(
cls_optimal_combination(=S,
S=S @ y_hat_bottom,
y_hat=idx_bottom if nonnegative else None,
idx_bottom'mean'],
)[@ y_hat_bottom
S )
6. 经验风险最小化
@njit
def lasso(X: np.ndarray, y: np.ndarray,
float, max_iters: int = 1_000,
lambda_reg: float = 1e-4):
tol: # lasso循环坐标下降法
= X.shape
n, feats = (X ** 2).sum(axis=0)
norms = np.zeros(feats, dtype=np.float32)
beta = np.zeros(feats, dtype=np.float32)
beta_changes = y.copy()
residuals
for it in range(max_iters):
for i, betai in enumerate(beta):
# 该特征接近于零,我们
# 继续下一个。
# 在这种情况下,最优的βi= 0
if abs(norms[i]) < 1e-8:
continue
= X[:, i]
xi #我们计算归一化导数
= betai + xi.flatten().dot(residuals) / norms[i] #(规范[i] + 1e-3)
rho #软阈值
= np.sign(rho) * max(np.abs(rho) - lambda_reg * n / norms[i], 0.)#(规范[i] + 1e-3), 0.)
beta[i] = np.abs(betai - beta[i])
beta_changes[i] if beta[i] != betai:
+= (betai - beta[i]) * xi
residuals if max(beta_changes) < tol:
break
#打印(它)
return beta
class ERM(HReconciler):
"""Optimal Combination Reconciliation Class.
The Empirical Risk Minimization reconciliation strategy relaxes the unbiasedness assumptions from
previous reconciliation methods like MinT and optimizes square errors between the reconciled predictions
and the validation data to obtain an optimal reconciliation matrix P.
The exact solution for $\mathbf{P}$ (`method='closed'`) follows the expression:
$$\mathbf{P}^{*} = \\left(\mathbf{S}^{\intercal}\mathbf{S}\\right)^{-1}\mathbf{Y}^{\intercal}\hat{\mathbf{Y}}\\left(\hat{\mathbf{Y}}\hat{\mathbf{Y}}\\right)^{-1}$$
The alternative Lasso regularized $\mathbf{P}$ solution (`method='reg_bu'`) is useful when the observations
of validation data is limited or the exact solution has low numerical stability.
$$\mathbf{P}^{*} = \\text{argmin}_{\mathbf{P}} ||\mathbf{Y}-\mathbf{S} \mathbf{P} \hat{Y} ||^{2}_{2} + \lambda ||\mathbf{P}-\mathbf{P}_{\\text{BU}}||_{1}$$
**Parameters:**<br>
`method`: str, one of `closed`, `reg` and `reg_bu`.<br>
`lambda_reg`: float, l1 regularizer for `reg` and `reg_bu`.<br>
**References:**<br>
- [Ben Taieb, S., & Koo, B. (2019). Regularized regression for hierarchical forecasting without
unbiasedness conditions. In Proceedings of the 25th ACM SIGKDD International Conference on Knowledge
Discovery & Data Mining KDD '19 (p. 1337-1347). New York, NY, USA: Association for Computing Machinery.](https://doi.org/10.1145/3292500.3330976).<br>
"""
def __init__(self,
str,
method: float = 1e-2):
lambda_reg: self.method = method
self.lambda_reg = lambda_reg
self.insample = True
def _get_PW_matrices(self,
S: np.ndarray,
y_hat: np.ndarray,
y_insample: np.ndarray,
y_hat_insample: np.ndarray,
idx_bottom: np.ndarray):= S.shape
n_hiers, n_bottom # y_hat_insample shape (n_hiers, obs)
# remove obs with nan values
= np.isnan(y_hat_insample).any(axis=0)
nan_idx = y_insample[:, ~nan_idx]
y_insample = y_hat_insample[:, ~nan_idx]
y_hat_insample #only using h validation steps to avoid
#computational burden
#print(y_hat.shape)
= min(y_hat.shape[1], y_hat_insample.shape[1])
h = y_hat_insample[:, -h:] # shape (h, n_hiers)
y_hat_insample = y_insample[:, -h:]
y_insample if self.method == 'closed':
= np.linalg.inv(S.T @ S) @ S.T @ y_insample
B = B.T
B = np.linalg.pinv(y_hat_insample.T) @ B
P = P.T
P elif self.method in ['reg', 'reg_bu']:
= np.kron(np.array(S, order='F'), np.array(y_hat_insample.T, order='F'))
X = np.zeros_like(S)
Pbu if self.method == 'reg_bu':
= S[idx_bottom]
Pbu[idx_bottom] = Pbu.T
Pbu = y_insample.T.flatten(order='F') - X @ Pbu.T.flatten(order='F')
Y if self.lambda_reg is None:
= np.max(np.abs(X.T.dot(Y)))
lambda_reg else:
= self.lambda_reg
lambda_reg = lasso(X, Y, lambda_reg)
P = P + Pbu.T.flatten(order='F')
P = P.reshape(-1, n_bottom, order='F').T
P else:
raise ValueError(f'Unknown reconciliation method {self.method}')
= np.eye(n_hiers, dtype=np.float32)
W
return P, W
def fit(self,
S,
y_hat,
y_insample,
y_hat_insample,= None,
sigmah: Optional[np.ndarray] str] = None,
intervals_method: Optional[int] = None,
num_samples: Optional[int] = None,
seed: Optional[str, np.ndarray]] = None,
tags: Optional[Dict[= None):
idx_bottom: Optional[np.ndarray] """ERM Fit Method.
**Parameters:**<br>
`S`: Summing matrix of size (`base`, `bottom`).<br>
`y_hat`: Forecast values of size (`base`, `horizon`).<br>
`y_insample`: Train values of size (`base`, `insample_size`).<br>
`y_hat_insample`: Insample train predictions of size (`base`, `insample_size`).<br>
`idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
`level`: float list 0-100, confidence levels for prediction intervals.<br>
`intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
`**sampler_kwargs`: Coherent sampler instantiation arguments.<br>
**Returns:**<br>
`self`: object, fitted reconciler.
"""
self.P, self.W = self._get_PW_matrices(S=S,
=y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=idx_bottom)
idx_bottomself.sampler = self._get_sampler(S=S,
=self.P,
P=self.W,
W=y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=sigmah,
sigmah=intervals_method,
intervals_method=num_samples,
num_samples=seed,
seed=tags)
tagsself.fitted = True
return self
def fit_predict(self,
S: np.ndarray,
y_hat: np.ndarray,= None,
idx_bottom: np.ndarray = None,
y_insample: Optional[np.ndarray] = None,
y_hat_insample: Optional[np.ndarray] = None,
sigmah: Optional[np.ndarray] int]] = None,
level: Optional[List[str] = None,
intervals_method: Optional[int] = None,
num_samples: Optional[int] = None,
seed: Optional[str, np.ndarray]] = None):
tags: Optional[Dict["""ERM Reconciliation Method.
**Parameters:**<br>
`S`: Summing matrix of size (`base`, `bottom`).<br>
`y_hat`: Forecast values of size (`base`, `horizon`).<br>
`y_insample`: Train values of size (`base`, `insample_size`).<br>
`y_hat_insample`: Insample train predictions of size (`base`, `insample_size`).<br>
`idx_bottom`: Indices corresponding to the bottom level of `S`, size (`bottom`).<br>
`level`: float list 0-100, confidence levels for prediction intervals.<br>
`intervals_method`: Sampler for prediction intevals, one of `normality`, `bootstrap`, `permbu`.<br>
**Returns:**<br>
`y_tilde`: Reconciliated y_hat using the ERM approach.
"""
# Fit 创建了 P、W 和采样器属性
self.fit(S=S,
=y_hat,
y_hat=y_insample,
y_insample=y_hat_insample,
y_hat_insample=sigmah,
sigmah=intervals_method,
intervals_method=num_samples,
num_samples=seed,
seed=tags, idx_bottom=idx_bottom)
tags
return self._reconcile(S=S, P=self.P, y_hat=y_hat,
=level, sampler=self.sampler)
level
__call__ = fit_predict
=3) show_doc(ERM, title_level
='ERM.fit', title_level=3) show_doc(ERM.fit, name
='ERM.predict', title_level=3) show_doc(ERM.predict, name
='ERM.fit_predict', title_level=3) show_doc(ERM.fit_predict, name
='ERM.sample', title_level=3) show_doc(ERM.sample, name
for method in ['reg_bu']:
= ERM(method=method, lambda_reg=None)
cls_erm
test_close(
cls_erm(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=S @ y_hat_bottom_insample,
y_hat_insample=idx_bottom
idx_bottom'mean'],
)[@ y_hat_bottom
S )
# 测试间隔
for method in ['reg_bu']:
= ERM(method=method, lambda_reg=None)
cls_erm
test_close(
cls_erm(=S,
S=S @ y_hat_bottom,
y_hat=S @ y_bottom,
y_insample=S @ y_hat_bottom_insample,
y_hat_insample=idx_bottom,
idx_bottom'mean'],
)[@ y_hat_bottom
S )
= dict(S=S,
reconciler_args =y_hat_base,
y_hat=y_base,
y_insample=y_hat_base_insample,
y_hat_insample=sigmah,
sigmah=[80, 90],
level='normality',
intervals_method=200,
num_samples=0,
seed=tags,
tags=idx_bottom
idx_bottom )
# 测试正态预测区间
# 我们应该恢复原始的sigmah。
# 对于底部时间序列
= BottomUp()
cls_bottom_up
test_eq(**reconciler_args)['sigmah'][idx_bottom],
cls_bottom_up(
sigmah[idx_bottom] )
# test normality interval's names
= BottomUp()
cls_bottom_up = cls_bottom_up(**reconciler_args)
bu_bootstrap_intervals
test_eq('mean', 'sigmah', 'quantiles'],
[list(bu_bootstrap_intervals.keys())
)
# test PERMBU interval's names
'intervals_method'] = 'permbu'
reconciler_args[= cls_bottom_up(**reconciler_args)
bu_permbu_intervals
test_eq('mean', 'quantiles'],
[list(bu_permbu_intervals.keys())
)
# 测试自顶向下 + 区间
for method in ['average_proportions', 'proportion_averages', 'forecast_proportions']:
for intervals_method in ['normality', 'bootstrap', 'permbu']:
= TopDown(method=method)
cls_top_down 'intervals_method'] = 'permbu'
reconciler_args[**reconciler_args) cls_top_down(
# 测试MinTrace + 区间
= False
nonnegative for method in ['ols', 'wls_struct', 'wls_var', 'mint_shrink']:
for intervals_method in ['normality', 'bootstrap', 'permbu']:
= MinTrace(method=method, nonnegative=nonnegative)
cls_min_trace 'intervals_method'] = intervals_method
reconciler_args[**reconciler_args) cls_min_trace(
# 测试最佳组合 + 区间
= False
nonnegative for method in ['ols', 'wls_struct']:
for intervals_method in ['normality', 'bootstrap', 'permbu']:
= OptimalCombination(method=method, nonnegative=nonnegative)
cls_optimal_combination 'intervals_method'] = intervals_method
reconciler_args[if not nonnegative:
**reconciler_args) cls_optimal_combination(
# 测试ERM + 区间
for method in ['reg_bu']:
for intervals_method in ['normality', 'bootstrap', 'permbu']:
= ERM(method=method, lambda_reg=None)
cls_erm 'intervals_method'] = intervals_method
reconciler_args[**reconciler_args) cls_erm(
# test coherent sample's shape
= dict(S=S,
reconciler_args =y_hat_base,
y_hat=y_base,
y_insample=y_hat_base_insample,
y_hat_insample=sigmah,
sigmah='bootstrap',
intervals_method=tags,
tags=idx_bottom)
idx_bottom
= BottomUp()
cls_bottom_up = []
shapes for intervals_method in ['normality', 'bootstrap', 'permbu']:
**reconciler_args)
cls_bottom_up.fit(= cls_bottom_up.sample(num_samples=100)
coherent_samples
shapes.append(coherent_samples.shape)0], shapes[1])
test_eq(shapes[0], shapes[2]) test_eq(shapes[
# 测试协方差等价性
= 100
n_samples = 10
n_hiers = np.random.rand(n_samples, n_hiers)
y_insample = np.random.rand(n_samples, n_hiers)
y_hat_insample = (y_insample - y_hat_insample).astype(np.float32)
residuals = np.isnan(residuals)
nan_mask
# 检查在无缺失值情况下协方差函数的等价性
= _ma_cov(residuals, ~nan_mask)
W_nb = np.cov(residuals)
W_np =1e-6)
np.testing.assert_allclose(W_nb, W_np, atol
# 检查在无缺失值情况下收缩协方差函数的等价性
= _shrunk_covariance_schaferstrimmer_no_nans(residuals, 2e-8)
W_ss_nonan = _shrunk_covariance_schaferstrimmer_with_nans(residuals, ~nan_mask, 2e-8)
W_ss_nan =1e-6)
np.testing.assert_allclose(W_ss_nan, W_ss_nonan, atol
# 检查在无缺失值情况下,收缩W的对角元素与非收缩W的对角元素是否等价
=1e-6) np.testing.assert_allclose(np.diag(W_np), np.diag(W_ss_nan), atol
参考文献
一般和谐
- Orcutt, G.H., Watts, H.W., & Edwards, J.B.(1968). 数据聚合与信息损失. 美国经济评论, 58 , 773(787).
- 采用分解方法加速产品线预测. 预测学杂志, 9 , 233–254. doi:10.1002/for.3980090304.
- 针对具有特定子聚合时间序列统计相关性的聚合变量时间序列预测策略的研究. 计算机与运筹学研究, 26 , 1133–1149. doi:10.1016/S0305-0548(99)00017-9.
- Hyndman, R.J., & Athanasopoulos, G. (2021). “预测:原则与实践,第3版:第11章:预测分层和分组序列。” OTexts: 澳大利亚墨尔本. OTexts.com/fpp3 于2022年7月访问.
最优和谐
- Rob J. Hyndman, Roman A. Ahmed, George Athanasopoulos, Han Lin Shang. “分层时间序列的最优组合预测” (2010).
- Shanika L. Wickramasuriya, George Athanasopoulos 和 Rob J. Hyndman. “分层时间序列的最优组合预测” (2010).
- Ben Taieb, S., & Koo, B. (2019). 无偏条件下的分层预测的正则化回归. 在第25届ACM SIGKDD国际会议上发表的知识发现与数据挖掘论文 KDD ’19 (第1337-1347页). 纽约, NY, USA: 计算机协会.
分层概率一致预测
If you find the code useful, please ⭐ us on Github