滞后变换

%load_ext autoreload
%autoreload 2

内置滞后变换

import copy
import inspect
import re
from typing import Callable, Optional, Sequence

import numpy as np
import coreforecast.lag_transforms as core_tfms
from coreforecast.grouped_array import GroupedArray as CoreGroupedArray
from sklearn.base import BaseEstimator, clone
def _pascal2camel(pascal_str: str) -> str:
    return re.sub(r'(?<!^)(?=[A-Z])', '_', pascal_str).lower()
class _BaseLagTransform(BaseEstimator):
    def _get_init_signature(self):
        return {
            k: v for k, v in inspect.signature(self.__class__.__init__).parameters.items()
            if k != 'self'
        }

    def _set_core_tfm(self, lag: int) -> '_BaseLagTransform':
        init_args = {
            k: getattr(self, k) for k in self._get_init_signature()
        }
        self._core_tfm = getattr(core_tfms, self.__class__.__name__)(lag=lag, **init_args)
        return self

    def _get_name(self, lag: int) -> str:
        init_params = self._get_init_signature()
        result = f'{_pascal2camel(self.__class__.__name__)}_lag{lag}'
        changed_params = [
            f"{name}{getattr(self, name)}"
            for name, arg in init_params.items()
            if arg.default != getattr(self, name)
        ]
        if changed_params:
            result += "_" + "_".join(changed_params)
        return result        

    def transform(self, ga: CoreGroupedArray) -> np.ndarray:
        return self._core_tfm.transform(ga)

    def update(self, ga: CoreGroupedArray) -> np.ndarray:
        return self._core_tfm.update(ga)

    def take(self, idxs: np.ndarray) -> '_BaseLagTransform':
        out = copy.deepcopy(self)
        out._core_tfm = self._core_tfm.take(idxs)
        return out

    @staticmethod
    def stack(transforms: Sequence['_BaseLagTransform']) -> '_BaseLagTransform':
        out = copy.deepcopy(transforms[0])
        out._core_tfm = transforms[0]._core_tfm.stack(
            [tfm._core_tfm for tfm in transforms]
        )
        return out
class Lag(_BaseLagTransform):
    def __init__(self, lag: int):
        self.lag = lag
        self._core_tfm = core_tfms.Lag(lag=lag)

    def _set_core_tfm(self, _lag: int) -> 'Lag':
        return self

    def _get_name(self, lag: int) -> str:
        return f'lag{lag}'

    def __eq__(self, other):
        return isinstance(other, Lag) and self.lag == other.lag
class _RollingBase(_BaseLagTransform):
    "Rolling statistic"
    def __init__(self, window_size: int, min_samples: Optional[int] = None):
        """
        Parameters
        ----------
        window_size : int
            Number of samples in the window.
        min_samples: int
            Minimum samples required to output the statistic.
            If `None`, will be set to `window_size`.
        """
        self.window_size = window_size
        self.min_samples = min_samples
class RollingMean(_RollingBase):
    ...

class RollingStd(_RollingBase):
    ...

class RollingMin(_RollingBase):
    ...

class RollingMax(_RollingBase):
    ...

class RollingQuantile(_RollingBase):
    def __init__(self, p: float, window_size: int, min_samples: Optional[int] = None):
        super().__init__(window_size=window_size, min_samples=min_samples)
        self.p = p

    def _set_core_tfm(self, lag: int):
        self._core_tfm = core_tfms.RollingQuantile(
            lag=lag, p=self.p, window_size=self.window_size, min_samples=self.min_samples
        )
        return self
rng = np.random.default_rng(seed=0)
lengths = rng.integers(low=50, high=100, size=20)
data = rng.random(lengths.sum())
ga = CoreGroupedArray(data, np.append(0, lengths.cumsum()))
RollingMean(7)._set_core_tfm(1).transform(ga)
array([       nan,        nan,        nan, ..., 0.32114229, 0.3672723 ,
       0.39137066])
class _Seasonal_RollingBase(_BaseLagTransform):
    """在季节性周期上滚动统计"""
    def __init__(
        self, season_length: int, window_size: int, min_samples: Optional[int] = None
    ):
        """
        Parameters
        ----------
        season_length : int
            Periodicity of the seasonal period.
        window_size : int
            Number of samples in the window.
        min_samples: int
            Minimum samples required to output the statistic.
            If `None`, will be set to `window_size`.
        """        
        self.season_length = season_length
        self.window_size = window_size
        self.min_samples = min_samples

