目标转换

# 目标变换
%load_ext autoreload
%autoreload 2

可以在计算特征之前应用于目标的转换,并在计算预测后恢复。

import abc
import copy
from typing import Iterable, List, Optional, Sequence

import coreforecast.scalers as core_scalers
import numpy as np
import utilsforecast.processing as ufp
from coreforecast.grouped_array import GroupedArray as CoreGroupedArray
from sklearn.base import TransformerMixin, clone
from utilsforecast.compat import DataFrame

from mlforecast.grouped_array import GroupedArray
from mlforecast.utils import _ShortSeriesException
import pandas as pd
from fastcore.test import test_fail
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PowerTransformer
from utilsforecast.processing import counts_by_id

from mlforecast import MLForecast
from mlforecast.utils import generate_daily_series
class BaseTargetTransform(abc.ABC):
    """用于目标变换的基类。"""
    def set_column_names(self, id_col: str, time_col: str, target_col: str):
        self.id_col = id_col
        self.time_col = time_col
        self.target_col = target_col

    def update(self, df: DataFrame) -> DataFrame:
        raise NotImplementedError

    @staticmethod
    def stack(transforms: Sequence["BaseTargetTransform"]) -> "BaseTargetTransform":
        raise NotImplementedError

    @abc.abstractmethod
    def fit_transform(self, df: DataFrame) -> DataFrame:
        ...
        
    @abc.abstractmethod
    def inverse_transform(self, df: DataFrame) -> DataFrame:
        ...
class _BaseGroupedArrayTargetTransform(abc.ABC):
    """用于对分组数组进行目标变换的基类。"""
    num_threads: int = 1
    scaler_: core_scalers._BaseLocalScaler

    def set_num_threads(self, num_threads: int) -> None:
        self.num_threads = num_threads

    @abc.abstractmethod
    def update(self, ga: GroupedArray) -> GroupedArray:
        ...

    @abc.abstractmethod
    def fit_transform(self, ga: GroupedArray) -> GroupedArray:
        ...

    @abc.abstractmethod
    def inverse_transform(self, ga: GroupedArray) -> GroupedArray:
        ...    

    def inverse_transform_fitted(self, ga: GroupedArray) -> GroupedArray:
        return self.inverse_transform(ga)

    @abc.abstractmethod
    def take(self, idxs: np.ndarray) -> '_BaseGroupedArrayTargetTransform':
        ...

    @staticmethod
    def stack(scalers: Sequence["_BaseGroupedArrayTargetTransform"]) -> "_BaseGroupedArrayTargetTransform":
        first_scaler = scalers[0]
        core_scaler = first_scaler.scaler_
        out = copy.deepcopy(first_scaler)
        out.scaler_ = core_scaler.stack([sc.scaler_ for sc in scalers])
        return out
