%load_ext autoreload
%autoreload 2
import copy
import inspect
import re
from typing import Callable, Optional, Sequence
import numpy as np
import coreforecast.lag_transforms as core_tfms
from coreforecast.grouped_array import GroupedArray as CoreGroupedArray
from sklearn.base import BaseEstimator, clone
def _pascal2camel(pascal_str: str) -> str:
return re.sub(r'(?<!^)(?=[A-Z])', '_', pascal_str).lower()
class _BaseLagTransform(BaseEstimator):
def _get_init_signature(self):
return {
for k, v in inspect.signature(self.__class__.__init__).parameters.items()
k: v if k != 'self'
def _set_core_tfm(self, lag: int) -> '_BaseLagTransform':
= {
init_args getattr(self, k) for k in self._get_init_signature()
}self._core_tfm = getattr(core_tfms, self.__class__.__name__)(lag=lag, **init_args)
return self
def _get_name(self, lag: int) -> str:
= self._get_init_signature()
init_params = f'{_pascal2camel(self.__class__.__name__)}_lag{lag}'
result = [
changed_params f"{name}{getattr(self, name)}"
for name, arg in init_params.items()
if arg.default != getattr(self, name)
]if changed_params:
+= "_" + "_".join(changed_params)
result return result
def transform(self, ga: CoreGroupedArray) -> np.ndarray:
return self._core_tfm.transform(ga)
def update(self, ga: CoreGroupedArray) -> np.ndarray:
return self._core_tfm.update(ga)
def take(self, idxs: np.ndarray) -> '_BaseLagTransform':
= copy.deepcopy(self)
out = self._core_tfm.take(idxs)
out._core_tfm return out
def stack(transforms: Sequence['_BaseLagTransform']) -> '_BaseLagTransform':
= copy.deepcopy(transforms[0])
out = transforms[0]._core_tfm.stack(
out._core_tfm for tfm in transforms]
)return out
class Lag(_BaseLagTransform):
def __init__(self, lag: int):
self.lag = lag
self._core_tfm = core_tfms.Lag(lag=lag)
def _set_core_tfm(self, _lag: int) -> 'Lag':
return self
def _get_name(self, lag: int) -> str:
return f'lag{lag}'
def __eq__(self, other):
return isinstance(other, Lag) and self.lag == other.lag
class _RollingBase(_BaseLagTransform):
"Rolling statistic"
def __init__(self, window_size: int, min_samples: Optional[int] = None):
window_size : int
Number of samples in the window.
min_samples: int
Minimum samples required to output the statistic.
If `None`, will be set to `window_size`.
self.window_size = window_size
self.min_samples = min_samples
class RollingMean(_RollingBase):
class RollingStd(_RollingBase):
class RollingMin(_RollingBase):
class RollingMax(_RollingBase):
class RollingQuantile(_RollingBase):
def __init__(self, p: float, window_size: int, min_samples: Optional[int] = None):
super().__init__(window_size=window_size, min_samples=min_samples)
self.p = p
def _set_core_tfm(self, lag: int):
self._core_tfm = core_tfms.RollingQuantile(
=lag, p=self.p, window_size=self.window_size, min_samples=self.min_samples
)return self
= np.random.default_rng(seed=0)
rng = rng.integers(low=50, high=100, size=20)
lengths = rng.random(lengths.sum())
data = CoreGroupedArray(data, np.append(0, lengths.cumsum()))
ga 7)._set_core_tfm(1).transform(ga) RollingMean(
array([ nan, nan, nan, ..., 0.32114229, 0.3672723 ,
class _Seasonal_RollingBase(_BaseLagTransform):
def __init__(
self, season_length: int, window_size: int, min_samples: Optional[int] = None
season_length : int
Periodicity of the seasonal period.
window_size : int
Number of samples in the window.
min_samples: int
Minimum samples required to output the statistic.
If `None`, will be set to `window_size`.
self.season_length = season_length
self.window_size = window_size
self.min_samples = min_samples
::: {#40cf5b8b-d079-40ee-89ca-36a1ea5dfb90 .cell 0=‘出’ 1=‘口’}
class SeasonalRollingMean(_Seasonal_RollingBase):
class SeasonalRollingStd(_Seasonal_RollingBase):
class SeasonalRollingMin(_Seasonal_RollingBase):
class SeasonalRollingMax(_Seasonal_RollingBase):
class SeasonalRollingQuantile(_Seasonal_RollingBase):
def __init__(self, p: float, season_length: int, window_size: int, min_samples: Optional[int] = None):
super().__init__(season_length=season_length, window_size=window_size, min_samples=min_samples)
self.p = p
7, 4)._set_core_tfm(2).transform(ga) SeasonalRollingStd(
array([ nan, nan, nan, ..., 0.35518094, 0.25199008,
class _ExpandingBase(_BaseLagTransform):
def __init__(self):
class ExpandingMean(_ExpandingBase):
class ExpandingStd(_ExpandingBase):
class ExpandingMin(_ExpandingBase):
class ExpandingMax(_ExpandingBase):
class ExpandingQuantile(_ExpandingBase):
def __init__(self, p: float):
self.p = p
3).transform(ga) ExpandingMin()._set_core_tfm(
array([ nan, nan, nan, ..., 0.00297614, 0.00297614,
class ExponentiallyWeightedMean(_BaseLagTransform):
alpha : float
def __init__(self, alpha: float):
self.alpha = alpha
0.7)._set_core_tfm(4).transform(ga) ExponentiallyWeightedMean(
array([ nan, nan, nan, ..., 0.3074053 , 0.5567787 ,
class Offset(_BaseLagTransform):
"""Shift series before computing transformation
tfm : LagTransform
Transformation to be applied
n : int
Number of positions to shift (lag) series before applying the transformation"""
def __init__(self, tfm: _BaseLagTransform, n: int):
self.tfm = tfm
self.n = n
def _get_name(self, lag: int) -> str:
return self.tfm._get_name(lag + self.n)
def _set_core_tfm(self, lag: int) -> 'Offset':
self._core_tfm = clone(self.tfm)._set_core_tfm(lag + self.n)
return self
= Offset(RollingMean(window_size=10), 2)._set_core_tfm(5)
offset assert offset._get_name(5) == "rolling_mean_lag7_window_size10"
= offset.transform(ga)
transformed = RollingMean(window_size=10)._set_core_tfm(5).transform(ga._with_data(Lag(2).transform(ga)))
expected np.testing.assert_allclose(transformed, expected)
class Combine(_BaseLagTransform):
"""Combine two lag transformations using an operator
tfm1 : LagTransform
First transformation.
tfm2 : LagTransform
Second transformation.
operator : callable
Binary operator that defines how to combine the two transformations."""
def __init__(
self, tfm1: _BaseLagTransform, tfm2: _BaseLagTransform, operator: Callable
):self.tfm1 = tfm1
self.tfm2 = tfm2
self.operator = operator
def _set_core_tfm(self, lag: int) -> 'Combine':
self.tfm1 = clone(self.tfm1)._set_core_tfm(lag)
self.tfm2 = clone(self.tfm2)._set_core_tfm(lag)
return self
def _get_name(self, lag: int) -> str:
= getattr(self.tfm1, 'lag', lag)
lag1 = getattr(self.tfm2, 'lag', lag)
lag2 return f'{self.tfm1._get_name(lag1)}_{self.operator.__name__}_{self.tfm2._get_name(lag2)}'
def transform(self, ga: CoreGroupedArray) -> np.ndarray:
return self.operator(self.tfm1.transform(ga), self.tfm2.transform(ga))
def update(self, ga: CoreGroupedArray) -> np.ndarray:
return self.operator(self.tfm1.update(ga), self.tfm2.update(ga))
import operator
= Combine(Lag(1), Lag(2), operator.truediv)
comb assert comb._get_name(1) == 'lag1_truediv_lag2'
= comb.transform(ga)
transformed = Lag(1).transform(ga) / Lag(2).transform(ga)
expected np.testing.assert_allclose(transformed, expected)
# 检查所有变换是否能被正确使用
= [
RollingStd(0.5, 7),
RollingQuantile(7, 2),
SeasonalRollingMax(7, 2),
SeasonalRollingMean(7, 2),
SeasonalRollingMin(7, 2),
SeasonalRollingStd(0.5, 7, 7),
SeasonalRollingQuantile(7), 2),
Offset(RollingMax(5), Offset(RollingMean(5), 2), operator.truediv),
Combine(RollingMean(5), 2), RollingMean(5), operator.truediv),
]for tfm in tfms:
tfm.transform(ga) tfm.update(ga)
Give us a ⭐ on Github