import warnings
from typing import Dict, Optional
import numpy as np
from scipy.stats import norm
from sklearn.preprocessing import OneHotEncoder
from hierarchicalforecast.utils import is_strictly_hierarchical, cov2corr
概率方法
在这里,我们提供了一系列旨在提供层次一致的概率分布的方法,这意味着它们生成具有层次线性约束的多变量时间序列样本。
我们设计这些方法是为了扩展 core.HierarchicalForecast
类的功能。查看它们的使用示例。
from nbdev.showdoc import add_docs, show_doc
from fastcore.test import ExceptionExpected, test_close, test_eq, test_fail
from hierarchicalforecast.methods import BottomUp, TopDown, MiddleOut, MinTrace, OptimalCombination, ERM
1. 正态性
class Normality:
""" Normality Probabilistic Reconciliation Class.
The Normality method leverages the Gaussian Distribution linearity, to
generate hierarchically coherent prediction distributions. This class is
meant to be used as the `sampler` input as other `HierarchicalForecast` [reconciliation classes](https://nixtla.github.io/hierarchicalforecast/methods.html).
Given base forecasts under a normal distribution:
$$\hat{y}_{h} \sim \mathrm{N}(\hat{\\boldsymbol{\\mu}}, \hat{\mathbf{W}}_{h})$$
The reconciled forecasts are also normally distributed:
$$
\\tilde{y}_{h} \sim \mathrm{N}(\mathbf{S}\mathbf{P}\hat{\\boldsymbol{\\mu}},
\mathbf{S}\mathbf{P}\hat{\mathbf{W}}_{h} \mathbf{P}^{\intercal} \mathbf{S}^{\intercal})
$$
**Parameters:**<br>
`S`: np.array, summing matrix of size (`base`, `bottom`).<br>
`P`: np.array, reconciliation matrix of size (`bottom`, `base`).<br>
`y_hat`: Point forecasts values of size (`base`, `horizon`).<br>
`W`: np.array, hierarchical covariance matrix of size (`base`, `base`).<br>
`sigmah`: np.array, forecast standard dev. of size (`base`, `horizon`).<br>
`num_samples`: int, number of bootstraped samples generated.<br>
`seed`: int, random seed for numpy generator's replicability.<br>
**References:**<br>
- [Panagiotelis A., Gamakumara P. Athanasopoulos G., and Hyndman R. J. (2022).
"Probabilistic forecast reconciliation: Properties, evaluation and score optimisation". European Journal of Operational Research.](https://www.sciencedirect.com/science/article/pii/S0377221722006087)
"""
def __init__(self,
S: np.ndarray,
P: np.ndarray,
y_hat: np.ndarray,
sigmah: np.ndarray,
W: np.ndarray,int = 0):
seed: self.S = S
self.P = P
self.y_hat = y_hat
self.SP = self.S @ self.P
self.W = W
self.sigmah = sigmah
self.seed = seed
# Base Normality Errors assume independence/diagonal covariance
# TODO: replace bilinearity with elementwise row multiplication
= cov2corr(self.W)
R1 = [np.diag(sigma) @ R1 @ np.diag(sigma).T for sigma in self.sigmah.T]
Wh
# Reconciled covariances across forecast horizon
self.cov_rec = [(self.SP @ W @ self.SP.T) for W in Wh]
self.sigmah_rec = np.hstack([np.sqrt(np.diag(cov))[:, None] for cov in self.cov_rec])
def get_samples(self, num_samples: int):
"""Normality Coherent Samples.
Obtains coherent samples under the Normality assumptions.
**Parameters:**<br>
`num_samples`: int, number of samples generated from coherent distribution.<br>
**Returns:**<br>
`samples`: Coherent samples of size (`base`, `horizon`, `num_samples`).
"""
= np.random.RandomState(self.seed)
state = self.y_hat.shape
n_series, n_horizon = np.empty(shape=(num_samples, n_series, n_horizon))
samples for t in range(n_horizon):
with warnings.catch_warnings():
# Avoid 'RuntimeWarning: covariance is not positive-semidefinite.'
# By definition the multivariate distribution is not full-rank
= state.multivariate_normal(mean=self.SP @ self.y_hat[:,t],
partial_samples =self.cov_rec[t], size=num_samples)
cov= partial_samples
samples[:,:,t]
# [samples, N, H] -> [N, H, samples]
= samples.transpose((1, 2, 0))
samples return samples
def get_prediction_levels(self, res, level):
""" 将已协调的预测水平添加到结果字典中 """
'sigmah'] = self.sigmah_rec
res[= np.asarray(level)
level = norm.ppf(0.5 + level / 200)
z for zs, lv in zip(z, level):
f'lo-{lv}'] = res['mean'] - zs * self.sigmah_rec
res[f'hi-{lv}'] = res['mean'] + zs * self.sigmah_rec
res[return res
def get_prediction_quantiles(self, res, quantiles):
""" 将调和后的预测分位数添加到结果字典中 """
# [N,H,None] + [None None,Q] * [N,H,None] -> [N,H,Q]
= norm.ppf(quantiles)
z 'sigmah'] = self.sigmah_rec
res['quantiles'] = res['mean'][:,:,None] + z[None,None,:] * self.sigmah_rec[:,:,None]
res[return res
=3) show_doc(Normality, title_level
=3) show_doc(Normality.get_samples, title_level
2. 引导程序 (Bootstrap)
class Bootstrap:
""" Bootstrap Probabilistic Reconciliation Class.
This method goes beyond the normality assumption for the base forecasts,
the technique simulates future sample paths and uses them to generate
base sample paths that are latered reconciled. This clever idea and its
simplicity allows to generate coherent bootstraped prediction intervals
for any reconciliation strategy. This class is meant to be used as the `sampler`
input as other `HierarchicalForecast` [reconciliation classes](https://nixtla.github.io/hierarchicalforecast/methods.html).
Given a boostraped set of simulated sample paths:
$$(\hat{\mathbf{y}}^{[1]}_{\\tau}, \dots ,\hat{\mathbf{y}}^{[B]}_{\\tau})$$
The reconciled sample paths allow for reconciled distributional forecasts:
$$(\mathbf{S}\mathbf{P}\hat{\mathbf{y}}^{[1]}_{\\tau}, \dots ,\mathbf{S}\mathbf{P}\hat{\mathbf{y}}^{[B]}_{\\tau})$$
**Parameters:**<br>
`S`: np.array, summing matrix of size (`base`, `bottom`).<br>
`P`: np.array, reconciliation matrix of size (`bottom`, `base`).<br>
`y_hat`: Point forecasts values of size (`base`, `horizon`).<br>
`y_insample`: Insample values of size (`base`, `insample_size`).<br>
`y_hat_insample`: Insample point forecasts of size (`base`, `insample_size`).<br>
`num_samples`: int, number of bootstraped samples generated.<br>
`seed`: int, random seed for numpy generator's replicability.<br>
**References:**<br>
- [Puwasala Gamakumara Ph. D. dissertation. Monash University, Econometrics and Business Statistics (2020).
"Probabilistic Forecast Reconciliation"](https://bridges.monash.edu/articles/thesis/Probabilistic_Forecast_Reconciliation_Theory_and_Applications/11869533)
- [Panagiotelis A., Gamakumara P. Athanasopoulos G., and Hyndman R. J. (2022).
"Probabilistic forecast reconciliation: Properties, evaluation and score optimisation". European Journal of Operational Research.](https://www.sciencedirect.com/science/article/pii/S0377221722006087)
"""
def __init__(self,
S: np.ndarray,
P: np.ndarray,
y_hat: np.ndarray,
y_insample: np.ndarray,
y_hat_insample: np.ndarray,int=100,
num_samples: int = 0,
seed: = None):
W: np.ndarray self.S = S
self.P = P
self.W = W
self.y_hat = y_hat
self.y_insample = y_insample
self.y_hat_insample = y_hat_insample
self.num_samples = num_samples
self.seed = seed
def get_samples(self, num_samples: int):
"""Bootstrap Sample Reconciliation Method.
Applies Bootstrap sample reconciliation method as defined by Gamakumara 2020.
Generating independent sample paths and reconciling them with Bootstrap.
**Parameters:**<br>
`num_samples`: int, number of samples generated from coherent distribution.<br>
**Returns:**<br>
`samples`: Coherent samples of size (`base`, `horizon`, `num_samples`).
"""
= self.y_insample - self.y_hat_insample
residuals = self.y_hat.shape[1]
h
#removing nas from residuals
= residuals[:, np.isnan(residuals).sum(axis=0) == 0]
residuals = np.arange(residuals.shape[1] - h)
sample_idx = np.random.RandomState(self.seed)
state = state.choice(sample_idx, size=num_samples)
samples_idx = [self.y_hat + residuals[:, idx:(idx + h)] for idx in samples_idx]
samples = self.S @ self.P
SP = np.apply_along_axis(lambda path: np.matmul(SP, path),
samples =1, arr=samples)
axis= np.stack(samples)
samples_np
# [samples, N, H] -> [N, H, samples]
= samples_np.transpose((1, 2, 0))
samples_np return samples_np
def get_prediction_levels(self, res, level):
""" 将已对账的预测水平添加到结果字典中 """
= self.get_samples(num_samples=self.num_samples)
samples for lv in level:
= (100 - lv) / 200
min_q = min_q + lv / 100
max_q f'lo-{lv}'] = np.quantile(samples, min_q, axis=2)
res[f'hi-{lv}'] = np.quantile(samples, max_q, axis=2)
res[return res
def get_prediction_quantiles(self, res, quantiles):
""" 将调和后的预测分位数添加到结果字典中 """
= self.get_samples(num_samples=self.num_samples)
samples
# [Q, N, H] -> [N, H, Q]
= np.quantile(samples, quantiles, axis=2)
sample_quantiles 'quantiles'] = sample_quantiles.transpose((1, 2, 0))
res[return res
=3) show_doc(Bootstrap, title_level
=3) show_doc(Bootstrap.get_samples, title_level
3. PERMBU
class PERMBU:
""" PERMBU Probabilistic Reconciliation Class.
The PERMBU method leverages empirical bottom-level marginal distributions
with empirical copula functions (describing bottom-level dependencies) to
generate the distribution of aggregate-level distributions using BottomUp
reconciliation. The sample reordering technique in the PERMBU method reinjects
multivariate dependencies into independent bottom-level samples.
Algorithm:
1. For all series compute conditional marginals distributions.
2. Compute residuals $\hat{\epsilon}_{i,t}$ and obtain rank permutations.
2. Obtain K-sample from the bottom-level series predictions.
3. Apply recursively through the hierarchical structure:<br>
3.1. For a given aggregate series $i$ and its children series:<br>
3.2. Obtain children's empirical joint using sample reordering copula.<br>
3.2. From the children's joint obtain the aggregate series's samples.
**Parameters:**<br>
`S`: np.array, summing matrix of size (`base`, `bottom`).<br>
`tags`: Each key is a level and each value its `S` indices.<br>
`y_insample`: Insample values of size (`base`, `insample_size`).<br>
`y_hat_insample`: Insample point forecasts of size (`base`, `insample_size`).<br>
`sigmah`: np.array, forecast standard dev. of size (`base`, `horizon`).<br>
`num_samples`: int, number of normal prediction samples generated.<br>
`seed`: int, random seed for numpy generator's replicability.<br>
**References:**<br>
- [Taieb, Souhaib Ben and Taylor, James W and Hyndman, Rob J. (2017).
Coherent probabilistic forecasts for hierarchical time series.
International conference on machine learning ICML.](https://proceedings.mlr.press/v70/taieb17a.html)
"""
def __init__(self,
S: np.ndarray,str, np.ndarray],
tags: Dict[
y_hat: np.ndarray,
y_insample: np.ndarray,
y_hat_insample: np.ndarray,
sigmah: np.ndarray,int] = None,
num_samples: Optional[int=0,
seed: = None):
P: np.ndarray # PERMBU仅适用于严格分层结构
if not is_strictly_hierarchical(S, tags):
raise ValueError('PERMBU probabilistic reconciliation requires strictly hierarchical structures.')
self.S = S
self.P = P
self.y_hat = y_hat
self.y_insample = y_insample
self.y_hat_insample = y_hat_insample
self.sigmah = sigmah
self.num_samples = num_samples
self.seed = seed
def _obtain_ranks(self, array):
""" Vector ranks
Efficiently obtain vector ranks.
Example `array=[4,2,7,1]` -> `ranks=[2, 1, 3, 0]`.
**Parameters**<br>
`array`: np.array, matrix with floats or integers on which the
ranks will be computed on the second dimension.<br>
**Returns**<br>
`ranks`: np.array, matrix with ranks along the second dimension.<br>
"""
= array.argsort(axis=1)
temp = np.empty_like(temp)
ranks = np.arange(temp.shape[1])
a_range for i_row in range(temp.shape[0]):
= a_range
ranks[i_row, temp[i_row,:]] return ranks
def _permutate_samples(self, samples, permutations):
""" Permutate Samples
Applies efficient vectorized permutation on the samples.
**Parameters**<br>
`samples`: np.array [series,samples], independent base samples.<br>
`permutations`: np.array [series,samples], permutation ranks with wich
which `samples` dependence will be restored see `_obtain_ranks`.<br>
**Returns**<br>
`permutated_samples`: np.array.<br>
"""
# 生成辅助和扁平排列索引
= permutations.shape
n_rows, n_cols = np.arange(n_rows)[:,None] * n_cols
aux_row_idx = np.repeat(aux_row_idx, repeats=n_cols, axis=1)
aux_row_idx = permutations.flatten() + aux_row_idx.flatten()
permutate_idxs
# 应用平面排列索引并恢复原始形状
= samples.flatten()
permutated_samples = permutated_samples[permutate_idxs]
permutated_samples = permutated_samples.reshape(n_rows, n_cols)
permutated_samples return permutated_samples
def _permutate_predictions(self, prediction_samples, permutations):
""" Permutate Prediction Samples
Applies permutations to prediction_samples across the horizon.
**Parameters**<br>
`prediction_samples`: np.array [series,horizon,samples], independent
base prediction samples.<br>
`permutations`: np.array [series, samples], permutation ranks with which
`samples` dependence will be restored see `_obtain_ranks`.
it can also apply a random permutation.<br>
**Returns**<br>
`permutated_prediction_samples`: np.array.<br>
"""
# 在整个预测范围内应用排列
= prediction_samples.copy()
permutated_prediction_samples
= prediction_samples.shape
_, n_horizon, _ for t in range(n_horizon):
= \
permutated_prediction_samples[:,t,:] self._permutate_samples(prediction_samples[:,t,:],
permutations)return permutated_prediction_samples
def _nonzero_indexes_by_row(self, M):
return [np.nonzero(M[row,:])[0] for row in range(len(M))]
def get_samples(self, num_samples: Optional[int] = None):
"""PERMBU Sample Reconciliation Method.
Applies PERMBU reconciliation method as defined by Taieb et. al 2017.
Generating independent base prediction samples, restoring its multivariate
dependence using estimated copula with reordering and applying the BottomUp
aggregation to the new samples.
**Parameters:**<br>
`num_samples`: int, number of samples generated from coherent distribution.<br>
**Returns:**<br>
`samples`: Coherent samples of size (`base`, `horizon`, `num_samples`).
"""
# 计算残差并排列置换
= self.y_insample - self.y_hat_insample
residuals = residuals[:, np.isnan(residuals).sum(axis=0) == 0]
residuals
# 样本h步超前基准边际分布
if num_samples is None:
= residuals.shape[1]
num_samples
# 扩展残差以匹配样本数量 [(a,b),T] -> [(a,b),num_samples]
if num_samples > residuals.shape[1]:
= np.random.choice(residuals.shape[1], size=num_samples)
residuals_idxs else:
= np.random.choice(residuals.shape[1], size=num_samples,
residuals_idxs =False)
replace= residuals[:,residuals_idxs]
residuals = self._obtain_ranks(residuals)
rank_permutations
= np.random.RandomState(self.seed)
state = self.y_hat.shape
n_series, n_horizon
= np.array([
base_samples =m, scale=s, size=num_samples) for m, s in \
state.normal(loczip(self.y_hat.flatten(), self.sigmah.flatten())
])= base_samples.reshape(n_series, n_horizon, num_samples)
base_samples
# 初始化PERMBU工具
= base_samples.copy()
rec_samples try:
= OneHotEncoder(sparse_output=False, dtype=np.float32)
encoder except TypeError:
= OneHotEncoder(sparse=False, dtype=np.float32)
encoder = np.vstack(self._nonzero_indexes_by_row(self.S.T))
hier_links
# 自底向上的层次遍历
= hier_links.shape[1]-1
hier_levels for level_idx in reversed(range(hier_levels)):
# 从父子链接中获取聚合矩阵
= np.unique(hier_links[:,level_idx:level_idx+2],
children_links =0)
axis= np.unique(children_links[:,1])
children_idxs = np.unique(children_links[:,0])
parent_idxs = encoder.fit_transform(children_links).T
Agg = Agg[:len(parent_idxs),:]
Agg
# 在每个预测步骤中对子样本进行排列
= rank_permutations[children_idxs, :]
children_permutations = rec_samples[children_idxs,:,:]
children_samples = self._permutate_predictions(
children_samples =children_samples,
prediction_samples=children_permutations
permutations
)
# 用自底向上的聚合结果覆盖hier_samples
# 并在聚合后随机打乱父预测结果
= np.einsum('ab,bhs->ahs', Agg, children_samples)
parent_samples = np.array([
random_permutation \
np.random.permutation(np.arange(num_samples)) for serie in range(len(parent_samples))
])= self._permutate_predictions(
parent_samples =parent_samples,
prediction_samples=random_permutation
permutations
)
= parent_samples
rec_samples[parent_idxs,:,:] return rec_samples
def get_prediction_levels(self, res, level):
""" 将已协调的预测水平添加到结果字典中 """
= self.get_samples(num_samples=self.num_samples)
samples for lv in level:
= (100 - lv) / 200
min_q = min_q + lv / 100
max_q f'lo-{lv}'] = np.quantile(samples, min_q, axis=2)
res[f'hi-{lv}'] = np.quantile(samples, max_q, axis=2)
res[return res
def get_prediction_quantiles(self, res, quantiles):
""" 将调和后的预测分位数添加到结果字典中 """
= self.get_samples(num_samples=self.num_samples)
samples
# [Q, N, H] -> [N, H, Q]
= np.quantile(samples, quantiles, axis=2)
sample_quantiles 'quantiles'] = sample_quantiles.transpose((1, 2, 0))
res[return res
=3) show_doc(PERMBU, title_level
=3) show_doc(PERMBU.get_samples, title_level
from hierarchicalforecast.evaluation import (
rel_mse,
msse,
energy_score,
scaled_crps,
log_score )
= np.array([[1., 1., 1., 1.],
S 1., 1., 0., 0.],
[0., 0., 1., 1.],
[0., 1., 0., 0.],
[1., 0., 0., 0.],
[0., 0., 1., 0.],
[0., 0., 0., 1.]])
[= 2
h = np.array([10., 5., 4., 2., 1.])
_y = np.vstack([i * _y for i in range(1, 5)])
y_bottom = np.roll(y_bottom, 1)
y_hat_bottom_insample 0] = np.nan
y_hat_bottom_insample[:, = np.vstack([i * np.ones(h) for i in range(1, 5)])
y_hat_bottom = [4, 3, 5, 6]
idx_bottom = {'level1': np.array([0]),
tags 'level2': np.array([1, 2]),
'level3': idx_bottom}
# 层次结构中所有级别的sigmah
# sigmah 用于朴素方法
# 如本文所计算:
#https://otexts.com/fpp3/预测区间.html
= S @ y_bottom
y_base = S @ y_hat_bottom
y_hat_base = S @ y_hat_bottom_insample
y_hat_base_insample = np.nansum((y_base - y_hat_base_insample) ** 2, axis=1) / (y_base.shape[1] - 1)
sigma = np.sqrt(sigma)
sigma = sigma[:, None] * np.sqrt(np.vstack([np.arange(1, h + 1) for _ in range(y_base.shape[0])]))
sigmah = np.random.normal(scale=sigmah)
noise = y_hat_base + noise y_test
# 测试采样器
= BottomUp()
cls_bottom_up = cls_bottom_up._get_PW_matrices(S=S, idx_bottom=idx_bottom)
P, W
= Normality(S=S, P=P, W=W,
normality_sampler =y_hat_base,
y_hat=sigmah)
sigmah= Bootstrap(S=S, P=P, W=W,
bootstrap_sampler =y_hat_base,
y_hat=y_base,
y_insample=y_hat_base_insample,
y_hat_insample=1_000)
num_samples= Bootstrap(S=S, P=P, W=W,
empty_bootstrap_sampler =y_hat_base,
y_hat=y_base,
y_insample=y_base,
y_hat_insample=1_000)
num_samples= PERMBU(S=S, P=P,
permbu_sampler =tags,
tags=y_hat_base,
y_hat=y_base,
y_insample=y_hat_base_insample,
y_hat_insample=sigmah)
sigmah= PERMBU(S=S, P=P,
empty_permbu_sampler =tags,
tags=y_hat_base,
y_hat=y_base,
y_insample=y_base,
y_hat_insample=sigmah) sigmah
# 测试相干样本的形状
= normality_sampler.get_samples(num_samples=100)
normality_samples = bootstrap_sampler.get_samples(num_samples=100)
bootstrap_samples = permbu_sampler.get_samples(num_samples=100)
permbu_samples
test_eq(bootstrap_samples.shape, normality_samples.shape) test_eq(bootstrap_samples.shape, permbu_samples.shape)
# test RelMSE's execution
=y_test, y_hat=y_hat_base, y_train=y_base)
rel_mse(y
# test MSSE's execution
=y_test, y_hat=y_hat_base, y_train=y_base)
msse(y
# test energy score's execution
=y_test,
energy_score(y=bootstrap_samples, y_sample2=permbu_samples)
y_sample1
# test scaled CRPS' execution
= np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
quantiles = np.quantile(bootstrap_samples, q=quantiles, axis=2)
bootstrap_quantiles = bootstrap_quantiles.transpose((1,2,0)) # [Q,N,H] -> [N,H,Q]
bootstrap_quantiles =y_test, y_hat=bootstrap_quantiles, quantiles=quantiles)
scaled_crps(y
# 测试日志分数的执行
= np.concatenate([cov[:,:,None] for cov in normality_sampler.cov_rec], axis=2)
cov =y_test, y_hat=y_hat_base, cov=cov, allow_singular=True) log_score(y
# 测试分位数损失保护
= np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.2])
quantiles
test_fail(
scaled_crps,='between 0 and 1',
contains=(y_test, bootstrap_quantiles, quantiles),
args )
参考文献
- Rob J. Hyndman 和 George Athanasopoulos (2018). “预测原则与实践,协调分布预测”.
- Puwasala Gamakumara 博士论文. 莫纳什大学, 计量经济学与商业统计 (2020). “概率预测协调”
- Panagiotelis A., Gamakumara P. Athanasopoulos G. 和 Hyndman R. J. (2022). “概率预测协调:属性、评估和评分优化”. 欧洲运筹学期刊.
- Taieb, Souhaib Ben 和 Taylor, James W 和 Hyndman, Rob J. (2017). 层次时间序列的连贯概率预测. 国际机器学习会议 ICML.
If you find the code useful, please ⭐ us on Github