class Differences(_BaseGroupedArrayTargetTransform):
    """减去序列的先前值。可用于去除趋势或季节性。"""
    store_fitted = False
    
    def __init__(self, differences: Iterable[int]):
        self.differences = list(differences)

    def fit_transform(self, ga: GroupedArray) -> GroupedArray:
        self.fitted_: List[GroupedArray] = []
        original_sizes = np.diff(ga.indptr)
        total_diffs = sum(self.differences)
        small_series = original_sizes < total_diffs
        if small_series.any():
            raise _ShortSeriesException(np.arange(ga.n_groups)[small_series])
        self.scalers_ = []
        core_ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)        
        for d in self.differences:
            if self.store_fitted:
                # 这些被保存下来,以便能够正确执行。
                # 在尝试检索拟合值时进行逆变换。
                self.fitted_.append(GroupedArray(core_ga.data.copy(), core_ga.indptr.copy()))
            scaler = core_scalers.Difference(d)
            transformed = scaler.fit_transform(core_ga)
            self.scalers_.append(scaler)            
            core_ga = core_ga._with_data(transformed)
        return GroupedArray(core_ga.data, ga.indptr)

    def update(self, ga: GroupedArray) -> GroupedArray:
        core_ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
        for scaler in self.scalers_:
            transformed = scaler.update(core_ga)
            core_ga = core_ga._with_data(transformed)
        return GroupedArray(transformed, ga.indptr)

    def inverse_transform(self, ga: GroupedArray) -> GroupedArray:
        core_ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
        for scaler in self.scalers_[::-1]:
            transformed = scaler.inverse_transform(core_ga)
            core_ga = core_ga._with_data(transformed)
        return GroupedArray(transformed, ga.indptr)

    def inverse_transform_fitted(self, ga: GroupedArray) -> GroupedArray:
        ga = copy.copy(ga)
        for d, fitted in zip(reversed(self.differences), reversed(self.fitted_)):
            fitted.restore_fitted_difference(ga.data, ga.indptr, d)
        return ga

    def take(self, idxs: np.ndarray) -> "Differences":
        out = Differences(self.differences)
        out.fitted_ = [ga.take(idxs) for ga in self.fitted_]
        out.scalers_ = [scaler.take(idxs) for scaler in self.scalers_]
        return out

    @staticmethod
    def stack(scalers: Sequence["Differences"]) -> "Differences":  # 类型:忽略[覆盖]
        first_scaler = scalers[0]
        core_scaler = first_scaler.scalers_[0]
        diffs = first_scaler.differences
        out = Differences(diffs)
        out.fitted_ = []
        for i in range(len(scalers[0].fitted_)):
            data = np.hstack([sc.fitted_[i].data for sc in scalers])
            sizes = np.hstack([np.diff(sc.fitted_[i].indptr) for sc in scalers])
            out.fitted_.append(GroupedArray(data, np.append(0, sizes.cumsum())))
        out.scalers_ = [
            core_scaler.stack([sc.scalers_[i] for sc in scalers])
            for i in range(len(diffs))
        ]
        return out
series = generate_daily_series(10, min_length=50, max_length=100)
diffs = Differences([1, 2, 5])
id_counts = counts_by_id(series, 'unique_id')
indptr = np.append(0, id_counts['counts'].cumsum())
ga = GroupedArray(series['y'].values, indptr)

# 差异已正确应用
transformed = diffs.fit_transform(ga)
assert diffs.fitted_ == []
expected = series.copy()
for d in diffs.differences:
    expected['y'] -= expected.groupby('unique_id', observed=True)['y'].shift(d)
np.testing.assert_allclose(transformed.data, expected['y'].values)

# 拟合差异被正确恢复
diffs.store_fitted = True
transformed = diffs.fit_transform(ga)
keep_mask = ~np.isnan(transformed.data)
restored = diffs.inverse_transform_fitted(transformed)
np.testing.assert_allclose(ga.data[keep_mask], restored.data[keep_mask])
restored_subs = diffs.inverse_transform_fitted(transformed.take_from_groups(slice(8, None)))
np.testing.assert_allclose(ga.data[keep_mask], restored_subs.data)

# 测试转换
new_ga = GroupedArray(np.random.rand(10), np.arange(11))
prev_orig = [diffs.scalers_[i].tails_[::d].copy() for i, d in enumerate(diffs.differences)]
expected = new_ga.data - np.add.reduce(prev_orig)
updates = diffs.update(new_ga)
np.testing.assert_allclose(expected, updates.data)
np.testing.assert_allclose(diffs.scalers_[0].tails_, new_ga.data)
np.testing.assert_allclose(diffs.scalers_[1].tails_[1::2], new_ga.data - prev_orig[0])
np.testing.assert_allclose(diffs.scalers_[2].tails_[4::5], new_ga.data - np.add.reduce(prev_orig[:2]))
# 变量大小
diff1 = Differences([1])
ga = GroupedArray(np.arange(10), np.array([0, 3, 10]))
diff1.fit_transform(ga)
new_ga = GroupedArray(np.arange(4), np.array([0, 1, 4]))
updates = diff1.update(new_ga)
np.testing.assert_allclose(updates.data, np.array([0 - 2, 1 - 9, 2 - 1, 3 - 2]))
np.testing.assert_allclose(diff1.scalers_[0].tails_, np.array([0, 3]))

