# 目标变换
目标转换
%load_ext autoreload
%autoreload 2
可以在计算特征之前应用于目标的转换,并在计算预测后恢复。
import abc
import copy
from typing import Iterable, List, Optional, Sequence
import coreforecast.scalers as core_scalers
import numpy as np
import utilsforecast.processing as ufp
from coreforecast.grouped_array import GroupedArray as CoreGroupedArray
from sklearn.base import TransformerMixin, clone
from utilsforecast.compat import DataFrame
from mlforecast.grouped_array import GroupedArray
from mlforecast.utils import _ShortSeriesException
import pandas as pd
from fastcore.test import test_fail
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PowerTransformer
from utilsforecast.processing import counts_by_id
from mlforecast import MLForecast
from mlforecast.utils import generate_daily_series
class BaseTargetTransform(abc.ABC):
"""用于目标变换的基类。"""
def set_column_names(self, id_col: str, time_col: str, target_col: str):
self.id_col = id_col
self.time_col = time_col
self.target_col = target_col
def update(self, df: DataFrame) -> DataFrame:
raise NotImplementedError
@staticmethod
def stack(transforms: Sequence["BaseTargetTransform"]) -> "BaseTargetTransform":
raise NotImplementedError
@abc.abstractmethod
def fit_transform(self, df: DataFrame) -> DataFrame:
...
@abc.abstractmethod
def inverse_transform(self, df: DataFrame) -> DataFrame:
...
class _BaseGroupedArrayTargetTransform(abc.ABC):
"""用于对分组数组进行目标变换的基类。"""
int = 1
num_threads:
scaler_: core_scalers._BaseLocalScaler
def set_num_threads(self, num_threads: int) -> None:
self.num_threads = num_threads
@abc.abstractmethod
def update(self, ga: GroupedArray) -> GroupedArray:
...
@abc.abstractmethod
def fit_transform(self, ga: GroupedArray) -> GroupedArray:
...
@abc.abstractmethod
def inverse_transform(self, ga: GroupedArray) -> GroupedArray:
...
def inverse_transform_fitted(self, ga: GroupedArray) -> GroupedArray:
return self.inverse_transform(ga)
@abc.abstractmethod
def take(self, idxs: np.ndarray) -> '_BaseGroupedArrayTargetTransform':
...
@staticmethod
def stack(scalers: Sequence["_BaseGroupedArrayTargetTransform"]) -> "_BaseGroupedArrayTargetTransform":
= scalers[0]
first_scaler = first_scaler.scaler_
core_scaler = copy.deepcopy(first_scaler)
out = core_scaler.stack([sc.scaler_ for sc in scalers])
out.scaler_ return out
class Differences(_BaseGroupedArrayTargetTransform):
"""减去序列的先前值。可用于去除趋势或季节性。"""
= False
store_fitted
def __init__(self, differences: Iterable[int]):
self.differences = list(differences)
def fit_transform(self, ga: GroupedArray) -> GroupedArray:
self.fitted_: List[GroupedArray] = []
= np.diff(ga.indptr)
original_sizes = sum(self.differences)
total_diffs = original_sizes < total_diffs
small_series if small_series.any():
raise _ShortSeriesException(np.arange(ga.n_groups)[small_series])
self.scalers_ = []
= CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
core_ga for d in self.differences:
if self.store_fitted:
# 这些被保存下来,以便能够正确执行。
# 在尝试检索拟合值时进行逆变换。
self.fitted_.append(GroupedArray(core_ga.data.copy(), core_ga.indptr.copy()))
= core_scalers.Difference(d)
scaler = scaler.fit_transform(core_ga)
transformed self.scalers_.append(scaler)
= core_ga._with_data(transformed)
core_ga return GroupedArray(core_ga.data, ga.indptr)
def update(self, ga: GroupedArray) -> GroupedArray:
= CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
core_ga for scaler in self.scalers_:
= scaler.update(core_ga)
transformed = core_ga._with_data(transformed)
core_ga return GroupedArray(transformed, ga.indptr)
def inverse_transform(self, ga: GroupedArray) -> GroupedArray:
= CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
core_ga for scaler in self.scalers_[::-1]:
= scaler.inverse_transform(core_ga)
transformed = core_ga._with_data(transformed)
core_ga return GroupedArray(transformed, ga.indptr)
def inverse_transform_fitted(self, ga: GroupedArray) -> GroupedArray:
= copy.copy(ga)
ga for d, fitted in zip(reversed(self.differences), reversed(self.fitted_)):
fitted.restore_fitted_difference(ga.data, ga.indptr, d)return ga
def take(self, idxs: np.ndarray) -> "Differences":
= Differences(self.differences)
out = [ga.take(idxs) for ga in self.fitted_]
out.fitted_ = [scaler.take(idxs) for scaler in self.scalers_]
out.scalers_ return out
@staticmethod
def stack(scalers: Sequence["Differences"]) -> "Differences": # 类型:忽略[覆盖]
= scalers[0]
first_scaler = first_scaler.scalers_[0]
core_scaler = first_scaler.differences
diffs = Differences(diffs)
out = []
out.fitted_ for i in range(len(scalers[0].fitted_)):
= np.hstack([sc.fitted_[i].data for sc in scalers])
data = np.hstack([np.diff(sc.fitted_[i].indptr) for sc in scalers])
sizes 0, sizes.cumsum())))
out.fitted_.append(GroupedArray(data, np.append(= [
out.scalers_ for sc in scalers])
core_scaler.stack([sc.scalers_[i] for i in range(len(diffs))
]return out
= generate_daily_series(10, min_length=50, max_length=100) series
= Differences([1, 2, 5])
diffs = counts_by_id(series, 'unique_id')
id_counts = np.append(0, id_counts['counts'].cumsum())
indptr = GroupedArray(series['y'].values, indptr)
ga
# 差异已正确应用
= diffs.fit_transform(ga)
transformed assert diffs.fitted_ == []
= series.copy()
expected for d in diffs.differences:
'y'] -= expected.groupby('unique_id', observed=True)['y'].shift(d)
expected['y'].values)
np.testing.assert_allclose(transformed.data, expected[
# 拟合差异被正确恢复
= True
diffs.store_fitted = diffs.fit_transform(ga)
transformed = ~np.isnan(transformed.data)
keep_mask = diffs.inverse_transform_fitted(transformed)
restored
np.testing.assert_allclose(ga.data[keep_mask], restored.data[keep_mask])= diffs.inverse_transform_fitted(transformed.take_from_groups(slice(8, None)))
restored_subs
np.testing.assert_allclose(ga.data[keep_mask], restored_subs.data)
# 测试转换
= GroupedArray(np.random.rand(10), np.arange(11))
new_ga = [diffs.scalers_[i].tails_[::d].copy() for i, d in enumerate(diffs.differences)]
prev_orig = new_ga.data - np.add.reduce(prev_orig)
expected = diffs.update(new_ga)
updates
np.testing.assert_allclose(expected, updates.data)0].tails_, new_ga.data)
np.testing.assert_allclose(diffs.scalers_[1].tails_[1::2], new_ga.data - prev_orig[0])
np.testing.assert_allclose(diffs.scalers_[2].tails_[4::5], new_ga.data - np.add.reduce(prev_orig[:2]))
np.testing.assert_allclose(diffs.scalers_[# 变量大小
= Differences([1])
diff1 = GroupedArray(np.arange(10), np.array([0, 3, 10]))
ga
diff1.fit_transform(ga)= GroupedArray(np.arange(4), np.array([0, 1, 4]))
new_ga = diff1.update(new_ga)
updates 0 - 2, 1 - 9, 2 - 1, 3 - 2]))
np.testing.assert_allclose(updates.data, np.array([0].tails_, np.array([0, 3]))
np.testing.assert_allclose(diff1.scalers_[
# 短系列
= GroupedArray(np.arange(20), np.array([0, 2, 20]))
ga lambda: diffs.fit_transform(ga), contains="[0]")
test_fail(
# 堆栈
= Differences([1, 2, 5])
diffs = GroupedArray(series['y'].values, indptr)
ga
diffs.fit_transform(ga)= Differences.stack([diffs, diffs])
stacked for i in range(len(diffs.differences)):
np.testing.assert_allclose(
stacked.scalers_[i].tails_,2)
np.tile(diffs.scalers_[i].tails_, )
class AutoDifferences(_BaseGroupedArrayTargetTransform):
"""找到并应用每个序列的最佳差分次数。
参数
----------
max_diffs: int
要应用的最大差分次数。"""
def __init__(self, max_diffs: int):
self.scaler_ = core_scalers.AutoDifferences(max_diffs)
def fit_transform(self, ga: GroupedArray) -> GroupedArray:
= CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
core_ga return GroupedArray(self.scaler_.fit_transform(core_ga), ga.indptr)
def update(self, ga: GroupedArray) -> GroupedArray:
= CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
core_ga return GroupedArray(self.scaler_.update(core_ga), ga.indptr)
def inverse_transform(self, ga: GroupedArray) -> GroupedArray:
= CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
core_ga return GroupedArray(self.scaler_.inverse_transform(core_ga), ga.indptr)
def inverse_transform_fitted(self, ga: GroupedArray) -> GroupedArray:
raise NotImplementedError
def take(self, idxs: np.ndarray) -> 'AutoDifferences':
= AutoDifferences(self.scaler_.max_diffs)
out = self.scaler_.take(idxs)
out.scaler_ return out
# 隐藏
= AutoDifferences(1)
sc = GroupedArray(np.arange(10), np.array([0, 10]))
ga = sc.fit_transform(ga)
transformed 9)))
np.testing.assert_allclose(transformed.data, np.append(np.nan, np.ones(1.0], dtype=np.float32))
np.testing.assert_equal(sc.scaler_.diffs_, np.array([= sc.inverse_transform(ga).data
inv 9 + np.arange(10).cumsum(), inv)
np.testing.assert_allclose(= sc.update(GroupedArray(np.array([10]), np.array([0, 1])))
upd 1.0]), upd.data)
np.testing.assert_equal(np.array([0], np.array([10]))
np.testing.assert_equal(sc.scaler_.tails_[
# 堆栈
= AutoDifferences.stack([sc, sc])
stacked
np.testing.assert_allclose(
stacked.scaler_.diffs_,2),
np.tile(sc.scaler_.diffs_,
)
np.testing.assert_allclose(
stacked.scaler_.tails_,2),
np.tile(sc.scaler_.tails_, )
class AutoSeasonalDifferences(AutoDifferences):
"""Find and apply the optimal number of seasonal differences to each group.
Parameters
----------
season_length : int
Length of the seasonal period.
max_diffs : int
Maximum number of differences to apply.
n_seasons : int, optional (default=10)
Number of seasons to use to determine the number of differences. Defaults to 10.
If `None` will use all samples, otherwise `season_length` * `n_seasons samples` will be used for the test.
Smaller values will be faster but could be less accurate."""
def __init__(self, season_length: int, max_diffs: int, n_seasons: Optional[int] = 10):
self.scaler_ = core_scalers.AutoSeasonalDifferences(
=season_length,
season_length=max_diffs,
max_diffs=n_seasons,
n_seasons )
= AutoSeasonalDifferences(season_length=5, max_diffs=1)
sc = GroupedArray(np.arange(5)[np.arange(10) % 5], np.array([0, 10]))
ga = sc.fit_transform(ga)
transformed
sc.inverse_transform(ga)
sc.update(ga)
# 堆栈
= AutoDifferences.stack([sc, sc])
stacked
np.testing.assert_allclose(
stacked.scaler_.diffs_,2),
np.tile(sc.scaler_.diffs_,
)
np.testing.assert_allclose(
stacked.scaler_.tails_,2),
np.tile(sc.scaler_.tails_, )
class AutoSeasonalityAndDifferences(AutoDifferences):
"""Find the length of the seasonal period and apply the optimal number of differences to each group.
Parameters
----------
max_season_length : int
Maximum length of the seasonal period.
max_diffs : int
Maximum number of differences to apply.
n_seasons : int, optional (default=10)
Number of seasons to use to determine the number of differences. Defaults to 10.
If `None` will use all samples, otherwise `max_season_length` * `n_seasons samples` will be used for the test.
Smaller values will be faster but could be less accurate."""
def __init__(self, max_season_length: int, max_diffs: int, n_seasons: Optional[int] = 10):
self.scaler_ = core_scalers.AutoSeasonalityAndDifferences(
=max_season_length,
max_season_length=max_diffs,
max_diffs=n_seasons,
n_seasons )
= AutoSeasonalityAndDifferences(max_season_length=5, max_diffs=1)
sc = GroupedArray(np.arange(5)[np.arange(10) % 5], np.array([0, 10]))
ga = sc.fit_transform(ga)
transformed
sc.inverse_transform(ga)
sc.update(ga)
# 堆栈
= AutoDifferences.stack([sc, sc])
stacked
np.testing.assert_allclose(
stacked.scaler_.diffs_,2),
np.tile(sc.scaler_.diffs_,
)
np.testing.assert_allclose(
stacked.scaler_.tails_,2),
np.tile(sc.scaler_.tails_, )
class _BaseLocalScaler(_BaseGroupedArrayTargetTransform):
type
scaler_factory:
def update(self, ga: GroupedArray) -> GroupedArray:
= CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
ga return GroupedArray(self.scaler_.transform(ga), ga.indptr)
def fit_transform(self, ga: GroupedArray) -> GroupedArray:
self.scaler_ = self.scaler_factory()
= CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
core_ga = self.scaler_.fit_transform(core_ga)
transformed return GroupedArray(transformed, ga.indptr)
def inverse_transform(self, ga: GroupedArray) -> GroupedArray:
= CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
core_ga = self.scaler_.inverse_transform(core_ga)
transformed return GroupedArray(transformed, ga.indptr)
def take(self, idxs: np.ndarray) -> '_BaseLocalScaler':
= copy.deepcopy(self)
out = self.scaler_.take(idxs)
out.scaler_ return out
def test_scaler(sc, series):
= counts_by_id(series, 'unique_id')
id_counts = np.append(0, id_counts['counts'].cumsum())
indptr = GroupedArray(series['y'].values, indptr)
ga = sc.fit_transform(ga)
transformed
np.testing.assert_allclose(
sc.inverse_transform(transformed).data,
ga.data,
)= sc.update(ga)
transformed2
np.testing.assert_allclose(transformed.data, transformed2.data)
= [0, 7]
idxs = ga.take(idxs)
subset = transformed.take(idxs)
transformed_subset = sc.take(idxs)
subsc
np.testing.assert_allclose(
subsc.inverse_transform(transformed_subset).data,
subset.data,
)
= sc.stack([sc, sc])
stacked = stacked.scaler_.stats_
stacked_stats
np.testing.assert_allclose(
stacked_stats,2, 1)),
np.tile(sc.scaler_.stats_, ( )
class LocalStandardScaler(_BaseLocalScaler):
"""通过减去均值并除以其标准差来标准化每个序列。"""
= core_scalers.LocalStandardScaler scaler_factory
test_scaler(LocalStandardScaler(), series)
class LocalMinMaxScaler(_BaseLocalScaler):
"""将每个序列缩放到 [0, 1] 区间内。"""
= core_scalers.LocalMinMaxScaler scaler_factory
test_scaler(LocalMinMaxScaler(), series)
class LocalRobustScaler(_BaseLocalScaler):
"""Scaler robust to outliers.
Parameters
----------
scale : str (default='iqr')
Statistic to use for scaling. Can be either 'iqr' (Inter Quartile Range) or 'mad' (Median Asbolute Deviation)
"""
def __init__(self, scale: str):
self.scaler_factory = lambda: core_scalers.LocalRobustScaler(scale) # 类型:忽略
='iqr'), series) test_scaler(LocalRobustScaler(scale
='mad'), series) test_scaler(LocalRobustScaler(scale
class LocalBoxCox(_BaseLocalScaler):
"""为每组序列找到最优的lambda值,并应用Box-Cox变换"""
def __init__(self):
self.scaler_factory = lambda: core_scalers.LocalBoxCoxScaler(
='loglik', lower=0.0
method )
test_scaler(LocalBoxCox(), series)
class GlobalSklearnTransformer(BaseTargetTransform):
"""对所有序列应用相同的scikit-learn转换器。"""
def __init__(self, transformer: TransformerMixin):
self.transformer = transformer
def fit_transform(self, df: DataFrame) -> DataFrame:
= ufp.copy_if_pandas(df, deep=False)
df self.transformer_ = clone(self.transformer)
= self.transformer_.fit_transform(df[[self.target_col]].to_numpy())
transformed return ufp.assign_columns(df, self.target_col, transformed[:, 0])
def inverse_transform(self, df: DataFrame) -> DataFrame:
= ufp.copy_if_pandas(df, deep=False)
df = [
cols_to_transform for c in df.columns if c not in (self.id_col, self.time_col)
c
]= self.transformer_.inverse_transform(
transformed
df[cols_to_transform].to_numpy()
)return ufp.assign_columns(df, cols_to_transform, transformed)
def update(self, df: DataFrame) -> DataFrame:
= ufp.copy_if_pandas(df, deep=False)
df = self.transformer_.transform(df[[self.target_col]].to_numpy())
transformed return ufp.assign_columns(df, self.target_col, transformed[:, 0])
@staticmethod
def stack(transforms: Sequence["GlobalSklearnTransformer"]) -> "GlobalSklearnTransformer": # 类型:忽略[覆盖]
return transforms[0]
# 需要这个导入才能使 isinstance 正常工作
from mlforecast.target_transforms import Differences as ExportedDifferences
= PowerTransformer(method='box-cox', standardize=False)
sk_boxcox = GlobalSklearnTransformer(sk_boxcox)
boxcox_global = ExportedDifferences([1])
single_difference = generate_daily_series(10)
series = MLForecast(
fcst =[LinearRegression()],
models='D',
freq=[1, 2],
lags=[boxcox_global, single_difference]
target_transforms
)= fcst.preprocess(series, dropna=False)
prep = (
expected
pd.Series('y']])[:, 0], index=series['unique_id']
sk_boxcox.fit_transform(series[['unique_id', observed=True)
).groupby(
.diff()
.values
)'y'].values, expected) np.testing.assert_allclose(prep[
#| 极地
= generate_daily_series(10, engine='polars')
series_pl = MLForecast(
fcst_pl =[LinearRegression()],
models='1d',
freq=[1, 2],
lags=[boxcox_global, single_difference]
target_transforms
)= fcst_pl.preprocess(series_pl, dropna=False)
prep_pl pd.testing.assert_frame_equal(prep, prep_pl.to_pandas())
Give us a ⭐ on Github