::: {#40cf5b8b-d079-40ee-89ca-36a1ea5dfb90 .cell 0=‘出’ 1=‘口’}

class SeasonalRollingMean(_Seasonal_RollingBase):
    ...

class SeasonalRollingStd(_Seasonal_RollingBase):
    ...

class SeasonalRollingMin(_Seasonal_RollingBase):
    ...

class SeasonalRollingMax(_Seasonal_RollingBase):
    ...

class SeasonalRollingQuantile(_Seasonal_RollingBase):
    def __init__(self, p: float, season_length: int, window_size: int, min_samples: Optional[int] = None):
        super().__init__(season_length=season_length, window_size=window_size, min_samples=min_samples)
        self.p = p

:::

SeasonalRollingStd(7, 4)._set_core_tfm(2).transform(ga)
array([       nan,        nan,        nan, ..., 0.35518094, 0.25199008,
       0.40335074])
class _ExpandingBase(_BaseLagTransform):
    """扩展统计"""
    def __init__(self):
        ...
class ExpandingMean(_ExpandingBase):
    ...

class ExpandingStd(_ExpandingBase):
    ...

class ExpandingMin(_ExpandingBase):
    ...

class ExpandingMax(_ExpandingBase):
    ...

class ExpandingQuantile(_ExpandingBase):
    def __init__(self, p: float):
        self.p = p
ExpandingMin()._set_core_tfm(3).transform(ga)
array([       nan,        nan,        nan, ..., 0.00297614, 0.00297614,
       0.00297614])
class ExponentiallyWeightedMean(_BaseLagTransform):
    """指数加权平均

    参数
    ----------
    alpha : float
        平滑因子。"""
    def __init__(self, alpha: float):
        self.alpha = alpha
ExponentiallyWeightedMean(0.7)._set_core_tfm(4).transform(ga)
array([       nan,        nan,        nan, ..., 0.3074053 , 0.5567787 ,
       0.31390901])
class Offset(_BaseLagTransform):
    """Shift series before computing transformation
    
    Parameters
    ----------
    tfm : LagTransform
        Transformation to be applied
    n : int
        Number of positions to shift (lag) series before applying the transformation"""
    def __init__(self, tfm: _BaseLagTransform, n: int):
        self.tfm = tfm
        self.n = n

    def _get_name(self, lag: int) -> str:
        return self.tfm._get_name(lag + self.n)

    def _set_core_tfm(self, lag: int) -> 'Offset':
        self._core_tfm = clone(self.tfm)._set_core_tfm(lag + self.n)
        return self
offset = Offset(RollingMean(window_size=10), 2)._set_core_tfm(5)
assert offset._get_name(5) == "rolling_mean_lag7_window_size10"
transformed = offset.transform(ga)
expected = RollingMean(window_size=10)._set_core_tfm(5).transform(ga._with_data(Lag(2).transform(ga)))
np.testing.assert_allclose(transformed, expected)
class Combine(_BaseLagTransform):
    """Combine two lag transformations using an operator
    
    Parameters
    ----------
    tfm1 : LagTransform
        First transformation.
    tfm2 : LagTransform
        Second transformation.
    operator : callable
        Binary operator that defines how to combine the two transformations."""
    def __init__(
        self, tfm1: _BaseLagTransform, tfm2: _BaseLagTransform, operator: Callable
    ):
        self.tfm1 = tfm1
        self.tfm2 = tfm2
        self.operator = operator

    def _set_core_tfm(self, lag: int) -> 'Combine':
        self.tfm1 = clone(self.tfm1)._set_core_tfm(lag)
        self.tfm2 = clone(self.tfm2)._set_core_tfm(lag)
        return self

    def _get_name(self, lag: int) -> str:
        lag1 = getattr(self.tfm1, 'lag', lag)
        lag2 = getattr(self.tfm2, 'lag', lag)
        return f'{self.tfm1._get_name(lag1)}_{self.operator.__name__}_{self.tfm2._get_name(lag2)}'

    def transform(self, ga: CoreGroupedArray) -> np.ndarray:
        return self.operator(self.tfm1.transform(ga), self.tfm2.transform(ga))

    def update(self, ga: CoreGroupedArray) -> np.ndarray:
        return self.operator(self.tfm1.update(ga), self.tfm2.update(ga))
import operator
comb = Combine(Lag(1), Lag(2), operator.truediv)
assert comb._get_name(1) == 'lag1_truediv_lag2'
transformed = comb.transform(ga)
expected = Lag(1).transform(ga) / Lag(2).transform(ga)
np.testing.assert_allclose(transformed, expected)
# 检查所有变换是否能被正确使用
tfms = [
    ExpandingMax(),
    ExpandingMean(),
    ExpandingMin(),
    ExpandingStd(),
    ExpandingQuantile(0.5),
    ExponentiallyWeightedMean(0.1),
    RollingMax(7),
    RollingMean(7),
    RollingMin(7),
    RollingStd(7),
    RollingQuantile(0.5, 7),
    SeasonalRollingMax(7, 2),
    SeasonalRollingMean(7, 2),
    SeasonalRollingMin(7, 2),
    SeasonalRollingStd(7, 2),
    SeasonalRollingQuantile(0.5, 7, 7),
    Offset(RollingMax(7), 2),
    Combine(RollingMean(5), Offset(RollingMean(5), 2), operator.truediv),
    Combine(Offset(RollingMean(5), 2), RollingMean(5), operator.truediv),
]
for tfm in tfms:
    tfm._set_core_tfm(1)
    tfm._get_name(1)
    tfm.transform(ga)
    tfm.update(ga)

Give us a ⭐ on Github