# 短系列
ga = GroupedArray(np.arange(20), np.array([0, 2, 20]))
test_fail(lambda: diffs.fit_transform(ga), contains="[0]")

# 堆栈
diffs = Differences([1, 2, 5])
ga = GroupedArray(series['y'].values, indptr)
diffs.fit_transform(ga)
stacked = Differences.stack([diffs, diffs])
for i in range(len(diffs.differences)):
    np.testing.assert_allclose(
        stacked.scalers_[i].tails_,
        np.tile(diffs.scalers_[i].tails_, 2)
    )
class AutoDifferences(_BaseGroupedArrayTargetTransform):
    """找到并应用每个序列的最佳差分次数。

参数
----------
max_diffs: int
    要应用的最大差分次数。"""
    def __init__(self, max_diffs: int):
        self.scaler_ = core_scalers.AutoDifferences(max_diffs)

    def fit_transform(self, ga: GroupedArray) -> GroupedArray:
        core_ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
        return GroupedArray(self.scaler_.fit_transform(core_ga), ga.indptr)

    def update(self, ga: GroupedArray) -> GroupedArray:
        core_ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
        return GroupedArray(self.scaler_.update(core_ga), ga.indptr)

    def inverse_transform(self, ga: GroupedArray) -> GroupedArray:
        core_ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
        return GroupedArray(self.scaler_.inverse_transform(core_ga), ga.indptr)

    def inverse_transform_fitted(self, ga: GroupedArray) -> GroupedArray:
        raise NotImplementedError

    def take(self, idxs: np.ndarray) -> 'AutoDifferences':
        out = AutoDifferences(self.scaler_.max_diffs)
        out.scaler_ = self.scaler_.take(idxs)
        return out
# 隐藏
sc = AutoDifferences(1)
ga = GroupedArray(np.arange(10), np.array([0, 10]))
transformed = sc.fit_transform(ga)
np.testing.assert_allclose(transformed.data, np.append(np.nan, np.ones(9)))
np.testing.assert_equal(sc.scaler_.diffs_, np.array([1.0], dtype=np.float32))
inv = sc.inverse_transform(ga).data
np.testing.assert_allclose(9 + np.arange(10).cumsum(), inv)
upd = sc.update(GroupedArray(np.array([10]), np.array([0, 1])))
np.testing.assert_equal(np.array([1.0]), upd.data)
np.testing.assert_equal(sc.scaler_.tails_[0], np.array([10]))

# 堆栈
stacked = AutoDifferences.stack([sc, sc])
np.testing.assert_allclose(
    stacked.scaler_.diffs_,
    np.tile(sc.scaler_.diffs_, 2),
)
np.testing.assert_allclose(
    stacked.scaler_.tails_,
    np.tile(sc.scaler_.tails_, 2),
)
class AutoSeasonalDifferences(AutoDifferences):
    """Find and apply the optimal number of seasonal differences to each group.

    Parameters
    ----------
    season_length : int
        Length of the seasonal period.
    max_diffs : int
        Maximum number of differences to apply.
    n_seasons : int, optional (default=10)
        Number of seasons to use to determine the number of differences. Defaults to 10.
        If `None` will use all samples, otherwise `season_length` * `n_seasons samples` will be used for the test.
        Smaller values will be faster but could be less accurate."""
    def __init__(self, season_length: int, max_diffs: int, n_seasons: Optional[int] = 10):      
        self.scaler_ = core_scalers.AutoSeasonalDifferences(
            season_length=season_length,
            max_diffs=max_diffs,
            n_seasons=n_seasons,
        )
sc = AutoSeasonalDifferences(season_length=5, max_diffs=1)
ga = GroupedArray(np.arange(5)[np.arange(10) % 5], np.array([0, 10]))
transformed = sc.fit_transform(ga)
sc.inverse_transform(ga)
sc.update(ga)

