%load_ext autoreload
%autoreload 2
滞后变换
内置滞后变换
import copy
import inspect
import re
from typing import Callable, Optional, Sequence
import numpy as np
import coreforecast.lag_transforms as core_tfms
from coreforecast.grouped_array import GroupedArray as CoreGroupedArray
from sklearn.base import BaseEstimator, clone
def _pascal2camel(pascal_str: str) -> str:
return re.sub(r'(?<!^)(?=[A-Z])', '_', pascal_str).lower()
class _BaseLagTransform(BaseEstimator):
def _get_init_signature(self):
return {
for k, v in inspect.signature(self.__class__.__init__).parameters.items()
k: v if k != 'self'
}
def _set_core_tfm(self, lag: int) -> '_BaseLagTransform':
= {
init_args getattr(self, k) for k in self._get_init_signature()
k:
}self._core_tfm = getattr(core_tfms, self.__class__.__name__)(lag=lag, **init_args)
return self
def _get_name(self, lag: int) -> str:
= self._get_init_signature()
init_params = f'{_pascal2camel(self.__class__.__name__)}_lag{lag}'
result = [
changed_params f"{name}{getattr(self, name)}"
for name, arg in init_params.items()
if arg.default != getattr(self, name)
]if changed_params:
+= "_" + "_".join(changed_params)
result return result
def transform(self, ga: CoreGroupedArray) -> np.ndarray:
return self._core_tfm.transform(ga)
def update(self, ga: CoreGroupedArray) -> np.ndarray:
return self._core_tfm.update(ga)
def take(self, idxs: np.ndarray) -> '_BaseLagTransform':
= copy.deepcopy(self)
out = self._core_tfm.take(idxs)
out._core_tfm return out
@staticmethod
def stack(transforms: Sequence['_BaseLagTransform']) -> '_BaseLagTransform':
= copy.deepcopy(transforms[0])
out = transforms[0]._core_tfm.stack(
out._core_tfm for tfm in transforms]
[tfm._core_tfm
)return out
class Lag(_BaseLagTransform):
def __init__(self, lag: int):
self.lag = lag
self._core_tfm = core_tfms.Lag(lag=lag)
def _set_core_tfm(self, _lag: int) -> 'Lag':
return self
def _get_name(self, lag: int) -> str:
return f'lag{lag}'
def __eq__(self, other):
return isinstance(other, Lag) and self.lag == other.lag
class _RollingBase(_BaseLagTransform):
"Rolling statistic"
def __init__(self, window_size: int, min_samples: Optional[int] = None):
"""
Parameters
----------
window_size : int
Number of samples in the window.
min_samples: int
Minimum samples required to output the statistic.
If `None`, will be set to `window_size`.
"""
self.window_size = window_size
self.min_samples = min_samples
class RollingMean(_RollingBase):
...
class RollingStd(_RollingBase):
...
class RollingMin(_RollingBase):
...
class RollingMax(_RollingBase):
...
class RollingQuantile(_RollingBase):
def __init__(self, p: float, window_size: int, min_samples: Optional[int] = None):
super().__init__(window_size=window_size, min_samples=min_samples)
self.p = p
def _set_core_tfm(self, lag: int):
self._core_tfm = core_tfms.RollingQuantile(
=lag, p=self.p, window_size=self.window_size, min_samples=self.min_samples
lag
)return self
= np.random.default_rng(seed=0)
rng = rng.integers(low=50, high=100, size=20)
lengths = rng.random(lengths.sum())
data = CoreGroupedArray(data, np.append(0, lengths.cumsum()))
ga 7)._set_core_tfm(1).transform(ga) RollingMean(
array([ nan, nan, nan, ..., 0.32114229, 0.3672723 ,
0.39137066])
class _Seasonal_RollingBase(_BaseLagTransform):
"""在季节性周期上滚动统计"""
def __init__(
self, season_length: int, window_size: int, min_samples: Optional[int] = None
):"""
Parameters
----------
season_length : int
Periodicity of the seasonal period.
window_size : int
Number of samples in the window.
min_samples: int
Minimum samples required to output the statistic.
If `None`, will be set to `window_size`.
"""
self.season_length = season_length
self.window_size = window_size
self.min_samples = min_samples
::: {#40cf5b8b-d079-40ee-89ca-36a1ea5dfb90 .cell 0=‘出’ 1=‘口’}
class SeasonalRollingMean(_Seasonal_RollingBase):
...
class SeasonalRollingStd(_Seasonal_RollingBase):
...
class SeasonalRollingMin(_Seasonal_RollingBase):
...
class SeasonalRollingMax(_Seasonal_RollingBase):
...
class SeasonalRollingQuantile(_Seasonal_RollingBase):
def __init__(self, p: float, season_length: int, window_size: int, min_samples: Optional[int] = None):
super().__init__(season_length=season_length, window_size=window_size, min_samples=min_samples)
self.p = p
:::
7, 4)._set_core_tfm(2).transform(ga) SeasonalRollingStd(
array([ nan, nan, nan, ..., 0.35518094, 0.25199008,
0.40335074])
class _ExpandingBase(_BaseLagTransform):
"""扩展统计"""
def __init__(self):
...
class ExpandingMean(_ExpandingBase):
...
class ExpandingStd(_ExpandingBase):
...
class ExpandingMin(_ExpandingBase):
...
class ExpandingMax(_ExpandingBase):
...
class ExpandingQuantile(_ExpandingBase):
def __init__(self, p: float):
self.p = p
3).transform(ga) ExpandingMin()._set_core_tfm(
array([ nan, nan, nan, ..., 0.00297614, 0.00297614,
0.00297614])
class ExponentiallyWeightedMean(_BaseLagTransform):
"""指数加权平均
参数
----------
alpha : float
平滑因子。"""
def __init__(self, alpha: float):
self.alpha = alpha
0.7)._set_core_tfm(4).transform(ga) ExponentiallyWeightedMean(
array([ nan, nan, nan, ..., 0.3074053 , 0.5567787 ,
0.31390901])
class Offset(_BaseLagTransform):
"""Shift series before computing transformation
Parameters
----------
tfm : LagTransform
Transformation to be applied
n : int
Number of positions to shift (lag) series before applying the transformation"""
def __init__(self, tfm: _BaseLagTransform, n: int):
self.tfm = tfm
self.n = n
def _get_name(self, lag: int) -> str:
return self.tfm._get_name(lag + self.n)
def _set_core_tfm(self, lag: int) -> 'Offset':
self._core_tfm = clone(self.tfm)._set_core_tfm(lag + self.n)
return self
= Offset(RollingMean(window_size=10), 2)._set_core_tfm(5)
offset assert offset._get_name(5) == "rolling_mean_lag7_window_size10"
= offset.transform(ga)
transformed = RollingMean(window_size=10)._set_core_tfm(5).transform(ga._with_data(Lag(2).transform(ga)))
expected np.testing.assert_allclose(transformed, expected)
class Combine(_BaseLagTransform):
"""Combine two lag transformations using an operator
Parameters
----------
tfm1 : LagTransform
First transformation.
tfm2 : LagTransform
Second transformation.
operator : callable
Binary operator that defines how to combine the two transformations."""
def __init__(
self, tfm1: _BaseLagTransform, tfm2: _BaseLagTransform, operator: Callable
):self.tfm1 = tfm1
self.tfm2 = tfm2
self.operator = operator
def _set_core_tfm(self, lag: int) -> 'Combine':
self.tfm1 = clone(self.tfm1)._set_core_tfm(lag)
self.tfm2 = clone(self.tfm2)._set_core_tfm(lag)
return self
def _get_name(self, lag: int) -> str:
= getattr(self.tfm1, 'lag', lag)
lag1 = getattr(self.tfm2, 'lag', lag)
lag2 return f'{self.tfm1._get_name(lag1)}_{self.operator.__name__}_{self.tfm2._get_name(lag2)}'
def transform(self, ga: CoreGroupedArray) -> np.ndarray:
return self.operator(self.tfm1.transform(ga), self.tfm2.transform(ga))
def update(self, ga: CoreGroupedArray) -> np.ndarray:
return self.operator(self.tfm1.update(ga), self.tfm2.update(ga))
import operator
= Combine(Lag(1), Lag(2), operator.truediv)
comb assert comb._get_name(1) == 'lag1_truediv_lag2'
= comb.transform(ga)
transformed = Lag(1).transform(ga) / Lag(2).transform(ga)
expected np.testing.assert_allclose(transformed, expected)
# 检查所有变换是否能被正确使用
= [
tfms
ExpandingMax(),
ExpandingMean(),
ExpandingMin(),
ExpandingStd(),0.5),
ExpandingQuantile(0.1),
ExponentiallyWeightedMean(7),
RollingMax(7),
RollingMean(7),
RollingMin(7),
RollingStd(0.5, 7),
RollingQuantile(7, 2),
SeasonalRollingMax(7, 2),
SeasonalRollingMean(7, 2),
SeasonalRollingMin(7, 2),
SeasonalRollingStd(0.5, 7, 7),
SeasonalRollingQuantile(7), 2),
Offset(RollingMax(5), Offset(RollingMean(5), 2), operator.truediv),
Combine(RollingMean(5), 2), RollingMean(5), operator.truediv),
Combine(Offset(RollingMean(
]for tfm in tfms:
1)
tfm._set_core_tfm(1)
tfm._get_name(
tfm.transform(ga) tfm.update(ga)
Give us a ⭐ on Github