# 堆栈
stacked = AutoDifferences.stack([sc, sc])
np.testing.assert_allclose(
    stacked.scaler_.diffs_,
    np.tile(sc.scaler_.diffs_, 2),
)
np.testing.assert_allclose(
    stacked.scaler_.tails_,
    np.tile(sc.scaler_.tails_, 2),
)
class AutoSeasonalityAndDifferences(AutoDifferences):
    """Find the length of the seasonal period and apply the optimal number of differences to each group.

    Parameters
    ----------
    max_season_length : int
        Maximum length of the seasonal period.
    max_diffs : int
        Maximum number of differences to apply.
    n_seasons : int, optional (default=10)
        Number of seasons to use to determine the number of differences. Defaults to 10.
        If `None` will use all samples, otherwise `max_season_length` * `n_seasons samples` will be used for the test.
        Smaller values will be faster but could be less accurate."""
    def __init__(self, max_season_length: int, max_diffs: int, n_seasons: Optional[int] = 10):      
        self.scaler_ = core_scalers.AutoSeasonalityAndDifferences(
            max_season_length=max_season_length,
            max_diffs=max_diffs,
            n_seasons=n_seasons,
        )
sc = AutoSeasonalityAndDifferences(max_season_length=5, max_diffs=1)
ga = GroupedArray(np.arange(5)[np.arange(10) % 5], np.array([0, 10]))
transformed = sc.fit_transform(ga)
sc.inverse_transform(ga)
sc.update(ga)

# 堆栈
stacked = AutoDifferences.stack([sc, sc])
np.testing.assert_allclose(
    stacked.scaler_.diffs_,
    np.tile(sc.scaler_.diffs_, 2),
)
np.testing.assert_allclose(
    stacked.scaler_.tails_,
    np.tile(sc.scaler_.tails_, 2),
)
class _BaseLocalScaler(_BaseGroupedArrayTargetTransform):
    scaler_factory: type

    def update(self, ga: GroupedArray) -> GroupedArray:
        ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
        return GroupedArray(self.scaler_.transform(ga), ga.indptr)

    def fit_transform(self, ga: GroupedArray) -> GroupedArray:
        self.scaler_ = self.scaler_factory()
        core_ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
        transformed = self.scaler_.fit_transform(core_ga)
        return GroupedArray(transformed, ga.indptr)

    def inverse_transform(self, ga: GroupedArray) -> GroupedArray:
        core_ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
        transformed = self.scaler_.inverse_transform(core_ga)
        return GroupedArray(transformed, ga.indptr)

    def take(self, idxs: np.ndarray) -> '_BaseLocalScaler':
        out = copy.deepcopy(self)
        out.scaler_ = self.scaler_.take(idxs)
        return out
def test_scaler(sc, series):
    id_counts = counts_by_id(series, 'unique_id')
    indptr = np.append(0, id_counts['counts'].cumsum())
    ga = GroupedArray(series['y'].values, indptr)
    transformed = sc.fit_transform(ga)
    np.testing.assert_allclose(
        sc.inverse_transform(transformed).data,
        ga.data,
    )
    transformed2 = sc.update(ga)
    np.testing.assert_allclose(transformed.data, transformed2.data)
    
    idxs = [0, 7]
    subset = ga.take(idxs)
    transformed_subset = transformed.take(idxs)
    subsc = sc.take(idxs)
    np.testing.assert_allclose(
        subsc.inverse_transform(transformed_subset).data,
        subset.data,
    )

    stacked = sc.stack([sc, sc])
    stacked_stats = stacked.scaler_.stats_
    np.testing.assert_allclose(
        stacked_stats,
        np.tile(sc.scaler_.stats_, (2, 1)),
    )
class LocalStandardScaler(_BaseLocalScaler):
    """通过减去均值并除以其标准差来标准化每个序列。"""
    scaler_factory = core_scalers.LocalStandardScaler
test_scaler(LocalStandardScaler(), series)
class LocalMinMaxScaler(_BaseLocalScaler):
    """将每个序列缩放到 [0, 1] 区间内。"""
    scaler_factory = core_scalers.LocalMinMaxScaler
test_scaler(LocalMinMaxScaler(), series)
class LocalRobustScaler(_BaseLocalScaler):
    """Scaler robust to outliers.

    Parameters
    ----------
    scale : str (default='iqr')
        Statistic to use for scaling. Can be either 'iqr' (Inter Quartile Range) or 'mad' (Median Asbolute Deviation)
    """

    def __init__(self, scale: str):
        self.scaler_factory = lambda: core_scalers.LocalRobustScaler(scale)  # 类型:忽略
test_scaler(LocalRobustScaler(scale='iqr'), series)
test_scaler(LocalRobustScaler(scale='mad'), series)
class LocalBoxCox(_BaseLocalScaler):
    """为每组序列找到最优的lambda值,并应用Box-Cox变换"""
    def __init__(self):
        self.scaler_factory = lambda: core_scalers.LocalBoxCoxScaler(
            method='loglik', lower=0.0
        )
test_scaler(LocalBoxCox(), series)
class GlobalSklearnTransformer(BaseTargetTransform):
    """对所有序列应用相同的scikit-learn转换器。"""    
    def __init__(self, transformer: TransformerMixin):
        self.transformer = transformer

    def fit_transform(self, df: DataFrame) -> DataFrame:
        df = ufp.copy_if_pandas(df, deep=False)
        self.transformer_ = clone(self.transformer)
        transformed = self.transformer_.fit_transform(df[[self.target_col]].to_numpy())
        return ufp.assign_columns(df, self.target_col, transformed[:, 0])

    def inverse_transform(self, df: DataFrame) -> DataFrame:
        df = ufp.copy_if_pandas(df, deep=False)
        cols_to_transform = [
            c for c in df.columns if c not in (self.id_col, self.time_col)
        ]
        transformed = self.transformer_.inverse_transform(
            df[cols_to_transform].to_numpy()
        )
        return ufp.assign_columns(df, cols_to_transform, transformed)

    def update(self, df: DataFrame) -> DataFrame:
        df = ufp.copy_if_pandas(df, deep=False)
        transformed = self.transformer_.transform(df[[self.target_col]].to_numpy())
        return ufp.assign_columns(df, self.target_col, transformed[:, 0])

    @staticmethod
    def stack(transforms: Sequence["GlobalSklearnTransformer"]) -> "GlobalSklearnTransformer":  # 类型:忽略[覆盖]
        return transforms[0]
# 需要这个导入才能使 isinstance 正常工作
from mlforecast.target_transforms import Differences as ExportedDifferences
sk_boxcox = PowerTransformer(method='box-cox', standardize=False)
boxcox_global = GlobalSklearnTransformer(sk_boxcox)
single_difference = ExportedDifferences([1])
series = generate_daily_series(10)
fcst = MLForecast(
    models=[LinearRegression()],
    freq='D',
    lags=[1, 2],
    target_transforms=[boxcox_global, single_difference]
)
prep = fcst.preprocess(series, dropna=False)
expected = (
    pd.Series(
        sk_boxcox.fit_transform(series[['y']])[:, 0], index=series['unique_id']
    ).groupby('unique_id', observed=True)
    .diff()
    .values
)
np.testing.assert_allclose(prep['y'].values, expected)
#| 极地
series_pl = generate_daily_series(10, engine='polars')
fcst_pl = MLForecast(
    models=[LinearRegression()],
    freq='1d',
    lags=[1, 2],
    target_transforms=[boxcox_global, single_difference]
)
prep_pl = fcst_pl.preprocess(series_pl, dropna=False)
pd.testing.assert_frame_equal(prep, prep_pl.to_pandas())

Give us a ⭐ on Github